Skip to content

Commit

Permalink
Add files_in_order table option
Browse files Browse the repository at this point in the history
  • Loading branch information
zilder committed Jul 14, 2021
1 parent 06dc3dd commit 086e567
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 41 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Foreign table may be created for a single Parquet file and for a set of files. I
Following table options are supported:
* **filename** - space separated list of paths to Parquet files to read;
* **sorted** - space separated list of columns that Parquet files are presorted by; that would help postgres to avoid redundant sorting when running query with `ORDER BY` clause or in other cases when having a presorted set is beneficial (Group Aggregate, Merge Join);
* **files_in_order** - specifies that files specified by `filename` or returned by `files_func` are ordered according to `sorted` option and have no intersection rangewise; this allows to use `Gather Merge` node on top of parallel Multifile scan (default `false`);
* **use_mmap** - whether memory map operations will be used instead of file read operations (default `false`);
* **use_threads** - enables Apache Arrow's parallel columns decoding/decompression (default `false`);
* **files_func** - user defined function that is used by parquet_fdw to retrieve the list of parquet files on each query; function must take one `JSONB` argument and return text array of full paths to parquet files;
Expand Down
2 changes: 2 additions & 0 deletions input/parquet_fdw.source
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ EXPLAIN (COSTS OFF) SELECT * FROM example_seq ORDER BY two;
EXPLAIN (COSTS OFF) SELECT * FROM example_sorted;
EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY one;
EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY two;
ALTER FOREIGN TABLE example_sorted OPTIONS (ADD files_in_order 'true');
EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY one;
EXPLAIN (COSTS OFF) SELECT * FROM example1;
SELECT SUM(one) FROM example1;

Expand Down
31 changes: 23 additions & 8 deletions output/parquet_fdw.source
Original file line number Diff line number Diff line change
Expand Up @@ -437,16 +437,18 @@ EXPLAIN (COSTS OFF) SELECT * FROM example_sorted;
(7 rows)

EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY one;
QUERY PLAN
-----------------------------------------------
QUERY PLAN
-----------------------------------------------------
Gather Merge
Workers Planned: 2
-> Parallel Foreign Scan on example_sorted
Reader: Multifile
Row groups:
example1.parquet: 1, 2
example2.parquet: 1
(7 rows)
-> Sort
Sort Key: one
-> Parallel Foreign Scan on example_sorted
Reader: Multifile
Row groups:
example1.parquet: 1, 2
example2.parquet: 1
(9 rows)

EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY two;
QUERY PLAN
Expand All @@ -462,6 +464,19 @@ EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY two;
example2.parquet: 1
(9 rows)

ALTER FOREIGN TABLE example_sorted OPTIONS (ADD files_in_order 'true');
EXPLAIN (COSTS OFF) SELECT * FROM example_sorted ORDER BY one;
QUERY PLAN
-----------------------------------------------
Gather Merge
Workers Planned: 2
-> Parallel Foreign Scan on example_sorted
Reader: Multifile
Row groups:
example1.parquet: 1, 2
example2.parquet: 1
(7 rows)

EXPLAIN (COSTS OFF) SELECT * FROM example1;
QUERY PLAN
-----------------------------------------
Expand Down
53 changes: 20 additions & 33 deletions src/parquet_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ struct ParquetFdwPlanState
bool use_mmap;
bool use_threads;
int32 max_open_files;
bool files_in_order;
List *rowgroups; /* List of Lists (per filename) */
uint64 ntuples;
ReaderType type;
Expand Down Expand Up @@ -832,6 +833,7 @@ get_table_options(Oid relid, ParquetFdwPlanState *fdw_private)
fdw_private->use_mmap = false;
fdw_private->use_threads = false;
fdw_private->max_open_files = 0;
fdw_private->files_in_order = false;
table = GetForeignTable(relid);

foreach(lc, table->options)
Expand All @@ -857,25 +859,21 @@ get_table_options(Oid relid, ParquetFdwPlanState *fdw_private)
}
else if (strcmp(def->defname, "use_mmap") == 0)
{
if (!parse_bool(defGetString(def), &fdw_private->use_mmap))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for boolean option \"%s\": %s",
def->defname, defGetString(def))));
fdw_private->use_mmap = defGetBoolean(def);
}
else if (strcmp(def->defname, "use_threads") == 0)
{
if (!parse_bool(defGetString(def), &fdw_private->use_threads))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for boolean option \"%s\": %s",
def->defname, defGetString(def))));
fdw_private->use_threads = defGetBoolean(def);
}
else if (strcmp(def->defname, "max_open_files") == 0)
{
/* check that int value is valid */
fdw_private->max_open_files = pg_atoi(defGetString(def), sizeof(int32), '\0');
}
else if (strcmp(def->defname, "files_in_order") == 0)
{
fdw_private->files_in_order = defGetBoolean(def);
}
else
elog(ERROR, "unknown option '%s'", def->defname);
}
Expand Down Expand Up @@ -1128,18 +1126,22 @@ parquetGetForeignPaths(PlannerInfo *root,
if (baserel->consider_parallel > 0)
{
ParquetFdwPlanState *private_parallel;
bool use_pathkeys = false;

private_parallel = (ParquetFdwPlanState *) palloc(sizeof(ParquetFdwPlanState));
memcpy(private_parallel, fdw_private, sizeof(ParquetFdwPlanState));
private_parallel->type = is_multi ? RT_MULTI : RT_SINGLE;

/* For mutifile reader only use pathkeys when files are in order */
use_pathkeys = is_sorted && (!is_multi || (is_multi && fdw_private->files_in_order));

Path *parallel_path = (Path *)
create_foreignscan_path(root, baserel,
NULL, /* default pathtarget */
baserel->rows,
startup_cost,
total_cost,
pathkeys,
use_pathkeys ? pathkeys : NULL,
NULL, /* no outer rel either */
NULL, /* no extra plan */
(List *) private_parallel);
Expand All @@ -1152,14 +1154,6 @@ parquetGetForeignPaths(PlannerInfo *root,
parallel_path->parallel_aware = true;
parallel_path->parallel_safe = true;

/* Create GatherMerge path for sorted parquet files */
if (is_sorted)
{
GatherMergePath *gather_merge =
create_gather_merge_path(root, baserel, parallel_path, NULL,
pathkeys, NULL, NULL);
add_path(baserel, (Path *) gather_merge);
}
add_partial_path(baserel, parallel_path);
}
}
Expand Down Expand Up @@ -1852,30 +1846,23 @@ parquet_fdw_validator_impl(PG_FUNCTION_ARGS)
else if (strcmp(def->defname, "use_mmap") == 0)
{
/* Check that bool value is valid */
bool use_mmap;

if (!parse_bool(defGetString(def), &use_mmap))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for boolean option \"%s\": %s",
def->defname, defGetString(def))));
(void) defGetBoolean(def);
}
else if (strcmp(def->defname, "use_threads") == 0)
{
/* Check that bool value is valid */
bool use_threads;

if (!parse_bool(defGetString(def), &use_threads))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid value for boolean option \"%s\": %s",
def->defname, defGetString(def))));
(void) defGetBoolean(def);
}
else if (strcmp(def->defname, "max_open_files") == 0)
{
/* check that int value is valid */
pg_atoi(defGetString(def), sizeof(int32), '\0');
}
else if (strcmp(def->defname, "files_in_order") == 0)
{
/* Check that bool value is valid */
(void) defGetBoolean(def);
}
else
{
ereport(ERROR,
Expand Down

0 comments on commit 086e567

Please sign in to comment.