Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2367] Add two cluster ha test #126

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,46 @@ jobs:
run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}'
env:
CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }}

integration-test-ha:
strategy:
fail-fast: false
matrix:
tox-environments:
- integration-ha-ha
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: What is this? Is it so it can be nested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will look different, just a placeholder at the moment, see following PR

name: ${{ matrix.tox-environments }}
needs:
- lint
- unit-test
- build
- integration-test
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Setup operator environment
# TODO: Replace with custom image on self-hosted runner
uses: charmed-kubernetes/actions-operator@main
with:
provider: lxd
bootstrap-options: "--agent-version 2.9.38"
- name: Download packed charm(s)
uses: actions/download-artifact@v3
with:
name: ${{ needs.build.outputs.artifact-name }}
- name: Select tests
id: select-tests
run: |
if [ "${{ github.event_name }}" == "schedule" ]
then
echo Running unstable and stable tests
echo "mark_expression=" >> $GITHUB_OUTPUT
else
echo Skipping unstable tests
echo "mark_expression=not unstable" >> $GITHUB_OUTPUT
fi
- name: Run integration tests
run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}'
env:
CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }}
129 changes: 129 additions & 0 deletions tests/integration/ha/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import logging
from pathlib import Path
from subprocess import PIPE, check_output
from typing import Any, Dict

import yaml
from charms.kafka.v0.client import KafkaClient
from kafka.admin import NewTopic

from snap import KafkaSnap

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
APP_NAME = METADATA["name"]
ZK_NAME = "zookeeper"
REL_NAME_ADMIN = "kafka-client-admin"

logger = logging.getLogger(__name__)


def produce_and_check_logs(
model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str
) -> None:
"""Produces messages from HN to chosen Kafka topic.

Args:
model_full_name: the full name of the model
kafka_unit_name: the kafka unit to checks logs on
provider_unit_name: the app to grab credentials from
topic: the desired topic to produce to

Raises:
KeyError: if missing relation data
AssertionError: if logs aren't found for desired topic
"""
relation_data = get_provider_data(
unit_name=provider_unit_name,
model_full_name=model_full_name,
endpoint="kafka-client-admin",
)
topic = topic
username = relation_data.get("username", None)
password = relation_data.get("password", None)
servers = relation_data.get("endpoints", "").split(",")
security_protocol = "SASL_PLAINTEXT"

if not (username and password and servers):
raise KeyError("missing relation data from app charm")

client = KafkaClient(
servers=servers,
username=username,
password=password,
security_protocol=security_protocol,
)
topic_config = NewTopic(
name=topic,
num_partitions=5,
replication_factor=1,
)

client.create_topic(topic=topic_config)
for i in range(15):
message = f"Message #{i}"
client.produce_message(topic_name=topic, message_content=message)

check_logs(model_full_name, kafka_unit_name, topic)


def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None:
"""Checks if messages for a topic have been produced.

Args:
model_full_name: the full name of the model
kafka_unit_name: the kafka unit to checks logs on
topic: the desired topic to check
"""
logs = check_output(
f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'",
stderr=PIPE,
shell=True,
universal_newlines=True,
).splitlines()

logger.debug(f"{logs=}")

passed = False
for log in logs:
if topic and "index" in log:
passed = True
break

assert passed, "logs not found"


def get_provider_data(
unit_name: str, model_full_name: str, endpoint: str = "kafka-client"
) -> Dict[str, str]:
result = show_unit(unit_name=unit_name, model_full_name=model_full_name)
relations_info = result[unit_name]["relation-info"]
logger.info(f"Relation info: {relations_info}")
provider_relation_data = {}
for info in relations_info:
if info["endpoint"] == endpoint:
logger.info(f"Relation data: {info}")
provider_relation_data["username"] = info["application-data"]["username"]
provider_relation_data["password"] = info["application-data"]["password"]
provider_relation_data["endpoints"] = info["application-data"]["endpoints"]
provider_relation_data["zookeeper-uris"] = info["application-data"]["zookeeper-uris"]
provider_relation_data["tls"] = info["application-data"]["tls"]
if "consumer-group-prefix" in info["application-data"]:
provider_relation_data["consumer-group-prefix"] = info["application-data"][
"consumer-group-prefix"
]
provider_relation_data["topic"] = info["application-data"]["topic"]
return provider_relation_data


