From 7cf44a85afb5ec853acf788d8b3a188039875a56 Mon Sep 17 00:00:00 2001 From: Greg Lindahl Date: Tue, 25 Aug 2020 19:02:09 -0700 Subject: [PATCH 1/8] wip parquet code and utils --- cdx_toolkit/athena.py | 338 ++++++++++++++++++++++++++++++++++++ cdx_toolkit/cli.py | 148 ++++++++++++++-- scripts/cc-check-parquet.py | 72 ++++++++ scripts/parquet-stats.py | 256 +++++++++++++++++++++++++++ setup.py | 3 +- 5 files changed, 806 insertions(+), 11 deletions(-) create mode 100755 cdx_toolkit/athena.py create mode 100755 scripts/cc-check-parquet.py create mode 100644 scripts/parquet-stats.py diff --git a/cdx_toolkit/athena.py b/cdx_toolkit/athena.py new file mode 100755 index 0000000..6b117e1 --- /dev/null +++ b/cdx_toolkit/athena.py @@ -0,0 +1,338 @@ +import math +from operator import attrgetter +import logging +import sys +import os +import os.path +import json + +from pyathena import connect +import pyathena.error + +LOGGER = logging.getLogger(__name__) + + +def all_results_properties(cursor): + properties = ('database', 'query_id', 'query', 'statement_type', + 'state', 'state_change_reason', 'completion_date_time', + 'submission_date_time', 'data_scanned_in_bytes', + 'execution_time_in_millis', 'output_location', + 'encryption_option', 'kms_key', 'work_group') + return dict([(p, attrgetter(p)(cursor)) for p in properties]) + + +def estimate_athena_cost(cursor, cost_per_tib=5.0): + data_scanned_in_bytes = cursor.data_scanned_in_bytes or 0 + data_scanned_in_mibytes = math.ceil(data_scanned_in_bytes / 1_000_000) + return max(data_scanned_in_mibytes, 10) * cost_per_tib / 1_000_000 + + +def print_text_messages(connection, location): + if location is None or not location.endswith('.txt'): + return [] + + parts = location.split('/') + bucket = parts[2] + key = '/'.join(parts[3:]) + LOGGER.info('looking for text messages in bucket {} key {}'.format(bucket, key)) + + s3 = connection.session.client('s3') # reuse the connection parameters we already set up + response = s3.get_object(Bucket=bucket, Key=key) + messages = response['Body'].read().decode().splitlines() + for m in messages: + LOGGER.info(m) + if messages: + return True + + +database_name = 'ccindex' +table_name = 'ccindex' + +# XXX ccindex -> "ccindex"."ccindex" +create_table = ''' +CREATE EXTERNAL TABLE IF NOT EXISTS ccindex ( + url_surtkey STRING, + url STRING, + url_host_name STRING, + url_host_tld STRING, + url_host_2nd_last_part STRING, + url_host_3rd_last_part STRING, + url_host_4th_last_part STRING, + url_host_5th_last_part STRING, + url_host_registry_suffix STRING, + url_host_registered_domain STRING, + url_host_private_suffix STRING, + url_host_private_domain STRING, + url_protocol STRING, + url_port INT, + url_path STRING, + url_query STRING, + fetch_time TIMESTAMP, + fetch_status SMALLINT, + content_digest STRING, + content_mime_type STRING, + content_mime_detected STRING, + content_charset STRING, + content_languages STRING, + warc_filename STRING, + warc_record_offset INT, + warc_record_length INT, + warc_segment STRING) +PARTITIONED BY ( + crawl STRING, + subset STRING) +STORED AS parquet +LOCATION 's3://commoncrawl/cc-index/table/cc-main/warc/'; +''' + + +find_captures = ''' +SELECT url, warc_filename, warc_record_offset, warc_record_length +FROM "ccindex"."ccindex" +WHERE crawl = '%(INDEX)s' + AND subset = '%(SUBSET)s' + AND regexp_like(url_path, '(job|career|employ|openings|opportunities)') + AND url_host_registered_domain = 'museums.ca' +''' +# alternately WHERE (crawl = 'x' OR crawl = 'y') +# apparently WHERE contains(ARRAY ['CC-MAIN-2019-04', 'CC-MAIN-2019-09', 'CC-MAIN-=2019-13'], crawl) works but will read the entire row (bug mentioned in amazon docs) +# LIMIT 100 + +find_captures_params = { + 'INDEX': 'CC-MAIN-2018-43', + 'SUBSET': 'warc', +} + +jobs_surt = ''' +SELECT url, + warc_filename, + warc_record_offset, + warc_record_length +FROM "ccindex"."ccindex" +WHERE (crawl = 'CC-MAIN-2019-51') + AND subset = 'warc' + AND regexp_like(url_path, '(job|career|employ|openings|opportunities)') + AND url_host_registered_domain = 'museums.ca' + AND url_surtkey LIKE 'ca,museums)%' +''' + +jobs_surt_only = ''' +SELECT url, + warc_filename, + warc_record_offset, + warc_record_length +FROM "ccindex"."ccindex" +WHERE (crawl = 'CC-MAIN-2019-51') + AND subset = 'warc' + AND regexp_like(url_path, '(job|career|employ|openings|opportunities)') + AND url_surtkey LIKE 'ca,museums)%' +''' + +''' +See https://github.com/commoncrawl/cc-index-table#query-the-table-in-aws-athena for more SQL examples +''' + +''' +See https://github.com/commoncrawl/cc-index-table/blob/master/src/main/java/org/commoncrawl/spark/examples/CCIndexWarcExport.java +for an example of extracting the subset of warcs based on a sql query + +conf.set("warc.export.description", "Common Crawl WARC export from " + tablePath + " for query: " + sqlQuery); +''' + +''' +since pyathena is using boto3, then the two keys will come +from (in order) ~/.aws/credentials, ~/.aws/config, /etc/boto.cfg, ~/.boto +region_name ought to match where the cc data is (us-east-1) +region_name's name is different from the usual name region (?) +need a way to specify an alternate profile: AWS_PROFILE env or profile_name when creating a session +s3_staging_dir really ought to be in the same region as cc data (us-east-1) + +can read back configured variables? +dev_s3_client = session.client('s3') +''' + + +def print_debug_info(params=None): + print('here are some debugging hints: add at least one -v early in the command line', file=sys.stderr) + if params: + print('params:', file=sys.stderr) + for k, v in params: + print(' ', k, v, file=sys.stderr) + for k, v in os.environ.items(): + if k.startswith('AWS_'): + print(k, v, file=sys.stderr) + for f in ('~/.aws/credentials', '~/.aws/config', '/etc/boto.cfg', '~/.boto'): + if os.path.exists(os.path.expanduser(f)): + print(f, 'exists', file=sys.stderr) + + +def get_athena(**kwargs): + LOGGER.info('connecting to athena') + + try: + connection = connect(**kwargs) + except Exception: + print_debug_info(params=kwargs) + raise + + return connection + + +def asetup(connection, **kwargs): + + LOGGER.info('creating database') + create_database = 'CREATE DATABASE ccindex' + try: + cursor = my_execute(connection, create_database, warn_for_cost=True, **kwargs) + except pyathena.error.OperationalError as e: + if 'Database ccindex already exists' in str(e): + LOGGER.info('database ccindex already exists') + else: + cursor = connection.cursor() + print_text_messages(connection, cursor.output_location) + raise + + LOGGER.info('creating table') + my_execute(connection, create_table, warn_for_cost=True, **kwargs) + + LOGGER.info('repairing table') + # ccindex -> "ccindex"."ccindex" + repair_table = ''' +MSCK REPAIR TABLE ccindex; + ''' + my_execute(connection, repair_table, warn_for_cost=True, **kwargs) + + +class WrapCursor: + ''' + Make the cursor iterator easier to use, returns a dict with keys for the field names + XXX consider making this a subclass of pyathena.cursor.Cursor ? + ''' + def __init__(self, cursor): + self.cursor = cursor + self.fields = [d[0] for d in cursor.description] + if self.fields: + LOGGER.info('observed fields of %s', ','.join(self.fields)) + + def __next__(self): + row = next(self.cursor) + return dict(zip(self.fields, row)) + + def __iter__(self): + return self + + +def my_execute(connection, sql, params={}, dry_run=False, + print_cost=True, print_messages=True, + warn_for_cost=False, raise_for_messages=False): + + try: + sql = sql % params + except KeyError as e: + raise KeyError('sql template referenced an unknown parameter: '+str(e)) + except ValueError as e: + if 'unsupported format character' in str(e): + raise ValueError('did you forget to quote a percent sign?: '+str(e)) + + if dry_run: + print('final sql is:', file=sys.stderr) + print(sql, file=sys.stderr) + return [] # callers expect an iterable + + cursor = connection.cursor() + + try: + cursor.execute(sql) + except Exception: + print_debug_info() + raise + + m = None + if print_messages or raise_for_messages: + m = print_text_messages(connection, cursor.output_location) + if print_cost or warn_for_cost: + c = estimate_athena_cost(cursor) + if warn_for_cost and c > 0.009: + LOGGER.warn('estimated cost $%.6f', c) + elif print_cost: + LOGGER.info('estimated cost $%.6f', c) + + if m and raise_for_messages: + raise ValueError('Expected no messages') + + return WrapCursor(cursor) + + +def iter(connection, **kwargs): + # form the query: + # all verbatim queries to be passed in + # if not verbatim: + # fields -- from kwargs[fields] -- or all_fields + # LIMIT NNN -- from kwargs[limit] + # crawls -- from kwargs[crawl] [] + # SQL WHERE clauses -- from kwargs[filter] plus url, needs translation + + LOGGER.info('executing the iter command') + return my_execute(connection, jobs_surt, **kwargs) + + +def get_all_crawls(connection, **kwargs): + get_all_crawls = ''' + SELECT DISTINCT crawl + FROM "ccindex"."ccindex";''' + + LOGGER.info('executing get_all_crawls') + + cursor = my_execute(connection, get_all_crawls, + warn_for_cost=True, raise_for_messages=True, **kwargs) + + ret = [] + for row in cursor: + ret.append(row['crawl']) + + return sorted(ret) + + +def get_summary(connection, **kwargs): + count_by_partition = ''' +SELECT COUNT(*) as n_captures, + crawl, + subset +FROM "ccindex"."ccindex" +GROUP BY crawl, subset; +''' + LOGGER.info('executing get_summary') + + cursor = my_execute(connection, count_by_partition, + warn_for_cost=True, raise_for_messages=True, **kwargs) + + return '\n'.join([json.dumps(row, sort_keys=True) for row in cursor]) + + +def run_sql_from_file(connection, cmd, **kwargs): + with open(cmd.file, 'r') as fd: + sql = fd.read() + + params = {} + if cmd.param: + for p in cmd.param: + if '=' not in p: + raise ValueError('paramters should have a single equals sign') + k, v = p.split('=', 1) + params[k] = v + + LOGGER.info('executing sql from file %s', cmd.file) + cursor = my_execute(connection, sql, params=params, **kwargs) + + return '\n'.join([json.dumps(row, sort_keys=True) for row in cursor]) + + +#if __name__ == '__main__': +# s3_staging_dir = 's3://dshfjhfkjhdfshjgdghj/staging/' # AWS_ATHENA_S3_STAGING_DIR +# ga_kwargs = dict(profile_name='greg', # AWS_PROFILE +# schema_name='ccindex', # needed for a couple of actions that don't mention the database +# s3_staging_dir=s3_staging_dir, # AWS_ATHENA_S3_STAGING_DIR optional if work_group is set +# region_name='us-east-1') # AWS_DEFAULT_REGION +# connection = get_athena(**ga_kwargs) +# +# print(get_all_crawls(connection)) diff --git a/cdx_toolkit/cli.py b/cdx_toolkit/cli.py index dbf3b37..8a70718 100644 --- a/cdx_toolkit/cli.py +++ b/cdx_toolkit/cli.py @@ -6,26 +6,41 @@ import os import cdx_toolkit +from . import athena LOGGER = logging.getLogger(__name__) -def main(args=None): - parser = ArgumentParser(description='cdx_toolkit iterator command line tool') +def split_fields(fields): + ret = [] + dedup = set() + for f in fields.split(','): + if f not in dedup: + dedup.add(f) + ret.append(f) + return ret + +def add_global_args(parser): parser.add_argument('--version', '-V', action='version', version=get_version()) parser.add_argument('--verbose', '-v', action='count', help='set logging level to INFO (-v) or DEBUG (-vv)') + parser.add_argument('--limit', type=int, action='store') + parser.add_argument('--from', action='store') # XXX default for cc + parser.add_argument('--to', action='store') + parser.add_argument('--filter', action='append', help='see CDX API documentation for usage') + + +def main(args=None): + parser = ArgumentParser(description='cdx_toolkit iterator command line tool') + + add_global_args(parser) parser.add_argument('--cc', action='store_const', const='cc', help='direct the query to the Common Crawl CDX/WARCs') parser.add_argument('--ia', action='store_const', const='ia', help='direct the query to the Internet Archive CDX/wayback') parser.add_argument('--source', action='store', help='direct the query to this CDX server') parser.add_argument('--wb', action='store', help='direct replays for content to this wayback') - parser.add_argument('--limit', type=int, action='store') parser.add_argument('--cc-mirror', action='store', help='use this Common Crawl index mirror') parser.add_argument('--cc-sort', action='store', help='default mixed, alternatively: ascending') - parser.add_argument('--from', action='store') # XXX default for cc - parser.add_argument('--to', action='store') - parser.add_argument('--filter', action='append', help='see CDX API documentation for usage') parser.add_argument('--get', action='store_true', help='use a single get instead of a paged iteration. default limit=1000') parser.add_argument('--closest', action='store', help='get the closest capture to this timestamp. use with --get') @@ -34,7 +49,7 @@ def main(args=None): iterate = subparsers.add_parser('iter', help='iterate printing captures') iterate.add_argument('--all-fields', action='store_true') - iterate.add_argument('--fields', action='store', default='url,status,timestamp', help='try --all-fields if you need the list') + iterate.add_argument('--fields', action='store', default='url,status,timestamp', help='try --all-fields if you need the complete list') iterate.add_argument('--jsonl', action='store_true') iterate.add_argument('--csv', action='store_true') iterate.add_argument('url') @@ -53,7 +68,7 @@ def main(args=None): size = subparsers.add_parser('size', help='imprecise count of how many results are available') size.add_argument('--details', action='store_true', help='show details of each subindex') - size.add_argument('url') + size.add_argument('url', help='') size.set_defaults(func=sizer) if args is not None: @@ -70,6 +85,62 @@ def main(args=None): cmd.func(cmd, cmdline) +def add_athena_args(parser): + parser.add_argument('--profile-name', action='store', help='choose which section of your boto conf files is used') + parser.add_argument('--role-arn', action='store', help='Amazon resource name roley') + parser.add_argument('--work-group', action='store', help='Amazon Athena work group name') + parser.add_argument('--s3-staging-dir', action='store', help='an s3 bucket to hold outputs') + parser.add_argument('--region-name', action='store', default='us-east-1', + help='AWS region to use, you probably want the one the commoncrawl data is in (us-east-1)') + parser.add_argument('--dry-run', '-n', action='store_true', help='print the SQL and exit without executing it') + + +def main_athena(args=None): + parser = ArgumentParser(description='CommonCrawl column database command line tool') + + add_global_args(parser) + add_athena_args(parser) + + subparsers = parser.add_subparsers(dest='cmd') + subparsers.required = True + + asetup = subparsers.add_parser('setup', help='set up amazon athena ccindex database and table') + asetup.set_defaults(func=asetuper) + + asummarize = subparsers.add_parser('summarize', help='summarize the partitions currently in the table') + asummarize.set_defaults(func=asummarizer) + + asql = subparsers.add_parser('sql', help='run arbitrary SQL statement from a file') + asql.add_argument('--param', action='append', help='parameteres for templating the SQL, e.g. SUBSET=warc') + asql.add_argument('file', help='') + asql.set_defaults(func=asqler) + + aiter = subparsers.add_parser('iter', help='iterate printing captures') + aiter.add_argument('--all-fields', action='store_true') + aiter.add_argument('--fields', action='store', default='url,warc_filename,warc_record_offset,warc_record_length', help='try --all-fields if you need the list') + aiter.add_argument('--jsonl', action='store_true') + aiter.add_argument('--csv', action='store_true') + aiter.add_argument('--subset', action='append', default='warc', help='e.g. warc, robotstxt, crawldiagnostics') + aiter.add_argument('--crawl', action='append', help='crawl to process, you can specify more than one') + aiter.add_argument('--limit', type=int, action='store', help='maximum records to return, good for debugging') + aiter.add_argument('--filter', action='append', help='CDX-style filter, see CDX API documentation for usage') + aiter.add_argument('url', help='') + aiter.set_defaults(func=aiterator) + + if args is not None: + cmdline = ' '.join(args) + else: # pragma: no cover + # there's something magic about args and console_scripts + # this fallback is needed when installed by setuptools + if len(sys.argv) > 1: + cmdline = 'cdxt ' + ' '.join(sys.argv[1:]) + else: + cmdline = 'cdxt' + cmd = parser.parse_args(args=args) + set_loglevel(cmd) + cmd.func(cmd, cmdline) + + def set_loglevel(cmd): loglevel = os.getenv('LOGLEVEL') or 'WARNING' if cmd.verbose: @@ -144,9 +215,13 @@ def print_line(cmd, writer, printme): def iterator(cmd, cmdline): cdx, kwargs = setup(cmd) - fields = set(cmd.fields.split(',')) + + fields = split_fields(cmd.fields) + + if cmd.csv and cmd.all_fields: + raise NotImplementedError('Sorry, the comination of csv and all-fields is not yet implemented') if cmd.csv: - writer = csv.DictWriter(sys.stdout, fieldnames=sorted(list(fields))) + writer = csv.DictWriter(sys.stdout, fieldnames=fields) writer.writeheader() else: writer = None @@ -212,3 +287,56 @@ def sizer(cmd, cmdline): size = cdx.get_size_estimate(cmd.url, **kwargs) print(size) + + +def athena_init(cmd): + conn_kwargs = {} + # these args are all permissions-related + for k in ('profile_name', 'role_arn', 'work_group', 's3_staging_dir', 'region_name', 'dry_run'): + if k in cmd: + value = cmd.__dict__[k] + if value is not None: + conn_kwargs[k] = value + kwargs = {} + if 'dry_run' in cmd and cmd.dry_run: + kwargs['dry_run'] = True + + connection = athena.get_athena(**conn_kwargs) + return connection, kwargs + + +def asetuper(cmd, cmdline): + connection, kwargs = athena_init(cmd) + athena.asetup(connection, **kwargs) + print('crawl partitions:', athena.get_all_crawls(connection, **kwargs)) + + +def asummarizer(cmd, cmdline): + connection, kwargs = athena_init(cmd) + print(athena.get_summary(connection, **kwargs)) + + +def asqler(cmd, cmdline): + connection, kwargs = athena_init(cmd) + print(athena.run_sql_from_file(connection, cmd, **kwargs)) + + +def aiterator(cmd, cmdline): + connection, kwargs = athena_init(cmd) + + fields = split_fields(cmd.fields) + + if cmd.csv and cmd.all_fields: + raise NotImplementedError('Sorry, the comination of csv and all-fields is not yet implemented') + if cmd.csv: + writer = csv.DictWriter(sys.stdout, fieldnames=sorted(list(fields))) + writer.writeheader() + else: + writer = None + + # csv fields for all-fields are not present until the cursor.execute has run + # XXX what should winnow_fields do in this loop? + + for obj in athena.iter(connection, **kwargs): + printme = winnow_fields(cmd, fields, obj) + print_line(cmd, writer, printme) diff --git a/scripts/cc-check-parquet.py b/scripts/cc-check-parquet.py new file mode 100755 index 0000000..e1239cb --- /dev/null +++ b/scripts/cc-check-parquet.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python + +import sys +from collections import defaultdict +import json + +import jsonlines + +''' +Criteria for a good CommonCrawl crawl + +'parquet-mr version ' > 1.8.1 +physical_type has exactly one entry + +all url_host_* fields have min and max + plus url_path, url_query, url_surtkey -- is this url_* ? + ok url_protocol and url_port are the only ones not on this list + url_port is very small and is lacking a lot of min and max + +all fields, small dictionary overflows are rare (?) + +''' + + +def analyze_parquet_version(created_by, complaints): + parts = created_by.split(' ') + assert parts[0] == 'parquet-mr' + assert parts[1] == 'version' + semver = parts[2].split('.') + if semver[0] == '1' and int(semver[1]) < 10: + complaints.append('parquet version too old: '+parts[2]) + + +def analyze(obj): + complaints = [] + group = obj['_group'] + created_by = obj['_created_by'].keys() + for cb in created_by: + analyze_parquet_version(cb, complaints) + + for k, v in obj.items(): + if k.startswith('_'): + continue + if 'physical_type' not in v: + print('no physical type', k) + if len(v['physical_type']) != 1: + complaints.append('multiple phsyical_types in field: '+k) + if 'small dictionary overflowed to plain' in v['overflow']: + small = 0 + for k2, v2 in v['overflow'].items(): + if k2.startswith('small '): + small += v2 + ratio = v['overflow']['small dictionary overflowed to plain'] / small + if ratio > 0.2: + complaints.append('field has {:.1f}% small dictionary overflows: {}'.format(ratio*100, k)) + if k.startswith('url_'): + if 'min_max_absent' in v or 'min_max_no_stats_set' in v: + present = v.get('min_max_present', 0) + absent = v.get('min_max_absent', 0) + v.get('min_max_no_stats_set', 0) + percent = absent / (present + absent) * 100. + complaints.append('field is missing stats {:4.1f}% of the time: {}'.format(percent, k)) + + return group, complaints + + +overall = {} +with jsonlines.open(sys.argv[1]) as reader: + for obj in reader.iter(type=dict, skip_invalid=True): + group, complaints = analyze(obj) + overall[group] = list(sorted(complaints)) + +print(json.dumps(overall, sort_keys=True, indent=4)) diff --git a/scripts/parquet-stats.py b/scripts/parquet-stats.py new file mode 100644 index 0000000..ae65139 --- /dev/null +++ b/scripts/parquet-stats.py @@ -0,0 +1,256 @@ +from argparse import ArgumentParser +from collections import defaultdict +import json +import sys +import math +import glob + +import s3fs +import pyarrow.parquet as pq + +from progress.bar import Bar + + +def main(args=None): + parser = ArgumentParser(description='parquet-stats command line tool') + parser.add_argument('--anon', action='store_true', help='Use an anonymous connection, public buckets only') + parser.add_argument('--requester-pays', action='store_true', help='use requester-pays access, will read S3 creds from env vars or files') + parser.add_argument('--limit', action='append', help='limit analysis to this partition, e.g. date=2018-01-31') + parser.add_argument('--aggregate', action='append', help='aggregate analysis across this partition, e.g. date') + parser.add_argument('--overflow', action='store_true', help='try to analyze dictionary overflows') + parser.add_argument('--overflow-print', action='store_true', help='print parquet filenames for overflowed dictionary columns') + parser.add_argument('--pretty', action='store_true', help='pretty-print jsonl output') + parser.add_argument('--dump-groups', action='store_true', help='just print the file groups and exit early') + parser.add_argument('path', help='e.g. s3://commoncrawl/cc-index/table/cc-main/warc/*/*/*.parquet or **/*.parquet') + + cmd = parser.parse_args(args=args) + print('cmd', cmd, file=sys.stderr) + + kwargs = {} + if cmd.anon: + kwargs['anon'] = True + if cmd.requester_pays: + kwargs['requester_pays'] = True # defaults to False + + fs = None + if cmd.path.startswith('s3://'): + fs = s3fs.S3FileSystem(**kwargs) + all_files = fs.glob(path=cmd.path, recursive=True) + else: + all_files = glob.glob(cmd.path, recursive=True) + + print('have', len(all_files), 'files', file=sys.stderr) + + all_files = apply_limit(all_files, cmd) + print('have', len(all_files), 'files after limit applied', file=sys.stderr) + + file_groups = get_aggregate_groups(all_files, cmd) + if file_groups: + print('have', len(file_groups), 'file groups', file=sys.stderr) + else: + return + + if cmd.dump_groups: + print(json.dumps(list(file_groups), sort_keys=True, indent=4)) + return + + results = do_work(file_groups, fs, cmd) + print_result(results, pretty=cmd.pretty) + + +def apply_limit(all_files, cmd): + if not cmd.limit: + return all_files + ret = [] + for f in all_files: + if cmd.limit: + for limit in cmd.limit: + if limit not in f: + break + else: + ret.append(f) + return ret + + +def get_aggregate_groups(all_files, cmd): + if not all_files: + return + if not cmd.aggregate: + return [(None, all_files)] + + ret = defaultdict(list) + for f in all_files: + labels = [] + for a in cmd.aggregate: + parts = f.split('/') + for p in parts: + if p.startswith(a): + labels.append(p.replace(a, '', 1).replace('=', '', 1)) + if labels: + # the order should be preserved + key = ' '.join(labels) + ret[key].append(f) + else: + # probably a bad sign, but... + print('no label for', f, file=sys.stderr) + + return ret.items() + + +def check_same(thing1, thing2, what, fname): + if thing1 is not None and thing1 != thing2: + print('observed unusual value of {} for {} in {}'.format(what, thing2, fname)) + return thing2 + + +def analyze_dictionary_overflow(column, statspath, fname, row_group_index, path, cmd): + if 'hist_size' not in statspath: + statspath['hist_size'] = defaultdict(int) + + try: + bucket = int(math.log10(column.total_compressed_size)) + statspath['hist_size']['{:,}-{:,}'.format(10**bucket, 10**(bucket+1))] += 1 + except Exception as e: + print('exception doing bucket math', e, file=sys.stderr) + + if 'overflow' not in statspath: + statspath['overflow'] = defaultdict(int) + + if column.total_compressed_size < 100_000: + statspath['overflow']['very small size'] += 1 + return + + if column.total_compressed_size < 1_000_000: + if 'PLAIN_DICTIONARY' not in column.encodings: + statspath['overflow']['small no dictionary'] += 1 + return + + if 'PLAIN' in column.encodings: + statspath['overflow']['small dictionary overflowed to plain'] += 1 + if cmd.overflow_print: + print('small overflow', fname, row_group_index, path, file=sys.stderr) + return + + statspath['overflow']['small dictionary not overflowed'] += 1 + return + + if 'PLAIN_DICTIONARY' not in column.encodings: + statspath['overflow']['big no dictionary'] += 1 + return + + if 'PLAIN' in column.encodings: + statspath['overflow']['big dictionary overflowed to plain'] += 1 + if cmd.overflow_print: + print('big overflow', fname, row_group_index, path, file=sys.stderr) + return + + statspath['overflow']['big dictionary not overflowed'] += 1 + + +def analyze_one(metadata, stats, num_columns, fname, cmd): + stats['_created_by'][metadata.created_by] += 1 + num_columns = check_same(num_columns, metadata.num_columns, 'num_columns', fname) + + for row_group_index in range(metadata.num_row_groups): + row_group = metadata.row_group(row_group_index) + num_columns = check_same(num_columns, row_group.num_columns, 'num_columns', fname) + stats['_row_groups'] += 1 + + for column_index in range(row_group.num_columns): + column = row_group.column(column_index) + path = column.path_in_schema + + if path not in stats: + stats[path] = defaultdict(int) + + if 'compression' not in stats[path]: + stats[path]['compression'] = defaultdict(int) + stats[path]['compression']['total_compressed_size'] += column.total_compressed_size + stats[path]['compression']['total_uncompressed_size'] += column.total_uncompressed_size + + if 'physical_type' not in stats[path]: + stats[path]['physical_type'] = defaultdict(int) + stats[path]['physical_type'][column.physical_type] += 1 + + encodings = ','.join(sorted(column.encodings)) # seems to be reordered by PyArrow so sort for stability + if 'encodings' not in stats[path]: + stats[path]['encodings'] = defaultdict(int) + stats[path]['encodings'][encodings] += 1 + + if column.is_stats_set: + statistics = column.statistics + if statistics.has_min_max: + stats[path]['min_max_present'] += 1 + else: + stats[path]['min_max_absent'] += 1 + else: + stats[path]['min_max_no_stats_set'] += 1 + + if cmd.overflow: + analyze_dictionary_overflow(column, stats[path], fname, row_group_index, path, cmd) + return num_columns + + +def my_smart_open(fname, fs): + if fs: # fname.startswith('s3://'): + fp = fs.open(fname, mode='rb') + else: + fp = open(fname, mode='rb') + return fp + + +def do_work(file_groups, fs, cmd): + ret = [] + + bar = Bar('files', max=sum([len(files) for group, files in file_groups]), suffix='%(index)d / %(max)d - %(percent).1f%%') + + for group, files in file_groups: + num_columns = None + stats = defaultdict(int) + stats['_created_by'] = defaultdict(int) + for fname in files: + bar.next(1) + try: + fp = my_smart_open(fname, fs) + pqf = pq.ParquetFile(fp) + metadata = pqf.metadata + except Exception as e: + print(file=sys.stderr) + print('exception {} processing file {}'.format(str(e), fname), file=sys.stderr) + continue + + num_columns = analyze_one(metadata, stats, num_columns, fname, cmd) + + for path in stats: + sp = stats[path] + if isinstance(sp, int): + continue + if 'compression' in sp: + spc = sp['compression'] + if 'total_compressed_size' in spc and 'total_uncompressed_size' in spc: + cr = spc['total_uncompressed_size']/spc['total_compressed_size'] + if cr >= 10.0: + spc['total_compression_ratio'] = '{:.0f}:1'.format(int(cr)) + else: + spc['total_compression_ratio'] = '{:.1f}:1'.format(cr) + + ret.append((group, stats)) + bar.finish() + return ret + + +def print_result(results, pretty=False): + if not results: + return + + kwargs = {} + if pretty: + kwargs['indent'] = 4 + + for group, stats in results: + stats['_group'] = group + print(json.dumps(stats, sort_keys=True, **kwargs)) + + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py index c20ab5d..5ceadb7 100755 --- a/setup.py +++ b/setup.py @@ -48,6 +48,7 @@ def run_tests(self): use_scm_version=True, description='A toolkit for working with CDX indices', long_description=description, + long_description_content_type='text/markdown', author='Greg Lindahl and others', author_email='lindahl@pbm.com', url='https://github.com/cocrawler/cdx_toolkit', @@ -58,7 +59,7 @@ def run_tests(self): entry_points=''' [console_scripts] cdxt = cdx_toolkit.cli:main - #ccathena = cdx_toolkit.cli:main_athena + ccathena = cdx_toolkit.cli:main_athena ''', scripts=scripts, license='Apache 2.0', From 711d4a75f0ba50c2cb7df01e65f80c963a43f417 Mon Sep 17 00:00:00 2001 From: Greg Lindahl Date: Tue, 25 Aug 2020 21:30:17 -0700 Subject: [PATCH 2/8] fixes for 3.5 and requires --- cdx_toolkit/athena.py | 4 ++-- scripts/parquet-stats.py | 4 ++-- setup.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cdx_toolkit/athena.py b/cdx_toolkit/athena.py index 6b117e1..47a3c7e 100755 --- a/cdx_toolkit/athena.py +++ b/cdx_toolkit/athena.py @@ -23,8 +23,8 @@ def all_results_properties(cursor): def estimate_athena_cost(cursor, cost_per_tib=5.0): data_scanned_in_bytes = cursor.data_scanned_in_bytes or 0 - data_scanned_in_mibytes = math.ceil(data_scanned_in_bytes / 1_000_000) - return max(data_scanned_in_mibytes, 10) * cost_per_tib / 1_000_000 + data_scanned_in_mibytes = math.ceil(data_scanned_in_bytes / 1000000) + return max(data_scanned_in_mibytes, 10) * cost_per_tib / 1000000 def print_text_messages(connection, location): diff --git a/scripts/parquet-stats.py b/scripts/parquet-stats.py index ae65139..4889555 100644 --- a/scripts/parquet-stats.py +++ b/scripts/parquet-stats.py @@ -116,11 +116,11 @@ def analyze_dictionary_overflow(column, statspath, fname, row_group_index, path, if 'overflow' not in statspath: statspath['overflow'] = defaultdict(int) - if column.total_compressed_size < 100_000: + if column.total_compressed_size < 100000: statspath['overflow']['very small size'] += 1 return - if column.total_compressed_size < 1_000_000: + if column.total_compressed_size < 1000000: if 'PLAIN_DICTIONARY' not in column.encodings: statspath['overflow']['small no dictionary'] += 1 return diff --git a/setup.py b/setup.py index 5ceadb7..9f1fd23 100755 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ def run_tests(self): 'cdx_toolkit', ] -requires = ['requests', 'warcio'] +requires = ['requests', 'warcio', 'pyathena'] test_requirements = ['pytest>=3.0.0'] # 'coverage', 'pytest-cov'] From a1e8770a96189a7053f4606c57a976986a6f9b2e Mon Sep 17 00:00:00 2001 From: Greg Lindahl Date: Wed, 26 Aug 2020 07:08:06 -0700 Subject: [PATCH 3/8] packaging --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index ed86538..ce42bd8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ requests==2.24.0 warcio==1.7.4 +pyathena==1.11.1 # used by Makefile coverage==5.2.1 From 25c1fe6a2171054004cef66f3a4d8de317555511 Mon Sep 17 00:00:00 2001 From: Greg Lindahl Date: Tue, 4 Jan 2022 22:40:49 -0800 Subject: [PATCH 4/8] packaging --- azure-pipelines.yml | 4 ++-- requirements.txt | 3 ++- setup.py | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index ee56d4a..03275f3 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -37,7 +37,7 @@ jobs: echo "Note: using 3.6 because packages are deprecating 3.5 support" pip install -r requirements.txt fi - pip --use-feature=in-tree-build install . .[test] + pip --use-feature=in-tree-build install . .[test] .[athena] displayName: 'Install dependencies' - script: | @@ -74,7 +74,7 @@ jobs: - script: | python -m pip install --upgrade pip - pip --use-feature=in-tree-build install . .[test] + pip --use-feature=in-tree-build install . .[test] .[athena] displayName: 'Install dependencies' - script: | diff --git a/requirements.txt b/requirements.txt index c092952..5cc6d1d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ # Install with "python -m pip install -r requirements.txt". -# must be kept in sync with setup.py +# names must be kept in sync with setup.py +# exact versions are tested in the azure-pipelines.yml config requests==2.25.1 warcio==1.7.4 diff --git a/setup.py b/setup.py index c61bba9..6a3c462 100755 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ package_requirements = ['twine', 'setuptools', 'setuptools-scm'] extras_require = { + 'athena': ['pyathena'], 'test': test_requirements, # setup no longer tests, so make them an extra 'package': package_requirements, } From 567d09b8aeca916b5760933b1b089a49f5d8bb7d Mon Sep 17 00:00:00 2001 From: Greg Lindahl Date: Fri, 7 Jan 2022 19:25:04 -0800 Subject: [PATCH 5/8] WIP --- CHANGELOG.md | 3 + cdx_toolkit/athena.py | 143 +++++++++++++++++++++++++++++++----------- cdx_toolkit/cli.py | 4 +- tests/test_cli.py | 11 +++- 4 files changed, 122 insertions(+), 39 deletions(-) mode change 100755 => 100644 cdx_toolkit/athena.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ee908ac..c726db5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +- 0.9.34 (not yet tagged) + + experimental support for CC's columnar index + - 0.9.33 + rename master to main + drop python 3.5 testing because of setuptools-scm diff --git a/cdx_toolkit/athena.py b/cdx_toolkit/athena.py old mode 100755 new mode 100644 index 47a3c7e..329c101 --- a/cdx_toolkit/athena.py +++ b/cdx_toolkit/athena.py @@ -8,6 +8,11 @@ from pyathena import connect import pyathena.error +from pyathena.cursor import Cursor, DictCursor +from pyathena.pandas.cursor import PandasCursor +from pyathena.async_cursor import AsyncCursor, AsyncDictCursor +from pyathena.pandas.async_cursor import AsyncPandasCursor +from pyathena.utils import parse_output_location LOGGER = logging.getLogger(__name__) @@ -31,9 +36,7 @@ def print_text_messages(connection, location): if location is None or not location.endswith('.txt'): return [] - parts = location.split('/') - bucket = parts[2] - key = '/'.join(parts[3:]) + bucket, key = parse_output_location(location) LOGGER.info('looking for text messages in bucket {} key {}'.format(bucket, key)) s3 = connection.session.client('s3') # reuse the connection parameters we already set up @@ -45,10 +48,25 @@ def print_text_messages(connection, location): return True +def download_result_csv(connection, location, output_file): + if location is None or not location.endswith('.csv'): + raise ValueError('athena query did not return a csv') + + bucket, key = parse_output_location(location) + LOGGER.info('looking for csv in bucket {} key {}'.format(bucket, key)) + s3 = connection.session.client('s3') # reuse the connection parameters we already set up + try: + s3.Bucket(bucket).download_file(key, output_file) + except Exception: + raise + + database_name = 'ccindex' table_name = 'ccindex' -# XXX ccindex -> "ccindex"."ccindex" +# depends on schema_name="ccindex' +# https://github.com/commoncrawl/cc-index-table/blob/master/src/sql/athena/cc-index-create-table-flat.sql + create_table = ''' CREATE EXTERNAL TABLE IF NOT EXISTS ccindex ( url_surtkey STRING, @@ -63,6 +81,7 @@ def print_text_messages(connection, location): url_host_registered_domain STRING, url_host_private_suffix STRING, url_host_private_domain STRING, + url_host_name_reversed STRING, url_protocol STRING, url_port INT, url_path STRING, @@ -152,36 +171,107 @@ def print_text_messages(connection, location): ''' -def print_debug_info(params=None): +def debug_credentials(params=None): print('here are some debugging hints: add at least one -v early in the command line', file=sys.stderr) + print('these are in the same order that boto3 uses them:', file=sys.stderr) + if params: print('params:', file=sys.stderr) for k, v in params: print(' ', k, v, file=sys.stderr) + profile_name = params.get('profile_name') + + print('environment variables', file=sys.stderr) for k, v in os.environ.items(): if k.startswith('AWS_'): + if k in {'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN', 'AWS_SECURITY_TOKEN'}: + v = '' print(k, v, file=sys.stderr) - for f in ('~/.aws/credentials', '~/.aws/config', '/etc/boto.cfg', '~/.boto'): + if k == 'AWS_SECURITY_TOKEN': + print('AWS_SECURITY_TOKEN is deprecated', file=sys.stderr) + + if 'AWS_PROFILE' not in os.environ and not profile_name: + print('NOTE: AWS_PROFILE is not set, so we will look for the default profile', file=sys.stderr) + profile_name = 'default' + elif not profile_name: + profile_name = os.environ.get('AWS_PROFILE', 'default') + + scf = os.environ.get('AWS_SHARED_CREDENTIALS_FILE', '~/.aws/credentials') + scf = os.path.expanduser(scf) + if os.path.exists(scf): + print(scf, 'exists', file=sys.stderr) + # XXX read it + # only allowed to have 3 keys. every section is a profile name + # aws_access_key_id, aws_secret_access_key, aws_session_token + + if os.path.exists(os.path.expanduser('~/.aws/config')): + print('~/.aws/config', 'exists', file=sys.stderr) + # XXX read it + # profiles have to have section names like "profile prod" + # in addition to profiles with the 3 keys, this can have region, region_name, s3_staging_dir + # can also have "assume role provider" in a profile + # source_profile=foo will look for foo in either config or credentials + + for f in ('/etc/boto.cfg', '~/.boto'): if os.path.exists(os.path.expanduser(f)): print(f, 'exists', file=sys.stderr) + # XXX only uses the [Credentials] section + + print('finally: instance metadata if running in EC2 with IAM', file=sys.stderr) + + +def log_session_info(session): + LOGGER.debug('Session info:') + LOGGER.debug('session.profile_name: ' + session.profile_name) + LOGGER.debug('session.available_profiles: ' + session.available_profiles) + LOGGER.debug('session.region_name: ' + session.region_name) + # get_credentials ? botocore.credential.Credential + + +kinds = { + 'default': {'cursor': Cursor}, + 'dict': {'cursor': DictCursor}, + 'pandas': {'cursor': PandasCursor}, + 'default_async': {'cursor': AsyncCursor}, + 'dict_async': {'cursor': AsyncDictCursor}, + 'pandas_async': {'cursor': AsyncPandasCursor}, +} def get_athena(**kwargs): LOGGER.info('connecting to athena') + # XXX verify that s3_staging_dir is set, and not set to common crawl's bucket (which is ro) + # XXX verify that there is a schema_name + + cursor_class = kwargs.pop('cursor_class', None) + if not cursor_class: + kind = kwargs.pop('kind', 'default') + cursor_class = kinds.get(kind) + if not cursor_class: + raise ValueError('Unknown cursor kind of '+kind) + elif kind in kwargs and kwargs[kind]: + LOGGER.warning('User specified both cursor_class and kind, ignoring kind') + try: - connection = connect(**kwargs) + connection = connect(cursor_class=cursor_class, **kwargs) except Exception: - print_debug_info(params=kwargs) + debug_credentials(params=kwargs) raise + log_session_info(connection.session) + + # XXX cursor._result_set_class = OurExponentialResultClass return connection def asetup(connection, **kwargs): - LOGGER.info('creating database') + + # XXX verify that there is a schema_name? needed for create and repair + create_database = 'CREATE DATABASE ccindex' + try: cursor = my_execute(connection, create_database, warn_for_cost=True, **kwargs) except pyathena.error.OperationalError as e: @@ -196,32 +286,13 @@ def asetup(connection, **kwargs): my_execute(connection, create_table, warn_for_cost=True, **kwargs) LOGGER.info('repairing table') - # ccindex -> "ccindex"."ccindex" + # depends on schema_name="ccindex' repair_table = ''' MSCK REPAIR TABLE ccindex; ''' my_execute(connection, repair_table, warn_for_cost=True, **kwargs) -class WrapCursor: - ''' - Make the cursor iterator easier to use, returns a dict with keys for the field names - XXX consider making this a subclass of pyathena.cursor.Cursor ? - ''' - def __init__(self, cursor): - self.cursor = cursor - self.fields = [d[0] for d in cursor.description] - if self.fields: - LOGGER.info('observed fields of %s', ','.join(self.fields)) - - def __next__(self): - row = next(self.cursor) - return dict(zip(self.fields, row)) - - def __iter__(self): - return self - - def my_execute(connection, sql, params={}, dry_run=False, print_cost=True, print_messages=True, warn_for_cost=False, raise_for_messages=False): @@ -244,7 +315,7 @@ def my_execute(connection, sql, params={}, dry_run=False, try: cursor.execute(sql) except Exception: - print_debug_info() + debug_credentials() raise m = None @@ -258,12 +329,12 @@ def my_execute(connection, sql, params={}, dry_run=False, LOGGER.info('estimated cost $%.6f', c) if m and raise_for_messages: - raise ValueError('Expected no messages') + raise ValueError('Expected no messages, see above') - return WrapCursor(cursor) + return cursor -def iter(connection, **kwargs): +def aiter(connection, **kwargs): # form the query: # all verbatim queries to be passed in # if not verbatim: @@ -293,7 +364,7 @@ def get_all_crawls(connection, **kwargs): return sorted(ret) -def get_summary(connection, **kwargs): +def asummary(connection, **kwargs): count_by_partition = ''' SELECT COUNT(*) as n_captures, crawl, @@ -309,14 +380,14 @@ def get_summary(connection, **kwargs): return '\n'.join([json.dumps(row, sort_keys=True) for row in cursor]) -def run_sql_from_file(connection, cmd, **kwargs): +def asql(connection, cmd, **kwargs): with open(cmd.file, 'r') as fd: sql = fd.read() params = {} if cmd.param: for p in cmd.param: - if '=' not in p: + if p.count('=') != 1: raise ValueError('paramters should have a single equals sign') k, v = p.split('=', 1) params[k] = v diff --git a/cdx_toolkit/cli.py b/cdx_toolkit/cli.py index 8a70718..c57b0db 100644 --- a/cdx_toolkit/cli.py +++ b/cdx_toolkit/cli.py @@ -313,12 +313,12 @@ def asetuper(cmd, cmdline): def asummarizer(cmd, cmdline): connection, kwargs = athena_init(cmd) - print(athena.get_summary(connection, **kwargs)) + print(athena.asummarize(connection, **kwargs)) def asqler(cmd, cmdline): connection, kwargs = athena_init(cmd) - print(athena.run_sql_from_file(connection, cmd, **kwargs)) + print(athena.asql(connection, cmd, **kwargs)) def aiterator(cmd, cmdline): diff --git a/tests/test_cli.py b/tests/test_cli.py index a69f922..71aa173 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -185,7 +185,7 @@ def one_ia_corner(tmpdir, cmdline): def test_warc_ia_corners(tmpdir, caplog): ''' To test these more properly, need to add a --exact-warcname and then postprocess. - For now, these tests show up in the coverage report + For now, these are just crash-only tests ''' # revisit vivification @@ -203,3 +203,12 @@ def test_warc_ia_corners(tmpdir, caplog): # warcing a 404 is a corner case in myrequests cmdline = '--ia --from 20080512074145 --to 20080512074145 warc http://www.pbm.com/oly/archive/design94/0074.html' one_ia_corner(tmpdir, cmdline) + + +def test_athena(): + # tests of configuration file reading? can be unit tests? + + # asetup (must be done first, but it's implicit in all of the following) + # asummarize + # asql (test an example that's in the docs) + # aiter (something short, using the new reverse hostname field? From 2962322e24ba5bbb275f8c9275334eacb3a67715 Mon Sep 17 00:00:00 2001 From: Greg Lindahl Date: Fri, 7 Jan 2022 20:25:25 -0800 Subject: [PATCH 6/8] typos --- cdx_toolkit/athena.py | 2 +- tests/test_cli.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/cdx_toolkit/athena.py b/cdx_toolkit/athena.py index 329c101..edf0887 100644 --- a/cdx_toolkit/athena.py +++ b/cdx_toolkit/athena.py @@ -12,7 +12,7 @@ from pyathena.pandas.cursor import PandasCursor from pyathena.async_cursor import AsyncCursor, AsyncDictCursor from pyathena.pandas.async_cursor import AsyncPandasCursor -from pyathena.utils import parse_output_location +from pyathena.util import parse_output_location LOGGER = logging.getLogger(__name__) diff --git a/tests/test_cli.py b/tests/test_cli.py index 71aa173..1acc574 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -212,3 +212,4 @@ def test_athena(): # asummarize # asql (test an example that's in the docs) # aiter (something short, using the new reverse hostname field? + pass From 7d258f5570655b4b8642ebf3b6edf35c80fcf7b4 Mon Sep 17 00:00:00 2001 From: Greg Lindahl Date: Fri, 7 Jan 2022 20:33:35 -0800 Subject: [PATCH 7/8] bump versions and note athena api change --- requirements.txt | 16 ++++++++-------- setup.py | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/requirements.txt b/requirements.txt index 5cc6d1d..427a1a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,17 +2,17 @@ # names must be kept in sync with setup.py # exact versions are tested in the azure-pipelines.yml config -requests==2.25.1 +requests==2.27.1 warcio==1.7.4 -pyathena==1.11.1 +pyathena==2.3.2 # used by Makefile -pytest==6.2.4 -pytest-cov==2.12.1 +pytest==6.2.5 +pytest-cov==3.0.0 pytest-sugar==0.9.4 -coveralls==3.1.0 +coveralls==3.3.1 # packaging -twine==3.4.1 -setuptools==57.0.0 -setuptools-scm==6.0.1 +twine==3.7.1 +setuptools==59.6.0 +setuptools-scm==6.3.2 diff --git a/setup.py b/setup.py index 6a3c462..b06474b 100755 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ package_requirements = ['twine', 'setuptools', 'setuptools-scm'] extras_require = { - 'athena': ['pyathena'], + 'athena': ['pyathena>=2'], 'test': test_requirements, # setup no longer tests, so make them an extra 'package': package_requirements, } From 28ad01c39ce87595301ffea801b9c0d376f7af14 Mon Sep 17 00:00:00 2001 From: Greg Lindahl Date: Fri, 14 Jan 2022 11:40:19 -0800 Subject: [PATCH 8/8] WIP --- .gitignore | 1 + ATHENA.md | 44 ++++++++++++++++++++++++++++++ README.md | 6 ++++- cdx_toolkit/athena.py | 48 +++++++++++++++++++++------------ cdx_toolkit/cli.py | 63 +++++++++++++++---------------------------- setup.py | 2 +- 6 files changed, 104 insertions(+), 60 deletions(-) create mode 100644 ATHENA.md diff --git a/.gitignore b/.gitignore index dd8c712..f3e1c4b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ dist/ __pycache__ cdx_toolkit.egg-info .coverage +.eggs/ diff --git a/ATHENA.md b/ATHENA.md new file mode 100644 index 0000000..d0a6caf --- /dev/null +++ b/ATHENA.md @@ -0,0 +1,44 @@ +# Using cdx_toolkit's columnar index with Athena + +## Installing + +``` +$ pip install cdx_toolkit[athena] +``` + +## Credentials and Configuration + +In addition to having AWS credentials, a few more configuration items are needed. + +credentials: can be done multiple ways, here is one: ~/.aws/config and [profile cdx_athena] + +aws_access_key_id +aws_secret_access_key + +s3_staging_dir=, needs to be writeable, need to explain how to clear this bucket +schema_name= will default to 'ccindex', this is the database name, not the table name + +region=us-east-1 # this is the default, and this is where CC's data is stored +# "When specifying a Region inline during client initialization, this property is named region_name." +s3_staging_dir=s3://dshfjhfkjhdfshjgdghj/staging + + +## Initializing the database + +asetup +asummary +get_all_crawls + +## Arbitrary queries + +asql +explain the partitions +explain how to override the safety-belt LIMIT + +## Iterating similar to the CDX index + +## Generating subset WARCs from an sql query or iteration + +## Clearing the staging directory + +configure rclone diff --git a/README.md b/README.md index c23f818..cae13ab 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,10 @@ hides these differences as best it can. cdx_toolkit also knits together the monthly Common Crawl CDX indices into a single, virtual index. +CommonCrawl also has a non-CDX "columnar index" hosted on AWS, +accessible via the (paid) Amazon Athena service. This index can be +queried using SQL, and has a few columns not present in the CDX index. + Finally, cdx_toolkit allows extracting archived pages from CC and IA into WARC files. If you're looking to create subsets of CC or IA data and then process them into WET or WAT files, this is a feature you'll @@ -218,7 +222,7 @@ cdx_toolkit has reached the beta-testing stage of development. ## License -Copyright 2018-2020 Greg Lindahl and others +Copyright 2018-2022 Greg Lindahl and others Licensed under the Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License. diff --git a/cdx_toolkit/athena.py b/cdx_toolkit/athena.py index edf0887..d6e8939 100644 --- a/cdx_toolkit/athena.py +++ b/cdx_toolkit/athena.py @@ -15,6 +15,7 @@ from pyathena.util import parse_output_location LOGGER = logging.getLogger(__name__) +saved_connect_kwargs = None def all_results_properties(cursor): @@ -175,10 +176,15 @@ def debug_credentials(params=None): print('here are some debugging hints: add at least one -v early in the command line', file=sys.stderr) print('these are in the same order that boto3 uses them:', file=sys.stderr) + if saved_connect_kwargs: + print('athena connect kwargs:', file=sys.stderr) + for k, v in params.items(): + print(' ', k, v, file=sys.stderr) if params: - print('params:', file=sys.stderr) - for k, v in params: + print('params for this call:', file=sys.stderr) + for k, v in params.items(): print(' ', k, v, file=sys.stderr) + profile_name = params.get('profile_name') print('environment variables', file=sys.stderr) @@ -186,12 +192,12 @@ def debug_credentials(params=None): if k.startswith('AWS_'): if k in {'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN', 'AWS_SECURITY_TOKEN'}: v = '' - print(k, v, file=sys.stderr) + print(' ', k, v, file=sys.stderr) if k == 'AWS_SECURITY_TOKEN': print('AWS_SECURITY_TOKEN is deprecated', file=sys.stderr) if 'AWS_PROFILE' not in os.environ and not profile_name: - print('NOTE: AWS_PROFILE is not set, so we will look for the default profile', file=sys.stderr) + print(' ', 'NOTE: AWS_PROFILE is not set, so we will look for the default profile', file=sys.stderr) profile_name = 'default' elif not profile_name: profile_name = os.environ.get('AWS_PROFILE', 'default') @@ -223,7 +229,7 @@ def debug_credentials(params=None): def log_session_info(session): LOGGER.debug('Session info:') LOGGER.debug('session.profile_name: ' + session.profile_name) - LOGGER.debug('session.available_profiles: ' + session.available_profiles) + LOGGER.debug('session.available_profiles: ' + str(session.available_profiles)) LOGGER.debug('session.region_name: ' + session.region_name) # get_credentials ? botocore.credential.Credential @@ -238,7 +244,7 @@ def log_session_info(session): } -def get_athena(**kwargs): +def aconnect(**kwargs): LOGGER.info('connecting to athena') # XXX verify that s3_staging_dir is set, and not set to common crawl's bucket (which is ro) @@ -247,16 +253,19 @@ def get_athena(**kwargs): cursor_class = kwargs.pop('cursor_class', None) if not cursor_class: kind = kwargs.pop('kind', 'default') - cursor_class = kinds.get(kind) + cursor_class = kinds.get(kind).get('cursor') if not cursor_class: raise ValueError('Unknown cursor kind of '+kind) elif kind in kwargs and kwargs[kind]: LOGGER.warning('User specified both cursor_class and kind, ignoring kind') + global saved_connect_kwargs + saved_connect_kwargs = kwargs + try: connection = connect(cursor_class=cursor_class, **kwargs) except Exception: - debug_credentials(params=kwargs) + debug_credentials() raise log_session_info(connection.session) @@ -269,6 +278,7 @@ def asetup(connection, **kwargs): LOGGER.info('creating database') # XXX verify that there is a schema_name? needed for create and repair + # XXX schema_name can be specified when you create the cursor, pass it to .cursor() create_database = 'CREATE DATABASE ccindex' @@ -279,26 +289,28 @@ def asetup(connection, **kwargs): LOGGER.info('database ccindex already exists') else: cursor = connection.cursor() + # XXX does this really work? a fresh cursor has the previous output_location in it ??? print_text_messages(connection, cursor.output_location) raise LOGGER.info('creating table') + # depends on schema_name="ccindex" my_execute(connection, create_table, warn_for_cost=True, **kwargs) LOGGER.info('repairing table') - # depends on schema_name="ccindex' + # depends on schema_name="ccindex" repair_table = ''' MSCK REPAIR TABLE ccindex; ''' my_execute(connection, repair_table, warn_for_cost=True, **kwargs) -def my_execute(connection, sql, params={}, dry_run=False, +def my_execute(connection, sql, sql_params={}, dry_run=False, print_cost=True, print_messages=True, - warn_for_cost=False, raise_for_messages=False): + warn_for_cost=False, raise_for_messages=False, **kwargs): try: - sql = sql % params + sql = sql % sql_params except KeyError as e: raise KeyError('sql template referenced an unknown parameter: '+str(e)) except ValueError as e: @@ -310,12 +322,13 @@ def my_execute(connection, sql, params={}, dry_run=False, print(sql, file=sys.stderr) return [] # callers expect an iterable - cursor = connection.cursor() + cursor = connection.cursor() # XXX kwargs? schema_name= exists in both Connection and Cursor objects try: - cursor.execute(sql) - except Exception: - debug_credentials() + cursor.execute(sql) # XXX kwargs? + except Exception as e: + LOGGER.warning('saw exception {} in my_execute'.format(str(e))) + debug_credentials(**kwargs) raise m = None @@ -359,7 +372,8 @@ def get_all_crawls(connection, **kwargs): ret = [] for row in cursor: - ret.append(row['crawl']) + # XXX this needs to take into account the cursor class? + ret.append(row['crawl']) # XXX row is a tuple, return sorted(ret) diff --git a/cdx_toolkit/cli.py b/cdx_toolkit/cli.py index c57b0db..10f7dd2 100644 --- a/cdx_toolkit/cli.py +++ b/cdx_toolkit/cli.py @@ -30,11 +30,17 @@ def add_global_args(parser): parser.add_argument('--filter', action='append', help='see CDX API documentation for usage') -def main(args=None): - parser = ArgumentParser(description='cdx_toolkit iterator command line tool') +def add_athena_args(parser): + parser.add_argument('--profile-name', action='store', help='choose which section of your boto conf files is used') + parser.add_argument('--role-arn', action='store', help='Amazon resource name roley') + parser.add_argument('--work-group', action='store', help='Amazon Athena work group name') + parser.add_argument('--s3-staging-dir', action='store', help='an s3 bucket to hold outputs') + parser.add_argument('--region-name', action='store', default='us-east-1', + help='AWS region to use, you probably want the one the commoncrawl data is in (us-east-1)') + parser.add_argument('--dry-run', '-n', action='store_true', help='print the SQL and exit without executing it') - add_global_args(parser) +def add_cdx_args(parser): parser.add_argument('--cc', action='store_const', const='cc', help='direct the query to the Common Crawl CDX/WARCs') parser.add_argument('--ia', action='store_const', const='ia', help='direct the query to the Internet Archive CDX/wayback') parser.add_argument('--source', action='store', help='direct the query to this CDX server') @@ -44,6 +50,14 @@ def main(args=None): parser.add_argument('--get', action='store_true', help='use a single get instead of a paged iteration. default limit=1000') parser.add_argument('--closest', action='store', help='get the closest capture to this timestamp. use with --get') + +def main(args=None): + parser = ArgumentParser(description='cdx_toolkit iterator command line tool') + + add_global_args(parser) + add_athena_args(parser) + add_cdx_args(parser) + subparsers = parser.add_subparsers(dest='cmd') subparsers.required = True @@ -71,51 +85,18 @@ def main(args=None): size.add_argument('url', help='') size.set_defaults(func=sizer) - if args is not None: - cmdline = ' '.join(args) - else: # pragma: no cover - # there's something magic about args and console_scripts - # this fallback is needed when installed by setuptools - if len(sys.argv) > 1: - cmdline = 'cdxt ' + ' '.join(sys.argv[1:]) - else: - cmdline = 'cdxt' - cmd = parser.parse_args(args=args) - set_loglevel(cmd) - cmd.func(cmd, cmdline) - - -def add_athena_args(parser): - parser.add_argument('--profile-name', action='store', help='choose which section of your boto conf files is used') - parser.add_argument('--role-arn', action='store', help='Amazon resource name roley') - parser.add_argument('--work-group', action='store', help='Amazon Athena work group name') - parser.add_argument('--s3-staging-dir', action='store', help='an s3 bucket to hold outputs') - parser.add_argument('--region-name', action='store', default='us-east-1', - help='AWS region to use, you probably want the one the commoncrawl data is in (us-east-1)') - parser.add_argument('--dry-run', '-n', action='store_true', help='print the SQL and exit without executing it') - - -def main_athena(args=None): - parser = ArgumentParser(description='CommonCrawl column database command line tool') - - add_global_args(parser) - add_athena_args(parser) - - subparsers = parser.add_subparsers(dest='cmd') - subparsers.required = True - - asetup = subparsers.add_parser('setup', help='set up amazon athena ccindex database and table') + asetup = subparsers.add_parser('asetup', help='set up amazon athena ccindex database and table') asetup.set_defaults(func=asetuper) - asummarize = subparsers.add_parser('summarize', help='summarize the partitions currently in the table') + asummarize = subparsers.add_parser('asummarize', help='summarize the partitions currently in the table') asummarize.set_defaults(func=asummarizer) - asql = subparsers.add_parser('sql', help='run arbitrary SQL statement from a file') + asql = subparsers.add_parser('asql', help='run arbitrary SQL statement from a file') asql.add_argument('--param', action='append', help='parameteres for templating the SQL, e.g. SUBSET=warc') asql.add_argument('file', help='') asql.set_defaults(func=asqler) - aiter = subparsers.add_parser('iter', help='iterate printing captures') + aiter = subparsers.add_parser('aiter', help='iterate printing captures') aiter.add_argument('--all-fields', action='store_true') aiter.add_argument('--fields', action='store', default='url,warc_filename,warc_record_offset,warc_record_length', help='try --all-fields if you need the list') aiter.add_argument('--jsonl', action='store_true') @@ -301,7 +282,7 @@ def athena_init(cmd): if 'dry_run' in cmd and cmd.dry_run: kwargs['dry_run'] = True - connection = athena.get_athena(**conn_kwargs) + connection = athena.aconnect(**conn_kwargs) return connection, kwargs diff --git a/setup.py b/setup.py index b06474b..bbaf7a9 100755 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ package_requirements = ['twine', 'setuptools', 'setuptools-scm'] extras_require = { - 'athena': ['pyathena>=2'], + 'athena': ['pyathena[pandas]>=2'], 'test': test_requirements, # setup no longer tests, so make them an extra 'package': package_requirements, }