diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 41431754..da50dccd 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -104,7 +104,7 @@ jobs: fail-fast: false matrix: tox-environments: - - integration-ha-ha + - integration-ha name: ${{ matrix.tox-environments }} needs: - lint diff --git a/.gitignore b/.gitignore index 4b4412e7..f48d73e8 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ __pycache__/ *.py[cod] .vscode .idea -.python-version \ No newline at end of file +.python-version +last_written_value diff --git a/last_written_value b/last_written_value new file mode 100644 index 00000000..885bfe9b --- /dev/null +++ b/last_written_value @@ -0,0 +1 @@ +20052,0 \ No newline at end of file diff --git a/lib/charms/kafka/v0/client.py b/lib/charms/kafka/v0/client.py index 00db6903..0016a112 100644 --- a/lib/charms/kafka/v0/client.py +++ b/lib/charms/kafka/v0/client.py @@ -81,6 +81,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent): from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer from kafka.admin import NewTopic +from kafka.errors import KafkaError logger = logging.getLogger(__name__) logging.basicConfig(stream=sys.stdout, level=logging.INFO) @@ -93,7 +94,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 1 +LIBPATCH = 2 class KafkaClient: @@ -126,6 +127,7 @@ def __init__( self.mtls = self.security_protocol == "SSL" self._subscription = None + self._consumer_group_prefix = None @cached_property def _admin_client(self) -> KafkaAdminClient: @@ -174,7 +176,7 @@ def _consumer_client(self) -> KafkaConsumer: ssl_certfile=self.certfile_path if self.ssl else None, ssl_keyfile=self.keyfile_path if self.mtls else None, api_version=KafkaClient.API_VERSION if self.mtls else None, - group_id=self._consumer_group_prefix or None, + group_id=self._consumer_group_prefix, enable_auto_commit=True, auto_offset_reset="earliest", consumer_timeout_ms=15000, @@ -188,10 +190,17 @@ def create_topic(self, topic: NewTopic) -> None: Args: topic: the configuration of the topic to create - """ self._admin_client.create_topics(new_topics=[topic], validate_only=False) + def delete_topics(self, topics: list[str]) -> None: + """Deletes a topic. + + Args: + topics: list of topics to delete + """ + self._admin_client.delete_topics(topics=topics) + def subscribe_to_topic( self, topic_name: str, consumer_group_prefix: Optional[str] = None ) -> None: @@ -240,8 +249,19 @@ def produce_message(self, topic_name: str, message_content: str) -> None: """ item_content = f"Message #{message_content}" future = self._producer_client.send(topic_name, str.encode(item_content)) - future.get(timeout=60) - logger.info(f"Message published to topic={topic_name}, message content: {item_content}") + try: + future.get(timeout=60) + logger.info( + f"Message published to topic={topic_name}, message content: {item_content}" + ) + except KafkaError as e: + logger.error(f"Error producing message {message_content} to topic {topic_name}: {e}") + + def close(self) -> None: + """Close the connection to the client.""" + self._admin_client.close() + self._producer_client.close() + self._consumer_client.close() if __name__ == "__main__": diff --git a/tests/integration/ha/__init__.py b/tests/integration/ha/__init__.py new file mode 100644 index 00000000..db3bfe1a --- /dev/null +++ b/tests/integration/ha/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. diff --git a/tests/integration/ha/continuous_writes.py b/tests/integration/ha/continuous_writes.py new file mode 100644 index 00000000..f75332e0 --- /dev/null +++ b/tests/integration/ha/continuous_writes.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import asyncio +import logging +import os +from multiprocessing import Event, Process, Queue +from types import SimpleNamespace + +from charms.kafka.v0.client import KafkaClient +from kafka.admin import NewTopic +from kafka.errors import KafkaTimeoutError +from pytest_operator.plugin import OpsTest +from tenacity import ( + RetryError, + Retrying, + retry, + stop_after_attempt, + stop_after_delay, + wait_fixed, + wait_random, +) + +from integration.helpers import DUMMY_NAME, get_provider_data + +logger = logging.getLogger(__name__) + + +class ContinuousWrites: + """Utility class for managing continuous writes.""" + + TOPIC_NAME = "ha-test-topic" + LAST_WRITTEN_VAL_PATH = "last_written_value" + + def __init__(self, ops_test: OpsTest, app: str): + self._ops_test = ops_test + self._app = app + self._is_stopped = True + self._event = None + self._queue = None + self._process = None + + @retry( + wait=wait_fixed(wait=5) + wait_random(0, 5), + stop=stop_after_attempt(5), + ) + def start(self) -> None: + """Run continuous writes in the background.""" + if not self._is_stopped: + self.clear() + + # create topic + self._create_replicated_topic() + + # create process + self._create_process() + + # pass the model full name to the process once it starts + self.update() + + # start writes + self._process.start() + + def update(self): + """Update cluster related conf. Useful in cases such as scaling, pwd change etc.""" + self._queue.put(SimpleNamespace(model_full_name=self._ops_test.model_full_name)) + + @retry( + wait=wait_fixed(wait=5) + wait_random(0, 5), + stop=stop_after_attempt(5), + ) + def clear(self) -> None: + """Stop writes and delete the topic.""" + if not self._is_stopped: + self.stop() + + client = self._client() + try: + client.delete_topics(topics=[self.TOPIC_NAME]) + finally: + client.close() + + def consumed_messages(self) -> list | None: + """Consume the messages in the topic.""" + client = self._client() + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)): + with attempt: + client.subscribe_to_topic(topic_name=self.TOPIC_NAME) + # FIXME: loading whole list of consumed messages into memory might not be the best idea + return list(client.messages()) + except RetryError: + return [] + finally: + client.close() + + def _create_replicated_topic(self): + """Create topic with replication_factor = 3.""" + client = self._client() + topic_config = NewTopic( + name=self.TOPIC_NAME, + num_partitions=1, + replication_factor=3, + ) + client.create_topic(topic=topic_config) + + @retry( + wait=wait_fixed(wait=5) + wait_random(0, 5), + stop=stop_after_attempt(5), + ) + def stop(self) -> SimpleNamespace: + """Stop the continuous writes process and return max inserted ID.""" + if not self._is_stopped: + self._stop_process() + + result = SimpleNamespace() + + # messages count + consumed_messages = self.consumed_messages() + result.count = len(consumed_messages) + result.last_message = consumed_messages[-1] + + # last expected message stored on disk + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)): + with attempt: + with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f: + result.last_expected_message, result.lost_messages = ( + f.read().rstrip().split(",", maxsplit=2) + ) + except RetryError: + result.last_expected_message = result.lost_messages = -1 + + return result + + def _create_process(self): + self._is_stopped = False + self._event = Event() + self._queue = Queue() + self._process = Process( + target=ContinuousWrites._run_async, + name="continuous_writes", + args=(self._event, self._queue, 0), + ) + + def _stop_process(self): + self._event.set() + self._process.join() + self._queue.close() + self._is_stopped = True + + def _client(self): + """Build a Kafka client.""" + relation_data = get_provider_data( + unit_name=f"{DUMMY_NAME}/0", + model_full_name=self._ops_test.model_full_name, + endpoint="kafka-client-admin", + ) + return KafkaClient( + servers=relation_data["endpoints"].split(","), + username=relation_data["username"], + password=relation_data["password"], + security_protocol="SASL_PLAINTEXT", + ) + + @staticmethod + async def _run(event: Event, data_queue: Queue, starting_number: int) -> None: # noqa: C901 + """Continuous writing.""" + initial_data = data_queue.get(True) + + def _client(): + """Build a Kafka client.""" + relation_data = get_provider_data( + unit_name=f"{DUMMY_NAME}/0", + model_full_name=initial_data.model_full_name, + endpoint="kafka-client-admin", + ) + return KafkaClient( + servers=relation_data["endpoints"].split(","), + username=relation_data["username"], + password=relation_data["password"], + security_protocol="SASL_PLAINTEXT", + ) + + write_value = starting_number + lost_messages = 0 + client = _client() + + while True: + if not data_queue.empty(): # currently evaluates to false as we don't make updates + data_queue.get(False) + client.close() + client = _client() + + try: + client.produce_message( + topic_name=ContinuousWrites.TOPIC_NAME, message_content=str(write_value) + ) + except KafkaTimeoutError: + client.close() + client = _client() + lost_messages += 1 + finally: + # process termination requested + if event.is_set(): + break + + write_value += 1 + + # write last expected written value on disk + with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "w") as f: + f.write(f"{str(write_value)},{str(lost_messages)}") + os.fsync(f) + + client.close() + + @staticmethod + def _run_async(event: Event, data_queue: Queue, starting_number: int): + """Run async code.""" + asyncio.run(ContinuousWrites._run(event, data_queue, starting_number)) diff --git a/tests/integration/ha/ha_helpers.py b/tests/integration/ha/ha_helpers.py new file mode 100644 index 00000000..c89b8a04 --- /dev/null +++ b/tests/integration/ha/ha_helpers.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import logging +import re +from subprocess import PIPE, check_output + +from pytest_operator.plugin import OpsTest + +from integration.helpers import APP_NAME, get_address +from literals import SECURITY_PROTOCOL_PORTS +from snap import KafkaSnap + +PROCESS = "kafka.Kafka" +SERVICE_DEFAULT_PATH = "/etc/systemd/system/snap.charmed-kafka.daemon.service" + +logger = logging.getLogger(__name__) + + +class ProcessError(Exception): + """Raised when a process fails.""" + + +class ProcessRunningError(Exception): + """Raised when a process is running when it is not expected to be.""" + + +async def get_topic_leader(ops_test: OpsTest, topic: str) -> int: + """Get the broker with the topic leader. + + Args: + ops_test: OpsTest utility class + topic: the desired topic to check + """ + bootstrap_server = ( + await get_address(ops_test=ops_test) + + f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}" + ) + + result = check_output( + f"JUJU_MODEL={ops_test.model_full_name} juju ssh kafka/0 sudo -i 'charmed-kafka.topics --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --describe --topic {topic}'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return re.search(r"Leader: (\d+)", result)[1] + + +async def get_topic_offsets(ops_test: OpsTest, topic: str, unit_name: str) -> list[str]: + """Get the offsets of a topic on a unit. + + Args: + ops_test: OpsTest utility class + topic: the desired topic to check + unit_name: unit to check the offsets on + """ + bootstrap_server = ( + await get_address(ops_test=ops_test) + + f":{SECURITY_PROTOCOL_PORTS['SASL_PLAINTEXT'].client}" + ) + + # example of topic offset output: 'test-topic:0:10' + result = check_output( + f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.get-offsets --bootstrap-server {bootstrap_server} --command-config {KafkaSnap.CONF_PATH}/client.properties --topic {topic}'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + + return re.search(rf"{topic}:(\d+:\d+)", result)[1].split(":") + + +async def send_control_signal( + ops_test: OpsTest, unit_name: str, kill_code: str, app_name: str = APP_NAME +) -> None: + if len(ops_test.model.applications[app_name].units) < 3: + await ops_test.model.applications[app_name].add_unit(count=1) + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) + + kill_cmd = f"exec --unit {unit_name} -- pkill --signal {kill_code} -f {PROCESS}" + return_code, stdout, stderr = await ops_test.juju(*kill_cmd.split()) + + if return_code != 0: + raise Exception( + f"Expected kill command {kill_cmd} to succeed instead it failed: {return_code}, {stdout}, {stderr}" + ) + + +async def patch_restart_delay(ops_test: OpsTest, unit_name: str, delay: int) -> None: + """Adds a restart delay in the DB service file. + + When the DB service fails it will now wait for `delay` number of seconds. + """ + add_delay_cmd = ( + f"exec --unit {unit_name} -- " + f"sudo sed -i -e '/^[Service]/a RestartSec={delay}' " + f"{SERVICE_DEFAULT_PATH}" + ) + await ops_test.juju(*add_delay_cmd.split(), check=True) + + # reload the daemon for systemd to reflect changes + reload_cmd = f"exec --unit {unit_name} -- sudo systemctl daemon-reload" + await ops_test.juju(*reload_cmd.split(), check=True) + + +async def remove_restart_delay(ops_test: OpsTest, unit_name: str) -> None: + """Removes the restart delay from the service.""" + remove_delay_cmd = ( + f"exec --unit {unit_name} -- sed -i -e '/^RestartSec=.*/d' {SERVICE_DEFAULT_PATH}" + ) + await ops_test.juju(*remove_delay_cmd.split(), check=True) + + # reload the daemon for systemd to reflect changes + reload_cmd = f"exec --unit {unit_name} -- sudo systemctl daemon-reload" + await ops_test.juju(*reload_cmd.split(), check=True) diff --git a/tests/integration/ha/helpers.py b/tests/integration/ha/helpers.py deleted file mode 100644 index d8123d28..00000000 --- a/tests/integration/ha/helpers.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. -# See LICENSE file for licensing details. -import logging -from pathlib import Path -from subprocess import PIPE, check_output -from typing import Any, Dict - -import yaml -from charms.kafka.v0.client import KafkaClient -from kafka.admin import NewTopic - -from snap import KafkaSnap - -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ZK_NAME = "zookeeper" -REL_NAME_ADMIN = "kafka-client-admin" - -logger = logging.getLogger(__name__) - - -def produce_and_check_logs( - model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str -) -> None: - """Produces messages from HN to chosen Kafka topic. - - Args: - model_full_name: the full name of the model - kafka_unit_name: the kafka unit to checks logs on - provider_unit_name: the app to grab credentials from - topic: the desired topic to produce to - - Raises: - KeyError: if missing relation data - AssertionError: if logs aren't found for desired topic - """ - relation_data = get_provider_data( - unit_name=provider_unit_name, - model_full_name=model_full_name, - endpoint="kafka-client-admin", - ) - topic = topic - username = relation_data.get("username", None) - password = relation_data.get("password", None) - servers = relation_data.get("endpoints", "").split(",") - security_protocol = "SASL_PLAINTEXT" - - if not (username and password and servers): - raise KeyError("missing relation data from app charm") - - client = KafkaClient( - servers=servers, - username=username, - password=password, - security_protocol=security_protocol, - ) - topic_config = NewTopic( - name=topic, - num_partitions=5, - replication_factor=1, - ) - - client.create_topic(topic=topic_config) - for i in range(15): - message = f"Message #{i}" - client.produce_message(topic_name=topic, message_content=message) - - check_logs(model_full_name, kafka_unit_name, topic) - - -def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None: - """Checks if messages for a topic have been produced. - - Args: - model_full_name: the full name of the model - kafka_unit_name: the kafka unit to checks logs on - topic: the desired topic to check - """ - logs = check_output( - f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'", - stderr=PIPE, - shell=True, - universal_newlines=True, - ).splitlines() - - logger.debug(f"{logs=}") - - passed = False - for log in logs: - if topic and "index" in log: - passed = True - break - - assert passed, "logs not found" - - -def get_provider_data( - unit_name: str, model_full_name: str, endpoint: str = "kafka-client" -) -> Dict[str, str]: - result = show_unit(unit_name=unit_name, model_full_name=model_full_name) - relations_info = result[unit_name]["relation-info"] - logger.info(f"Relation info: {relations_info}") - provider_relation_data = {} - for info in relations_info: - if info["endpoint"] == endpoint: - logger.info(f"Relation data: {info}") - provider_relation_data["username"] = info["application-data"]["username"] - provider_relation_data["password"] = info["application-data"]["password"] - provider_relation_data["endpoints"] = info["application-data"]["endpoints"] - provider_relation_data["zookeeper-uris"] = info["application-data"]["zookeeper-uris"] - provider_relation_data["tls"] = info["application-data"]["tls"] - if "consumer-group-prefix" in info["application-data"]: - provider_relation_data["consumer-group-prefix"] = info["application-data"][ - "consumer-group-prefix" - ] - provider_relation_data["topic"] = info["application-data"]["topic"] - return provider_relation_data - - -def show_unit(unit_name: str, model_full_name: str) -> Any: - result = check_output( - f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}", - stderr=PIPE, - shell=True, - universal_newlines=True, - ) - - return yaml.safe_load(result) diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index 94c69417..4a60804a 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -4,30 +4,69 @@ import asyncio import logging +import time import pytest -from helpers import ( +from pytest_operator.plugin import OpsTest + +from integration.ha.continuous_writes import ContinuousWrites +from integration.ha.ha_helpers import ( + get_topic_leader, + get_topic_offsets, + patch_restart_delay, + remove_restart_delay, + send_control_signal, +) +from integration.helpers import ( APP_NAME, + DUMMY_NAME, REL_NAME_ADMIN, + TEST_DEFAULT_MESSAGES, ZK_NAME, check_logs, produce_and_check_logs, ) -from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) -DUMMY_NAME = "app" +@pytest.fixture() +async def c_writes(ops_test: OpsTest): + """Creates instance of the ContinuousWrites.""" + app = APP_NAME + return ContinuousWrites(ops_test, app) + + +@pytest.fixture() +async def c_writes_runner(ops_test: OpsTest, c_writes: ContinuousWrites): + """Starts continuous write operations and clears writes at the end of the test.""" + c_writes.start() + yield + c_writes.clear() + logger.info("\n\n\n\nThe writes have been cleared.\n\n\n\n") + + +@pytest.fixture() +async def restart_delay(ops_test: OpsTest): + for unit in ops_test.model.applications[APP_NAME].units: + await patch_restart_delay(ops_test=ops_test, unit_name=unit.name, delay=5) + yield + for unit in ops_test.model.applications[APP_NAME].units: + await remove_restart_delay(ops_test=ops_test, unit_name=unit.name) @pytest.mark.abort_on_fail -async def test_build_and_deploy(ops_test: OpsTest, kafka_charm): +async def test_build_and_deploy(ops_test: OpsTest, kafka_charm, app_charm): await asyncio.gather( ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"), ops_test.model.deploy(ZK_NAME, channel="edge", num_units=1, series="jammy"), + ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), + ) + await ops_test.model.wait_for_idle( + apps=[APP_NAME, ZK_NAME, DUMMY_NAME], + idle_period=30, + timeout=3600, ) - await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30, timeout=3600) assert ops_test.model.applications[APP_NAME].status == "blocked" assert ops_test.model.applications[ZK_NAME].status == "active" @@ -37,8 +76,69 @@ async def test_build_and_deploy(ops_test: OpsTest, kafka_charm): assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[ZK_NAME].status == "active" + await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") + await ops_test.model.wait_for_idle(apps=[APP_NAME, DUMMY_NAME], idle_period=30) + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[DUMMY_NAME].status == "active" + + +async def test_replicated_events(ops_test: OpsTest): + await ops_test.model.applications[APP_NAME].add_units(count=2) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=600, idle_period=120, wait_for_exact_units=3 + ) + logger.info("Producing messages and checking on all units") + produce_and_check_logs( + model_full_name=ops_test.model_full_name, + kafka_unit_name=f"{APP_NAME}/0", + provider_unit_name=f"{DUMMY_NAME}/0", + topic="replicated-topic", + replication_factor=3, + num_partitions=1, + ) + assert await get_topic_offsets( + ops_test=ops_test, topic="replicated-topic", unit_name=f"{APP_NAME}/0" + ) == ["0", str(TEST_DEFAULT_MESSAGES)] + + +async def test_kill_broker_with_topic_leader( + ops_test: OpsTest, + c_writes: ContinuousWrites, + c_writes_runner: ContinuousWrites, + restart_delay, +): + initial_leader_num = await get_topic_leader( + ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME + ) + initial_offsets = await get_topic_offsets( + ops_test=ops_test, + topic=ContinuousWrites.TOPIC_NAME, + unit_name=f"kafka/{initial_leader_num}", + ) + + logger.info( + f"Killing broker of leader for topic '{ContinuousWrites.TOPIC_NAME}': {initial_leader_num}" + ) + await send_control_signal( + ops_test=ops_test, unit_name=f"{APP_NAME}/{initial_leader_num}", kill_code="SIGKILL" + ) + # Give time for the service to restart + time.sleep(10) + + # Check that leader changed + next_leader_num = await get_topic_leader(ops_test=ops_test, topic="replicated-topic") + next_offsets = await get_topic_offsets( + ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME, unit_name=f"kafka/{next_leader_num}" + ) + res = c_writes.stop() + + assert initial_leader_num != next_leader_num + assert int(next_offsets[-1]) > int(initial_offsets[-1]) + assert res.lost_messages == "0" + assert res.count == int(res.last_expected_message) + -async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): +async def test_multi_cluster_isolation(ops_test: OpsTest, kafka_charm): second_kafka_name = f"{APP_NAME}-two" second_zk_name = f"{ZK_NAME}-two" @@ -49,23 +149,18 @@ async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm): ops_test.model.deploy( ZK_NAME, channel="edge", application_name=second_zk_name, num_units=1, series="jammy" ), - ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"), ) await ops_test.model.wait_for_idle( - apps=[second_kafka_name, second_zk_name, DUMMY_NAME], + apps=[second_kafka_name, second_zk_name], idle_period=30, timeout=3600, ) assert ops_test.model.applications[second_kafka_name].status == "blocked" await ops_test.model.add_relation(second_kafka_name, second_zk_name) - - # Relate "app" to the *first* cluster - await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}") - await ops_test.model.wait_for_idle( - apps=[second_kafka_name, second_zk_name, DUMMY_NAME, APP_NAME], + apps=[second_kafka_name, second_zk_name, APP_NAME], idle_period=30, ) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 45c232d9..7f97a757 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -22,9 +22,9 @@ METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) APP_NAME = METADATA["name"] ZK_NAME = "zookeeper" -REL_NAME_ADMIN = "kafka-client-admin" DUMMY_NAME = "app" -TEST_MESSAGE_COUNT = 15 +REL_NAME_ADMIN = "kafka-client-admin" +TEST_DEFAULT_MESSAGES = 15 logger = logging.getLogger(__name__) @@ -122,6 +122,7 @@ def get_kafka_zk_relation_data(unit_name: str, model_full_name: str) -> Dict[str zk_relation_data["uris"] = info["application-data"]["uris"] zk_relation_data["username"] = info["application-data"]["username"] zk_relation_data["tls"] = info["application-data"]["tls"] + break return zk_relation_data @@ -254,7 +255,13 @@ def consume_and_check(model_full_name: str, provider_unit_name: str, topic: str) def produce_and_check_logs( - model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str + model_full_name: str, + kafka_unit_name: str, + provider_unit_name: str, + topic: str, + create_topic: bool = True, + replication_factor: int = 1, + num_partitions: int = 5, ) -> None: """Produces 15 messages from HN to chosen Kafka topic. @@ -263,6 +270,9 @@ def produce_and_check_logs( kafka_unit_name: the kafka unit to checks logs on provider_unit_name: the app to grab credentials from topic: the desired topic to produce to + create_topic: if the topic needs to be created + replication_factor: replication factor of the created topic + num_partitions: number of partitions for the topic Raises: KeyError: if missing relation data @@ -273,29 +283,21 @@ def produce_and_check_logs( model_full_name=model_full_name, endpoint="kafka-client-admin", ) - topic = topic - username = relation_data.get("username", None) - password = relation_data.get("password", None) - servers = relation_data.get("endpoints", "").split(",") - security_protocol = "SASL_PLAINTEXT" - - if not (username and password and servers): - raise KeyError("missing relation data from app charm") - client = KafkaClient( - servers=servers, - username=username, - password=password, - security_protocol=security_protocol, - ) - topic_config = NewTopic( - name=topic, - num_partitions=5, - replication_factor=1, + servers=relation_data["endpoints"].split(","), + username=relation_data["username"], + password=relation_data["password"], + security_protocol="SASL_PLAINTEXT", ) - client.create_topic(topic=topic_config) - for i in range(TEST_MESSAGE_COUNT): + if create_topic: + topic_config = NewTopic( + name=topic, + num_partitions=num_partitions, + replication_factor=replication_factor, + ) + client.create_topic(topic=topic_config) + for i in range(TEST_DEFAULT_MESSAGES): message = f"Message #{i}" client.produce_message(topic_name=topic, message_content=message) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index a045274f..1f4d0c54 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -16,6 +16,7 @@ from .helpers import ( APP_NAME, + DUMMY_NAME, REL_NAME_ADMIN, ZK_NAME, check_socket, @@ -26,7 +27,6 @@ logger = logging.getLogger(__name__) -DUMMY_NAME = "app" SAME_ZK = f"{ZK_NAME}-same" SAME_KAFKA = f"{APP_NAME}-same" diff --git a/tests/integration/test_scaling.py b/tests/integration/test_scaling.py index 70fdf718..5f3b45e0 100644 --- a/tests/integration/test_scaling.py +++ b/tests/integration/test_scaling.py @@ -5,10 +5,8 @@ import asyncio import logging import time -from pathlib import Path import pytest -import yaml from pytest_operator.plugin import OpsTest from literals import CHARM_KEY, ZK @@ -18,9 +16,6 @@ logger = logging.getLogger(__name__) -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -DUMMY_NAME_1 = "app" - @pytest.mark.abort_on_fail async def test_kafka_simple_scale_up(ops_test: OpsTest, kafka_charm): diff --git a/tox.ini b/tox.ini index 5342d8bd..80a4e1b7 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,7 @@ set_env = scaling: TEST_FILE=test_scaling.py password-rotation: TEST_FILE=test_password_rotation.py tls: TEST_FILE=test_tls.py - ha: TEST_FILE=test_ha.py + ha: TEST_FILE=ha/test_ha.py pass_env = PYTHONPATH @@ -85,7 +85,7 @@ commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ -[testenv:integration-{charm,provider,scaling,password-rotation,tls}] +[testenv:integration-{charm,provider,scaling,password-rotation,tls,ha}] description = Run integration tests pass_env = {[testenv]pass_env} @@ -94,13 +94,3 @@ pass_env = commands = poetry install --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE} - -[testenv:integration-ha-{ha}] -description = Run integration tests for high availability -pass_env = - {[testenv]pass_env} - CI - CI_PACKED_CHARMS -commands = - poetry install --with integration - poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ha/{env:TEST_FILE}