diff --git a/README.md b/README.md index 01f9630..0949ecb 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ Foreign table may be created for a single Parquet file and for a set of files. I Following table options are supported: * **filename** - space separated list of paths to Parquet files to read; * **sorted** - space separated list of columns that Parquet files are presorted by; that would help postgres to avoid redundant sorting when running query with `ORDER BY` clause or in other cases when having a presorted set is beneficial (Group Aggregate, Merge Join); +* **files_in_order** - specifies that files specified by `filename` or returned by `files_func` are ordered according to `sorted` option and have no intersection rangewise; this allows to use `Gather Merge` node on top of parallel Multifile scan (default `false`); * **use_mmap** - whether memory map operations will be used instead of file read operations (default `false`); * **use_threads** - enables Apache Arrow's parallel columns decoding/decompression (default `false`); * **files_func** - user defined function that is used by parquet_fdw to retrieve the list of parquet files on each query; function must take one `JSONB` argument and return text array of full paths to parquet files; diff --git a/input/parquet_fdw.source b/input/parquet_fdw.source index 2d7376f..4482619 100644 --- a/input/parquet_fdw.source +++ b/input/parquet_fdw.source @@ -164,6 +164,8 @@ EXPLAIN (COSTS OFF) SELECT * FROM example_seq ORDER BY two; EXPLAIN (COSTS OFF) SELECT * FROM example_sorted; EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY one; EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY two; +ALTER FOREIGN TABLE example_sorted OPTIONS (ADD files_in_order 'true'); +EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY one; EXPLAIN (COSTS OFF) SELECT * FROM example1; SELECT SUM(one) FROM example1; diff --git a/output/parquet_fdw.source b/output/parquet_fdw.source index 8a6e4f4..c604a99 100644 --- a/output/parquet_fdw.source +++ b/output/parquet_fdw.source @@ -437,16 +437,18 @@ EXPLAIN (COSTS OFF) SELECT * FROM example_sorted; (7 rows) EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY one; - QUERY PLAN ------------------------------------------------ + QUERY PLAN +----------------------------------------------------- Gather Merge Workers Planned: 2 - -> Parallel Foreign Scan on example_sorted - Reader: Multifile - Row groups: - example1.parquet: 1, 2 - example2.parquet: 1 -(7 rows) + -> Sort + Sort Key: one + -> Parallel Foreign Scan on example_sorted + Reader: Multifile + Row groups: + example1.parquet: 1, 2 + example2.parquet: 1 +(9 rows) EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY two; QUERY PLAN @@ -462,6 +464,19 @@ EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY two; example2.parquet: 1 (9 rows) +ALTER FOREIGN TABLE example_sorted OPTIONS (ADD files_in_order 'true'); +EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY one; + QUERY PLAN +----------------------------------------------- + Gather Merge + Workers Planned: 2 + -> Parallel Foreign Scan on example_sorted + Reader: Multifile + Row groups: + example1.parquet: 1, 2 + example2.parquet: 1 +(7 rows) + EXPLAIN (COSTS OFF) SELECT * FROM example1; QUERY PLAN ----------------------------------------- diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index a8e58b1..3d5c6b5 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -109,6 +109,7 @@ struct ParquetFdwPlanState bool use_mmap; bool use_threads; int32 max_open_files; + bool files_in_order; List *rowgroups; /* List of Lists (per filename) */ uint64 ntuples; ReaderType type; @@ -832,6 +833,7 @@ get_table_options(Oid relid, ParquetFdwPlanState *fdw_private) fdw_private->use_mmap = false; fdw_private->use_threads = false; fdw_private->max_open_files = 0; + fdw_private->files_in_order = false; table = GetForeignTable(relid); foreach(lc, table->options) @@ -857,25 +859,21 @@ get_table_options(Oid relid, ParquetFdwPlanState *fdw_private) } else if (strcmp(def->defname, "use_mmap") == 0) { - if (!parse_bool(defGetString(def), &fdw_private->use_mmap)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid value for boolean option \"%s\": %s", - def->defname, defGetString(def)))); + fdw_private->use_mmap = defGetBoolean(def); } else if (strcmp(def->defname, "use_threads") == 0) { - if (!parse_bool(defGetString(def), &fdw_private->use_threads)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid value for boolean option \"%s\": %s", - def->defname, defGetString(def)))); + fdw_private->use_threads = defGetBoolean(def); } else if (strcmp(def->defname, "max_open_files") == 0) { /* check that int value is valid */ fdw_private->max_open_files = pg_atoi(defGetString(def), sizeof(int32), '\0'); } + else if (strcmp(def->defname, "files_in_order") == 0) + { + fdw_private->files_in_order = defGetBoolean(def); + } else elog(ERROR, "unknown option '%s'", def->defname); } @@ -1128,18 +1126,22 @@ parquetGetForeignPaths(PlannerInfo *root, if (baserel->consider_parallel > 0) { ParquetFdwPlanState *private_parallel; + bool use_pathkeys = false; private_parallel = (ParquetFdwPlanState *) palloc(sizeof(ParquetFdwPlanState)); memcpy(private_parallel, fdw_private, sizeof(ParquetFdwPlanState)); private_parallel->type = is_multi ? RT_MULTI : RT_SINGLE; + /* For mutifile reader only use pathkeys when files are in order */ + use_pathkeys = is_sorted && (!is_multi || (is_multi && fdw_private->files_in_order)); + Path *parallel_path = (Path *) create_foreignscan_path(root, baserel, NULL, /* default pathtarget */ baserel->rows, startup_cost, total_cost, - pathkeys, + use_pathkeys ? pathkeys : NULL, NULL, /* no outer rel either */ NULL, /* no extra plan */ (List *) private_parallel); @@ -1152,14 +1154,6 @@ parquetGetForeignPaths(PlannerInfo *root, parallel_path->parallel_aware = true; parallel_path->parallel_safe = true; - /* Create GatherMerge path for sorted parquet files */ - if (is_sorted) - { - GatherMergePath *gather_merge = - create_gather_merge_path(root, baserel, parallel_path, NULL, - pathkeys, NULL, NULL); - add_path(baserel, (Path *) gather_merge); - } add_partial_path(baserel, parallel_path); } } @@ -1852,30 +1846,23 @@ parquet_fdw_validator_impl(PG_FUNCTION_ARGS) else if (strcmp(def->defname, "use_mmap") == 0) { /* Check that bool value is valid */ - bool use_mmap; - - if (!parse_bool(defGetString(def), &use_mmap)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid value for boolean option \"%s\": %s", - def->defname, defGetString(def)))); + (void) defGetBoolean(def); } else if (strcmp(def->defname, "use_threads") == 0) { /* Check that bool value is valid */ - bool use_threads; - - if (!parse_bool(defGetString(def), &use_threads)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("invalid value for boolean option \"%s\": %s", - def->defname, defGetString(def)))); + (void) defGetBoolean(def); } else if (strcmp(def->defname, "max_open_files") == 0) { /* check that int value is valid */ pg_atoi(defGetString(def), sizeof(int32), '\0'); } + else if (strcmp(def->defname, "files_in_order") == 0) + { + /* Check that bool value is valid */ + (void) defGetBoolean(def); + } else { ereport(ERROR,