Skip to content

Commit

Permalink
Rewrite observation generation using temporal.io
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Mar 6, 2024
1 parent 584306b commit ea54a88
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 423 deletions.
12 changes: 12 additions & 0 deletions debug-temporal.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# >>> json.dumps(asdict(ObservationsWorkflowParams(probe_cc=["IT"], start_day="2024-01-01", end_day="2024-01-02", clickhouse="clickhouse://localhost/", data_dir="/Users/art/repos/ooni/data/tests/data/", parallelism=10, fast_fail=False, test_name=["signal"])))
#
#
INPUT_JSON="{\"probe_cc\": [\"IT\"], \"test_name\": [\"signal\"], \"start_day\": \"2024-01-01\", \"end_day\": \"2024-01-20\", \"clickhouse\": \"clickhouse://localhost/\", \"data_dir\": \"$(pwd)/tests/data/datadir/\", \"parallelism\": 10, \"fast_fail\": false, \"log_level\": 20}"

echo $INPUT_JSON
temporal workflow start \
--task-queue oonidatapipeline-task-queue \
--type ObservationsWorkflow \
--namespace default \
--input "$INPUT_JSON"

295 changes: 0 additions & 295 deletions oonidata/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
from oonidata.db.connections import ClickhouseConnection
from oonidata.db.create_tables import create_queries, list_all_table_diffs
from oonidata.netinfo import NetinfoDB
from oonidata.workers import (
start_fingerprint_hunter,
start_observation_maker,
start_ground_truth_builder,
start_response_archiver,
)
from oonidata.workers.analysis import start_analysis


log = logging.getLogger("oonidata")
Expand Down Expand Up @@ -125,294 +118,6 @@ def sync(
)


@cli.command()
@probe_cc_option
@test_name_option
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str)
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use. Only works when writing to a database",
)
@click.option(
"--fast-fail",
is_flag=True,
help="should we fail immediately when we encounter an error?",
)
@click.option(
"--create-tables",
is_flag=True,
help="should we attempt to create the required clickhouse tables",
)
@click.option(
"--drop-tables",
is_flag=True,
help="should we drop tables before creating them",
)
def mkobs(
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
clickhouse: Optional[str],
data_dir: Path,
parallelism: int,
fast_fail: bool,
create_tables: bool,
drop_tables: bool,
):
"""
Make observations for OONI measurements and write them into clickhouse or a CSV file
"""
if create_tables:
if not clickhouse:
click.echo("--clickhouse needs to be specified when creating tables")
return 1
if drop_tables:
click.confirm(
"Are you sure you want to drop the tables before creation?", abort=True
)

with ClickhouseConnection(clickhouse) as db:
for query, table_name in create_queries:
if drop_tables:
db.execute(f"DROP TABLE IF EXISTS {table_name};")
db.execute(query)

NetinfoDB(datadir=data_dir, download=True)

start_observation_maker(
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
clickhouse=clickhouse,
data_dir=data_dir,
parallelism=parallelism,
fast_fail=fast_fail,
)


@cli.command()
@probe_cc_option
@test_name_option
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str, required=True)
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count(),
help="number of processes to use. Only works when writing to a database",
)
@click.option(
"--fast-fail",
is_flag=True,
help="should we fail immediately when we encounter an error?",
)
@click.option(
"--create-tables",
is_flag=True,
help="should we attempt to create the required clickhouse tables",
)
@click.option(
"--rebuild-ground-truths",
is_flag=True,
help="should we force the rebuilding of ground truths",
)
def mker(
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
clickhouse: str,
data_dir: Path,
parallelism: int,
fast_fail: bool,
create_tables: bool,
rebuild_ground_truths: bool,
):
if create_tables:
with ClickhouseConnection(clickhouse) as db:
for query, table_name in create_queries:
click.echo(f"Running create query for {table_name}")
db.execute(query)
raise Exception("Run this via the analysis command")


@cli.command()
@probe_cc_option
@test_name_option
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str, required=True)
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use. Only works when writing to a database",
)
@click.option(
"--fast-fail",
is_flag=True,
help="should we fail immediately when we encounter an error?",
)
@click.option(
"--create-tables",
is_flag=True,
help="should we attempt to create the required clickhouse tables",
)
@click.option(
"--rebuild-ground-truths",
is_flag=True,
help="should we force the rebuilding of ground truths",
)
def mkanalysis(
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
clickhouse: str,
data_dir: Path,
parallelism: int,
fast_fail: bool,
create_tables: bool,
rebuild_ground_truths: bool,
):
if create_tables:
with ClickhouseConnection(clickhouse) as db:
for query, table_name in create_queries:
click.echo(f"Running create query for {table_name}")
db.execute(query)

start_analysis(
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
clickhouse=clickhouse,
data_dir=data_dir,
parallelism=parallelism,
fast_fail=fast_fail,
rebuild_ground_truths=rebuild_ground_truths,
)


@cli.command()
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str, required=True)
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use. Only works when writing to a database",
)
def mkgt(
start_day: date,
end_day: date,
clickhouse: str,
data_dir: Path,
parallelism: int,
):
start_ground_truth_builder(
start_day=start_day,
end_day=end_day,
clickhouse=clickhouse,
data_dir=data_dir,
parallelism=parallelism,
)


@cli.command()
@probe_cc_option
@test_name_option
@start_day_option
@end_day_option
@click.option("--clickhouse", type=str)
@click.option("--data-dir", type=Path, required=True)
@click.option("--archives-dir", type=Path, required=True)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use. Only works when writing to a database",
)
def mkbodies(
probe_cc: List[str],
test_name: List[str],
start_day: date,
end_day: date,
clickhouse: str,
data_dir: Path,
archives_dir: Path,
parallelism: int,
):
"""
Make response body archives
"""
start_response_archiver(
probe_cc=probe_cc,
test_name=test_name,
start_day=start_day,
end_day=end_day,
data_dir=data_dir,
archives_dir=archives_dir,
clickhouse=clickhouse,
parallelism=parallelism,
)


@cli.command()
@click.option(
"--data-dir",
type=Path,
required=True,
help="data directory to store fingerprint and geoip databases",
)
@click.option("--archives-dir", type=Path, required=True)
@click.option(
"--parallelism",
type=int,
default=multiprocessing.cpu_count() + 2,
help="number of processes to use",
)
def fphunt(data_dir: Path, archives_dir: Path, parallelism: int):
click.echo("🏹 starting the hunt for blockpage fingerprints!")
start_fingerprint_hunter(
archives_dir=archives_dir,
data_dir=data_dir,
parallelism=parallelism,
)


@cli.command()
@click.option("--clickhouse", type=str)
@click.option(
Expand Down
File renamed without changes.
31 changes: 31 additions & 0 deletions oonidata/datapipeline/run_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio

import concurrent.futures

from temporalio.client import Client
from temporalio.worker import Worker

from .workflows.observations import ObservationsWorkflow
from .workflows.observations import make_observation_in_day


async def async_main():
client = await Client.connect("localhost:7233")
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="oonidatapipeline-task-queue",
workflows=[ObservationsWorkflow],
activities=[make_observation_in_day],
activity_executor=activity_executor,
)

await worker.run()


def main():
asyncio.run(async_main())


if __name__ == "__main__":
main()
Empty file.
Loading

0 comments on commit ea54a88

Please sign in to comment.