Skip to content

Commit

Permalink
[DPE-2367] Add two cluster ha test (#126)
Browse files Browse the repository at this point in the history
* add two cluster ha test
  • Loading branch information
zmraul committed Oct 17, 2023
1 parent a3b4301 commit db2a2b4
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 46 deletions.
43 changes: 43 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,46 @@ jobs:
run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}'
env:
CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }}

integration-test-ha:
strategy:
fail-fast: false
matrix:
tox-environments:
- integration-ha-ha
name: ${{ matrix.tox-environments }}
needs:
- lint
- unit-test
- build
- integration-test
runs-on: ubuntu-latest
timeout-minutes: 120
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Setup operator environment
# TODO: Replace with custom image on self-hosted runner
uses: charmed-kubernetes/actions-operator@main
with:
provider: lxd
bootstrap-options: "--agent-version 2.9.38"
- name: Download packed charm(s)
uses: actions/download-artifact@v3
with:
name: ${{ needs.build.outputs.artifact-name }}
- name: Select tests
id: select-tests
run: |
if [ "${{ github.event_name }}" == "schedule" ]
then
echo Running unstable and stable tests
echo "mark_expression=" >> $GITHUB_OUTPUT
else
echo Skipping unstable tests
echo "mark_expression=not unstable" >> $GITHUB_OUTPUT
fi
- name: Run integration tests
run: tox run -e ${{ matrix.tox-environments }} -- -m '${{ steps.select-tests.outputs.mark_expression }}'
env:
CI_PACKED_CHARMS: ${{ needs.build.outputs.charms }}
129 changes: 129 additions & 0 deletions tests/integration/ha/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import logging
from pathlib import Path
from subprocess import PIPE, check_output
from typing import Any, Dict

import yaml
from charms.kafka.v0.client import KafkaClient
from kafka.admin import NewTopic

from snap import KafkaSnap

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
APP_NAME = METADATA["name"]
ZK_NAME = "zookeeper"
REL_NAME_ADMIN = "kafka-client-admin"

logger = logging.getLogger(__name__)


def produce_and_check_logs(
model_full_name: str, kafka_unit_name: str, provider_unit_name: str, topic: str
) -> None:
"""Produces messages from HN to chosen Kafka topic.
Args:
model_full_name: the full name of the model
kafka_unit_name: the kafka unit to checks logs on
provider_unit_name: the app to grab credentials from
topic: the desired topic to produce to
Raises:
KeyError: if missing relation data
AssertionError: if logs aren't found for desired topic
"""
relation_data = get_provider_data(
unit_name=provider_unit_name,
model_full_name=model_full_name,
endpoint="kafka-client-admin",
)
topic = topic
username = relation_data.get("username", None)
password = relation_data.get("password", None)
servers = relation_data.get("endpoints", "").split(",")
security_protocol = "SASL_PLAINTEXT"

if not (username and password and servers):
raise KeyError("missing relation data from app charm")

client = KafkaClient(
servers=servers,
username=username,
password=password,
security_protocol=security_protocol,
)
topic_config = NewTopic(
name=topic,
num_partitions=5,
replication_factor=1,
)

client.create_topic(topic=topic_config)
for i in range(15):
message = f"Message #{i}"
client.produce_message(topic_name=topic, message_content=message)

check_logs(model_full_name, kafka_unit_name, topic)


def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None:
"""Checks if messages for a topic have been produced.
Args:
model_full_name: the full name of the model
kafka_unit_name: the kafka unit to checks logs on
topic: the desired topic to check
"""
logs = check_output(
f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'",
stderr=PIPE,
shell=True,
universal_newlines=True,
).splitlines()

logger.debug(f"{logs=}")

passed = False
for log in logs:
if topic and "index" in log:
passed = True
break

assert passed, "logs not found"


def get_provider_data(
unit_name: str, model_full_name: str, endpoint: str = "kafka-client"
) -> Dict[str, str]:
result = show_unit(unit_name=unit_name, model_full_name=model_full_name)
relations_info = result[unit_name]["relation-info"]
logger.info(f"Relation info: {relations_info}")
provider_relation_data = {}
for info in relations_info:
if info["endpoint"] == endpoint:
logger.info(f"Relation data: {info}")
provider_relation_data["username"] = info["application-data"]["username"]
provider_relation_data["password"] = info["application-data"]["password"]
provider_relation_data["endpoints"] = info["application-data"]["endpoints"]
provider_relation_data["zookeeper-uris"] = info["application-data"]["zookeeper-uris"]
provider_relation_data["tls"] = info["application-data"]["tls"]
if "consumer-group-prefix" in info["application-data"]:
provider_relation_data["consumer-group-prefix"] = info["application-data"][
"consumer-group-prefix"
]
provider_relation_data["topic"] = info["application-data"]["topic"]
return provider_relation_data


def show_unit(unit_name: str, model_full_name: str) -> Any:
result = check_output(
f"JUJU_MODEL={model_full_name} juju show-unit {unit_name}",
stderr=PIPE,
shell=True,
universal_newlines=True,
)

