Skip to content

Commit

Permalink
Improvements to unittesting
Browse files Browse the repository at this point in the history
* Enable processing of http_header_field_manipulation transformer
* Wrap calls to ClickhouseConnection with a context
* Refactor unittests to make use of measurement_uids
* Improve the github ci cache
* Run salvage queries slower to avoid overloading DB
* Add tests for writing observations to DB
* Add more tests
  • Loading branch information
hellais authored Nov 3, 2023
1 parent 691db1c commit cb4eebd
Show file tree
Hide file tree
Showing 16 changed files with 314 additions and 344 deletions.
20 changes: 16 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
- main
pull_request:
branches:
- '*'
- "*"
jobs:
Tests:
name: ${{ matrix.os }} / ${{ matrix.python-version }}
Expand Down Expand Up @@ -44,11 +44,23 @@ jobs:
path: "$HOME/.cache/pypoetry/virtualenvs"
key: venv-${{ runner.os }}-${{ steps.full-python-version.outputs.version }}-${{ hashFiles('**/poetry.lock') }}

- name: Set up test data cache
- name: Set up datadir cache
uses: actions/cache@v3
with:
path: tests/data/
key: tests-data-${{ hashFiles('tests/conftest.py') }}
path: tests/data/datadir/
key: tests-data-datadir

- name: Set up measurements cache
uses: actions/cache@v3
with:
path: tests/data/measurements/
key: tests-data-measurements-${{ hashFiles('tests/_sample_measurements.py') }}

- name: Set up raw_measurements cache
uses: actions/cache@v3
with:
path: tests/data/measurements/
key: tests-data-raw_measurements-${{ hashFiles('tests/conftest.py') }}

- name: Install clickhouse
run: |
Expand Down
1 change: 1 addition & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The analysis engine is made up of several components:
Below we explain each step of this process in detail

At a high level the pipeline looks like this:

