Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-implement import-sql using Python #378

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ import-borders load borders.csv # Load borders.csv into a table
This utility requires PostgreSQL's PG* environment variables, and optionally uses `PBF_DATA_DIR, BORDERS_PBF_FILE, BORDERS_CSV_FILE, BORDERS_TABLE_NAME`.

## Importing into Postgres
The `import-sql` script can execute a single SQL file in Postgres when the file is given as the first parameter.
The `run-psql` helper script will run psql with one or more files provided. It can also run them in parallel. In case of an error, `run-psql` will not start any new files, but will wait for all started files to finish. Once finished, it will re-print the last few lines of STDERR to simplify debugging. Unlike legacy `import-sql`, `run-psql` does not make any assumptions about SQL directories.

The legacy `import-sql` script can execute a single SQL file in Postgres when the file is given as the first parameter. The script is obsolete and should not be used.
TomPohys marked this conversation as resolved.
Show resolved Hide resolved

If ran without any arguments, `import-sql` executes all of the following:
* SQL files from `$SQL_TOOLS_DIR` - contains all SQL files to be imported before the ones generated by OpenMapTiles project. By default, contains OMT fork of the Mapbox's [postgis-vt-util.sql](https://github.com/openmaptiles/postgis-vt-util) helper functions and contains the [sql/language.sql](sql/zzz_language.sql) script. (as `10_postgis-vt-util.sql` - the file name has to be imported before files in `/sql`)
Expand All @@ -441,7 +443,7 @@ If ran without any arguments, `import-sql` executes all of the following:
Generating and importing SQL could be done in a single step with `&&`, e.g.

```bash
generate-sqltomvt openmaptiles.yaml > "$SQL_DIR/mvt.sql" && import-sql
generate-sqltomvt openmaptiles.yaml > "$SQL_DIR/mvt.sql" && run-psql "$SQL_DIR/mvt.sql"
```

Optionally you may pass extra arguments to `psql` by using `PSQL_OPTIONS` environment variable. For example `PSQL_OPTIONS=-a` makes psql echo all commands read from a file into stdout.
Expand Down
49 changes: 16 additions & 33 deletions bin/import-sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
set -o errexit
set -o errexit # set -e
set -o pipefail
set -o nounset
set -o nounset # set -u
shopt -s nullglob

# For backward compatibility, allow both PG* and POSTGRES_* forms,
Expand All @@ -13,69 +13,52 @@ export PGUSER="${POSTGRES_USER:-${PGUSER?}}"
export PGPASSWORD="${POSTGRES_PASSWORD:-${PGPASSWORD?}}"
export PGPORT="${POSTGRES_PORT:-${PGPORT:-5432}}"

PSQL="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/run-psql"

function exec_psql_file() {
local file_name="$1"
# Allows additional parameters to be passed to psql
# For example, PSQL_OPTIONS='-a -A' would echo everything and disable output alignment
# Using eval allows complex cases with quotes, like PSQL_OPTIONS=" -a -c 'select ...' "
eval "local psql_opts=(${PSQL_OPTIONS:-})"

echo "Importing $file_name (md5 $(md5sum < "$file_name") $(wc -l < "$file_name") lines) into Postgres..."

# shellcheck disable=SC2154
psql \
-v ON_ERROR_STOP="1" \
-c '\timing on' \
-f "$file_name" \
"${psql_opts[@]}"
}

