Skip to content

Commit

Permalink
Address charm listing review comments (#16)
Browse files Browse the repository at this point in the history
* address charm listing review comments
  • Loading branch information
kelkawi-a authored Jul 19, 2024
1 parent d8234bc commit 01bd849
Show file tree
Hide file tree
Showing 16 changed files with 119 additions and 59 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/integration_test.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
name: Integration tests

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

on:
pull_request:

Expand All @@ -10,7 +14,7 @@ jobs:
with:
channel: 1.28-strict/stable
modules: '["test_charm.py"]'
juju-channel: 3.1/stable
juju-channel: 3.4/stable
self-hosted-runner: true
self-hosted-runner-label: "xlarge"
microk8s-addons: "dns ingress rbac storage metallb:10.15.119.2-10.15.119.4 registry"
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ workflows are as follows:
library checks which run on every pull request.
- `integration_test.yaml`: This runs the suite of integration tests included
with the charm and runs on every pull request.
- `test_and_publish_charm.yaml`: This runs either by manual dispatch or on every
- `publish_charm.yaml`: This runs either by manual dispatch or on every
push to the main branch or a special track/\*\* branch. Once a PR is merged
with one of these branches, this workflow runs to ensure the tests have passed
before building the charm and publishing the new version to the edge channel
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[![Charmhub Badge](https://charmhub.io/airbyte-k8s/badge.svg)](https://charmhub.io/airbyte-k8s)
[![Release Edge](https://github.com/canonical/airbyte-k8s-operator/actions/workflows/publish_charm.yaml/badge.svg)](https://github.com/canonical/airbyte-k8s-operator/actions/workflows/publish_charm.yaml)

# Airbyte K8s Operator

This is the Kubernetes Python Operator for [Airbyte](https://airbyte.com/).
Expand Down Expand Up @@ -39,6 +42,10 @@ juju deploy postgresql-k8s --channel 14/edge --trust
juju relate airbyte-k8s postgresql-k8s
```

Note: The `--trust` is required when deploying charmed Airbyte k8s to enable it
to create k8s pods for sync jobs. The charm contains a script which periodically
cleans up these resources once they complete their function.

### Deploying Minio

Airbyte uses Minio for storing state and relevant logs. The Airbyte and Minio
Expand All @@ -57,7 +64,7 @@ The Temporal operators can be deployed and connected to each other using the
Juju command line as follows:

```bash
juju deploy temporal-k8s
juju deploy temporal-k8s --config num-history-shards=512
juju deploy temporal-admin-k8s
juju relate temporal-k8s:db postgresql-k8s:database
juju relate temporal-k8s:visibility postgresql-k8s:database
Expand Down
10 changes: 9 additions & 1 deletion charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ description: |
links:
documentation: https://discourse.charmhub.io/t/charmed-airbyte-k8s-overview/14530
source:
- https://github.com/canonical/airbyte-k8s-operator
issues:
- https://github.com/canonical/airbyte-k8s-operator/issues

# (Required for 'charm' type)
bases:
Expand All @@ -25,10 +29,13 @@ bases:
- name: ubuntu
channel: "22.04"

assumes:
- juju >= 3.1
- k8s-api

# Metadata
peers:
peer:
airbyte-peer:
interface: airbyte

requires:
Expand All @@ -38,6 +45,7 @@ requires:

object-storage:
interface: object-storage
limit: 1
schema:
v1:
provides:
Expand Down
18 changes: 9 additions & 9 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
AIRBYTE_VERSION,
BUCKET_CONFIGS,
CONNECTOR_BUILDER_SERVER_API_PORT,
CONTAINERS,
CONTAINER_HEALTH_CHECK_MAP,
INTERNAL_API_PORT,
LOGS_BUCKET_CONFIG,
REQUIRED_S3_PARAMETERS,
Expand Down Expand Up @@ -63,7 +63,7 @@ def get_pebble_layer(application_name, context):
},
}

application_info = CONTAINERS[application_name]
application_info = CONTAINER_HEALTH_CHECK_MAP[application_name]
if application_info is not None:
pebble_layer["services"][application_name].update(
{
Expand Down Expand Up @@ -104,10 +104,10 @@ def __init__(self, *args):
args: Ignore.
"""
super().__init__(*args)
self._state = State(self.app, lambda: self.model.get_relation("peer"))
self._state = State(self.app, lambda: self.model.get_relation("airbyte-peer"))

self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.peer_relation_changed, self._on_peer_relation_changed)
self.framework.observe(self.on.airbyte_peer_relation_changed, self._on_peer_relation_changed)
self.framework.observe(self.on.update_status, self._on_update_status)

# Handle postgresql relation.
Expand All @@ -123,7 +123,7 @@ def __init__(self, *args):
# Handle UI relation
self.airbyte_ui = AirbyteServerProvider(self)

for container_name in list(CONTAINERS.keys()):
for container_name in CONTAINER_HEALTH_CHECK_MAP:
self.framework.observe(self.on[container_name].pebble_ready, self._on_pebble_ready)

@log_event_handler(logger)
Expand Down Expand Up @@ -157,8 +157,8 @@ def _on_update_status(self, event):
return

all_valid_plans = True
for container_name in list(CONTAINERS.keys()):
if not CONTAINERS[container_name]:
for container_name, settings in CONTAINER_HEALTH_CHECK_MAP.items():
if not settings:
continue

container = self.unit.get_container(container_name)
Expand All @@ -167,7 +167,6 @@ def _on_update_status(self, event):
if not valid_pebble_plan:
logger.debug(f"failed to validate pebble plan for {container_name}, attempting creation again")
all_valid_plans = False
self._update(event)
continue

logger.info(f"performing up check for {container_name}")
Expand All @@ -178,6 +177,7 @@ def _on_update_status(self, event):
return

if not all_valid_plans:
self._update(event)
return

self.unit.set_workload_version(f"v{AIRBYTE_VERSION}")
Expand Down Expand Up @@ -287,7 +287,7 @@ def _update(self, event):

self.model.unit.set_ports(AIRBYTE_API_PORT, INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT)

for container_name in list(CONTAINERS.keys()):
for container_name in CONTAINER_HEALTH_CHECK_MAP:
container = self.unit.get_container(container_name)
if not container.can_connect():
event.defer()
Expand Down
4 changes: 2 additions & 2 deletions src/literals.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Literals."""
"""Charm literals."""

CONNECTOR_BUILDER_SERVER_API_PORT = 80
INTERNAL_API_PORT = 8001
AIRBYTE_API_PORT = 8006
WORKLOAD_API_PORT = 8007

CONTAINERS = {
CONTAINER_HEALTH_CHECK_MAP = {
"airbyte-api-server": {
"port": AIRBYTE_API_PORT,
"health_endpoint": "/health",
Expand Down
3 changes: 2 additions & 1 deletion src/relations/minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def _get_interfaces(self):
charm = self.charm
# Hack: get_interfaces checks for peer relation which does not exist under
# requires/provides list in charmcraft.yaml
del charm.meta.relations["peer"]
if "airbyte-peer" in charm.meta.relations:
del charm.meta.relations["airbyte-peer"]
interfaces = get_interfaces(charm)
except NoVersionsListed as err:
raise ErrorWithStatus(err, WaitingStatus) from err
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Fixtures for jenkins-k8s charm tests."""
"""Fixtures for charm tests."""

import pytest

Expand Down
3 changes: 1 addition & 2 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

"""Charm integration test config."""

import asyncio
import logging

import pytest_asyncio
Expand Down Expand Up @@ -37,7 +36,7 @@ async def deploy(ops_test: OpsTest):
config={"num-history-shards": 4},
)
await ops_test.model.deploy(APP_NAME_TEMPORAL_ADMIN, channel="edge")
await ops_test.model.deploy("postgresql-k8s", channel="14/edge", trust=True)
await ops_test.model.deploy("postgresql-k8s", channel="14/stable", trust=True)
await ops_test.model.deploy("minio", channel="edge")

async with ops_test.fast_forward():
Expand Down
39 changes: 34 additions & 5 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@


def get_airbyte_charm_resources():
"""Fetch charm resources from charmcraft.yaml.
Returns:
Charm resources.
"""
return {
"airbyte-api-server": METADATA["resources"]["airbyte-api-server"]["upstream-source"],
"airbyte-bootloader": METADATA["resources"]["airbyte-bootloader"]["upstream-source"],
Expand Down Expand Up @@ -153,6 +158,9 @@ def get_airbyte_workspace_id(api_url):
Args:
api_url: Airbyte API base URL.
Returns:
Airbyte workspace ID.
"""
url = f"{api_url}/v1/workspaces?includeDeleted=false&limit=20&offset=0"
logger.info("fetching Airbyte workspace ID")
Expand All @@ -168,6 +176,9 @@ def create_airbyte_source(api_url, workspace_id):
Args:
api_url: Airbyte API base URL.
workspace_id: default workspace ID.
Returns:
Created source ID.
"""
url = f"{api_url}/v1/sources"
payload = {
Expand All @@ -190,7 +201,10 @@ def create_airbyte_destination(api_url, model_name, workspace_id, db_password):
api_url: Airbyte API base URL.
model_name: name of the juju model.
workspace_id: default workspace ID.
password: database password.
db_password: database password.
Returns:
Created destination ID.
"""
url = f"{api_url}/v1/destinations"
payload = {
Expand Down Expand Up @@ -223,6 +237,9 @@ def create_airbyte_connection(api_url, source_id, destination_id):
api_url: Airbyte API base URL.
source_id: Airbyte source ID.
destination_id: Airbyte destination ID.
Returns:
Created connection ID.
"""
url = f"{api_url}/v1/connections"
payload = {
Expand All @@ -248,6 +265,9 @@ def trigger_airbyte_connection(api_url, connection_id):
Args:
api_url: Airbyte API base URL.
connection_id: Airbyte connection ID.
Returns:
Created job ID.
"""
url = f"{api_url}/v1/jobs"
payload = {"jobType": "sync", "connectionId": connection_id}
Expand All @@ -264,6 +284,9 @@ def check_airbyte_job_status(api_url, job_id):
Args:
api_url: Airbyte API base URL.
job_id: Sync job ID.
Returns:
Job status.
"""
url = f"{api_url}/v1/jobs/{job_id}"
logger.info("fetching Airbyte job status")
Expand All @@ -279,6 +302,9 @@ def cancel_airbyte_job(api_url, job_id):
Args:
api_url: Airbyte API base URL.
job_id: Sync job ID.
Returns:
Job status.
"""
url = f"{api_url}/v1/jobs/{job_id}"
logger.info("cancelling Airbyte job")
Expand All @@ -293,6 +319,9 @@ async def get_db_password(ops_test):
Args:
ops_test: PyTest object.
Returns:
PostgreSQL DB admin password.
"""
postgresql_unit = ops_test.model.applications["postgresql-k8s"].units[0]
for i in range(10):
Expand Down Expand Up @@ -328,24 +357,24 @@ async def run_test_sync_job(ops_test):

# Trigger sync job
for i in range(2):
logger.info(f"attempt {i+1} to trigger new job")
logger.info(f"attempt {i + 1} to trigger new job")
job_id = trigger_airbyte_connection(api_url, connection_id)

# Wait until job is successful
job_successful = False
for j in range(15):
logger.info(f"job {i+1} attempt {j+1}: getting job status")
logger.info(f"job {i + 1} attempt {j + 1}: getting job status")
status = check_airbyte_job_status(api_url, job_id)

if status == "failed":
break

if status == "succeeded":
logger.info(f"job {i+1} attempt {j+1}: job successful!")
logger.info(f"job {i + 1} attempt {j + 1}: job successful!")
job_successful = True
break

logger.info(f"job {i+1} attempt {j+1}: job still running, retrying in 20 seconds")
logger.info(f"job {i + 1} attempt {j + 1}: job still running, retrying in 20 seconds")
time.sleep(20)

if job_successful:
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/temporal_client/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

"""Temporal client sample workflow."""

import asyncio
from datetime import timedelta
from typing import List

from temporalio import workflow

Expand Down
2 changes: 0 additions & 2 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
# See LICENSE file for licensing details.

import logging
import time

import pytest
import requests
from conftest import deploy # noqa: F401, pylint: disable=W0611
from helpers import APP_NAME_AIRBYTE_SERVER, get_unit_url, run_test_sync_job
from pytest_operator.plugin import OpsTest

Expand Down
Loading

0 comments on commit 01bd849

Please sign in to comment.