Skip to content

Commit

Permalink
test sample airbyte connection
Browse files Browse the repository at this point in the history
  • Loading branch information
kelkawi-a committed Jun 13, 2024
1 parent e94b94f commit eb35a9e
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 6 deletions.
5 changes: 3 additions & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from charm_helpers import create_env
from literals import (
AIRBYTE_API_PORT,
BUCKET_CONFIGS,
CONNECTOR_BUILDER_SERVER_API_PORT,
CONTAINERS,
Expand Down Expand Up @@ -282,8 +283,7 @@ def _update(self, event):
self.unit.status = BlockedStatus(f"failed to create buckets: {str(e)}")
return

env = create_env(self.model.name, self.app.name, self.config, self._state)
self.model.unit.set_ports(INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT)
self.model.unit.set_ports(AIRBYTE_API_PORT, INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT)

for container_name in list(CONTAINERS.keys()):
container = self.unit.get_container(container_name)
Expand All @@ -303,6 +303,7 @@ def _update(self, event):
permissions=0o755,
)

env = create_env(self.model.name, self.app.name, container_name, self.config, self._state)
pebble_layer = get_pebble_layer(container_name, env)
container.add_layer(container_name, pebble_layer, combine=True)
container.replan()
Expand Down
7 changes: 6 additions & 1 deletion src/charm_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
from structured_config import StorageType


def create_env(model_name, app_name, config, state):
def create_env(model_name, app_name, container_name, config, state):
"""Create set of environment variables for application.
Args:
model_name: Name of the juju model.
app_name: Name of the application.
container_name: Name of Airbyte container.
config: Charm config.
state: Charm state.
Expand Down Expand Up @@ -70,6 +71,10 @@ def create_env(model_name, app_name, config, state):
"AIRBYTE_URL": config["webapp-url"],
}

# https://github.com/airbytehq/airbyte/issues/29506#issuecomment-1775148609
if container_name == "airbyte-api-server":
env.update({"INTERNAL_API_HOST": f"http://{app_name}:{INTERNAL_API_PORT}"})

if config["storage-type"].value == StorageType.minio and state.minio:
minio_endpoint = construct_svc_endpoint(
state.minio["service"],
Expand Down
140 changes: 139 additions & 1 deletion tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Temporal charm integration test helpers."""
"""Charm integration test helpers."""

import logging
from pathlib import Path

import requests
import yaml
from pytest_operator.plugin import OpsTest
from temporal_client.activities import say_hello
Expand All @@ -22,6 +23,9 @@
APP_NAME_TEMPORAL_ADMIN = "temporal-admin-k8s"
APP_NAME_TEMPORAL_UI = "temporal-ui-k8s"

GET_HEADERS = {"accept": "application/json"}
POST_HEADERS = {"accept": "application/json", "content-type": "application/json"}


def get_airbyte_charm_resources():
return {
Expand Down Expand Up @@ -136,3 +140,137 @@ async def perform_airbyte_integrations(ops_test: OpsTest):
)

assert ops_test.model.applications[APP_NAME_AIRBYTE_SERVER].units[0].workload_status == "active"