return yaml.safe_load(result)
85 changes: 85 additions & 0 deletions tests/integration/ha/test_ha.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import asyncio
import logging

import pytest
from helpers import (
APP_NAME,
REL_NAME_ADMIN,
ZK_NAME,
check_logs,
produce_and_check_logs,
)
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)


DUMMY_NAME = "app"


@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest, kafka_charm):
await asyncio.gather(
ops_test.model.deploy(kafka_charm, application_name=APP_NAME, num_units=1, series="jammy"),
ops_test.model.deploy(ZK_NAME, channel="edge", num_units=1, series="jammy"),
)
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30, timeout=3600)
assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK_NAME].status == "active"

await ops_test.model.add_relation(APP_NAME, ZK_NAME)
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[APP_NAME, ZK_NAME], idle_period=30)
assert ops_test.model.applications[APP_NAME].status == "active"
assert ops_test.model.applications[ZK_NAME].status == "active"


async def test_second_cluster(ops_test: OpsTest, kafka_charm, app_charm):
second_kafka_name = f"{APP_NAME}-two"
second_zk_name = f"{ZK_NAME}-two"

await asyncio.gather(
ops_test.model.deploy(
kafka_charm, application_name=second_kafka_name, num_units=1, series="jammy"
),
ops_test.model.deploy(
ZK_NAME, channel="edge", application_name=second_zk_name, num_units=1, series="jammy"
),
ops_test.model.deploy(app_charm, application_name=DUMMY_NAME, num_units=1, series="jammy"),
)

await ops_test.model.wait_for_idle(
apps=[second_kafka_name, second_zk_name, DUMMY_NAME],
idle_period=30,
timeout=3600,
)
assert ops_test.model.applications[second_kafka_name].status == "blocked"

await ops_test.model.add_relation(second_kafka_name, second_zk_name)

# Relate "app" to the *first* cluster
await ops_test.model.add_relation(APP_NAME, f"{DUMMY_NAME}:{REL_NAME_ADMIN}")

await ops_test.model.wait_for_idle(
apps=[second_kafka_name, second_zk_name, DUMMY_NAME, APP_NAME],
idle_period=30,
)

produce_and_check_logs(
model_full_name=ops_test.model_full_name,
kafka_unit_name=f"{APP_NAME}/0",
provider_unit_name=f"{DUMMY_NAME}/0",
topic="hot-topic",
)

# Check that logs are not found on the second cluster
with pytest.raises(AssertionError):
check_logs(
model_full_name=ops_test.model_full_name,
kafka_unit_name=f"{second_kafka_name}/0",
topic="hot-topic",
)
11 changes: 11 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,17 @@ def produce_and_check_logs(
message = f"Message #{i}"
client.produce_message(topic_name=topic, message_content=message)

check_logs(model_full_name, kafka_unit_name, topic)


def check_logs(model_full_name: str, kafka_unit_name: str, topic: str) -> None:
"""Checks if messages for a topic have been produced.
Args:
model_full_name: the full name of the model
kafka_unit_name: the kafka unit to checks logs on
topic: the desired topic to check
"""
logs = check_output(
f"JUJU_MODEL={model_full_name} juju ssh {kafka_unit_name} sudo -i 'find {KafkaSnap.DATA_PATH}/data'",
stderr=PIPE,
Expand Down
59 changes: 13 additions & 46 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ set_env =
PYTHONPATH = {tox_root}/lib:{[vars]src_path}
PYTHONBREAKPOINT=ipdb.set_trace
PY_COLORS=1
charm: TEST_FILE=test_charm.py
provider: TEST_FILE=test_provider.py
scaling: TEST_FILE=test_scaling.py
password-rotation: TEST_FILE=test_password_rotation.py
tls: TEST_FILE=test_tls.py
ha: TEST_FILE=test_ha.py

pass_env =
PYTHONPATH
CHARM_BUILD_DIR
Expand Down Expand Up @@ -78,62 +85,22 @@ commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/

[testenv:integration-charm]
description = Run base integration tests
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_charm.py

[testenv:integration-provider]
description = Run integration tests for provider
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_provider.py

[testenv:integration-scaling]
description = Run scaling integration tests
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_scaling.py

[testenv:integration-password-rotation]
description = Run password rotation integration tests
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_password_rotation.py

[testenv:integration-tls]
description = Run TLS integration tests
[testenv:integration-{charm,provider,scaling,password-rotation,tls}]
description = Run integration tests
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_tls.py
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/{env:TEST_FILE}

[testenv:integration-upgrade]
description = Run in place upgrade integration tests
[testenv:integration-ha-{ha}]
description = Run integration tests for high availability
pass_env =
{[testenv]pass_env}
CI
CI_PACKED_CHARMS
commands =
poetry install --with integration
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_upgrade.py
poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/ha/{env:TEST_FILE}

0 comments on commit db2a2b4

Please sign in to comment.