Skip to content

Commit

Permalink
Simplify the startup of workers
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Apr 15, 2024
1 parent 77947f8 commit 333845c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 52 deletions.
5 changes: 0 additions & 5 deletions oonipipeline/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ mkdir -p clickhouse-data
clickhouse server
```

You should then start the workers by running:
```
hatch run oonipipeline start-workers
```

You can then start the desired workflow, for example to create signal observations for the US:
```
hatch run oonipipeline mkobs --probe-cc US --test-name signal --start-day 2024-01-01 --end-day 2024-01-02
Expand Down
69 changes: 32 additions & 37 deletions oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import concurrent.futures

from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.client import Client as TemporalClient
from temporalio.worker import Worker, SharedStateManager

from temporalio.types import MethodAsyncSingleParam, SelfType, ParamType, ReturnType

Expand All @@ -48,16 +48,36 @@


async def run_workflow(
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], arg: ParamType
workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],
arg: ParamType,
process_count: int = 5,
temporal_address: str = "localhost:7233",
):
client = await Client.connect("localhost:7233")

await client.execute_workflow(
workflow,
arg,
id=TASK_QUEUE_NAME,
client = await TemporalClient.connect(temporal_address)
async with Worker(
client,
task_queue=TASK_QUEUE_NAME,
)
workflows=[
ObservationsWorkflow,
GroundTruthsWorkflow,
AnalysisWorkflow,
],
activities=[
make_observation_in_day,
make_ground_truths_in_day,
make_analysis_in_a_day,
],
activity_executor=concurrent.futures.ProcessPoolExecutor(process_count),
shared_state_manager=SharedStateManager.create_from_multiprocessing(
multiprocessing.Manager()
),
):
await client.execute_workflow(
workflow,
arg,
id=TASK_QUEUE_NAME,
task_queue=TASK_QUEUE_NAME,
)


def _parse_csv(ctx, param, s: Optional[str]) -> List[str]:
Expand Down Expand Up @@ -193,14 +213,14 @@ def mkobs(
end_day=end_day,
clickhouse=clickhouse,
data_dir=str(data_dir),
parallelism=parallelism,
fast_fail=fast_fail,
)
click.echo(f"starting to make observations with arg={arg}")
asyncio.run(
run_workflow(
ObservationsWorkflow.run,
arg,
process_count=parallelism,
)
)

Expand Down Expand Up @@ -271,6 +291,7 @@ def mkanalysis(
run_workflow(
AnalysisWorkflow.run,
arg,
process_count=parallelism,
)
)

Expand Down Expand Up @@ -403,29 +424,3 @@ def checkdb(

with ClickhouseConnection(clickhouse) as db:
list_all_table_diffs(db)


async def run_workers():
client = await Client.connect("localhost:7233")
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue=TASK_QUEUE_NAME,
workflows=[
ObservationsWorkflow,
GroundTruthsWorkflow,
AnalysisWorkflow,
],
activities=[
make_observation_in_day,
make_ground_truths_in_day,
make_analysis_in_a_day,
],
activity_executor=activity_executor,
)
await worker.run()


@cli.command()
def start_workers():
asyncio.run(run_workers())
2 changes: 0 additions & 2 deletions oonipipeline/src/oonipipeline/workflows/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class ObservationsWorkflowParams:
end_day: str
clickhouse: str
data_dir: str
parallelism: int
fast_fail: bool
log_level: int = logging.INFO

Expand Down Expand Up @@ -178,7 +177,6 @@ def make_observation_in_day(params: MakeObservationsParams) -> dict:

total_msmt_count = 0
for batch in file_entry_batches:
# TODO(art): add extra parallelism here
msmt_cnt = make_observations_for_file_entry_batch(
batch,
params.clickhouse,
Expand Down
9 changes: 1 addition & 8 deletions oonipipeline/tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
import time

from oonipipeline.cli.commands import cli, run_workers
from oonipipeline.cli.commands import cli


def wait_for_mutations(db, table_name):
Expand All @@ -16,10 +16,6 @@ def wait_for_mutations(db, table_name):
time.sleep(1)


def start_workers():
asyncio.run(run_workers())


def test_full_workflow(
db,
cli_runner,
Expand All @@ -29,9 +25,6 @@ def test_full_workflow(
tmp_path: Path,
temporal_dev_server,
):
# simulate the starting of workers
Process(target=start_workers, args=(), daemon=True).start()

result = cli_runner.invoke(
cli,
[
Expand Down

0 comments on commit 333845c

Please sign in to comment.