Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update resource sample worker #33

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 3 additions & 95 deletions resource_sample_py/resource_sample/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,10 @@

import asyncio
import logging
import os

from activities.activity1 import compose_greeting
from activities.activity2 import vault_test
from activities.db_activity import database_test
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
from temporallib.auth import (
AuthOptions,
GoogleAuthOptions,
KeyPair,
MacaroonAuthOptions,
)
from temporallib.client import Client, Options
from temporallib.encryption import EncryptionOptions
from temporallib.worker import SentryOptions, Worker, WorkerOptions
Expand All @@ -27,101 +19,17 @@
logger = logging.getLogger(__name__)


def _get_auth_header():
"""Get auth options based on provider.

Returns:
AuthOptions object.
"""
if os.getenv("TWC_AUTH_PROVIDER") == "candid":
return MacaroonAuthOptions(
keys=KeyPair(
private=os.getenv("TWC_CANDID_PRIVATE_KEY"),
public=os.getenv("TWC_CANDID_PUBLIC_KEY"),
),
macaroon_url=os.getenv("TWC_CANDID_URL"),
username=os.getenv("TWC_CANDID_USERNAME"),
)

if os.getenv("TWC_AUTH_PROVIDER") == "google":
return GoogleAuthOptions(
type="service_account",
project_id=os.getenv("TWC_OIDC_PROJECT_ID"),
private_key_id=os.getenv("TWC_OIDC_PRIVATE_KEY_ID"),
private_key=os.getenv("TWC_OIDC_PRIVATE_KEY"),
client_email=os.getenv("TWC_OIDC_CLIENT_EMAIL"),
client_id=os.getenv("TWC_OIDC_CLIENT_ID"),
auth_uri=os.getenv("TWC_OIDC_AUTH_URI"),
token_uri=os.getenv("TWC_OIDC_TOKEN_URI"),
auth_provider_x509_cert_url=os.getenv("TWC_OIDC_AUTH_CERT_URL"),
client_x509_cert_url=os.getenv("TWC_OIDC_CLIENT_CERT_URL"),
)

return None


def _init_runtime_with_prometheus(port: int) -> Runtime:
"""Create runtime for use with Prometheus metrics.

Args:
port: Port of prometheus.

Returns:
Runtime for temporalio with prometheus.
"""
return Runtime(
telemetry=TelemetryConfig(
metrics=PrometheusConfig(bind_address=f"0.0.0.0:{port}")
)
)


async def run_worker():
"""Connect Temporal worker to Temporal server."""
client_config = Options(
host=os.getenv("TWC_HOST"),
namespace=os.getenv("TWC_NAMESPACE"),
queue=os.getenv("TWC_QUEUE"),
client = await Client.connect(
client_opt=Options(encryption=EncryptionOptions()),
)

if os.getenv("TWC_TLS_ROOT_CAS", "").strip() != "":
client_config.tls_root_cas = os.getenv("TWC_TLS_ROOT_CAS")

if os.getenv("TWC_AUTH_PROVIDER", "").strip() != "":
client_config.auth = AuthOptions(
provider=os.getenv("TWC_AUTH_PROVIDER"), config=_get_auth_header()
)

if os.getenv("TWC_ENCRYPTION_KEY", "").strip() != "":
client_config.encryption = EncryptionOptions(
key=os.getenv("TWC_ENCRYPTION_KEY"), compress=True
)

worker_opt = None
dsn = os.getenv("TWC_SENTRY_DSN", "").strip()
if dsn != "":
sentry = SentryOptions(
dsn=dsn,
release=os.getenv("TWC_SENTRY_RELEASE", "").strip() or None,
environment=os.getenv("TWC_SENTRY_ENVIRONMENT", "").strip() or None,
redact_params=os.getenv("TWC_SENTRY_REDACT_PARAMS", False),
sample_rate=os.getenv("TWC_SENTRY_SAMPLE_RATE", 1.0),
)

worker_opt = WorkerOptions(sentry=sentry)

runtime = None
if os.getenv("TWC_PROMETHEUS_PORT"):
runtime = _init_runtime_with_prometheus(int(os.getenv("TWC_PROMETHEUS_PORT")))

client = await Client.connect(client_config, runtime=runtime)

worker = Worker(
client=client,
task_queue=os.getenv("TWC_QUEUE"),
workflows=[GreetingWorkflow, VaultWorkflow, DatabaseWorkflow],
activities=[compose_greeting, vault_test, database_test],
worker_opt=worker_opt,
worker_opt=WorkerOptions(sentry=SentryOptions()),
)

await worker.run()
Expand Down
Loading