diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 69aca6eb..bf575c18 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -5,7 +5,7 @@ on: - main pull_request: branches: - - '*' + - "*" jobs: Tests: name: ${{ matrix.os }} / ${{ matrix.python-version }} @@ -44,11 +44,23 @@ jobs: path: "$HOME/.cache/pypoetry/virtualenvs" key: venv-${{ runner.os }}-${{ steps.full-python-version.outputs.version }}-${{ hashFiles('**/poetry.lock') }} - - name: Set up test data cache + - name: Set up datadir cache uses: actions/cache@v3 with: - path: tests/data/ - key: tests-data-${{ hashFiles('tests/conftest.py') }} + path: tests/data/datadir/ + key: tests-data-datadir + + - name: Set up measurements cache + uses: actions/cache@v3 + with: + path: tests/data/measurements/ + key: tests-data-measurements-${{ hashFiles('tests/_sample_measurements.py') }} + + - name: Set up raw_measurements cache + uses: actions/cache@v3 + with: + path: tests/data/measurements/ + key: tests-data-raw_measurements-${{ hashFiles('tests/conftest.py') }} - name: Install clickhouse run: | diff --git a/Readme.md b/Readme.md index 42e9bef1..8e81ddfe 100644 --- a/Readme.md +++ b/Readme.md @@ -44,6 +44,7 @@ The analysis engine is made up of several components: Below we explain each step of this process in detail At a high level the pipeline looks like this: + ```mermaid graph M{{Measurement}} --> OGEN[[make_observations]] diff --git a/benchmarks/conftest.py b/benchmarks/conftest.py index 55513f68..6b29041e 100644 --- a/benchmarks/conftest.py +++ b/benchmarks/conftest.py @@ -5,12 +5,10 @@ import pytest -import orjson from oonidata.fingerprintdb import FingerprintDB from oonidata.netinfo import NetinfoDB from oonidata.dataclient import sync_measurements -from oonidata.apiclient import get_measurement_dict, get_raw_measurement FIXTURE_PATH = ( Path(os.path.dirname(os.path.realpath(__file__))) / ".." / "tests" / "data" diff --git a/oonidata/apiclient.py b/oonidata/apiclient.py index cfa4bbbb..23abcd66 100644 --- a/oonidata/apiclient.py +++ b/oonidata/apiclient.py @@ -22,3 +22,10 @@ def get_measurement_dict(report_id: str, input: Optional[str] = None) -> dict: msmt_dict = orjson.loads(j["raw_measurement"]) msmt_dict["measurement_uid"] = j["measurement_uid"] return msmt_dict + + +def get_measurement_dict_by_uid(measurement_uid: str) -> dict: + r = requests.get(f"https://api.ooni.io/api/v1/measurement/{measurement_uid}") + msmt_dict = r.json() + msmt_dict["measurement_uid"] = measurement_uid + return msmt_dict diff --git a/oonidata/db/connections.py b/oonidata/db/connections.py index 1b7ce1f0..0e4daeaf 100644 --- a/oonidata/db/connections.py +++ b/oonidata/db/connections.py @@ -29,6 +29,7 @@ class ClickhouseConnection(DatabaseConnection): def __init__(self, conn_url, row_buffer_size=0, max_block_size=1_000_000): from clickhouse_driver import Client + self.clickhouse_url = conn_url self.client = Client.from_url(conn_url) self.row_buffer_size = row_buffer_size @@ -79,18 +80,27 @@ def flush_rows(self, table_name, rows): log.error( f"Failed to write {len(rows)} rows. Trying to savage what is savageable. ({exc})" ) - for row in rows: + for idx, row in enumerate(rows): try: - self.execute(query_str, [row]) + self.execute( + query_str, + [row], + types_check=True, + query_id=f"oonidata-savage-{idx}-{time.time()}", + ) + time.sleep(0.1) except Exception as exc: - log.error(f"Failed to write {row} ({exc})") + log.error(f"Failed to write {row} ({exc}) {query_str}") with open(f"failing-rows.pickle", "ab") as out_file: pickle.dump({"query_str": query_str, "row": row}, out_file) - def close(self): + def flush_all_rows(self): for table_name, rows in self._row_buffer.items(): self.flush_rows(table_name=table_name, rows=rows) self._row_buffer[table_name] = [] + + def close(self): + self.flush_all_rows() self.client.disconnect() diff --git a/oonidata/transforms/__init__.py b/oonidata/transforms/__init__.py index e3388007..ec4de232 100644 --- a/oonidata/transforms/__init__.py +++ b/oonidata/transforms/__init__.py @@ -1,6 +1,9 @@ from oonidata.netinfo import NetinfoDB from oonidata.transforms.nettests.dnscheck import DNSCheckTransformer +from oonidata.transforms.nettests.http_header_field_manipulation import ( + HTTPHeaderFieldManipulationTransformer, +) from oonidata.transforms.nettests.signal import SignalTransformer from oonidata.transforms.nettests.telegram import TelegramTransformer from oonidata.transforms.nettests.tor import TorTransformer @@ -14,6 +17,7 @@ "signal": SignalTransformer, "telegram": TelegramTransformer, "tor": TorTransformer, + "http_header_field_manipulation": HTTPHeaderFieldManipulationTransformer, "http_invalid_request_line": HTTPInvalidRequestLineTransformer, "web_connectivity": WebConnectivityTransformer, } diff --git a/oonidata/workers/observations.py b/oonidata/workers/observations.py index 07d9e422..7bed992a 100644 --- a/oonidata/workers/observations.py +++ b/oonidata/workers/observations.py @@ -6,6 +6,8 @@ from typing import ( List, Optional, + Sequence, + Tuple, ) import statsd @@ -17,6 +19,7 @@ from oonidata.analysis.datasources import load_measurement from oonidata.datautils import PerfTimer +from oonidata.models.nettests import SupportedDataformats from oonidata.netinfo import NetinfoDB @@ -29,7 +32,6 @@ ) from oonidata.db.connections import ( ClickhouseConnection, - CSVConnection, ) from oonidata.transforms import measurement_to_observations from oonidata.workers.common import ( @@ -41,80 +43,84 @@ log = logging.getLogger("oonidata.processing") +def write_observations_to_db( + msmt: SupportedDataformats, + netinfodb: NetinfoDB, + db: ClickhouseConnection, + bucket_date: str, +): + for observations in measurement_to_observations(msmt, netinfodb=netinfodb): + if len(observations) == 0: + continue + + column_names = [f.name for f in dataclasses.fields(observations[0])] + table_name, rows = make_db_rows( + bucket_date=bucket_date, + dc_list=observations, + column_names=column_names, + ) + db.write_rows(table_name=table_name, rows=rows, column_names=column_names) + + def make_observations_for_file_entry_batch( - file_entry_batch, - clickhouse, - row_buffer_size, - data_dir, - bucket_date, - probe_cc, - fast_fail, + file_entry_batch: Sequence[Tuple[str, str, str, int]], + clickhouse: str, + row_buffer_size: int, + data_dir: pathlib.Path, + bucket_date: str, + probe_cc: str, + fast_fail: bool, ): netinfodb = NetinfoDB(datadir=data_dir, download=False) tbatch = PerfTimer() - db = ClickhouseConnection(clickhouse, row_buffer_size=row_buffer_size) - statsd_client = statsd.StatsClient("localhost", 8125) - ccs = ccs_set(probe_cc) - idx = 0 - for bucket_name, s3path, ext, fe_size in file_entry_batch: - log.info(f"processing file s3://{bucket_name}/{s3path}") - t = PerfTimer() - try: - for msmt_dict in stream_measurements( - bucket_name=bucket_name, s3path=s3path, ext=ext - ): - # Legacy cans don't allow us to pre-filter on the probe_cc, so - # we need to check for probe_cc consistency in here. - if ccs and msmt_dict["probe_cc"] not in ccs: - continue - msmt = None - try: - t = PerfTimer() - msmt = load_measurement(msmt_dict) - if not msmt.test_keys: - log.error( - f"measurement with empty test_keys: ({msmt.measurement_uid})", - exc_info=True, - ) + with ClickhouseConnection(clickhouse, row_buffer_size=row_buffer_size) as db: + statsd_client = statsd.StatsClient("localhost", 8125) + ccs = ccs_set(probe_cc) + idx = 0 + for bucket_name, s3path, ext, fe_size in file_entry_batch: + log.info(f"processing file s3://{bucket_name}/{s3path}") + t = PerfTimer() + try: + for msmt_dict in stream_measurements( + bucket_name=bucket_name, s3path=s3path, ext=ext + ): + # Legacy cans don't allow us to pre-filter on the probe_cc, so + # we need to check for probe_cc consistency in here. + if ccs and msmt_dict["probe_cc"] not in ccs: continue - for observations in measurement_to_observations( - msmt, netinfodb=netinfodb - ): - if len(observations) == 0: + msmt = None + try: + t = PerfTimer() + msmt = load_measurement(msmt_dict) + if not msmt.test_keys: + log.error( + f"measurement with empty test_keys: ({msmt.measurement_uid})", + exc_info=True, + ) continue + write_observations_to_db(msmt, netinfodb, db, bucket_date) + # following types ignored due to https://github.com/jsocol/pystatsd/issues/146 + statsd_client.timing("oonidata.make_observations.timed", t.ms, rate=0.1) # type: ignore + statsd_client.incr("oonidata.make_observations.msmt_count", rate=0.1) # type: ignore + idx += 1 + except Exception as exc: + msmt_str = msmt_dict.get("report_id", None) + if msmt: + msmt_str = msmt.measurement_uid + log.error(f"failed at idx: {idx} ({msmt_str})", exc_info=True) - column_names = [ - f.name for f in dataclasses.fields(observations[0]) - ] - table_name, rows = make_db_rows( - bucket_date=bucket_date, - dc_list=observations, - column_names=column_names, - ) - db.write_rows( - table_name=table_name, rows=rows, column_names=column_names - ) - # following types ignored due to https://github.com/jsocol/pystatsd/issues/146 - statsd_client.timing("oonidata.make_observations.timed", t.ms, rate=0.1) # type: ignore - statsd_client.incr("oonidata.make_observations.msmt_count", rate=0.1) # type: ignore - idx += 1 - except Exception as exc: - msmt_str = msmt_dict.get("report_id", None) - if msmt: - msmt_str = msmt.measurement_uid - log.error(f"failed at idx: {idx} ({msmt_str})", exc_info=True) - - if fast_fail: - db.close() - raise exc - log.info(f"done processing file s3://{bucket_name}/{s3path}") - except Exception as exc: - log.error(f"failed to stream measurements from s3://{bucket_name}/{s3path}") - log.error(exc) - statsd_client.timing("oonidata.dataclient.stream_file_entry.timed", t.ms, rate=0.1) # type: ignore - statsd_client.gauge("oonidata.dataclient.file_entry.kb_per_sec.gauge", fe_size / 1024 / t.s, rate=0.1) # type: ignore - statsd_client.timing("oonidata.dataclient.batch.timed", tbatch.ms) # type: ignore - db.close() + if fast_fail: + db.close() + raise exc + log.info(f"done processing file s3://{bucket_name}/{s3path}") + except Exception as exc: + log.error( + f"failed to stream measurements from s3://{bucket_name}/{s3path}" + ) + log.error(exc) + statsd_client.timing("oonidata.dataclient.stream_file_entry.timed", t.ms, rate=0.1) # type: ignore + statsd_client.gauge("oonidata.dataclient.file_entry.kb_per_sec.gauge", fe_size / 1024 / t.s, rate=0.1) # type: ignore + statsd_client.timing("oonidata.dataclient.batch.timed", tbatch.ms) # type: ignore return idx @@ -128,24 +134,24 @@ def make_observation_in_day( day: date, ): statsd_client = statsd.StatsClient("localhost", 8125) - db = ClickhouseConnection(clickhouse, row_buffer_size=10_000) bucket_date = day.strftime("%Y-%m-%d") - prev_ranges = [] - for table_name in ["obs_web"]: - prev_ranges.append( - ( - table_name, - get_prev_range( - db=db, - table_name=table_name, - bucket_date=bucket_date, - test_name=test_name, - probe_cc=probe_cc, - ), + with ClickhouseConnection(clickhouse, row_buffer_size=10_000) as db: + prev_ranges = [] + for table_name in ["obs_web"]: + prev_ranges.append( + ( + table_name, + get_prev_range( + db=db, + table_name=table_name, + bucket_date=bucket_date, + test_name=test_name, + probe_cc=probe_cc, + ), + ) ) - ) t = PerfTimer() total_t = PerfTimer() @@ -187,10 +193,10 @@ def make_observation_in_day( statsd_client.timing("oonidata.dataclient.daily.timed", total_t.ms) if len(prev_ranges) > 0: - for table_name, pr in prev_ranges: - maybe_delete_prev_range(db=db, prev_range=pr, table_name=table_name) + with ClickhouseConnection(clickhouse, row_buffer_size=10_000) as db: + for table_name, pr in prev_ranges: + maybe_delete_prev_range(db=db, prev_range=pr, table_name=table_name) - db.close() return total_size, total_msmt_count @@ -210,7 +216,6 @@ def start_observation_maker( n_workers=parallelism, ) - db = ClickhouseConnection(clickhouse) t_total = PerfTimer() total_size, total_msmt_count = 0, 0 day_list = list(date_interval(start_day, end_day)) @@ -233,19 +238,20 @@ def start_observation_maker( log.info( f"finished processing {day} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)" ) - db.execute( - "INSERT INTO oonidata_processing_logs (key, timestamp, runtime_ms, bytes, msmt_count, comment) VALUES", - [ + with ClickhouseConnection(clickhouse) as db: + db.execute( + "INSERT INTO oonidata_processing_logs (key, timestamp, runtime_ms, bytes, msmt_count, comment) VALUES", [ - "oonidata.bucket_processed", - datetime.utcnow(), - int(t.ms), - size, - msmt_count, - day.strftime("%Y-%m-%d"), - ] - ], - ) + [ + "oonidata.bucket_processed", + datetime.utcnow(), + int(t.ms), + size, + msmt_count, + day.strftime("%Y-%m-%d"), + ] + ], + ) mb_per_sec = round(total_size / t_total.s / 10**6, 1) msmt_per_sec = round(total_msmt_count / t_total.s) diff --git a/tests/_sample_measurements.py b/tests/_sample_measurements.py new file mode 100644 index 00000000..813ad135 --- /dev/null +++ b/tests/_sample_measurements.py @@ -0,0 +1,29 @@ +SAMPLE_MEASUREMENTS = [ + "20220107222458.184469_IL_webconnectivity_d32af5597d7eeccc", # "https://ooni.org/", + "20220607115854.978538_BR_webconnectivity_d47c958eb0986d1b", # "https://ooni.org/", + "20220608132401.787399_AM_webconnectivity_2285fc373f62729e", # "http://hahr.am", + "20220608155654.044764_AM_webconnectivity_ccb727b4812234a5", # "https://aysor.am", + "20220608122138.241075_IR_webconnectivity_c4240e52c7ca025f", # "https://www.youtube.com/", + "20220608121828.356206_RU_webconnectivity_80e3fa60eb2cd026", # "http://proxy.org/", + "20220627131742.081225_GB_webconnectivity_e1e2cf4db492b748", # "https://ooni.org/", + "20220627030703.592775_IR_webconnectivity_80e199b3c572f8d3", # "https://thepiratebay.org/", + "20220627134426.194308_DE_webconnectivity_15675b61ec62e268", # "https://thepiratebay.org/", + "20220627125833.737451_FR_webconnectivity_bca9ad9d3371919a", # "https://thepiratebay.org/", + "20220625234824.235023_HU_webconnectivity_3435a5df0e743d39", # "https://thepiratebay.org/", + "20220924222854.036406_IR_webconnectivity_7aedefe4aaac824c", # "https://doh.dns.apple.com/dns-query?dns=q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB", + "20221020235950.432819_NL_signal_27b05458f186a906", + "20221016235944.266268_GB_signal_1265ff650ee17b44", + "20210926222047.205897_UZ_signal_95fab4a2e669573f", + "20221018174612.488229_IR_signal_f8640b28061bec06", + "20221013000000.517636_US_dnscheck_bfd6d991e70afa0e", + "20221114002335.786418_BR_webconnectivity_6b203219ec4ded0e", + "20230427235943.206438_US_telegram_ac585306869eca7b", + "20210101181154.037019_CH_webconnectivity_68ce38aa9e3182c2", + "20210101190046.780850_US_webconnectivity_3296f126f79ca186", + "20231031032643.267235_GR_dnscheck_abcbfc460b9424b6", + "20231101164541.763506_NP_httpinvalidrequestline_0cf676868fa36cc4", + "20231101164544.534107_BR_httpheaderfieldmanipulation_4caa0b0556f0b141", + "20231101164649.235575_RU_tor_ccf7519bf683c022", + "20221101055235.141387_RU_webconnectivity_046ce024dd76b564", # ru_blocks_twitter + "20230907000740.785053_BR_httpinvalidrequestline_bdfe6d70dcbda5e9", # middlebox detected +] diff --git a/tests/conftest.py b/tests/conftest.py index c2a31d85..4f132dba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,115 +6,18 @@ import pytest import orjson +from oonidata.db.connections import ClickhouseConnection from oonidata.fingerprintdb import FingerprintDB from oonidata.netinfo import NetinfoDB from oonidata.dataclient import sync_measurements -from oonidata.apiclient import get_measurement_dict, get_raw_measurement +from oonidata.apiclient import get_measurement_dict_by_uid -from .explorer_urls import EXPLORER_URLS, get_report_id_input +from ._sample_measurements import SAMPLE_MEASUREMENTS FIXTURE_PATH = Path(os.path.dirname(os.path.realpath(__file__))) / "data" DATA_DIR = FIXTURE_PATH / "datadir" -SAMPLE_MEASUREMENTS = [ - ( - "20220107222458.184469_IL_webconnectivity_d32af5597d7eeccc", - "20220107T222039Z_webconnectivity_IL_42925_n1_18Kwpmtx9nYVVoeM", - "https://ooni.org/", - ), - ( - "20220607115854.978538_BR_webconnectivity_d47c958eb0986d1b", - "20220607T115805Z_webconnectivity_BR_270374_n1_69vdpoRbUpU1Lwjz", - "https://ooni.org/", - ), - ( - "20220608132401.787399_AM_webconnectivity_2285fc373f62729e", - "20220608T131504Z_webconnectivity_AM_49800_n1_AqEZWsh35AuSmwMv", - "http://hahr.am", - ), - ( - "20220608155654.044764_AM_webconnectivity_ccb727b4812234a5", - "20220608T154458Z_webconnectivity_AM_49800_n1_Xz3UTlXhINnvPC0o", - "https://aysor.am", - ), - ( - "20220608122138.241075_IR_webconnectivity_c4240e52c7ca025f", - "20220608T122003Z_webconnectivity_IR_58224_n1_AcrDNmCaHeCbDoNj", - "https://www.youtube.com/", - ), - ( - "20220608121828.356206_RU_webconnectivity_80e3fa60eb2cd026", - "20220608T120927Z_webconnectivity_RU_41668_n1_wuoaKW00hbGU12Yw", - "http://proxy.org/", - ), - ( - "20220627131742.081225_GB_webconnectivity_e1e2cf4db492b748", - "20220627T131610Z_webconnectivity_GB_5089_n1_hPwPFmWSlBooLToC", - "https://ooni.org/", - ), - ( - "20220627030703.592775_IR_webconnectivity_80e199b3c572f8d3", - "20220626T215355Z_webconnectivity_IR_206065_n1_aoeFoexkL6onyiqN", - "https://thepiratebay.org/", - ), - ( - "20220627134426.194308_DE_webconnectivity_15675b61ec62e268", - "20220627T134105Z_webconnectivity_DE_3209_n1_OxtDrquootq2Ud5G", - "https://thepiratebay.org/", - ), - ( - "20220627125833.737451_FR_webconnectivity_bca9ad9d3371919a", - "20220627T125710Z_webconnectivity_FR_5410_n1_KMkIWk9q4gZRq9gS", - "https://thepiratebay.org/", - ), - ( - "20220625234824.235023_HU_webconnectivity_3435a5df0e743d39", - "20220625T234722Z_webconnectivity_HU_20845_n1_Kg7ARyGpKG58zIZU", - "https://thepiratebay.org/", - ), - ( - "20220924222854.036406_IR_webconnectivity_7aedefe4aaac824c", - "20220924T215758Z_webconnectivity_IR_206065_n1_2CRoWBNJkWc7VyAs", - "https://doh.dns.apple.com/dns-query?dns=q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB", - ), - ( - "20221020235950.432819_NL_signal_27b05458f186a906", - "20221020T235949Z_signal_NL_13127_n1_3OMvDeaFEYRUpmMQ", - None, - ), - ( - "20221016235944.266268_GB_signal_1265ff650ee17b44", - "20221016T235943Z_signal_GB_2856_n1_sFk2dryEB6FQDSqM", - None, - ), - ( - "20210926222047.205897_UZ_signal_95fab4a2e669573f", - "20210926T222024Z_signal_UZ_201767_n1_DRRr4WJQ5hKHYhs2", - None, - ), - ( - "20221018174612.488229_IR_signal_f8640b28061bec06", - "20221018T174538Z_signal_IR_44244_n1_aCmeIoeeYLKBkyxo", - None, - ), - ( - "20221013000000.517636_US_dnscheck_bfd6d991e70afa0e", - "20221012T235950Z_dnscheck_US_10396_n1_EhBJEeRzCdMRTLDH", - "dot://dns.quad9.net/", - ), - ( - "20221114002335.786418_BR_webconnectivity_6b203219ec4ded0e", - "20221114T002124Z_webconnectivity_BR_27699_n1_knqvcofoEIxHMpzj", - "https://cdt.org/", - ), - ( - "20230427235943.206438_US_telegram_ac585306869eca7b", - "20230427T235935Z_telegram_US_7018_n1_pDGUuqH8ozxn8zJ4", - None, - ), -] - @pytest.fixture def datadir(): @@ -158,32 +61,38 @@ def measurements(): measurement_dir.mkdir(parents=True, exist_ok=True) sampled_measurements = {} - for msmt_uid, report_id, input_ in SAMPLE_MEASUREMENTS: + for msmt_uid in SAMPLE_MEASUREMENTS: sampled_measurements[msmt_uid] = measurement_dir / f"{msmt_uid}.json" if sampled_measurements[msmt_uid].exists(): continue - msmt = get_measurement_dict(report_id=report_id, input=input_) + msmt = get_measurement_dict_by_uid(msmt_uid) with sampled_measurements[msmt_uid].open("wb") as out_file: out_file.write(orjson.dumps(msmt)) return sampled_measurements @pytest.fixture -def explorer_urls(): - sampled_measurements = {} - - measurement_dir = FIXTURE_PATH / "measurements" / "explorer_urls" - measurement_dir.mkdir(parents=True, exist_ok=True) - for key, explorer_url in EXPLORER_URLS.items(): - report_id, input_ = get_report_id_input(explorer_url) - sampled_measurements[key] = measurement_dir / f"{key}.json" - - msmt = get_measurement_dict(report_id=report_id, input=input_) - with sampled_measurements[key].open("wb") as out_file: - out_file.write(orjson.dumps(msmt)) - return sampled_measurements +def cli_runner(): + return CliRunner() @pytest.fixture -def cli_runner(): - return CliRunner() +def db(): + from oonidata.db.create_tables import create_queries + + try: + with ClickhouseConnection(conn_url="clickhouse://localhost/") as db: + db.execute("CREATE DATABASE IF NOT EXISTS testing_oonidata") + except: + pytest.skip("no database connection") + + db = ClickhouseConnection(conn_url="clickhouse://localhost/testing_oonidata") + try: + db.execute("SELECT 1") + except: + pytest.skip("no database connection") + for query, table_name in create_queries: + db.execute(f"DROP TABLE IF EXISTS {table_name};") + db.execute(query) + + return db diff --git a/tests/explorer_urls.py b/tests/explorer_urls.py deleted file mode 100644 index 917317f9..00000000 --- a/tests/explorer_urls.py +++ /dev/null @@ -1,13 +0,0 @@ -from typing import Optional, Tuple -from urllib.parse import urlparse, parse_qs - -EXPLORER_URLS = { - "20221101_ru_tcp_blocked_twitter": "https://explorer.ooni.org/measurement/20221101T055122Z_webconnectivity_RU_8402_n1_lG7OkFM4GicboQ36?input=https%3A%2F%2Ftwitter.com%2F" -} - - -def get_report_id_input(explorer_url: str) -> Tuple[str, Optional[str]]: - p = urlparse(explorer_url) - input_ = parse_qs(p.query).get("input", [None])[0] - report_id = p.path.split("/")[-1] - return report_id, input_ diff --git a/tests/sample_measurement.py b/tests/sample_measurement.py deleted file mode 100644 index 4edbc725..00000000 --- a/tests/sample_measurement.py +++ /dev/null @@ -1,85 +0,0 @@ -import sys -from typing import Optional -import requests -from urllib.parse import urlparse, parse_qs - - -def print_sample_line(report_id: str, input: Optional[str]): - params = params = {"report_id": report_id} - if input: - params["input"] = input - r = requests.get("https://api.ooni.io/api/v1/measurement_meta", params=params) - j = r.json() - measurement_uid = j["measurement_uid"] - line = f'("{measurement_uid}", "{report_id}", ' - if input: - line += f'"{input}"),' - else: - line += "None)," - print(line) - return measurement_uid - - -samples = [ - ( - "20220627T131610Z_webconnectivity_GB_5089_n1_hPwPFmWSlBooLToC", - "https://ooni.org/", - ), - ( - "20220608T122003Z_webconnectivity_IR_58224_n1_AcrDNmCaHeCbDoNj", - "https://www.youtube.com/", - ), - ( - "20220608T120927Z_webconnectivity_RU_41668_n1_wuoaKW00hbGU12Yw", - "http://proxy.org/", - ), - ( - "20220626T215355Z_webconnectivity_IR_206065_n1_aoeFoexkL6onyiqN", - "https://thepiratebay.org/", - ), - ( - "20220627T134105Z_webconnectivity_DE_3209_n1_OxtDrquootq2Ud5G", - "https://thepiratebay.org/", - ), - ( - "20220627T125710Z_webconnectivity_FR_5410_n1_KMkIWk9q4gZRq9gS", - "https://thepiratebay.org/", - ), - ( - "20220625T234722Z_webconnectivity_HU_20845_n1_Kg7ARyGpKG58zIZU", - "https://thepiratebay.org/", - ), -] - - -def print_samples(): - uids = [] - for report_id, input in samples: - uids.append(print_sample_line(report_id, input)) - for u in uids: - print(f'measurements["{u}"]') - - -def main(): - if len(sys.argv) < 2: - print("Usage: sample_measurement.py report_id | explorer_url [input]") - sys.exit(1) - - input = None - report_id = sys.argv[1] - if len(sys.argv) > 2: - input = sys.argv[2] - - if "explorer.ooni.org" in report_id: - p = urlparse(report_id) - report_id = p.path.split("/")[-1] - qs = parse_qs(p.query) - if "input" in qs: - input = qs["input"][0] - - uid = print_sample_line(report_id, input) - print(f'measurements["{uid}"]') - - -if __name__ == "__main__": - main() diff --git a/tests/test_analysis.py b/tests/test_analysis.py index 61a1d720..1ad29a6d 100644 --- a/tests/test_analysis.py +++ b/tests/test_analysis.py @@ -18,7 +18,6 @@ def test_signal(fingerprintdb, netinfodb, measurements): - signal_old_ca = load_measurement( msmt_path=measurements["20221016235944.266268_GB_signal_1265ff650ee17b44"] ) diff --git a/tests/test_cli.py b/tests/test_cli.py index a9e1c508..440c6698 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -29,9 +29,7 @@ def test_sync(cli_runner, tmp_path: Path): assert len(list((tmp_path / "telegram" / "2022-01-01").iterdir())) == 24 -def wait_for_mutations(table_name): - db = ClickhouseConnection(conn_url="clickhouse://localhost") - +def wait_for_mutations(db, table_name): while True: res = db.execute( f"SELECT * FROM system.mutations WHERE is_done=0 AND table='{table_name}';" @@ -41,13 +39,9 @@ def wait_for_mutations(table_name): time.sleep(1) -def test_full_workflow(cli_runner, fingerprintdb, netinfodb, datadir, tmp_path: Path): - db = ClickhouseConnection(conn_url="clickhouse://localhost") - try: - db.execute("SELECT 1") - except: - pytest.skip("no database connection") - +def test_full_workflow( + db, cli_runner, fingerprintdb, netinfodb, datadir, tmp_path: Path +): result = cli_runner.invoke( cli, [ @@ -64,7 +58,7 @@ def test_full_workflow(cli_runner, fingerprintdb, netinfodb, datadir, tmp_path: "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/", + "clickhouse://localhost/testing_oonidata", # "--archives-dir", # tmp_path.absolute(), ], @@ -96,13 +90,13 @@ def test_full_workflow(cli_runner, fingerprintdb, netinfodb, datadir, tmp_path: "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/", + "clickhouse://localhost/testing_oonidata", ], ) assert result.exit_code == 0 # Wait for the mutation to finish running - wait_for_mutations("obs_web") + wait_for_mutations(db, "obs_web") res = db.execute( "SELECT COUNT() FROM obs_web WHERE bucket_date = '2022-10-20' AND probe_cc = 'BA'" ) @@ -120,7 +114,7 @@ def test_full_workflow(cli_runner, fingerprintdb, netinfodb, datadir, tmp_path: "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/", + "clickhouse://localhost/testing_oonidata", ], ) assert result.exit_code == 0 @@ -152,7 +146,7 @@ def test_full_workflow(cli_runner, fingerprintdb, netinfodb, datadir, tmp_path: "--data-dir", datadir, "--clickhouse", - "clickhouse://localhost/", + "clickhouse://localhost/testing_oonidata", ], ) assert result.exit_code == 0 diff --git a/tests/test_db.py b/tests/test_db.py index 6af7d88d..7c7ce936 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -5,6 +5,36 @@ from oonidata.db.connections import ClickhouseConnection +def test_flush_rows(db): + db.execute("DROP TABLE IF EXISTS tmp_test_recovery") + db.execute( + """ + CREATE TABLE IF NOT EXISTS tmp_test_recovery ( + col1 UInt32, + col2 String + ) + ENGINE = MergeTree() + PRIMARY KEY (col1) + """ + ) + db.row_buffer_size = 5 + + rows = [ + [1, "one"], + [2, "two"], + [3, None], # Invalid column type + [4, "four"], + [5, "five"], + [6, "six"], + ] + db.write_rows("tmp_test_recovery", rows, ["col1", "col2"]) + db.flush_all_rows() + res = db.execute("SELECT COUNT() FROM tmp_test_recovery") + # We should have 5 rows, just excluding the one with an invalid column type + assert res[0][0] == 5 + db.execute("DROP TABLE tmp_test_recovery") + + def test_clickhouse(monkeypatch): mock_client = MagicMock() diff --git a/tests/test_scoring.py b/tests/test_scoring.py index 24d6fd24..d11d3927 100644 --- a/tests/test_scoring.py +++ b/tests/test_scoring.py @@ -9,8 +9,12 @@ from oonidata.transforms import measurement_to_observations -def test_tcp_scoring(explorer_urls, netinfodb, fingerprintdb): - msmt = load_measurement(msmt_path=explorer_urls["20221101_ru_tcp_blocked_twitter"]) +def test_tcp_scoring(measurements, netinfodb, fingerprintdb): + msmt = load_measurement( + msmt_path=measurements[ + "20221101055235.141387_RU_webconnectivity_046ce024dd76b564" + ] + ) web_observations, web_control_observations = measurement_to_observations( msmt, netinfodb=netinfodb ) diff --git a/tests/test_workers.py b/tests/test_workers.py index f5a854b0..b06860f4 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -1,20 +1,83 @@ import gzip from pathlib import Path import sqlite3 +from typing import List from unittest.mock import MagicMock + from oonidata.analysis.datasources import load_measurement from oonidata.dataclient import stream_jsonl +from oonidata.db.connections import ClickhouseConnection from oonidata.models.nettests.dnscheck import DNSCheck from oonidata.models.nettests.web_connectivity import WebConnectivity +from oonidata.models.nettests.http_invalid_request_line import HTTPInvalidRequestLine +from oonidata.models.observations import HTTPMiddleboxObservation +from oonidata.workers.observations import ( + make_observations_for_file_entry_batch, + write_observations_to_db, +) from oonidata.workers.response_archiver import ResponseArchiver from oonidata.workers.fingerprint_hunter import fingerprint_hunter from oonidata.transforms import measurement_to_observations from oonidata.transforms.nettests.measurement_transformer import MeasurementTransformer -def test_insert_query_for_observation(measurements, netinfodb): +def test_make_file_entry_batch(datadir, db): + file_entry_batch = [ + ( + "ooni-data-eu-fra", + "raw/20231031/15/VE/whatsapp/2023103115_VE_whatsapp.n1.0.tar.gz", + "tar.gz", + 52964, + ) + ] + msmt_count = make_observations_for_file_entry_batch( + file_entry_batch, db.clickhouse_url, 100, datadir, "2023-10-31", "VE", False + ) + assert msmt_count == 5 + + +def test_write_observations(measurements, netinfodb, db): + msmt_uids = [ + ("20210101190046.780850_US_webconnectivity_3296f126f79ca186", "2021-01-01"), + ("20210101181154.037019_CH_webconnectivity_68ce38aa9e3182c2", "2021-01-01"), + ("20231031032643.267235_GR_dnscheck_abcbfc460b9424b6", "2023-10-31"), + ( + "20231101164541.763506_NP_httpinvalidrequestline_0cf676868fa36cc4", + "2023-10-31", + ), + ( + "20231101164544.534107_BR_httpheaderfieldmanipulation_4caa0b0556f0b141", + "2023-10-31", + ), + ("20231101164649.235575_RU_tor_ccf7519bf683c022", "2023-10-31"), + ( + "20230907000740.785053_BR_httpinvalidrequestline_bdfe6d70dcbda5e9", + "2023-09-07", + ), + ] + for msmt_uid, bucket_date in msmt_uids: + msmt = load_measurement(msmt_path=measurements[msmt_uid]) + write_observations_to_db(msmt, netinfodb, db, bucket_date) + db.close() + +def test_hirl_observations(measurements, netinfodb): + msmt = load_measurement( + msmt_path=measurements[ + "20230907000740.785053_BR_httpinvalidrequestline_bdfe6d70dcbda5e9" + ] + ) + assert isinstance(msmt, HTTPInvalidRequestLine) + middlebox_obs: List[HTTPMiddleboxObservation] = measurement_to_observations( + msmt, netinfodb=netinfodb + )[0] + assert isinstance(middlebox_obs[0], HTTPMiddleboxObservation) + assert middlebox_obs[0].hirl_success == True + assert middlebox_obs[0].hirl_sent_0 != middlebox_obs[0].hirl_received_0 + + +def test_insert_query_for_observation(measurements, netinfodb): http_blocked = load_measurement( msmt_path=measurements[ "20220608121828.356206_RU_webconnectivity_80e3fa60eb2cd026" @@ -87,12 +150,13 @@ def test_archive_http_transaction(measurements, tmpdir): status_code = http_transaction.response.code or 0 response_headers = http_transaction.response.headers_list_bytes or [] response_body = http_transaction.response.body_bytes + assert response_body archiver.archive_http_transaction( request_url=request_url, status_code=status_code, response_headers=response_headers, response_body=response_body, - matched_fingerprints=[] + matched_fingerprints=[], ) warc_files = list(dst_dir.glob("*.warc.gz")) @@ -125,12 +189,13 @@ def test_fingerprint_hunter(fingerprintdb, measurements, tmpdir): status_code = http_transaction.response.code or 0 response_headers = http_transaction.response.headers_list_bytes or [] response_body = http_transaction.response.body_bytes + assert response_body response_archiver.archive_http_transaction( request_url=request_url, status_code=status_code, response_headers=response_headers, response_body=response_body, - matched_fingerprints=[] + matched_fingerprints=[], ) archive_path = list(archives_dir.glob("*.warc.gz"))[0]