From c3e6aa261eafee3fb588c643fe29a3d46d7cec06 Mon Sep 17 00:00:00 2001 From: LeAnhMinh Date: Thu, 10 Mar 2022 11:50:18 +0700 Subject: [PATCH 1/5] Fix crash if WHERE clause contain system column: ignore extract to filter for system column in extract_rowgroup_filters() **Describe the bug** Run test by makecheck in debug mode: crash if WHERE clause contain system column **To Reproduce** 1. build to debug mode Makefile: -g -O0 2. Makecheck 3. Add these SQL query into test file (reference to test case of postgres_fdw.sql) CREATE FOREIGN TABLE ft1 ( c0 int, c1 int NOT NULL, c2 int NOT NULL, c3 text, -- c4 timestamptz, c5 timestamp, c6 text, c7 text, c8 text ) SERVER parquet_srv OPTIONS (filename :'/ported_postgres/ft1.parquet', sorted 'c1'); ALTER FOREIGN TABLE ft1 DROP COLUMN c0; EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE t1.ctid = '(0,2)'; **Expected behavior** QUERY PLAN -------------------------------------- Foreign Scan on public.ft1 t1 Output: c1, c2, c3, c5, c6, c7, c8 Filter: (t1.ctid = '(0,2)'::tid) Reader: Single File Row groups: 1 (5 rows) 4. Crash in testcase: SELECT * FROM ft1 t1 WHERE t1.ctid = '(0,2)'; TRAP: FailedAssertion("strlen(input) < NAMEDATALEN - 1", File: "src/common.cpp", Line: 143, PID: 25595) parquet_fdw.so(_Z11tolowercasePKcPc+0x48)[0x7fce0e1fed2f] parquet_fdw.so(_Z22extract_rowgroups_listPKcS0_PN3Aws2S38S3ClientEP13TupleDescDataRSt4listI14RowGroupFilterSaIS8_EEPmSC_+0x536)[0x7fce0e21df07] parquet_fdw.so(parquetGetForeignRelSize+0x243)[0x7fce0e22023b] **Additional context** Crash happen when makecheck in debug mode, and not happen on release mode. Because debug mode, the value of pg_comumn (extract_rowgroups_list) will not be optimized like release mode when receiving an invalid value (filter.attnum = -1 (system column)) --- src/parquet_impl.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index 4f4b342..da3ff7d 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -248,6 +248,15 @@ extract_rowgroup_filters(List *scan_clauses, else continue; + /* + * System columns should not be extract to filter, since + * we don't make any effort to ensure that local and + * remote values match (tableoid, in particular, almost + * certainly doesn't match). + */ + if (v->varattno < 0) + continue; + RowGroupFilter f { .attnum = v->varattno, From 41c4fbe21e80e497263cd0721f8e0128189f8f9c Mon Sep 17 00:00:00 2001 From: LeAnhMinh Date: Thu, 10 Mar 2022 13:45:41 +0700 Subject: [PATCH 2/5] Fix crash when lateral join in parallel mode: reference a correlated SubPlan is always parallel restricted. **Describe the bug** Crash when lateral join in parallel mode **To Reproduce** 1. Run testcase (reference test case of postgres_fdw.sql) CREATE SCHEMA "S 1"; IMPORT FOREIGN SCHEMA "/ported_postgres" FROM SERVER parquet_s3_srv INTO "S 1" OPTIONS (sorted 'c1'); CREATE FOREIGN TABLE ft1 ( c0 int, c1 int NOT NULL, c2 int NOT NULL, c3 text, -- c4 timestamptz, c5 timestamp, c6 text, c7 text, c8 text ) SERVER parquet_srv OPTIONS (filename 'ported_postgres/ft1.parquet', sorted 'c1'); ALTER FOREIGN TABLE ft1 DROP COLUMN c0; CREATE FOREIGN TABLE ft2 ( c1 int NOT NULL, c2 int NOT NULL, cx int, c3 text, -- c4 timestamptz, c5 timestamp, c6 text, c7 text, c8 text ) SERVER parquet_srv OPTIONS (filename 'ported_postgres/ft1.parquet', sorted 'c1'); ALTER FOREIGN TABLE ft2 DROP COLUMN cx; EXPLAIN (VERBOSE, COSTS OFF) SELECT ref_0.c2, subq_1.* FROM "S 1"."T1" AS ref_0, LATERAL ( SELECT ref_0.c1 c1, subq_0.* FROM (SELECT ref_0.c2, ref_1.c3 FROM ft1 AS ref_1) AS subq_0 RIGHT JOIN ft2 AS ref_3 ON (subq_0.c3 = ref_3.c3) ) AS subq_1 WHERE ref_0.c1 < 10 AND subq_1.c3 = '00001' ORDER BY ref_0.c1; 2. Crash psql: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. 3. Log TRAP: FailedAssertion("bms_is_subset(baserel->lateral_relids, required_outer)", File: "relnode.c", Line: 1304, PID: 17124) [local] EXPLAIN(ExceptionalCondition+0xb9)[0xb0a40a] [local] EXPLAIN(get_baserel_parampathinfo+0x4b)[0x882524] [local] EXPLAIN(create_gather_path+0xac)[0x873966] [local] EXPLAIN(generate_gather_paths+0x9e)[0x8061aa] [local] EXPLAIN(generate_useful_gather_paths+0x7e)[0x806489] [local] EXPLAIN[0x802cd8] [local] EXPLAIN[0x802876] [local] EXPLAIN(make_one_rel+0x1ba)[0x8025d8] [local] EXPLAIN(query_planner+0x373)[0x83ed1f] **Expected behavior** QUERY PLAN -------------------------------------------------------------------------------------------- Nested Loop Output: ref_0.c2, ref_0.c1, (ref_0.c2), ref_1.c3, ref_0.c1 -> Nested Loop Output: ref_0.c2, ref_0.c1, ref_1.c3, (ref_0.c2) -> Foreign Scan on "S 1"."T1" ref_0 Output: ref_0.c1, ref_0.c2, ref_0.c3, ref_0.c5, ref_0.c6, ref_0.c7, ref_0.c8 Filter: (ref_0.c1 < 10) Reader: Single File Row groups: 1 -> Foreign Scan on public.ft1 ref_1 Output: ref_1.c3, ref_0.c2 Filter: (ref_1.c3 = '00001'::text) Reader: Single File Row groups: 1 -> Materialize Output: ref_3.c3 -> Foreign Scan on public.ft2 ref_3 Output: ref_3.c3 Filter: (ref_3.c3 = '00001'::text) Reader: Single File Row groups: 1 (21 rows) **Additional context** LATERAL JOIN is a correlated SubPlan and cannot execute in parallel mode. Refer parallel restricted: https://www.postgresql.org/docs/14/parallel-safety.html --- src/parquet_impl.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index da3ff7d..14f332f 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -1863,6 +1863,11 @@ parquetIsForeignScanParallelSafe(PlannerInfo * /* root */, RelOptInfo *rel, RangeTblEntry * /* rte */) { + /* Plan nodes that reference a correlated SubPlan is always parallel restricted. + * Therefore, return false when there is lateral join. + */ + if (rel->lateral_relids) + return false; return true; } From 7c89a77d615aed32a2f1f9da0b84f0b8d99c26c0 Mon Sep 17 00:00:00 2001 From: LeAnhMinh Date: Thu, 10 Mar 2022 14:05:59 +0700 Subject: [PATCH 3/5] Fix rescan wrong value for row_groups: row_group need rescan with -1 value. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Describe the bug** rescan wrong value for row_groups **To Reproduce** 1. Execute SQL select * from example1, example2 WHERE example1.one = 1; 2. Actual result one | two | three | four | five | six | seven | one | two | three | four | five | six | seven -----+-----+-------+------+------+-----+-------+-----+-----+-------+------+------+-----+------- (0 rows) **Expected behavior** one | two | three | four | five | six | seven | one | two | three | four | five | six | seven -----+---------+-------+---------------------+------------+-----+-------+-----+---------+-------+---------------------+------------+-----+------- 1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5 | 1 | {19,20} | eins | 2018-01-01 00:00:00 | 2018-01-01 | t | 1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5 | 3 | {21,22} | zwei | 2018-01-03 00:00:00 | 2018-01-03 | f | 1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5 | 5 | {23,24} | drei | 2018-01-05 00:00:00 | 2018-01-05 | t | 1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5 | 7 | {25,26} | vier | 2018-01-07 00:00:00 | 2018-01-07 | f | 1 | {1,2,3} | foo | 2018-01-01 00:00:00 | 2018-01-01 | t | 0.5 | 9 | {27,28} | fünf | 2018-01-09 00:00:00 | 2018-01-09 | t | (5 rows) **Additional context** Current row_group initialized with -1 (contructor function) When RescanForeignScan called. row_group is recaned with 0 -> wrong row group index Then the datas are get by IterateForeignScan will be missing --- src/reader.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reader.cpp b/src/reader.cpp index 0343ad8..519ceed 100644 --- a/src/reader.cpp +++ b/src/reader.cpp @@ -1028,7 +1028,7 @@ class DefaultParquetReader : public ParquetReader void rescan(void) { - this->row_group = 0; + this->row_group = -1; this->row = 0; this->num_rows = 0; } @@ -1380,7 +1380,7 @@ class CachingParquetReader : public ParquetReader void rescan(void) { - this->row_group = 0; + this->row_group = -1; this->row = 0; this->num_rows = 0; } From 535c6d7410509e9c248566ae9a71826007535a88 Mon Sep 17 00:00:00 2001 From: LeAnhMinh Date: Thu, 10 Mar 2022 14:12:09 +0700 Subject: [PATCH 4/5] Support lateral join: add baserel->lateral_relids when create_foreignscan_path. **Describe the bug** Missing outer rel either (baserel->lateral_reldis). So crash when lateral outer join **To Reproduce** 1. Run test case (reference test case of postgres_fdw.sql) CREATE SCHEMA "S 1"; IMPORT FOREIGN SCHEMA "/ported_postgres" FROM SERVER parquet_s3_srv INTO "S 1" OPTIONS (sorted 'c1'); CREATE FOREIGN TABLE ft1 ( c0 int, c1 int NOT NULL, c2 int NOT NULL, c3 text, -- c4 timestamptz, c5 timestamp, c6 text, c7 text, c8 text ) SERVER parquet_srv OPTIONS (filename 'ported_postgres/ft1.parquet', sorted 'c1'); ALTER FOREIGN TABLE ft1 DROP COLUMN c0; CREATE FOREIGN TABLE ft2 ( c1 int NOT NULL, c2 int NOT NULL, cx int, c3 text, -- c4 timestamptz, c5 timestamp, c6 text, c7 text, c8 text ) SERVER parquet_srv OPTIONS (filename 'ported_postgres/ft1.parquet', sorted 'c1'); ALTER FOREIGN TABLE ft2 DROP COLUMN cx; EXPLAIN (VERBOSE, COSTS OFF) SELECT ref_0.c2, subq_1.* FROM "S 1"."T1" AS ref_0, LATERAL ( SELECT ref_0.c1 c1, subq_0.* FROM (SELECT ref_0.c2, ref_1.c3 FROM ft1 AS ref_1) AS subq_0 RIGHT JOIN ft2 AS ref_3 ON (subq_0.c3 = ref_3.c3) ) AS subq_1 WHERE ref_0.c1 < 10 AND subq_1.c3 = '00001' ORDER BY ref_0.c1; 2. Crash psql: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. **Expected behavior** QUERY PLAN -------------------------------------------------------------------------------------------- Nested Loop Output: ref_0.c2, ref_0.c1, (ref_0.c2), ref_1.c3, ref_0.c1 -> Nested Loop Output: ref_0.c2, ref_0.c1, ref_1.c3, (ref_0.c2) -> Foreign Scan on "S 1"."T1" ref_0 Output: ref_0.c1, ref_0.c2, ref_0.c3, ref_0.c5, ref_0.c6, ref_0.c7, ref_0.c8 Filter: (ref_0.c1 < 10) Reader: Single File Row groups: 1 -> Foreign Scan on public.ft1 ref_1 Output: ref_1.c3, ref_0.c2 Filter: (ref_1.c3 = '00001'::text) Reader: Single File Row groups: 1 -> Materialize Output: ref_3.c3 -> Foreign Scan on public.ft2 ref_3 Output: ref_3.c3 Filter: (ref_3.c3 = '00001'::text) Reader: Single File Row groups: 1 (21 rows) --- src/parquet_impl.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index 14f332f..30f59c6 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -1265,7 +1265,7 @@ parquetGetForeignPaths(PlannerInfo *root, startup_cost, total_cost, NULL, /* no pathkeys */ - NULL, /* no outer rel either */ + baserel->lateral_relids, NULL, /* no extra plan */ (List *) fdw_private); if (!enable_multifile && is_multi) @@ -1291,7 +1291,7 @@ parquetGetForeignPaths(PlannerInfo *root, startup_cost, total_cost, pathkeys, - NULL, /* no outer rel either */ + baserel->lateral_relids, NULL, /* no extra plan */ (List *) private_sort); @@ -1330,7 +1330,7 @@ parquetGetForeignPaths(PlannerInfo *root, startup_cost, total_cost, use_pathkeys ? pathkeys : NULL, - NULL, /* no outer rel either */ + baserel->lateral_relids, NULL, /* no extra plan */ (List *) private_parallel); @@ -1365,7 +1365,7 @@ parquetGetForeignPaths(PlannerInfo *root, startup_cost, total_cost, pathkeys, - NULL, /* no outer rel either */ + baserel->lateral_relids, NULL, /* no extra plan */ (List *) private_parallel_merge); From f040cc446543567025763dbe302e629cc02fda10 Mon Sep 17 00:00:00 2001 From: LeAnhMinh Date: Thu, 10 Mar 2022 14:23:23 +0700 Subject: [PATCH 5/5] Use FdwScanPrivateIndex: Indexes of FDW-private information stored in fdw_private lists. Follow postgres style: https://github.com/postgres/postgres/blob/master/contrib/postgres_fdw/postgres_fdw.c --- src/parquet_impl.cpp | 47 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/parquet_impl.cpp b/src/parquet_impl.cpp index 30f59c6..dca6fe1 100644 --- a/src/parquet_impl.cpp +++ b/src/parquet_impl.cpp @@ -110,6 +110,33 @@ struct RowGroupFilter int strategy; }; +/* + * Indexes of FDW-private information stored in fdw_private lists. + * + * These items are indexed with the enum FdwScanPrivateIndex, so an item + * can be fetched with list_nth(). For example, to get the filenames: + * sql = strVal(list_nth(fdw_private, FdwScanPrivateFileNames)); + */ +enum FdwScanPrivateIndex +{ + /* List of paths to Parquet files */ + FdwScanPrivateFileNames, + /* List of Attributes actually used in query */ + FdwScanPrivateAttributesUsed, + /* List of columns that Parquet files are presorted by */ + FdwScanPrivateAttributesSorted, + /* use_mmap flag (as an integer Value node) */ + FdwScanPrivateUseMmap, + /* use_threads flag (as an integer Value node) */ + FdwScanPrivateUse_Threads, + /* ReaderType of Parquet files */ + FdwScanPrivateType, + /* The limit for the number of Parquet files open simultaneously. */ + FdwScanPrivateMaxOpenFiles, + /* List of Lists (per filename) */ + FdwScanPrivateRowGroups, +}; + /* * Plain C struct for fdw_state */ @@ -1475,30 +1502,30 @@ parquetBeginForeignScan(ForeignScanState *node, int /* eflags */) { switch(i) { - case 0: + case FdwScanPrivateFileNames: filenames = (List *) lfirst(lc); break; - case 1: + case FdwScanPrivateAttributesUsed: attrs_list = (List *) lfirst(lc); foreach (lc2, attrs_list) attrs_used.insert(lfirst_int(lc2)); break; - case 2: + case FdwScanPrivateAttributesSorted: attrs_sorted = (List *) lfirst(lc); break; - case 3: + case FdwScanPrivateUseMmap: use_mmap = (bool) intVal((Value *) lfirst(lc)); break; - case 4: + case FdwScanPrivateUse_Threads: use_threads = (bool) intVal((Value *) lfirst(lc)); break; - case 5: + case FdwScanPrivateType: reader_type = (ReaderType) intVal((Value *) lfirst(lc)); break; - case 6: + case FdwScanPrivateMaxOpenFiles: max_open_files = intVal((Value *) lfirst(lc)); break; - case 7: + case FdwScanPrivateRowGroups: rowgroups_list = (List *) lfirst(lc); break; } @@ -1794,8 +1821,8 @@ parquetExplainForeignScan(ForeignScanState *node, ExplainState *es) fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; filenames = (List *) linitial(fdw_private); - reader_type = (ReaderType) intVal((Value *) list_nth(fdw_private, 5)); - rowgroups_list = (List *) llast(fdw_private); + reader_type = (ReaderType) intVal((Value *) list_nth(fdw_private, FdwScanPrivateType)); + rowgroups_list = (List *) list_nth(fdw_private, FdwScanPrivateRowGroups); switch (reader_type) {