def show_unit(unit_name: str, model_full_name: str) -> Any:
result = check_output(
f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

return yaml.safe_load(result)
85 changes: 85 additions & 0 deletions tests/integration/ha/test_ha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import asyncio
import logging

import pytest
from helpers import (
APP_NAME,
REL_NAME_ADMIN,
ZK_NAME,
check_logs,
produce_and_check_logs,
)
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)


DUMMY_NAME = "app"


@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest, kafka_charm):
await asyncio.gather(
ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"),
ops_test.model.deploy(ZK_NAME, channel="edge", num_units=1, series="jammy"),
)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30, timeout=3600)
assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK_NAME].status == "active"

await ops_test.model.add_relation(APP_NAME, ZK_NAME)
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30)
assert ops_test.model.applications[APP_NAME].status == "active"
assert ops_test.model.applications[ZK_NAME].status == "active"


async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm):
second_kafka_name = f"{APP_NAME}-two"
second_zk_name = f"{ZK_NAME}-two"

await asyncio.gather(
ops_test.model.deploy(
kafka_charm, application_name=second_kafka_name, num_units=1, series="jammy"
),
ops_test.model.deploy(
ZK_NAME, channel="edge", application_name=second_zk_name, num_units=1, series="jammy"
),
ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"),
)

await ops_test.model.wait_for_idle(
apps=[second_kafka_name, second_zk_name, DUMMY_NAME],
idle_period=30,
timeout=3600,
)
assert ops_test.model.applications[second_kafka_name].status == "blocked"

await ops_test.model.add_relation(second_kafka_name, second_zk_name)

# Relate "app" to the *first* cluster
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}")

await ops_test.model.wait_for_idle(
apps=[second_kafka_name, second_zk_name, DUMMY_NAME, APP_NAME],
idle_period=30,
)

produce_and_check_logs(
model_full_name=ops_test.model_full_name,
kafka_unit_name=f"{APP_NAME}/0",
provider_unit_name=f"{DUMMY_NAME}/0",
topic="hot-topic",
)

# Check that logs are not found on the second cluster
with pytest.raises(AssertionError):
check_logs(
model_full_name=ops_test.model_full_name,
kafka_unit_name=f"{second_kafka_name}/0",
topic="hot-topic",
)
11 changes: 11 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ def produce_and_check_logs(
message = f"Message #{i}"
client.produce_message(topic_name=topic, message_content=message)

check_logs(model_full_name, kafka_unit_name, topic)


def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None:
"""Checks if messages for a topic have been produced.

Args:
model_full_name: the full name of the model
kafka_unit_name: the kafka unit to checks logs on
topic: the desired topic to check
"""
logs = check_output(
f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'",
stderr=PIPE,
Expand Down
49 changes: 13 additions & 36 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ set_env =
PYTHONPATH = {tox_root}/lib:{[vars]src_path}
PYTHONBREAKPOINT=ipdb.set_trace
PY_COLORS=1
charm: TEST_FILE=test_charm.py
provider: TEST_FILE=test_provider.py
scaling: TEST_FILE=test_scaling.py
password-rotation: TEST_FILE=test_password_rotation.py
tls: TEST_FILE=test_tls.py
ha: TEST_FILE=test_ha.py

pass_env =
PYTHONPATH
CHARM_BUILD_DIR
Expand Down Expand Up @@ -78,52 +85,22 @@ commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/

[testenv:integration-charm]
description = Run base integration tests
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_charm.py

[testenv:integration-provider]
description = Run integration tests for provider
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_provider.py

[testenv:integration-scaling]
description = Run scaling integration tests
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_scaling.py

[testenv:integration-password-rotation]
description = Run password rotation integration tests
[testenv:integration-{charm,provider,scaling,password-rotation,tls}]
description = Run integration tests
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_password_rotation.py
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE}

[testenv:integration-tls]
description = Run TLS integration tests
[testenv:integration-ha-{ha}]
description = Run integration tests for high availability
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_tls.py
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ha/{env:TEST_FILE}