function import_all_sql_files() {
local dir="$1"
local sql_file

if [[ -d "$dir/parallel" ]]; then
# Assume this dir may contain run_first.sql, parallel/*.sql, and run_last.sql
# use parallel execution
if [[ -f "$dir/run_first.sql" ]]; then
exec_psql_file "$dir/run_first.sql"
"${PSQL}" "$dir/run_first.sql"
else
echo "File $dir/run_first.sql not found, skipping"
fi

# Run import-sql script in parallel, up to MAX_PARALLEL_PSQL processes at the same time
: "${MAX_PARALLEL_PSQL:=5}"
echo "Importing $(find "$dir/parallel" -name "*.sql" | wc -l) sql files from $dir/parallel/, up to $MAX_PARALLEL_PSQL files at the same time"
find "$dir/parallel" -name "*.sql" -print0 | xargs -0 -I{} -P "$MAX_PARALLEL_PSQL" sh -c "\"$0\" \"{}\" || exit 255"
echo "Finished importing sql files matching '$dir/parallel/*.sql'"
"${PSQL}" --parallel "${MAX_PARALLEL_PSQL:=1}" "$dir/parallel"

if [[ -f "$dir/run_last.sql" ]]; then
exec_psql_file "$dir/run_last.sql"
"${PSQL}" "$dir/run_last.sql"
else
echo "File $dir/run_last.sql not found, skipping"
fi
else
for sql_file in "$dir"/*.sql; do
exec_psql_file "$sql_file"
done
"${PSQL}" "$dir"
fi
}

# If there are no arguments, imports everything,
# otherwise the first argument is the name of a file to be imported.
if [[ $# -eq 0 ]]; then
if [[ "${SQL_TOOLS_DIR:-}" == "" ]]; then
echo "ENV variable SQL_TOOLS_DIR is not set. It should contain directory with .sql files, e.g. postgis-vt-util.sql and language.sql"
if [[ "${SQL_TOOLS_DIR:-}" == "" ]] || [[ ! -d "${SQL_TOOLS_DIR}" ]]; then
echo "ENV variable SQL_TOOLS_DIR is not set or there are no files. It should contain directory with .sql files, e.g. postgis-vt-util.sql and language.sql"
exit 1
else
echo "SQL_TOOLS_DIR=$SQL_TOOLS_DIR"
fi
if [[ "${SQL_DIR:-}" == "" ]]; then
echo "ENV variable SQL_DIR is not set. It should contain directory with .sql files, e.g. tileset.sql, or run_first.sql, run_last.sql, and parallel/*.sql"
if [[ "${SQL_DIR:-}" == "" ]] || [[ ! -d "${SQL_DIR}" ]]; then
echo "ENV variable SQL_DIR is not set or there are no files. It should contain directory with .sql files, e.g. tileset.sql, or run_first.sql, run_last.sql, and parallel/*.sql"
exit 1
else
echo "SQL_DIR=$SQL_DIR"
fi

import_all_sql_files "$SQL_TOOLS_DIR" # import postgis-vt-util.sql and all other tools SQL files from /sql
import_all_sql_files "$SQL_DIR" # import compiled tileset.sql
else
exec_psql_file "$1"
"${PSQL}" "$1"
fi
186 changes: 186 additions & 0 deletions bin/run-psql
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
#!/usr/bin/env python
"""
Usage:
run-psql <path>...
[--parallel=<num> [--print-err=<num>]]
[--pghost=<host>] [--pgport=<port>] [--dbname=<db>]
[--user=<user>] [--password=<password>] [--verbose]
[-- <psql-args>...]
run-psql --help
run-psql --version

Options:
<path> One or more SQL files or directories to run using psql.
<psql-args> Override default psql parameters.
-p --parallel=<num> Run all files/dirs specified with <path>... in parallel,
up to <num> at the same time. [default: 1]
-e --print-err=<num> If psql exits with an error, print last <num> output lines
after all other psql imports finish. Disabled if 0. [default: 10]
-v --verbose Print additional debugging information
--help Show this screen.
--version Show version.

PostgreSQL Options:
-h --pghost=<host> Postgres hostname. By default uses PGHOST env or "localhost" if not set.
-P --pgport=<port> Postgres port. By default uses PGPORT env or "5432" if not set.
-d --dbname=<db> Postgres db name. By default uses PGDATABASE env or "openmaptiles" if not set.
-U --user=<user> Postgres user. By default uses PGUSER env or "openmaptiles" if not set.
--password=<password> Postgres password. By default uses PGPASSWORD env or "openmaptiles" if not set.

These legacy environment variables should not be used, but they are still supported:
POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD
"""
import collections
import os
from dataclasses import dataclass
from hashlib import md5
from multiprocessing import Pool, Value
from pathlib import Path
from select import select
from subprocess import Popen, PIPE, list2cmdline
from sys import stdout, stderr
from typing import List, Optional

# noinspection PyProtectedMember
from docopt import docopt, DocoptExit

import openmaptiles
from openmaptiles.pgutils import parse_pg_args


@dataclass
class Params:
print_on_err: int
psql_args: Optional[List[str]]
pghost: str
pgport: str
dbname: str
user: str
password: str
verbose: bool


stop_value: Optional[Value] = None


def last_lines(output: List[bytes], count: int, name: str) -> str:
if output and count > 0:
text = b''.join(output).decode('utf-8').strip()
if text:
lines = text.split('\n')[-count:]
out = '\n'.join(lines)
return f'\n###### Last {len(lines)} lines of {name}:\n{out}'
return ''


def run_psql(file: Path, params: Params):
if stop_value.value != 0:
print(f'Skipping {file}', file=stderr)
return None

cmd = ['stdbuf', '-oL', '-e0', 'psql', '-f', file]
if params.psql_args is None:
cmd.extend(['-v', 'ON_ERROR_STOP=1', '-c', r'\timing on'])
else:
cmd.extend(params.psql_args)

env = {
'PGHOST': params.pghost,
'PGDATABASE': params.dbname,
'PGUSER': params.user,
'PGPASSWORD': params.password,
'PGPORT': params.pgport,
}

sql_content = file.read_bytes()
md5sum = md5(sql_content).hexdigest()
lines = sql_content.decode('utf-8').count('\n')
print(f'Importing {file} (md5 {md5sum} {lines} lines) into Postgres...')
if params.verbose:
print(f' {list2cmdline(cmd)}')
print('Environment Variables:')
for k, v in sorted(env.items()):
print(f' {k}={v}')

stderr_buf = []
with Popen(cmd, stdout=PIPE, stderr=PIPE, env=env) as proc:
readable = {
proc.stdout.fileno(): stdout.buffer,
proc.stderr.fileno(): stderr.buffer,
}
while readable:
for fd in select(readable, [], [])[0]:
data = os.read(fd, 1024) # read available
if not data:
del readable[fd]
else:
if params.print_on_err > 0 and fd == proc.stderr.fileno():
stderr_buf.append(data)
readable[fd].write(data)
readable[fd].flush()
# there should be no more output, should be safe to wait
exit_code = proc.wait()
if exit_code == 0:
print(f'Finished importing {file}...')
return None

stop_value.value = 1
result = f'File {file} failed with error {exit_code}'
print(result)
return result + last_lines(stderr_buf, params.print_on_err, 'STDERR')


def main(args):
pghost, pgport, dbname, user, password = parse_pg_args(args)
paths = args['<path>']
if '--' in paths:
pos = paths.index('--')
psql_args = paths[pos + 1:]
paths = paths[:pos]
else:
psql_args = None
params = Params(int(args['--print-err']), psql_args, pghost, pgport, dbname, user, password, args['--verbose'])

paths = [Path(v).resolve() for v in paths]
dups = [vv for vv, cnt in collections.Counter((str(v) for v in paths)).items() if cnt > 1]
if dups:
raise DocoptExit(f'ERROR: Duplicate paths: [{"], [".join(dups)}]')
missing = []
files = []
for path in paths:
if not path.exists():
missing.append(path)
elif path.is_dir():
for file in path.glob('*.sql'):
if file.is_file():
files.append(file)
else:
files.append(path)
if missing:
raise DocoptExit(f'ERROR: Path does not exist: [{"], [".join(str(v) for v in missing)}]')
if not files:
# For compatibility, allow empty dir import
print('No .sql files were found')
exit(0)

global stop_value
stop_value = Value('b', 0)

parallel = int(args['--parallel'])
with Pool(processes=parallel) as pool:
print(f'Importing {len(files)} files...')
tasks = [(file, params) for file in sorted(files)]
async_result = pool.starmap_async(run_psql, tasks, chunksize=1)
results = async_result.get()

if stop_value.value != 0:
if parallel > 1:
print('-------------------------- ERROR SUMMARY ---------------------------------')
print('\n'.join(v for v in results if v))
exit(1)
else:
print('All SQL files successfully executed')


if __name__ == '__main__':
main(docopt(__doc__, version=openmaptiles.__version__))
1 change: 1 addition & 0 deletions tests/sql/test-sql.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ done
sleep 1

# Import all pre-required SQL code
mkdir -p "$SQL_DIR"
source import-sql

echo "++++++++++++++++++++++++"
Expand Down