diff --git a/.gitignore b/.gitignore index dd8c712..f3e1c4b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ dist/ __pycache__ cdx_toolkit.egg-info .coverage +.eggs/ diff --git a/ATHENA.md b/ATHENA.md new file mode 100644 index 0000000..d0a6caf --- /dev/null +++ b/ATHENA.md @@ -0,0 +1,44 @@ +# Using cdx_toolkit's columnar index with Athena + +## Installing + +``` +$ pip install cdx_toolkit[athena] +``` + +## Credentials and Configuration + +In addition to having AWS credentials, a few more configuration items are needed. + +credentials: can be done multiple ways, here is one: ~/.aws/config and [profile cdx_athena] + +aws_access_key_id +aws_secret_access_key + +s3_staging_dir=, needs to be writeable, need to explain how to clear this bucket +schema_name= will default to 'ccindex', this is the database name, not the table name + +region=us-east-1 # this is the default, and this is where CC's data is stored +# "When specifying a Region inline during client initialization, this property is named region_name." +s3_staging_dir=s3://dshfjhfkjhdfshjgdghj/staging + + +## Initializing the database + +asetup +asummary +get_all_crawls + +## Arbitrary queries + +asql +explain the partitions +explain how to override the safety-belt LIMIT + +## Iterating similar to the CDX index + +## Generating subset WARCs from an sql query or iteration + +## Clearing the staging directory + +configure rclone diff --git a/CHANGELOG.md b/CHANGELOG.md index ee908ac..c726db5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +- 0.9.34 (not yet tagged) + + experimental support for CC's columnar index + - 0.9.33 + rename master to main + drop python 3.5 testing because of setuptools-scm diff --git a/README.md b/README.md index c23f818..cae13ab 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,10 @@ hides these differences as best it can. cdx_toolkit also knits together the monthly Common Crawl CDX indices into a single, virtual index. +CommonCrawl also has a non-CDX "columnar index" hosted on AWS, +accessible via the (paid) Amazon Athena service. This index can be +queried using SQL, and has a few columns not present in the CDX index. + Finally, cdx_toolkit allows extracting archived pages from CC and IA into WARC files. If you're looking to create subsets of CC or IA data and then process them into WET or WAT files, this is a feature you'll @@ -218,7 +222,7 @@ cdx_toolkit has reached the beta-testing stage of development. ## License -Copyright 2018-2020 Greg Lindahl and others +Copyright 2018-2022 Greg Lindahl and others Licensed under the Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. diff --git a/azure-pipelines.yml b/azure-pipelines.yml index ee56d4a..03275f3 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -37,7 +37,7 @@ jobs: echo "Note: using 3.6 because packages are deprecating 3.5 support" pip install -r requirements.txt fi - pip --use-feature=in-tree-build install . .[test] + pip --use-feature=in-tree-build install . .[test] .[athena] displayName: 'Install dependencies' - script: | @@ -74,7 +74,7 @@ jobs: - script: | python -m pip install --upgrade pip - pip --use-feature=in-tree-build install . .[test] + pip --use-feature=in-tree-build install . .[test] .[athena] displayName: 'Install dependencies' - script: | diff --git a/cdx_toolkit/athena.py b/cdx_toolkit/athena.py new file mode 100644 index 0000000..d6e8939 --- /dev/null +++ b/cdx_toolkit/athena.py @@ -0,0 +1,423 @@ +import math +from operator import attrgetter +import logging +import sys +import os +import os.path +import json + +from pyathena import connect +import pyathena.error +from pyathena.cursor import Cursor, DictCursor +from pyathena.pandas.cursor import PandasCursor +from pyathena.async_cursor import AsyncCursor, AsyncDictCursor +from pyathena.pandas.async_cursor import AsyncPandasCursor +from pyathena.util import parse_output_location + +LOGGER = logging.getLogger(__name__) +saved_connect_kwargs = None + + +def all_results_properties(cursor): + properties = ('database', 'query_id', 'query', 'statement_type', + 'state', 'state_change_reason', 'completion_date_time', + 'submission_date_time', 'data_scanned_in_bytes', + 'execution_time_in_millis', 'output_location', + 'encryption_option', 'kms_key', 'work_group') + return dict([(p, attrgetter(p)(cursor)) for p in properties]) + + +def estimate_athena_cost(cursor, cost_per_tib=5.0): + data_scanned_in_bytes = cursor.data_scanned_in_bytes or 0 + data_scanned_in_mibytes = math.ceil(data_scanned_in_bytes / 1000000) + return max(data_scanned_in_mibytes, 10) * cost_per_tib / 1000000 + + +def print_text_messages(connection, location): + if location is None or not location.endswith('.txt'): + return [] + + bucket, key = parse_output_location(location) + LOGGER.info('looking for text messages in bucket {} key {}'.format(bucket, key)) + + s3 = connection.session.client('s3') # reuse the connection parameters we already set up + response = s3.get_object(Bucket=bucket, Key=key) + messages = response['Body'].read().decode().splitlines() + for m in messages: + LOGGER.info(m) + if messages: + return True + + +def download_result_csv(connection, location, output_file): + if location is None or not location.endswith('.csv'): + raise ValueError('athena query did not return a csv') + + bucket, key = parse_output_location(location) + LOGGER.info('looking for csv in bucket {} key {}'.format(bucket, key)) + s3 = connection.session.client('s3') # reuse the connection parameters we already set up + try: + s3.Bucket(bucket).download_file(key, output_file) + except Exception: + raise + + +database_name = 'ccindex' +table_name = 'ccindex' + +# depends on schema_name="ccindex' +# https://github.com/commoncrawl/cc-index-table/blob/master/src/sql/athena/cc-index-create-table-flat.sql + +create_table = ''' +CREATE EXTERNAL TABLE IF NOT EXISTS ccindex ( + url_surtkey STRING, + url STRING, + url_host_name STRING, + url_host_tld STRING, + url_host_2nd_last_part STRING, + url_host_3rd_last_part STRING, + url_host_4th_last_part STRING, + url_host_5th_last_part STRING, + url_host_registry_suffix STRING, + url_host_registered_domain STRING, + url_host_private_suffix STRING, + url_host_private_domain STRING, + url_host_name_reversed STRING, + url_protocol STRING, + url_port INT, + url_path STRING, + url_query STRING, + fetch_time TIMESTAMP, + fetch_status SMALLINT, + content_digest STRING, + content_mime_type STRING, + content_mime_detected STRING, + content_charset STRING, + content_languages STRING, + warc_filename STRING, + warc_record_offset INT, + warc_record_length INT, + warc_segment STRING) +PARTITIONED BY ( + crawl STRING, + subset STRING) +STORED AS parquet +LOCATION 's3://commoncrawl/cc-index/table/cc-main/warc/'; +''' + + +find_captures = ''' +SELECT url, warc_filename, warc_record_offset, warc_record_length +FROM "ccindex"."ccindex" +WHERE crawl = '%(INDEX)s' + AND subset = '%(SUBSET)s' + AND regexp_like(url_path, '(job|career|employ|openings|opportunities)') + AND url_host_registered_domain = 'museums.ca' +''' +# alternately WHERE (crawl = 'x' OR crawl = 'y') +# apparently WHERE contains(ARRAY ['CC-MAIN-2019-04', 'CC-MAIN-2019-09', 'CC-MAIN-=2019-13'], crawl) works but will read the entire row (bug mentioned in amazon docs) +# LIMIT 100 + +find_captures_params = { + 'INDEX': 'CC-MAIN-2018-43', + 'SUBSET': 'warc', +} + +jobs_surt = ''' +SELECT url, + warc_filename, + warc_record_offset, + warc_record_length +FROM "ccindex"."ccindex" +WHERE (crawl = 'CC-MAIN-2019-51') + AND subset = 'warc' + AND regexp_like(url_path, '(job|career|employ|openings|opportunities)') + AND url_host_registered_domain = 'museums.ca' + AND url_surtkey LIKE 'ca,museums)%' +''' + +jobs_surt_only = ''' +SELECT url, + warc_filename, + warc_record_offset, + warc_record_length +FROM "ccindex"."ccindex" +WHERE (crawl = 'CC-MAIN-2019-51') + AND subset = 'warc' + AND regexp_like(url_path, '(job|career|employ|openings|opportunities)') + AND url_surtkey LIKE 'ca,museums)%' +''' + +''' +See https://github.com/commoncrawl/cc-index-table#query-the-table-in-aws-athena for more SQL examples +''' + +''' +See https://github.com/commoncrawl/cc-index-table/blob/master/src/main/java/org/commoncrawl/spark/examples/CCIndexWarcExport.java +for an example of extracting the subset of warcs based on a sql query + +conf.set("warc.export.description", "Common Crawl WARC export from " + tablePath + " for query: " + sqlQuery); +''' + +''' +since pyathena is using boto3, then the two keys will come +from (in order) ~/.aws/credentials, ~/.aws/config, /etc/boto.cfg, ~/.boto +region_name ought to match where the cc data is (us-east-1) +region_name's name is different from the usual name region (?) +need a way to specify an alternate profile: AWS_PROFILE env or profile_name when creating a session +s3_staging_dir really ought to be in the same region as cc data (us-east-1) + +can read back configured variables? +dev_s3_client = session.client('s3') +''' + + +def debug_credentials(params=None): + print('here are some debugging hints: add at least one -v early in the command line', file=sys.stderr) + print('these are in the same order that boto3 uses them:', file=sys.stderr) + + if saved_connect_kwargs: + print('athena connect kwargs:', file=sys.stderr) + for k, v in params.items(): + print(' ', k, v, file=sys.stderr) + if params: + print('params for this call:', file=sys.stderr) + for k, v in params.items(): + print(' ', k, v, file=sys.stderr) + + profile_name = params.get('profile_name') + + print('environment variables', file=sys.stderr) + for k, v in os.environ.items(): + if k.startswith('AWS_'): + if k in {'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN', 'AWS_SECURITY_TOKEN'}: + v = '' + print(' ', k, v, file=sys.stderr) + if k == 'AWS_SECURITY_TOKEN': + print('AWS_SECURITY_TOKEN is deprecated', file=sys.stderr) + + if 'AWS_PROFILE' not in os.environ and not profile_name: + print(' ', 'NOTE: AWS_PROFILE is not set, so we will look for the default profile', file=sys.stderr) + profile_name = 'default' + elif not profile_name: + profile_name = os.environ.get('AWS_PROFILE', 'default') + + scf = os.environ.get('AWS_SHARED_CREDENTIALS_FILE', '~/.aws/credentials') + scf = os.path.expanduser(scf) + if os.path.exists(scf): + print(scf, 'exists', file=sys.stderr) + # XXX read it + # only allowed to have 3 keys. every section is a profile name + # aws_access_key_id, aws_secret_access_key, aws_session_token + + if os.path.exists(os.path.expanduser('~/.aws/config')): + print('~/.aws/config', 'exists', file=sys.stderr) + # XXX read it + # profiles have to have section names like "profile prod" + # in addition to profiles with the 3 keys, this can have region, region_name, s3_staging_dir + # can also have "assume role provider" in a profile + # source_profile=foo will look for foo in either config or credentials + + for f in ('/etc/boto.cfg', '~/.boto'): + if os.path.exists(os.path.expanduser(f)): + print(f, 'exists', file=sys.stderr) + # XXX only uses the [Credentials] section + + print('finally: instance metadata if running in EC2 with IAM', file=sys.stderr) + + +def log_session_info(session): + LOGGER.debug('Session info:') + LOGGER.debug('session.profile_name: ' + session.profile_name) + LOGGER.debug('session.available_profiles: ' + str(session.available_profiles)) + LOGGER.debug('session.region_name: ' + session.region_name) + # get_credentials ? botocore.credential.Credential + + +kinds = { + 'default': {'cursor': Cursor}, + 'dict': {'cursor': DictCursor}, + 'pandas': {'cursor': PandasCursor}, + 'default_async': {'cursor': AsyncCursor}, + 'dict_async': {'cursor': AsyncDictCursor}, + 'pandas_async': {'cursor': AsyncPandasCursor}, +} + + +def aconnect(**kwargs): + LOGGER.info('connecting to athena') + + # XXX verify that s3_staging_dir is set, and not set to common crawl's bucket (which is ro) + # XXX verify that there is a schema_name + + cursor_class = kwargs.pop('cursor_class', None) + if not cursor_class: + kind = kwargs.pop('kind', 'default') + cursor_class = kinds.get(kind).get('cursor') + if not cursor_class: + raise ValueError('Unknown cursor kind of '+kind) + elif kind in kwargs and kwargs[kind]: + LOGGER.warning('User specified both cursor_class and kind, ignoring kind') + + global saved_connect_kwargs + saved_connect_kwargs = kwargs + + try: + connection = connect(cursor_class=cursor_class, **kwargs) + except Exception: + debug_credentials() + raise + + log_session_info(connection.session) + + # XXX cursor._result_set_class = OurExponentialResultClass + return connection + + +def asetup(connection, **kwargs): + LOGGER.info('creating database') + + # XXX verify that there is a schema_name? needed for create and repair + # XXX schema_name can be specified when you create the cursor, pass it to .cursor() + + create_database = 'CREATE DATABASE ccindex' + + try: + cursor = my_execute(connection, create_database, warn_for_cost=True, **kwargs) + except pyathena.error.OperationalError as e: + if 'Database ccindex already exists' in str(e): + LOGGER.info('database ccindex already exists') + else: + cursor = connection.cursor() + # XXX does this really work? a fresh cursor has the previous output_location in it ??? + print_text_messages(connection, cursor.output_location) + raise + + LOGGER.info('creating table') + # depends on schema_name="ccindex" + my_execute(connection, create_table, warn_for_cost=True, **kwargs) + + LOGGER.info('repairing table') + # depends on schema_name="ccindex" + repair_table = ''' +MSCK REPAIR TABLE ccindex; + ''' + my_execute(connection, repair_table, warn_for_cost=True, **kwargs) + + +def my_execute(connection, sql, sql_params={}, dry_run=False, + print_cost=True, print_messages=True, + warn_for_cost=False, raise_for_messages=False, **kwargs): + + try: + sql = sql % sql_params + except KeyError as e: + raise KeyError('sql template referenced an unknown parameter: '+str(e)) + except ValueError as e: + if 'unsupported format character' in str(e): + raise ValueError('did you forget to quote a percent sign?: '+str(e)) + + if dry_run: + print('final sql is:', file=sys.stderr) + print(sql, file=sys.stderr) + return [] # callers expect an iterable + + cursor = connection.cursor() # XXX kwargs? schema_name= exists in both Connection and Cursor objects + + try: + cursor.execute(sql) # XXX kwargs? + except Exception as e: + LOGGER.warning('saw exception {} in my_execute'.format(str(e))) + debug_credentials(**kwargs) + raise + + m = None + if print_messages or raise_for_messages: + m = print_text_messages(connection, cursor.output_location) + if print_cost or warn_for_cost: + c = estimate_athena_cost(cursor) + if warn_for_cost and c > 0.009: + LOGGER.warn('estimated cost $%.6f', c) + elif print_cost: + LOGGER.info('estimated cost $%.6f', c) + + if m and raise_for_messages: + raise ValueError('Expected no messages, see above') + + return cursor + + +def aiter(connection, **kwargs): + # form the query: + # all verbatim queries to be passed in + # if not verbatim: + # fields -- from kwargs[fields] -- or all_fields + # LIMIT NNN -- from kwargs[limit] + # crawls -- from kwargs[crawl] [] + # SQL WHERE clauses -- from kwargs[filter] plus url, needs translation + + LOGGER.info('executing the iter command') + return my_execute(connection, jobs_surt, **kwargs) + + +def get_all_crawls(connection, **kwargs): + get_all_crawls = ''' + SELECT DISTINCT crawl + FROM "ccindex"."ccindex";''' + + LOGGER.info('executing get_all_crawls') + + cursor = my_execute(connection, get_all_crawls, + warn_for_cost=True, raise_for_messages=True, **kwargs) + + ret = [] + for row in cursor: + # XXX this needs to take into account the cursor class? + ret.append(row['crawl']) # XXX row is a tuple, + + return sorted(ret) + + +def asummary(connection, **kwargs): + count_by_partition = ''' +SELECT COUNT(*) as n_captures, + crawl, + subset +FROM "ccindex"."ccindex" +GROUP BY crawl, subset; +''' + LOGGER.info('executing get_summary') + + cursor = my_execute(connection, count_by_partition, + warn_for_cost=True, raise_for_messages=True, **kwargs) + + return '\n'.join([json.dumps(row, sort_keys=True) for row in cursor]) + + +def asql(connection, cmd, **kwargs): + with open(cmd.file, 'r') as fd: + sql = fd.read() + + params = {} + if cmd.param: + for p in cmd.param: + if p.count('=') != 1: + raise ValueError('paramters should have a single equals sign') + k, v = p.split('=', 1) + params[k] = v + + LOGGER.info('executing sql from file %s', cmd.file) + cursor = my_execute(connection, sql, params=params, **kwargs) + + return '\n'.join([json.dumps(row, sort_keys=True) for row in cursor]) + + +#if __name__ == '__main__': +# s3_staging_dir = 's3://dshfjhfkjhdfshjgdghj/staging/' # AWS_ATHENA_S3_STAGING_DIR +# ga_kwargs = dict(profile_name='greg', # AWS_PROFILE +# schema_name='ccindex', # needed for a couple of actions that don't mention the database +# s3_staging_dir=s3_staging_dir, # AWS_ATHENA_S3_STAGING_DIR optional if work_group is set +# region_name='us-east-1') # AWS_DEFAULT_REGION +# connection = get_athena(**ga_kwargs) +# +# print(get_all_crawls(connection)) diff --git a/cdx_toolkit/cli.py b/cdx_toolkit/cli.py index dbf3b37..10f7dd2 100644 --- a/cdx_toolkit/cli.py +++ b/cdx_toolkit/cli.py @@ -6,35 +6,64 @@ import os import cdx_toolkit +from . import athena LOGGER = logging.getLogger(__name__) -def main(args=None): - parser = ArgumentParser(description='cdx_toolkit iterator command line tool') +def split_fields(fields): + ret = [] + dedup = set() + for f in fields.split(','): + if f not in dedup: + dedup.add(f) + ret.append(f) + return ret + +def add_global_args(parser): parser.add_argument('--version', '-V', action='version', version=get_version()) parser.add_argument('--verbose', '-v', action='count', help='set logging level to INFO (-v) or DEBUG (-vv)') + parser.add_argument('--limit', type=int, action='store') + parser.add_argument('--from', action='store') # XXX default for cc + parser.add_argument('--to', action='store') + parser.add_argument('--filter', action='append', help='see CDX API documentation for usage') + + +def add_athena_args(parser): + parser.add_argument('--profile-name', action='store', help='choose which section of your boto conf files is used') + parser.add_argument('--role-arn', action='store', help='Amazon resource name roley') + parser.add_argument('--work-group', action='store', help='Amazon Athena work group name') + parser.add_argument('--s3-staging-dir', action='store', help='an s3 bucket to hold outputs') + parser.add_argument('--region-name', action='store', default='us-east-1', + help='AWS region to use, you probably want the one the commoncrawl data is in (us-east-1)') + parser.add_argument('--dry-run', '-n', action='store_true', help='print the SQL and exit without executing it') + +def add_cdx_args(parser): parser.add_argument('--cc', action='store_const', const='cc', help='direct the query to the Common Crawl CDX/WARCs') parser.add_argument('--ia', action='store_const', const='ia', help='direct the query to the Internet Archive CDX/wayback') parser.add_argument('--source', action='store', help='direct the query to this CDX server') parser.add_argument('--wb', action='store', help='direct replays for content to this wayback') - parser.add_argument('--limit', type=int, action='store') parser.add_argument('--cc-mirror', action='store', help='use this Common Crawl index mirror') parser.add_argument('--cc-sort', action='store', help='default mixed, alternatively: ascending') - parser.add_argument('--from', action='store') # XXX default for cc - parser.add_argument('--to', action='store') - parser.add_argument('--filter', action='append', help='see CDX API documentation for usage') parser.add_argument('--get', action='store_true', help='use a single get instead of a paged iteration. default limit=1000') parser.add_argument('--closest', action='store', help='get the closest capture to this timestamp. use with --get') + +def main(args=None): + parser = ArgumentParser(description='cdx_toolkit iterator command line tool') + + add_global_args(parser) + add_athena_args(parser) + add_cdx_args(parser) + subparsers = parser.add_subparsers(dest='cmd') subparsers.required = True iterate = subparsers.add_parser('iter', help='iterate printing captures') iterate.add_argument('--all-fields', action='store_true') - iterate.add_argument('--fields', action='store', default='url,status,timestamp', help='try --all-fields if you need the list') + iterate.add_argument('--fields', action='store', default='url,status,timestamp', help='try --all-fields if you need the complete list') iterate.add_argument('--jsonl', action='store_true') iterate.add_argument('--csv', action='store_true') iterate.add_argument('url') @@ -53,9 +82,32 @@ def main(args=None): size = subparsers.add_parser('size', help='imprecise count of how many results are available') size.add_argument('--details', action='store_true', help='show details of each subindex') - size.add_argument('url') + size.add_argument('url', help='') size.set_defaults(func=sizer) + asetup = subparsers.add_parser('asetup', help='set up amazon athena ccindex database and table') + asetup.set_defaults(func=asetuper) + + asummarize = subparsers.add_parser('asummarize', help='summarize the partitions currently in the table') + asummarize.set_defaults(func=asummarizer) + + asql = subparsers.add_parser('asql', help='run arbitrary SQL statement from a file') + asql.add_argument('--param', action='append', help='parameteres for templating the SQL, e.g. SUBSET=warc') + asql.add_argument('file', help='') + asql.set_defaults(func=asqler) + + aiter = subparsers.add_parser('aiter', help='iterate printing captures') + aiter.add_argument('--all-fields', action='store_true') + aiter.add_argument('--fields', action='store', default='url,warc_filename,warc_record_offset,warc_record_length', help='try --all-fields if you need the list') + aiter.add_argument('--jsonl', action='store_true') + aiter.add_argument('--csv', action='store_true') + aiter.add_argument('--subset', action='append', default='warc', help='e.g. warc, robotstxt, crawldiagnostics') + aiter.add_argument('--crawl', action='append', help='crawl to process, you can specify more than one') + aiter.add_argument('--limit', type=int, action='store', help='maximum records to return, good for debugging') + aiter.add_argument('--filter', action='append', help='CDX-style filter, see CDX API documentation for usage') + aiter.add_argument('url', help='') + aiter.set_defaults(func=aiterator) + if args is not None: cmdline = ' '.join(args) else: # pragma: no cover @@ -144,9 +196,13 @@ def print_line(cmd, writer, printme): def iterator(cmd, cmdline): cdx, kwargs = setup(cmd) - fields = set(cmd.fields.split(',')) + + fields = split_fields(cmd.fields) + + if cmd.csv and cmd.all_fields: + raise NotImplementedError('Sorry, the comination of csv and all-fields is not yet implemented') if cmd.csv: - writer = csv.DictWriter(sys.stdout, fieldnames=sorted(list(fields))) + writer = csv.DictWriter(sys.stdout, fieldnames=fields) writer.writeheader() else: writer = None @@ -212,3 +268,56 @@ def sizer(cmd, cmdline): size = cdx.get_size_estimate(cmd.url, **kwargs) print(size) + + +def athena_init(cmd): + conn_kwargs = {} + # these args are all permissions-related + for k in ('profile_name', 'role_arn', 'work_group', 's3_staging_dir', 'region_name', 'dry_run'): + if k in cmd: + value = cmd.__dict__[k] + if value is not None: + conn_kwargs[k] = value + kwargs = {} + if 'dry_run' in cmd and cmd.dry_run: + kwargs['dry_run'] = True + + connection = athena.aconnect(**conn_kwargs) + return connection, kwargs + + +def asetuper(cmd, cmdline): + connection, kwargs = athena_init(cmd) + athena.asetup(connection, **kwargs) + print('crawl partitions:', athena.get_all_crawls(connection, **kwargs)) + + +def asummarizer(cmd, cmdline): + connection, kwargs = athena_init(cmd) + print(athena.asummarize(connection, **kwargs)) + + +def asqler(cmd, cmdline): + connection, kwargs = athena_init(cmd) + print(athena.asql(connection, cmd, **kwargs)) + + +def aiterator(cmd, cmdline): + connection, kwargs = athena_init(cmd) + + fields = split_fields(cmd.fields) + + if cmd.csv and cmd.all_fields: + raise NotImplementedError('Sorry, the comination of csv and all-fields is not yet implemented') + if cmd.csv: + writer = csv.DictWriter(sys.stdout, fieldnames=sorted(list(fields))) + writer.writeheader() + else: + writer = None + + # csv fields for all-fields are not present until the cursor.execute has run + # XXX what should winnow_fields do in this loop? + + for obj in athena.iter(connection, **kwargs): + printme = winnow_fields(cmd, fields, obj) + print_line(cmd, writer, printme) diff --git a/requirements.txt b/requirements.txt index 2d0357f..427a1a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,16 +1,18 @@ # Install with "python -m pip install -r requirements.txt". -# must be kept in sync with setup.py +# names must be kept in sync with setup.py +# exact versions are tested in the azure-pipelines.yml config -requests==2.25.1 +requests==2.27.1 warcio==1.7.4 +pyathena==2.3.2 # used by Makefile -pytest==6.2.4 -pytest-cov==2.12.1 +pytest==6.2.5 +pytest-cov==3.0.0 pytest-sugar==0.9.4 -coveralls==3.1.0 +coveralls==3.3.1 # packaging -twine==3.4.1 -setuptools==57.0.0 -setuptools-scm==6.0.1 +twine==3.7.1 +setuptools==59.6.0 +setuptools-scm==6.3.2 diff --git a/scripts/cc-check-parquet.py b/scripts/cc-check-parquet.py new file mode 100755 index 0000000..e1239cb --- /dev/null +++ b/scripts/cc-check-parquet.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +import sys +from collections import defaultdict +import json + +import jsonlines + +''' +Criteria for a good CommonCrawl crawl + +'parquet-mr version ' > 1.8.1 +physical_type has exactly one entry + +all url_host_* fields have min and max + plus url_path, url_query, url_surtkey -- is this url_* ? + ok url_protocol and url_port are the only ones not on this list + url_port is very small and is lacking a lot of min and max + +all fields, small dictionary overflows are rare (?) + +''' + + +def analyze_parquet_version(created_by, complaints): + parts = created_by.split(' ') + assert parts[0] == 'parquet-mr' + assert parts[1] == 'version' + semver = parts[2].split('.') + if semver[0] == '1' and int(semver[1]) < 10: + complaints.append('parquet version too old: '+parts[2]) + + +def analyze(obj): + complaints = [] + group = obj['_group'] + created_by = obj['_created_by'].keys() + for cb in created_by: + analyze_parquet_version(cb, complaints) + + for k, v in obj.items(): + if k.startswith('_'): + continue + if 'physical_type' not in v: + print('no physical type', k) + if len(v['physical_type']) != 1: + complaints.append('multiple phsyical_types in field: '+k) + if 'small dictionary overflowed to plain' in v['overflow']: + small = 0 + for k2, v2 in v['overflow'].items(): + if k2.startswith('small '): + small += v2 + ratio = v['overflow']['small dictionary overflowed to plain'] / small + if ratio > 0.2: + complaints.append('field has {:.1f}% small dictionary overflows: {}'.format(ratio*100, k)) + if k.startswith('url_'): + if 'min_max_absent' in v or 'min_max_no_stats_set' in v: + present = v.get('min_max_present', 0) + absent = v.get('min_max_absent', 0) + v.get('min_max_no_stats_set', 0) + percent = absent / (present + absent) * 100. + complaints.append('field is missing stats {:4.1f}% of the time: {}'.format(percent, k)) + + return group, complaints + + +overall = {} +with jsonlines.open(sys.argv[1]) as reader: + for obj in reader.iter(type=dict, skip_invalid=True): + group, complaints = analyze(obj) + overall[group] = list(sorted(complaints)) + +print(json.dumps(overall, sort_keys=True, indent=4)) diff --git a/scripts/parquet-stats.py b/scripts/parquet-stats.py new file mode 100644 index 0000000..4889555 --- /dev/null +++ b/scripts/parquet-stats.py @@ -0,0 +1,256 @@ +from argparse import ArgumentParser +from collections import defaultdict +import json +import sys +import math +import glob + +import s3fs +import pyarrow.parquet as pq + +from progress.bar import Bar + + +def main(args=None): + parser = ArgumentParser(description='parquet-stats command line tool') + parser.add_argument('--anon', action='store_true', help='Use an anonymous connection, public buckets only') + parser.add_argument('--requester-pays', action='store_true', help='use requester-pays access, will read S3 creds from env vars or files') + parser.add_argument('--limit', action='append', help='limit analysis to this partition, e.g. date=2018-01-31') + parser.add_argument('--aggregate', action='append', help='aggregate analysis across this partition, e.g. date') + parser.add_argument('--overflow', action='store_true', help='try to analyze dictionary overflows') + parser.add_argument('--overflow-print', action='store_true', help='print parquet filenames for overflowed dictionary columns') + parser.add_argument('--pretty', action='store_true', help='pretty-print jsonl output') + parser.add_argument('--dump-groups', action='store_true', help='just print the file groups and exit early') + parser.add_argument('path', help='e.g. s3://commoncrawl/cc-index/table/cc-main/warc/*/*/*.parquet or **/*.parquet') + + cmd = parser.parse_args(args=args) + print('cmd', cmd, file=sys.stderr) + + kwargs = {} + if cmd.anon: + kwargs['anon'] = True + if cmd.requester_pays: + kwargs['requester_pays'] = True # defaults to False + + fs = None + if cmd.path.startswith('s3://'): + fs = s3fs.S3FileSystem(**kwargs) + all_files = fs.glob(path=cmd.path, recursive=True) + else: + all_files = glob.glob(cmd.path, recursive=True) + + print('have', len(all_files), 'files', file=sys.stderr) + + all_files = apply_limit(all_files, cmd) + print('have', len(all_files), 'files after limit applied', file=sys.stderr) + + file_groups = get_aggregate_groups(all_files, cmd) + if file_groups: + print('have', len(file_groups), 'file groups', file=sys.stderr) + else: + return + + if cmd.dump_groups: + print(json.dumps(list(file_groups), sort_keys=True, indent=4)) + return + + results = do_work(file_groups, fs, cmd) + print_result(results, pretty=cmd.pretty) + + +def apply_limit(all_files, cmd): + if not cmd.limit: + return all_files + ret = [] + for f in all_files: + if cmd.limit: + for limit in cmd.limit: + if limit not in f: + break + else: + ret.append(f) + return ret + + +def get_aggregate_groups(all_files, cmd): + if not all_files: + return + if not cmd.aggregate: + return [(None, all_files)] + + ret = defaultdict(list) + for f in all_files: + labels = [] + for a in cmd.aggregate: + parts = f.split('/') + for p in parts: + if p.startswith(a): + labels.append(p.replace(a, '', 1).replace('=', '', 1)) + if labels: + # the order should be preserved + key = ' '.join(labels) + ret[key].append(f) + else: + # probably a bad sign, but... + print('no label for', f, file=sys.stderr) + + return ret.items() + + +def check_same(thing1, thing2, what, fname): + if thing1 is not None and thing1 != thing2: + print('observed unusual value of {} for {} in {}'.format(what, thing2, fname)) + return thing2 + + +def analyze_dictionary_overflow(column, statspath, fname, row_group_index, path, cmd): + if 'hist_size' not in statspath: + statspath['hist_size'] = defaultdict(int) + + try: + bucket = int(math.log10(column.total_compressed_size)) + statspath['hist_size']['{:,}-{:,}'.format(10**bucket, 10**(bucket+1))] += 1 + except Exception as e: + print('exception doing bucket math', e, file=sys.stderr) + + if 'overflow' not in statspath: + statspath['overflow'] = defaultdict(int) + + if column.total_compressed_size < 100000: + statspath['overflow']['very small size'] += 1 + return + + if column.total_compressed_size < 1000000: + if 'PLAIN_DICTIONARY' not in column.encodings: + statspath['overflow']['small no dictionary'] += 1 + return + + if 'PLAIN' in column.encodings: + statspath['overflow']['small dictionary overflowed to plain'] += 1 + if cmd.overflow_print: + print('small overflow', fname, row_group_index, path, file=sys.stderr) + return + + statspath['overflow']['small dictionary not overflowed'] += 1 + return + + if 'PLAIN_DICTIONARY' not in column.encodings: + statspath['overflow']['big no dictionary'] += 1 + return + + if 'PLAIN' in column.encodings: + statspath['overflow']['big dictionary overflowed to plain'] += 1 + if cmd.overflow_print: + print('big overflow', fname, row_group_index, path, file=sys.stderr) + return + + statspath['overflow']['big dictionary not overflowed'] += 1 + + +def analyze_one(metadata, stats, num_columns, fname, cmd): + stats['_created_by'][metadata.created_by] += 1 + num_columns = check_same(num_columns, metadata.num_columns, 'num_columns', fname) + + for row_group_index in range(metadata.num_row_groups): + row_group = metadata.row_group(row_group_index) + num_columns = check_same(num_columns, row_group.num_columns, 'num_columns', fname) + stats['_row_groups'] += 1 + + for column_index in range(row_group.num_columns): + column = row_group.column(column_index) + path = column.path_in_schema + + if path not in stats: + stats[path] = defaultdict(int) + + if 'compression' not in stats[path]: + stats[path]['compression'] = defaultdict(int) + stats[path]['compression']['total_compressed_size'] += column.total_compressed_size + stats[path]['compression']['total_uncompressed_size'] += column.total_uncompressed_size + + if 'physical_type' not in stats[path]: + stats[path]['physical_type'] = defaultdict(int) + stats[path]['physical_type'][column.physical_type] += 1 + + encodings = ','.join(sorted(column.encodings)) # seems to be reordered by PyArrow so sort for stability + if 'encodings' not in stats[path]: + stats[path]['encodings'] = defaultdict(int) + stats[path]['encodings'][encodings] += 1 + + if column.is_stats_set: + statistics = column.statistics + if statistics.has_min_max: + stats[path]['min_max_present'] += 1 + else: + stats[path]['min_max_absent'] += 1 + else: + stats[path]['min_max_no_stats_set'] += 1 + + if cmd.overflow: + analyze_dictionary_overflow(column, stats[path], fname, row_group_index, path, cmd) + return num_columns + + +def my_smart_open(fname, fs): + if fs: # fname.startswith('s3://'): + fp = fs.open(fname, mode='rb') + else: + fp = open(fname, mode='rb') + return fp + + +def do_work(file_groups, fs, cmd): + ret = [] + + bar = Bar('files', max=sum([len(files) for group, files in file_groups]), suffix='%(index)d / %(max)d - %(percent).1f%%') + + for group, files in file_groups: + num_columns = None + stats = defaultdict(int) + stats['_created_by'] = defaultdict(int) + for fname in files: + bar.next(1) + try: + fp = my_smart_open(fname, fs) + pqf = pq.ParquetFile(fp) + metadata = pqf.metadata + except Exception as e: + print(file=sys.stderr) + print('exception {} processing file {}'.format(str(e), fname), file=sys.stderr) + continue + + num_columns = analyze_one(metadata, stats, num_columns, fname, cmd) + + for path in stats: + sp = stats[path] + if isinstance(sp, int): + continue + if 'compression' in sp: + spc = sp['compression'] + if 'total_compressed_size' in spc and 'total_uncompressed_size' in spc: + cr = spc['total_uncompressed_size']/spc['total_compressed_size'] + if cr >= 10.0: + spc['total_compression_ratio'] = '{:.0f}:1'.format(int(cr)) + else: + spc['total_compression_ratio'] = '{:.1f}:1'.format(cr) + + ret.append((group, stats)) + bar.finish() + return ret + + +def print_result(results, pretty=False): + if not results: + return + + kwargs = {} + if pretty: + kwargs['indent'] = 4 + + for group, stats in results: + stats['_group'] = group + print(json.dumps(stats, sort_keys=True, **kwargs)) + + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py index d9e9ad1..bbaf7a9 100755 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ package_requirements = ['twine', 'setuptools', 'setuptools-scm'] extras_require = { + 'athena': ['pyathena[pandas]>=2'], 'test': test_requirements, # setup no longer tests, so make them an extra 'package': package_requirements, } @@ -44,7 +45,7 @@ entry_points=''' [console_scripts] cdxt = cdx_toolkit.cli:main - #ccathena = cdx_toolkit.cli:main_athena + ccathena = cdx_toolkit.cli:main_athena ''', scripts=scripts, license='Apache 2.0', diff --git a/tests/test_cli.py b/tests/test_cli.py index a69f922..1acc574 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -185,7 +185,7 @@ def one_ia_corner(tmpdir, cmdline): def test_warc_ia_corners(tmpdir, caplog): ''' To test these more properly, need to add a --exact-warcname and then postprocess. - For now, these tests show up in the coverage report + For now, these are just crash-only tests ''' # revisit vivification @@ -203,3 +203,13 @@ def test_warc_ia_corners(tmpdir, caplog): # warcing a 404 is a corner case in myrequests cmdline = '--ia --from 20080512074145 --to 20080512074145 warc http://www.pbm.com/oly/archive/design94/0074.html' one_ia_corner(tmpdir, cmdline) + + +def test_athena(): + # tests of configuration file reading? can be unit tests? + + # asetup (must be done first, but it's implicit in all of the following) + # asummarize + # asql (test an example that's in the docs) + # aiter (something short, using the new reverse hostname field? + pass