Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2366][DPE-2365] HA tests #129

Merged
merged 22 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
fail-fast: false
matrix:
tox-environments:
- integration-ha-ha
- integration-ha
name: ${{ matrix.tox-environments }}
needs:
- lint
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ __pycache__/
*.py[cod]
.vscode
.idea
.python-version
.python-version
last_written_value
1 change: 1 addition & 0 deletions last_written_value
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
20052,0
30 changes: 25 additions & 5 deletions lib/charms/kafka/v0/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent):

from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.errors import KafkaError

logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
Expand All @@ -93,7 +94,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1
LIBPATCH = 2


class KafkaClient:
Expand Down Expand Up @@ -126,6 +127,7 @@ def __init__(
self.mtls = self.security_protocol == "SSL"

self._subscription = None
self._consumer_group_prefix = None

@cached_property
def _admin_client(self) -> KafkaAdminClient:
Expand Down Expand Up @@ -174,7 +176,7 @@ def _consumer_client(self) -> KafkaConsumer:
ssl_certfile=self.certfile_path if self.ssl else None,
ssl_keyfile=self.keyfile_path if self.mtls else None,
api_version=KafkaClient.API_VERSION if self.mtls else None,
group_id=self._consumer_group_prefix or None,
group_id=self._consumer_group_prefix,
enable_auto_commit=True,
auto_offset_reset="earliest",
consumer_timeout_ms=15000,
Expand All @@ -188,10 +190,17 @@ def create_topic(self, topic: NewTopic) -> None:

Args:
topic: the configuration of the topic to create

"""
self._admin_client.create_topics(new_topics=[topic], validate_only=False)

def delete_topics(self, topics: list[str]) -> None:
"""Deletes a topic.

Args:
topics: list of topics to delete
"""
self._admin_client.delete_topics(topics=topics)

def subscribe_to_topic(
self, topic_name: str, consumer_group_prefix: Optional[str] = None
) -> None:
Expand Down Expand Up @@ -240,8 +249,19 @@ def produce_message(self, topic_name: str, message_content: str) -> None:
"""
item_content = f"Message #{message_content}"
future = self._producer_client.send(topic_name, str.encode(item_content))
future.get(timeout=60)
logger.info(f"Message published to topic={topic_name}, message content: {item_content}")
try:
future.get(timeout=60)
logger.info(
f"Message published to topic={topic_name}, message content: {item_content}"
)
except KafkaError as e:
logger.error(f"Error producing message {message_content} to topic {topic_name}: {e}")

def close(self) -> None:
"""Close the connection to the client."""
self._admin_client.close()
self._producer_client.close()
self._consumer_client.close()


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/ha/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
220 changes: 220 additions & 0 deletions tests/integration/ha/continuous_writes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import asyncio
import logging
import os
from multiprocessing import Event, Process, Queue
from types import SimpleNamespace

from charms.kafka.v0.client import KafkaClient
from kafka.admin import NewTopic
from kafka.errors import KafkaTimeoutError
from pytest_operator.plugin import OpsTest
from tenacity import (
RetryError,
Retrying,
retry,
stop_after_attempt,
stop_after_delay,
wait_fixed,
wait_random,
)

from integration.helpers import DUMMY_NAME, get_provider_data

logger = logging.getLogger(__name__)


class ContinuousWrites:
"""Utility class for managing continuous writes."""

TOPIC_NAME = "ha-test-topic"
LAST_WRITTEN_VAL_PATH = "last_written_value"

def __init__(self, ops_test: OpsTest, app: str):
self._ops_test = ops_test
self._app = app
self._is_stopped = True
self._event = None
self._queue = None
self._process = None

@retry(
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
def start(self) -> None:
"""Run continuous writes in the background."""
if not self._is_stopped:
self.clear()

# create topic
self._create_replicated_topic()

# create process
self._create_process()

# pass the model full name to the process once it starts
self.update()

# start writes
self._process.start()

def update(self):
"""Update cluster related conf. Useful in cases such as scaling, pwd change etc."""
self._queue.put(SimpleNamespace(model_full_name=self._ops_test.model_full_name))

@retry(
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
def clear(self) -> None:
"""Stop writes and delete the topic."""
if not self._is_stopped:
self.stop()

client = self._client()
try:
client.delete_topics(topics=[self.TOPIC_NAME])
finally:
client.close()

def consumed_messages(self) -> list | None:
"""Consume the messages in the topic."""
client = self._client()
try:
for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(5)):
with attempt:
client.subscribe_to_topic(topic_name=self.TOPIC_NAME)
# FIXME: loading whole list of consumed messages into memory might not be the best idea
return list(client.messages())
except RetryError:
return []
finally:
client.close()

def _create_replicated_topic(self):
"""Create topic with replication_factor = 3."""
client = self._client()
topic_config = NewTopic(
name=self.TOPIC_NAME,
num_partitions=1,
replication_factor=3,
)
client.create_topic(topic=topic_config)

@retry(
wait=wait_fixed(wait=5) + wait_random(0, 5),
stop=stop_after_attempt(5),
)
def stop(self) -> SimpleNamespace:
"""Stop the continuous writes process and return max inserted ID."""
if not self._is_stopped:
self._stop_process()

result = SimpleNamespace()

# messages count
consumed_messages = self.consumed_messages()
result.count = len(consumed_messages)
result.last_message = consumed_messages[-1]

# last expected message stored on disk
try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)):
with attempt:
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "r") as f:
result.last_expected_message, result.lost_messages = (
f.read().rstrip().split(",", maxsplit=2)
)
except RetryError:
result.last_expected_message = result.lost_messages = -1