```mermaid
graph
M{{Measurement}} --> OGEN[[make_observations]]
Expand Down
2 changes: 0 additions & 2 deletions benchmarks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@

import pytest

import orjson

from oonidata.fingerprintdb import FingerprintDB
from oonidata.netinfo import NetinfoDB
from oonidata.dataclient import sync_measurements
from oonidata.apiclient import get_measurement_dict, get_raw_measurement

FIXTURE_PATH = (
Path(os.path.dirname(os.path.realpath(__file__))) / ".." / "tests" / "data"
Expand Down
7 changes: 7 additions & 0 deletions oonidata/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@ def get_measurement_dict(report_id: str, input: Optional[str] = None) -> dict:
msmt_dict = orjson.loads(j["raw_measurement"])
msmt_dict["measurement_uid"] = j["measurement_uid"]
return msmt_dict


def get_measurement_dict_by_uid(measurement_uid: str) -> dict:
r = requests.get(f"https://api.ooni.io/api/v1/measurement/{measurement_uid}")
msmt_dict = r.json()
msmt_dict["measurement_uid"] = measurement_uid
return msmt_dict
18 changes: 14 additions & 4 deletions oonidata/db/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ClickhouseConnection(DatabaseConnection):
def __init__(self, conn_url, row_buffer_size=0, max_block_size=1_000_000):
from clickhouse_driver import Client

self.clickhouse_url = conn_url
self.client = Client.from_url(conn_url)

self.row_buffer_size = row_buffer_size
Expand Down Expand Up @@ -79,18 +80,27 @@ def flush_rows(self, table_name, rows):
log.error(
f"Failed to write {len(rows)} rows. Trying to savage what is savageable. ({exc})"
)
for row in rows:
for idx, row in enumerate(rows):
try:
self.execute(query_str, [row])
self.execute(
query_str,
[row],
types_check=True,
query_id=f"oonidata-savage-{idx}-{time.time()}",
)
time.sleep(0.1)
except Exception as exc:
log.error(f"Failed to write {row} ({exc})")
log.error(f"Failed to write {row} ({exc}) {query_str}")
with open(f"failing-rows.pickle", "ab") as out_file:
pickle.dump({"query_str": query_str, "row": row}, out_file)

def close(self):
def flush_all_rows(self):
for table_name, rows in self._row_buffer.items():
self.flush_rows(table_name=table_name, rows=rows)
self._row_buffer[table_name] = []

def close(self):
self.flush_all_rows()
self.client.disconnect()


Expand Down
4 changes: 4 additions & 0 deletions oonidata/transforms/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from oonidata.netinfo import NetinfoDB

from oonidata.transforms.nettests.dnscheck import DNSCheckTransformer
from oonidata.transforms.nettests.http_header_field_manipulation import (
HTTPHeaderFieldManipulationTransformer,
)
from oonidata.transforms.nettests.signal import SignalTransformer
from oonidata.transforms.nettests.telegram import TelegramTransformer
from oonidata.transforms.nettests.tor import TorTransformer
Expand All @@ -14,6 +17,7 @@
"signal": SignalTransformer,
"telegram": TelegramTransformer,
"tor": TorTransformer,
"http_header_field_manipulation": HTTPHeaderFieldManipulationTransformer,
"http_invalid_request_line": HTTPInvalidRequestLineTransformer,
"web_connectivity": WebConnectivityTransformer,
}
Expand Down
202 changes: 104 additions & 98 deletions oonidata/workers/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from typing import (
List,
Optional,
Sequence,
Tuple,
)

import statsd
Expand All @@ -17,6 +19,7 @@

from oonidata.analysis.datasources import load_measurement
from oonidata.datautils import PerfTimer
from oonidata.models.nettests import SupportedDataformats

from oonidata.netinfo import NetinfoDB

Expand All @@ -29,7 +32,6 @@
)
from oonidata.db.connections import (
ClickhouseConnection,
CSVConnection,
)
from oonidata.transforms import measurement_to_observations
from oonidata.workers.common import (
Expand All @@ -41,80 +43,84 @@
log = logging.getLogger("oonidata.processing")


def write_observations_to_db(
msmt: SupportedDataformats,
netinfodb: NetinfoDB,
db: ClickhouseConnection,
bucket_date: str,
):
for observations in measurement_to_observations(msmt, netinfodb=netinfodb):
if len(observations) == 0:
continue

column_names = [f.name for f in dataclasses.fields(observations[0])]
table_name, rows = make_db_rows(
bucket_date=bucket_date,
dc_list=observations,
column_names=column_names,
)
db.write_rows(table_name=table_name, rows=rows, column_names=column_names)


def make_observations_for_file_entry_batch(
file_entry_batch,
clickhouse,
row_buffer_size,
data_dir,
bucket_date,
probe_cc,
fast_fail,
file_entry_batch: Sequence[Tuple[str, str, str, int]],
clickhouse: str,
row_buffer_size: int,
data_dir: pathlib.Path,
bucket_date: str,
probe_cc: str,
fast_fail: bool,
):
netinfodb = NetinfoDB(datadir=data_dir, download=False)
tbatch = PerfTimer()
db = ClickhouseConnection(clickhouse, row_buffer_size=row_buffer_size)
statsd_client = statsd.StatsClient("localhost", 8125)
ccs = ccs_set(probe_cc)
idx = 0
for bucket_name, s3path, ext, fe_size in file_entry_batch:
log.info(f"processing file s3://{bucket_name}/{s3path}")
t = PerfTimer()
try:
for msmt_dict in stream_measurements(
bucket_name=bucket_name, s3path=s3path, ext=ext
):
# Legacy cans don't allow us to pre-filter on the probe_cc, so
# we need to check for probe_cc consistency in here.
if ccs and msmt_dict["probe_cc"] not in ccs:
continue
msmt = None
try:
t = PerfTimer()
msmt = load_measurement(msmt_dict)
if not msmt.test_keys:
log.error(
f"measurement with empty test_keys: ({msmt.measurement_uid})",
exc_info=True,
)
with ClickhouseConnection(clickhouse, row_buffer_size=row_buffer_size) as db:
statsd_client = statsd.StatsClient("localhost", 8125)
ccs = ccs_set(probe_cc)
idx = 0
for bucket_name, s3path, ext, fe_size in file_entry_batch:
log.info(f"processing file s3://{bucket_name}/{s3path}")
t = PerfTimer()
try:
for msmt_dict in stream_measurements(
bucket_name=bucket_name, s3path=s3path, ext=ext
):
# Legacy cans don't allow us to pre-filter on the probe_cc, so
# we need to check for probe_cc consistency in here.
if ccs and msmt_dict["probe_cc"] not in ccs:
continue
for observations in measurement_to_observations(
msmt, netinfodb=netinfodb
):
if len(observations) == 0:
msmt = None
try:
t = PerfTimer()
msmt = load_measurement(msmt_dict)
if not msmt.test_keys:
log.error(
f"measurement with empty test_keys: ({msmt.measurement_uid})",
exc_info=True,
)
continue
write_observations_to_db(msmt, netinfodb, db, bucket_date)
# following types ignored due to https://github.com/jsocol/pystatsd/issues/146
statsd_client.timing("oonidata.make_observations.timed", t.ms, rate=0.1) # type: ignore
statsd_client.incr("oonidata.make_observations.msmt_count", rate=0.1) # type: ignore
idx += 1
except Exception as exc:
msmt_str = msmt_dict.get("report_id", None)
if msmt:
msmt_str = msmt.measurement_uid
log.error(f"failed at idx: {idx} ({msmt_str})", exc_info=True)

column_names = [
f.name for f in dataclasses.fields(observations[0])
]
table_name, rows = make_db_rows(
bucket_date=bucket_date,
dc_list=observations,
column_names=column_names,
)
db.write_rows(
table_name=table_name, rows=rows, column_names=column_names
)
# following types ignored due to https://github.com/jsocol/pystatsd/issues/146
statsd_client.timing("oonidata.make_observations.timed", t.ms, rate=0.1) # type: ignore
statsd_client.incr("oonidata.make_observations.msmt_count", rate=0.1) # type: ignore
idx += 1
except Exception as exc:
msmt_str = msmt_dict.get("report_id", None)
if msmt:
msmt_str = msmt.measurement_uid
log.error(f"failed at idx: {idx} ({msmt_str})", exc_info=True)

if fast_fail:
db.close()
raise exc
log.info(f"done processing file s3://{bucket_name}/{s3path}")
except Exception as exc:
log.error(f"failed to stream measurements from s3://{bucket_name}/{s3path}")
log.error(exc)
statsd_client.timing("oonidata.dataclient.stream_file_entry.timed", t.ms, rate=0.1) # type: ignore
statsd_client.gauge("oonidata.dataclient.file_entry.kb_per_sec.gauge", fe_size / 1024 / t.s, rate=0.1) # type: ignore
statsd_client.timing("oonidata.dataclient.batch.timed", tbatch.ms) # type: ignore
db.close()
if fast_fail:
db.close()
raise exc
log.info(f"done processing file s3://{bucket_name}/{s3path}")
except Exception as exc:
log.error(
f"failed to stream measurements from s3://{bucket_name}/{s3path}"
)
log.error(exc)
statsd_client.timing("oonidata.dataclient.stream_file_entry.timed", t.ms, rate=0.1) # type: ignore
statsd_client.gauge("oonidata.dataclient.file_entry.kb_per_sec.gauge", fe_size / 1024 / t.s, rate=0.1) # type: ignore
statsd_client.timing("oonidata.dataclient.batch.timed", tbatch.ms) # type: ignore
return idx


Expand All @@ -128,24 +134,24 @@ def make_observation_in_day(
day: date,
):
statsd_client = statsd.StatsClient("localhost", 8125)
db = ClickhouseConnection(clickhouse, row_buffer_size=10_000)

bucket_date = day.strftime("%Y-%m-%d")

prev_ranges = []
for table_name in ["obs_web"]:
prev_ranges.append(
(
table_name,
get_prev_range(
db=db,
table_name=table_name,
bucket_date=bucket_date,
test_name=test_name,
probe_cc=probe_cc,
),
with ClickhouseConnection(clickhouse, row_buffer_size=10_000) as db:
prev_ranges = []
for table_name in ["obs_web"]:
prev_ranges.append(
(
table_name,
get_prev_range(
db=db,
table_name=table_name,
bucket_date=bucket_date,
test_name=test_name,
probe_cc=probe_cc,
),
)
)
)

t = PerfTimer()
total_t = PerfTimer()
Expand Down Expand Up @@ -187,10 +193,10 @@ def make_observation_in_day(
statsd_client.timing("oonidata.dataclient.daily.timed", total_t.ms)

if len(prev_ranges) > 0:
for table_name, pr in prev_ranges:
maybe_delete_prev_range(db=db, prev_range=pr, table_name=table_name)
with ClickhouseConnection(clickhouse, row_buffer_size=10_000) as db:
for table_name, pr in prev_ranges:
maybe_delete_prev_range(db=db, prev_range=pr, table_name=table_name)

db.close()
return total_size, total_msmt_count


Expand All @@ -210,7 +216,6 @@ def start_observation_maker(
n_workers=parallelism,
)

db = ClickhouseConnection(clickhouse)
t_total = PerfTimer()
total_size, total_msmt_count = 0, 0
day_list = list(date_interval(start_day, end_day))
Expand All @@ -233,19 +238,20 @@ def start_observation_maker(
log.info(
f"finished processing {day} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)"
)
db.execute(
"INSERT INTO oonidata_processing_logs (key, timestamp, runtime_ms, bytes, msmt_count, comment) VALUES",
[
with ClickhouseConnection(clickhouse) as db:
db.execute(
"INSERT INTO oonidata_processing_logs (key, timestamp, runtime_ms, bytes, msmt_count, comment) VALUES",
[
"oonidata.bucket_processed",
datetime.utcnow(),
int(t.ms),
size,
msmt_count,
day.strftime("%Y-%m-%d"),
]
],
)
[
"oonidata.bucket_processed",
datetime.utcnow(),
int(t.ms),
size,
msmt_count,
day.strftime("%Y-%m-%d"),
]
],
)

mb_per_sec = round(total_size / t_total.s / 10**6, 1)
msmt_per_sec = round(total_msmt_count / t_total.s)
Expand Down
Loading

0 comments on commit cb4eebd

Please sign in to comment.