From 3f022453e75e7b4aa1b1b1b54f02ac2ff32469c7 Mon Sep 17 00:00:00 2001 From: Ali Kelkawi Date: Thu, 13 Jun 2024 14:57:36 +0300 Subject: [PATCH] test sample airbyte connection --- src/charm.py | 5 +- src/charm_helpers.py | 7 +- tests/integration/helpers.py | 147 +++++++++++++++++++++++++++++++- tests/integration/test_charm.py | 49 ++++++++++- tests/unit/test_charm.py | 3 + 5 files changed, 205 insertions(+), 6 deletions(-) diff --git a/src/charm.py b/src/charm.py index e6c9c02..1f19868 100755 --- a/src/charm.py +++ b/src/charm.py @@ -17,6 +17,7 @@ from charm_helpers import create_env from literals import ( + AIRBYTE_API_PORT, BUCKET_CONFIGS, CONNECTOR_BUILDER_SERVER_API_PORT, CONTAINERS, @@ -282,8 +283,7 @@ def _update(self, event): self.unit.status = BlockedStatus(f"failed to create buckets: {str(e)}") return - env = create_env(self.model.name, self.app.name, self.config, self._state) - self.model.unit.set_ports(INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT) + self.model.unit.set_ports(AIRBYTE_API_PORT, INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT) for container_name in list(CONTAINERS.keys()): container = self.unit.get_container(container_name) @@ -303,6 +303,7 @@ def _update(self, event): permissions=0o755, ) + env = create_env(self.model.name, self.app.name, container_name, self.config, self._state) pebble_layer = get_pebble_layer(container_name, env) container.add_layer(container_name, pebble_layer, combine=True) container.replan() diff --git a/src/charm_helpers.py b/src/charm_helpers.py index a4bb6d4..f18a3da 100644 --- a/src/charm_helpers.py +++ b/src/charm_helpers.py @@ -15,12 +15,13 @@ from structured_config import StorageType -def create_env(model_name, app_name, config, state): +def create_env(model_name, app_name, container_name, config, state): """Create set of environment variables for application. Args: model_name: Name of the juju model. app_name: Name of the application. + container_name: Name of Airbyte container. config: Charm config. state: Charm state. @@ -70,6 +71,10 @@ def create_env(model_name, app_name, config, state): "AIRBYTE_URL": config["webapp-url"], } + # https://github.com/airbytehq/airbyte/issues/29506#issuecomment-1775148609 + if container_name == "airbyte-api-server": + env.update({"INTERNAL_API_HOST": f"http://{app_name}:{INTERNAL_API_PORT}"}) + if config["storage-type"].value == StorageType.minio and state.minio: minio_endpoint = construct_svc_endpoint( state.minio["service"], diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index b0ed688..0bb4e9f 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -2,11 +2,12 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Temporal charm integration test helpers.""" +"""Charm integration test helpers.""" import logging from pathlib import Path +import requests import yaml from pytest_operator.plugin import OpsTest from temporal_client.activities import say_hello @@ -22,6 +23,9 @@ APP_NAME_TEMPORAL_ADMIN = "temporal-admin-k8s" APP_NAME_TEMPORAL_UI = "temporal-ui-k8s" +GET_HEADERS = {"accept": "application/json"} +POST_HEADERS = {"accept": "application/json", "content-type": "application/json"} + def get_airbyte_charm_resources(): return { @@ -136,3 +140,144 @@ async def perform_airbyte_integrations(ops_test: OpsTest): ) assert ops_test.model.applications[APP_NAME_AIRBYTE_SERVER].units[0].workload_status == "active" + + +def get_airbyte_workspace_id(api_url): + """Get Airbyte default workspace ID. + + Args: + api_url: Airbyte API base URL. + """ + url = f"{api_url}/v1/workspaces?includeDeleted=false&limit=20&offset=0" + logger.info("fetching Airbyte workspace ID") + response = requests.get(url, headers=GET_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("data")[0]["workspaceId"] + + +def create_airbyte_source(api_url, workspace_id): + """Create Airbyte sample source. + + Args: + api_url: Airbyte API base URL. + workspace_id: default workspace ID. + """ + url = f"{api_url}/v1/sources" + payload = { + "configuration": {"sourceType": "pokeapi", "pokemon_name": "pikachu"}, + "name": "API Test", + "workspaceId": workspace_id, + } + + logger.info("creating Airbyte source") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("sourceId") + + +def create_airbyte_destination(api_url, workspace_id, password): + """Create Airbyte sample destination. + + Args: + api_url: Airbyte API base URL. + workspace_id: default workspace ID. + password: database password. + """ + url = f"{api_url}/v1/destinations" + payload = { + "configuration": { + "destinationType": "postgres", + "port": 5432, + "schema": "pokeapi", + "ssl_mode": {"mode": "disable"}, + "tunnel_method": {"tunnel_method": "NO_TUNNEL"}, + "host": "http://postgresql-k8s-primary", + "database": "airbyte-k8s_db", + "username": "operator", + "password": password, + }, + "workspaceId": workspace_id, + "name": "Postgres", + } + + logger.info("creating Airbyte destination") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("destinationId") + + +def create_airbyte_connection(api_url, source_id, destination_id): + """Create Airbyte connection. + + Args: + api_url: Airbyte API base URL. + source_id: Airbyte source ID. + destination_id: Airbyte destination ID. + """ + url = f"{api_url}/v1/connections" + payload = { + "schedule": {"scheduleType": "manual"}, + "dataResidency": "auto", + "namespaceDefinition": "destination", + "namespaceFormat": None, + "nonBreakingSchemaUpdatesBehavior": "ignore", + "sourceId": source_id, + "destinationId": destination_id, + } + + logger.info("creating Airbyte connection") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("connectionId") + + +def trigger_airbyte_connection(api_url, connection_id): + """Trigger Airbyte connection. + + Args: + api_url: Airbyte API base URL. + connection_id: Airbyte connection ID. + """ + url = f"{api_url}/v1/jobs" + payload = {"jobType": "sync", "connectionId": connection_id} + logger.info("triggering Airbyte connection") + response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300) + + assert response.status_code == 200 + return response.json().get("jobId") + + +def check_airbyte_job_status(api_url, job_id): + """Get Airbyte sync job status. + + Args: + api_url: Airbyte API base URL. + job_id: Sync job ID. + """ + url = f"{api_url}/v1/jobs/{job_id}" + logger.info("fetching Airbyte job status") + response = requests.get(url, headers=GET_HEADERS, timeout=120) + logger.info(response.json()) + + assert response.status_code == 200 + return response.json().get("status") + + +async def get_db_password(ops_test): + """Get PostgreSQL DB admin password. + + Args: + ops_test: PyTest object. + """ + postgresql_unit = ops_test.model.applications["postgresql-k8s"].units[0] + for i in range(10): + action = await postgresql_unit.run_action("get-password") + result = await action.wait() + logger.info(f"attempt {i} -> action result {result.status} {result.results}") + if "password" in result.results: + return result.results["password"] + time.sleep(2) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 78be12d..853ccd0 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -3,11 +3,22 @@ # See LICENSE file for licensing details. import logging +import time import pytest import requests from conftest import deploy # noqa: F401, pylint: disable=W0611 -from helpers import APP_NAME_AIRBYTE_SERVER, get_unit_url +from helpers import ( + APP_NAME_AIRBYTE_SERVER, + check_airbyte_job_status, + create_airbyte_connection, + create_airbyte_destination, + create_airbyte_source, + get_airbyte_workspace_id, + get_db_password, + get_unit_url, + trigger_airbyte_connection, +) from pytest_operator.plugin import OpsTest logger = logging.getLogger(__name__) @@ -23,6 +34,40 @@ async def test_deployment(self, ops_test: OpsTest): logger.info("curling app address: %s", url) response = requests.get(f"{url}/api/v1/health", timeout=300) - print(response.json()) + assert response.status_code == 200 assert response.json().get("available") + + async def test_sync_job(self, ops_test: OpsTest): + # Create connection + api_url = await get_unit_url(ops_test, application=APP_NAME_AIRBYTE_SERVER, unit=0, port=8006) + logger.info("curling app address: %s", api_url) + workspace_id = get_airbyte_workspace_id(api_url) + db_password = await get_db_password(ops_test) + + # Create Source + source_id = create_airbyte_source(api_url, workspace_id) + + # Create destination + destination_id = create_airbyte_destination(api_url, workspace_id, db_password) + + # Create connection + connection_id = create_airbyte_connection(api_url, source_id, destination_id) + + # Trigger sync job + job_id = trigger_airbyte_connection(api_url, connection_id) + + # Wait until job is successful + job_successful = False + for i in range(20): + logger.info(f"attempt {i} to get job status") + status = check_airbyte_job_status(api_url, job_id) + + if status == "succeeded": + job_successful = True + break + + logger.info(f"attempt {i}: job not yet successful") + time.sleep(20) + + assert job_successful diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7368e18..ca13a03 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -375,6 +375,9 @@ def create_plan(container_name, storage_type): }, } + if container_name == "airbyte-api-server": + want_plan["services"][container_name]["environment"].update({"INTERNAL_API_HOST": f"http://airbyte-k8s:8001"}) + if storage_type == StorageType.minio: want_plan["services"][container_name]["environment"].update( {