def get_airbyte_workspace_id(api_url):
"""Get Airbyte default workspace ID.
Args:
api_url: Airbyte API base URL.
"""
url = f"{api_url}/v1/workspaces?includeDeleted=false&limit=20&offset=0"
response = requests.get(url, headers=GET_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("data")[0]["workspaceId"]


def create_airbyte_source(api_url, workspace_id):
"""Create Airbyte sample source.
Args:
api_url: Airbyte API base URL.
workspace_id: default workspace ID.
"""
url = f"{api_url}/v1/sources"
payload = {
"configuration": {"sourceType": "pokeapi", "pokemon_name": "pikachu"},
"name": "API Test",
"workspaceId": workspace_id,
}

response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("sourceId")


def create_airbyte_destination(api_url, workspace_id, password):
"""Create Airbyte sample destination.
Args:
api_url: Airbyte API base URL.
workspace_id: default workspace ID.
password: database password.
"""
url = f"{api_url}/v1/destinations"
payload = {
"configuration": {
"destinationType": "postgres",
"port": 5432,
"schema": "pokeapi",
"ssl_mode": {"mode": "disable"},
"tunnel_method": {"tunnel_method": "NO_TUNNEL"},
"host": "http://postgresql-k8s-primary",
"database": "airbyte-k8s_db",
"username": "operator",
"password": password,
},
"workspaceId": workspace_id,
"name": "Postgres",
}

response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("destinationId")


def create_airbyte_connection(api_url, source_id, destination_id):
"""Create Airbyte connection.
Args:
api_url: Airbyte API base URL.
source_id: Airbyte source ID.
destination_id: Airbyte destination ID.
"""
url = f"{api_url}/v1/connections"
payload = {
"schedule": {"scheduleType": "manual"},
"dataResidency": "auto",
"namespaceDefinition": "destination",
"namespaceFormat": None,
"nonBreakingSchemaUpdatesBehavior": "ignore",
"sourceId": source_id,
"destinationId": destination_id,
}

response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("connectionId")


def trigger_airbyte_connection(api_url, connection_id):
"""Trigger Airbyte connection.
Args:
api_url: Airbyte API base URL.
connection_id: Airbyte connection ID.
"""
url = f"{api_url}/v1/jobs"
payload = {"jobType": "sync", "connectionId": connection_id}
response = requests.post(url, json=payload, headers=POST_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("jobId")


def check_airbyte_job_status(api_url, job_id):
"""Get Airbyte sync job status.
Args:
api_url: Airbyte API base URL.
job_id: Sync job ID.
"""
url = f"{api_url}/v1/jobs/{job_id}"
response = requests.get(url, headers=GET_HEADERS, timeout=300)

assert response.status_code == 200
return response.json().get("status")


async def get_db_password(ops_test):
"""Get PostgreSQL DB admin password.
Args:
ops_test: PyTest object.
"""
postgresql_unit = ops_test.model.applications["postgresql-k8s"].units[0]
for i in range(10):
action = await postgresql_unit.run_action("get-password")
result = await action.wait()
logger.info(f"attempt {i} -> action result {result.status} {result.results}")
if "password" in result.results:
return result.results["password"]
time.sleep(2)
46 changes: 44 additions & 2 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@
# See LICENSE file for licensing details.

import logging
import time

import pytest
import requests
from conftest import deploy # noqa: F401, pylint: disable=W0611
from helpers import APP_NAME_AIRBYTE_SERVER, get_unit_url
from helpers import (
APP_NAME_AIRBYTE_SERVER,
check_airbyte_job_status,
create_airbyte_connection,
create_airbyte_destination,
create_airbyte_source,
get_airbyte_workspace_id,
get_db_password,
get_unit_url,
trigger_airbyte_connection,
)
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)
Expand All @@ -23,6 +34,37 @@ async def test_deployment(self, ops_test: OpsTest):
logger.info("curling app address: %s", url)

response = requests.get(f"{url}/api/v1/health", timeout=300)
print(response.json())

assert response.status_code == 200
assert response.json().get("available")

async def test_sync_job(self, ops_test: OpsTest):
# Create connection
api_url = await get_unit_url(ops_test, application=APP_NAME_AIRBYTE_SERVER, unit=0, port=8006)
logger.info("curling app address: %s", api_url)
workspace_id = get_airbyte_workspace_id(api_url)
db_password = await get_db_password(ops_test)

# Create Source
source_id = create_airbyte_source(api_url, workspace_id)

# Create destination
destination_id = create_airbyte_destination(api_url, workspace_id, db_password)

# Create connection
connection_id = create_airbyte_connection(api_url, source_id, destination_id)

# Trigger sync job
job_id = trigger_airbyte_connection(api_url, connection_id)

# Wait until job is successful
job_successful = False
for _ in range(20):
status = check_airbyte_job_status(api_url, job_id)

if status == "succeeded":
job_successful = True
break
time.sleep(20)

assert job_successful
3 changes: 3 additions & 0 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ def create_plan(container_name, storage_type):
},
}

if container_name == "airbyte-api-server":
want_plan["services"][container_name]["environment"].update({"INTERNAL_API_HOST": f"http://airbyte-k8s:8001"})

if storage_type == StorageType.minio:
want_plan["services"][container_name]["environment"].update(
{
Expand Down

0 comments on commit eb35a9e

Please sign in to comment.