return result

def _create_process(self):
self._is_stopped = False
self._event = Event()
self._queue = Queue()
self._process = Process(
target=ContinuousWrites._run_async,
name="continuous_writes",
args=(self._event, self._queue, 0),
)

def _stop_process(self):
self._event.set()
self._process.join()
self._queue.close()
self._is_stopped = True

def _client(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could perhaps be externalized in a helper, so it can be called from the _run._client() function without having to duplicate it

"""Build a Kafka client."""
relation_data = get_provider_data(
unit_name=f"{DUMMY_NAME}/0",
model_full_name=self._ops_test.model_full_name,
endpoint="kafka-client-admin",
)
return KafkaClient(
servers=relation_data["endpoints"].split(","),
username=relation_data["username"],
password=relation_data["password"],
security_protocol="SASL_PLAINTEXT",
)

@staticmethod
async def _run(event: Event, data_queue: Queue, starting_number: int) -> None: # noqa: C901
"""Continuous writing."""
initial_data = data_queue.get(True)

def _client():
"""Build a Kafka client."""
relation_data = get_provider_data(
unit_name=f"{DUMMY_NAME}/0",
model_full_name=initial_data.model_full_name,
endpoint="kafka-client-admin",
)
return KafkaClient(
servers=relation_data["endpoints"].split(","),
username=relation_data["username"],
password=relation_data["password"],
security_protocol="SASL_PLAINTEXT",
)

write_value = starting_number
lost_messages = 0
client = _client()

while True:
if not data_queue.empty(): # currently evaluates to false as we don't make updates
data_queue.get(False)
client.close()
client = _client()

try:
client.produce_message(
topic_name=ContinuousWrites.TOPIC_NAME, message_content=str(write_value)
)
except KafkaTimeoutError:
client.close()
client = _client()
lost_messages += 1
finally:
# process termination requested
if event.is_set():
break

write_value += 1

# write last expected written value on disk
with open(ContinuousWrites.LAST_WRITTEN_VAL_PATH, "w") as f:
f.write(f"{str(write_value)},{str(lost_messages)}")
os.fsync(f)

client.close()

@staticmethod
def _run_async(event: Event, data_queue: Queue, starting_number: int):
"""Run async code."""
asyncio.run(ContinuousWrites._run(event, data_queue, starting_number))
Loading