diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index 4f4b342..dca6fe1 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -110,6 +110,33 @@ struct RowGroupFilter int strategy; }; +/* + * Indexes of FDW-private information stored in fdw_private lists. + * + * These items are indexed with the enum FdwScanPrivateIndex, so an item + * can be fetched with list_nth(). For example, to get the filenames: + * sql = strVal(list_nth(fdw_private, FdwScanPrivateFileNames)); + */ +enum FdwScanPrivateIndex +{ + /* List of paths to Parquet files */ + FdwScanPrivateFileNames, + /* List of Attributes actually used in query */ + FdwScanPrivateAttributesUsed, + /* List of columns that Parquet files are presorted by */ + FdwScanPrivateAttributesSorted, + /* use_mmap flag (as an integer Value node) */ + FdwScanPrivateUseMmap, + /* use_threads flag (as an integer Value node) */ + FdwScanPrivateUse_Threads, + /* ReaderType of Parquet files */ + FdwScanPrivateType, + /* The limit for the number of Parquet files open simultaneously. */ + FdwScanPrivateMaxOpenFiles, + /* List of Lists (per filename) */ + FdwScanPrivateRowGroups, +}; + /* * Plain C struct for fdw_state */ @@ -248,6 +275,15 @@ extract_rowgroup_filters(List *scan_clauses, else continue; + /* + * System columns should not be extract to filter, since + * we don't make any effort to ensure that local and + * remote values match (tableoid, in particular, almost + * certainly doesn't match). + */ + if (v->varattno < 0) + continue; + RowGroupFilter f { .attnum = v->varattno, @@ -1256,7 +1292,7 @@ parquetGetForeignPaths(PlannerInfo *root, startup_cost, total_cost, NULL, /* no pathkeys */ - NULL, /* no outer rel either */ + baserel->lateral_relids, NULL, /* no extra plan */ (List *) fdw_private); if (!enable_multifile && is_multi) @@ -1282,7 +1318,7 @@ parquetGetForeignPaths(PlannerInfo *root, startup_cost, total_cost, pathkeys, - NULL, /* no outer rel either */ + baserel->lateral_relids, NULL, /* no extra plan */ (List *) private_sort); @@ -1321,7 +1357,7 @@ parquetGetForeignPaths(PlannerInfo *root, startup_cost, total_cost, use_pathkeys ? pathkeys : NULL, - NULL, /* no outer rel either */ + baserel->lateral_relids, NULL, /* no extra plan */ (List *) private_parallel); @@ -1356,7 +1392,7 @@ parquetGetForeignPaths(PlannerInfo *root, startup_cost, total_cost, pathkeys, - NULL, /* no outer rel either */ + baserel->lateral_relids, NULL, /* no extra plan */ (List *) private_parallel_merge); @@ -1466,30 +1502,30 @@ parquetBeginForeignScan(ForeignScanState *node, int /* eflags */) { switch(i) { - case 0: + case FdwScanPrivateFileNames: filenames = (List *) lfirst(lc); break; - case 1: + case FdwScanPrivateAttributesUsed: attrs_list = (List *) lfirst(lc); foreach (lc2, attrs_list) attrs_used.insert(lfirst_int(lc2)); break; - case 2: + case FdwScanPrivateAttributesSorted: attrs_sorted = (List *) lfirst(lc); break; - case 3: + case FdwScanPrivateUseMmap: use_mmap = (bool) intVal((Value *) lfirst(lc)); break; - case 4: + case FdwScanPrivateUse_Threads: use_threads = (bool) intVal((Value *) lfirst(lc)); break; - case 5: + case FdwScanPrivateType: reader_type = (ReaderType) intVal((Value *) lfirst(lc)); break; - case 6: + case FdwScanPrivateMaxOpenFiles: max_open_files = intVal((Value *) lfirst(lc)); break; - case 7: + case FdwScanPrivateRowGroups: rowgroups_list = (List *) lfirst(lc); break; } @@ -1785,8 +1821,8 @@ parquetExplainForeignScan(ForeignScanState *node, ExplainState *es) fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; filenames = (List *) linitial(fdw_private); - reader_type = (ReaderType) intVal((Value *) list_nth(fdw_private, 5)); - rowgroups_list = (List *) llast(fdw_private); + reader_type = (ReaderType) intVal((Value *) list_nth(fdw_private, FdwScanPrivateType)); + rowgroups_list = (List *) list_nth(fdw_private, FdwScanPrivateRowGroups); switch (reader_type) { @@ -1854,6 +1890,11 @@ parquetIsForeignScanParallelSafe(PlannerInfo * /* root */, RelOptInfo *rel, RangeTblEntry * /* rte */) { + /* Plan nodes that reference a correlated SubPlan is always parallel restricted. + * Therefore, return false when there is lateral join. + */ + if (rel->lateral_relids) + return false; return true; } diff --git a/src/reader.cpp b/src/reader.cpp index 0343ad8..519ceed 100644 --- a/src/reader.cpp +++ b/src/reader.cpp @@ -1028,7 +1028,7 @@ class DefaultParquetReader : public ParquetReader void rescan(void) { - this->row_group = 0; + this->row_group = -1; this->row = 0; this->num_rows = 0; } @@ -1380,7 +1380,7 @@ class CachingParquetReader : public ParquetReader void rescan(void) { - this->row_group = 0; + this->row_group = -1; this->row = 0; this->num_rows = 0; }