From 2f3ec2ac46487a95237225417acc392c2365ffa5 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Tue, 30 Oct 2018 10:51:33 +0800 Subject: [PATCH 1/2] Extend colfilter and repset tests --- expected/column_filter.out | 223 ++++++++++++++++++++++++++++++++++- expected/replication_set.out | 48 +++++++- expected/row_filter.out | 38 +++++- pglogical_dependency.c | 2 +- pglogical_functions.c | 5 +- sql/column_filter.sql | 113 +++++++++++++++++- sql/replication_set.sql | 27 +++++ sql/row_filter.sql | 18 +++ 8 files changed, 466 insertions(+), 8 deletions(-) diff --git a/expected/column_filter.out b/expected/column_filter.out index d1df3ce..a5f71e5 100644 --- a/expected/column_filter.out +++ b/expected/column_filter.out @@ -9,6 +9,13 @@ CREATE TABLE public.basic_dml ( data text, something interval ); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + INSERT INTO basic_dml(other, data, something) VALUES (5, 'foo', '1 minute'::interval), (4, 'bar', '12 weeks'::interval), @@ -17,7 +24,8 @@ VALUES (5, 'foo', '1 minute'::interval), (1, NULL, NULL); \c :subscriber_dsn -- create table on subscriber to receive replicated filtered data from provider --- there are some extra columns too. +-- there are some extra columns too, and we omit 'other' as a non-replicated +-- table on upstream only. CREATE TABLE public.basic_dml ( id serial primary key, data text, @@ -25,13 +33,40 @@ CREATE TABLE public.basic_dml ( subonly integer, subonly_def integer DEFAULT 99 ); -SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml', ARRAY['default']); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); nspname | relname | att_list | has_row_filter ---------+-----------+-----------------------------------------+---------------- public | basic_dml | {id,data,something,subonly,subonly_def} | f (1 row) +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + \c :provider_dsn +-- Fails: the column filter list must include the key +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something}'); +ERROR: REPLICA IDENTITY columns must be replicated +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + +-- Fails: the column filter list may not include cols that are not in the table +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something, nosuchcol}'); +ERROR: table public.basic_dml does not have column nosuchcol +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + -- At provider, add table to replication set, with filtered columns SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data, something}'); replication_set_add_table @@ -39,6 +74,19 @@ SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchr t (1 row) +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+-----------+---------------------+---------------- + public | basic_dml | {id,data,something} | f +(1 row) + SELECT id, data, something FROM basic_dml ORDER BY id; id | data | something ----+------+------------------ @@ -71,6 +119,13 @@ SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_tab public | basic_dml | {id,data,something,subonly,subonly_def} | f (1 row) +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | +(1 row) + -- data should get replicated to subscriber SELECT id, data, something FROM basic_dml ORDER BY id; id | data | something @@ -90,14 +145,49 @@ CREATE TABLE public.basic_oids_dml ( data text, something interval ) with oids ; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------------+---------------- + public | basic_oids_dml | {id,other,data,something} | f +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + nspname | relname | set_name +---------+----------------+---------- + public | basic_oids_dml | +(1 row) + +-- Fails: cannot use system column 'oid' explicitly SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{oid, id, data, something}'); ERROR: table public.basic_oids_dml does not have column oid +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + nspname | relname | set_name +---------+----------------+---------- + public | basic_oids_dml | +(1 row) + +-- WITH OIDS table OK SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{id, data, something}'); replication_set_add_table --------------------------- t (1 row) +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + nspname | relname | set_name +---------+----------------+---------- + public | basic_oids_dml | default +(1 row) + SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); wait_slot_confirm_lsn ----------------------- @@ -120,6 +210,12 @@ VALUES (5, 'foo', '1 minute'::interval), (3, 'baz', '2 years 1 hour'::interval), (2, 'qux', '8 months 2 days'::interval), (1, NULL, NULL); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + UPDATE basic_oids_dml SET other = '40', data = NULL, something = '3 days'::interval WHERE id = 4; SELECT * from basic_oids_dml ORDER BY id; id | other | data | something @@ -149,6 +245,129 @@ SELECT id, data, something FROM basic_oids_dml ORDER BY id; (5 rows) \c :provider_dsn +-- Adding a table that's already selectively replicated fails +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true); +ERROR: duplicate key value violates unique constraint "replication_set_table_pkey" +\set VERBOSITY default +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +-- So does trying to re-add to change the column set +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data}'); +ERROR: duplicate key value violates unique constraint "replication_set_table_pkey" +\set VERBOSITY default +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +-- Shouldn't be able to drop a replicated col in a rel +-- but due to RM#5916 you can +BEGIN; +ALTER TABLE public.basic_dml DROP COLUMN data; +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +ROLLBACK; +-- Even when wrapped (RM#5916) +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +ALTER TABLE public.basic_dml DROP COLUMN data; +$$); + replicate_ddl_command +----------------------- + t +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +ROLLBACK; +-- CASCADE should be allowed though +BEGIN; +ALTER TABLE public.basic_dml DROP COLUMN data CASCADE; +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +ROLLBACK; +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +ALTER TABLE public.basic_dml DROP COLUMN data CASCADE; +$$); + replicate_ddl_command +----------------------- + t +(1 row) + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + nspname | relname | att_list | has_row_filter +---------+----------------+---------------------+---------------- + public | basic_oids_dml | {id,data,something} | f +(1 row) + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + nspname | relname | set_name +---------+-----------+---------- + public | basic_dml | default +(1 row) + +ROLLBACK; +-- We can drop a non-replicated col. We must not replicate this DDL because in +-- this case the downstream doesn't have the 'other' column and apply will +-- fail. +ALTER TABLE public.basic_dml DROP COLUMN other; +SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + \set VERBOSITY terse SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.basic_dml CASCADE; diff --git a/expected/replication_set.out b/expected/replication_set.out index 7fcec6e..a61e7a4 100644 --- a/expected/replication_set.out +++ b/expected/replication_set.out @@ -123,6 +123,11 @@ SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', repli ERROR: replication set repset_replicate_instrunc cannot be altered to replicate UPDATEs or DELETEs because it contains tables without PRIMARY KEY SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', replicate_delete := true); ERROR: replication set repset_replicate_instrunc cannot be altered to replicate UPDATEs or DELETEs because it contains tables without PRIMARY KEY +-- Adding already-added fails +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('repset_replicate_all', 'public.test_publicschema'); +ERROR: duplicate key value violates unique constraint "replication_set_table_pkey" +\set VERBOSITY default -- check the replication sets SELECT nspname, relname, set_name FROM pglogical.tables WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3; @@ -157,6 +162,48 @@ SELECT nspname, relname, set_name FROM pglogical.tables --too short SELECT pglogical.create_replication_set(''); ERROR: replication set name cannot be empty +-- Can't drop table while it's in a repset +DROP TABLE public.test_publicschema; +ERROR: cannot drop table test_publicschema because other objects depend on it +DETAIL: table test_publicschema membership in replication set default_insert_only depends on table test_publicschema +table test_publicschema membership in replication set repset_replicate_all depends on table test_publicschema +HINT: Use DROP ... CASCADE to drop the dependent objects too. +-- Can't drop table while it's in a repset +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +DROP TABLE public.test_publicschema; +$$); +ERROR: cannot drop table public.test_publicschema because other objects depend on it +DETAIL: table public.test_publicschema membership in replication set default_insert_only depends on table public.test_publicschema +table public.test_publicschema membership in replication set repset_replicate_all depends on table public.test_publicschema +HINT: Use DROP ... CASCADE to drop the dependent objects too. +CONTEXT: during execution of queued SQL statement: +DROP TABLE public.test_publicschema; + +ROLLBACK; +-- Can CASCADE though, even outside ddlrep +BEGIN; +DROP TABLE public.test_publicschema CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table test_publicschema membership in replication set default_insert_only +drop cascades to table test_publicschema membership in replication set repset_replicate_all +ROLLBACK; +-- ... and can drop after repset removal +SELECT pglogical.replication_set_remove_table('repset_replicate_all', 'public.test_publicschema'); + replication_set_remove_table +------------------------------ + t +(1 row) + +SELECT pglogical.replication_set_remove_table('default_insert_only', 'public.test_publicschema'); + replication_set_remove_table +------------------------------ + t +(1 row) + +BEGIN; +DROP TABLE public.test_publicschema; +ROLLBACK; \set VERBOSITY terse SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.test_publicschema CASCADE; @@ -165,7 +212,6 @@ SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.test_nopkey CASCADE; DROP TABLE public.test_unlogged CASCADE; $$); -NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to table normalschema.test_normalschema NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to table "strange.schema-IS".test_strangeschema diff --git a/expected/row_filter.out b/expected/row_filter.out index 41a72a5..3216ab4 100644 --- a/expected/row_filter.out +++ b/expected/row_filter.out @@ -125,7 +125,7 @@ SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', true, -- fail, the membership in repset depends on data column \set VERBOSITY terse ALTER TABLE basic_dml DROP COLUMN data; -ERROR: cannot drop column data of table basic_dml because other objects depend on it +ERROR: cannot drop table basic_dml column data because other objects depend on it \set VERBOSITY default SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); wait_slot_confirm_lsn @@ -399,6 +399,42 @@ SELECT * FROM test_jsonb ORDER BY json_type; scalar | "a scalar" (2 rows) +\c :provider_dsn +-- Filter may refer to not-replicated columns +SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml'); + replication_set_remove_table +------------------------------ + t +(1 row) + +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false, columns := ARRAY['id', 'data'], row_filter := $rf$other = 2$rf$); + replication_set_add_table +--------------------------- + t +(1 row) + +INSERT INTO basic_dml(other, data, "SomeThing") VALUES (2, 'itstwo', '1 second'::interval); +SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo'; + other | data | SomeThing +-------+--------+----------- + 2 | itstwo | @ 1 sec +(1 row) + +SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- 'other' will be NULL as it wasn't in the repset +-- even though we filtered on it. So will SomeThing. +SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo'; + other | data | SomeThing +-------+--------+----------- + | itstwo | +(1 row) + \c :provider_dsn \set VERBOSITY terse DROP FUNCTION funcn_add(integer, integer); diff --git a/pglogical_dependency.c b/pglogical_dependency.c index 2dbf616..cc3bd7f 100644 --- a/pglogical_dependency.c +++ b/pglogical_dependency.c @@ -2051,7 +2051,7 @@ doDeletion(const ObjectAddress *object) drop_replication_set(object->objectId); else if (object->classId == get_replication_set_table_rel_oid()) replication_set_remove_table(object->objectId, object->objectSubId, - true); + true); else if (object->classId == get_replication_set_seq_rel_oid()) replication_set_remove_seq(object->objectId, object->objectSubId, true); diff --git a/pglogical_functions.c b/pglogical_functions.c index c0f4c0f..afed6f9 100644 --- a/pglogical_functions.c +++ b/pglogical_functions.c @@ -1784,6 +1784,8 @@ pglogical_replicate_ddl_command(PG_FUNCTION_ARGS) } PG_END_TRY(); + in_pglogical_replicate_ddl_command = false; + /* * Restore the GUC variables we set above. */ @@ -1889,7 +1891,8 @@ pglogical_node_info(PG_FUNCTION_ARGS) * Get replication info about table. * * This is called by downstream sync worker on the upstream to obtain - * info needed to do initial synchronization correctly. + * info needed to do initial synchronization correctly. Be careful + * about changing it, as it must be upward- and downward-compatible. */ Datum pglogical_show_repset_table_info(PG_FUNCTION_ARGS) diff --git a/sql/column_filter.sql b/sql/column_filter.sql index 826ed05..9b138f6 100644 --- a/sql/column_filter.sql +++ b/sql/column_filter.sql @@ -10,6 +10,10 @@ CREATE TABLE public.basic_dml ( data text, something interval ); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + INSERT INTO basic_dml(other, data, something) VALUES (5, 'foo', '1 minute'::interval), (4, 'bar', '12 weeks'::interval), @@ -19,7 +23,8 @@ VALUES (5, 'foo', '1 minute'::interval), \c :subscriber_dsn -- create table on subscriber to receive replicated filtered data from provider --- there are some extra columns too. +-- there are some extra columns too, and we omit 'other' as a non-replicated +-- table on upstream only. CREATE TABLE public.basic_dml ( id serial primary key, data text, @@ -27,11 +32,34 @@ CREATE TABLE public.basic_dml ( subonly integer, subonly_def integer DEFAULT 99 ); -SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml', ARRAY['default']); + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; \c :provider_dsn + +-- Fails: the column filter list must include the key +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something}'); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + +-- Fails: the column filter list may not include cols that are not in the table +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{data, something, nosuchcol}'); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + -- At provider, add table to replication set, with filtered columns SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data, something}'); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); + SELECT id, data, something FROM basic_dml ORDER BY id; SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); @@ -44,6 +72,10 @@ SELECT pglogical.wait_for_table_sync_complete('test_subscription', 'basic_dml'); COMMIT; SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_dml'::regclass, ARRAY['default']); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + -- data should get replicated to subscriber SELECT id, data, something FROM basic_dml ORDER BY id; @@ -56,10 +88,25 @@ CREATE TABLE public.basic_oids_dml ( something interval ) with oids ; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + +-- Fails: cannot use system column 'oid' explicitly SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{oid, id, data, something}'); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + +-- WITH OIDS table OK SELECT * FROM pglogical.replication_set_add_table('default', 'basic_oids_dml', columns := '{id, data, something}'); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_oids_dml'::regclass; + SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); \c :subscriber_dsn @@ -82,6 +129,8 @@ VALUES (5, 'foo', '1 minute'::interval), (2, 'qux', '8 months 2 days'::interval), (1, NULL, NULL); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); + UPDATE basic_oids_dml SET other = '40', data = NULL, something = '3 days'::interval WHERE id = 4; SELECT * from basic_oids_dml ORDER BY id; @@ -92,6 +141,66 @@ SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); SELECT id, data, something FROM basic_oids_dml ORDER BY id; \c :provider_dsn + +-- Adding a table that's already selectively replicated fails +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true); +\set VERBOSITY default +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + +-- So does trying to re-add to change the column set +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', synchronize_data := true, columns := '{id, data}'); +\set VERBOSITY default +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; + +-- Shouldn't be able to drop a replicated col in a rel +-- but due to RM#5916 you can +BEGIN; +ALTER TABLE public.basic_dml DROP COLUMN data; +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); +ROLLBACK; + +-- Even when wrapped (RM#5916) +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +ALTER TABLE public.basic_dml DROP COLUMN data; +$$); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); +ROLLBACK; + +-- CASCADE should be allowed though +BEGIN; +ALTER TABLE public.basic_dml DROP COLUMN data CASCADE; +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +ROLLBACK; + +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +ALTER TABLE public.basic_dml DROP COLUMN data CASCADE; +$$); +SELECT nspname, relname, att_list, has_row_filter FROM pglogical.show_repset_table_info('basic_oids_dml'::regclass, ARRAY['default']); +SELECT nspname, relname, set_name FROM pglogical.tables +WHERE relid = 'public.basic_dml'::regclass; +ROLLBACK; + +-- We can drop a non-replicated col. We must not replicate this DDL because in +-- this case the downstream doesn't have the 'other' column and apply will +-- fail. +ALTER TABLE public.basic_dml DROP COLUMN other; + +SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); + \set VERBOSITY terse SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.basic_dml CASCADE; diff --git a/sql/replication_set.sql b/sql/replication_set.sql index 7fab2a1..1a0fc4b 100644 --- a/sql/replication_set.sql +++ b/sql/replication_set.sql @@ -49,6 +49,11 @@ SELECT * FROM pglogical.replication_set_add_all_tables('default', '{public}'); SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', replicate_update := true); SELECT * FROM pglogical.alter_replication_set('repset_replicate_instrunc', replicate_delete := true); +-- Adding already-added fails +\set VERBOSITY terse +SELECT * FROM pglogical.replication_set_add_table('repset_replicate_all', 'public.test_publicschema'); +\set VERBOSITY default + -- check the replication sets SELECT nspname, relname, set_name FROM pglogical.tables WHERE relname IN ('test_publicschema', 'test_normalschema', 'test_strangeschema', 'test_nopkey') ORDER BY 1,2,3; @@ -61,6 +66,28 @@ SELECT nspname, relname, set_name FROM pglogical.tables --too short SELECT pglogical.create_replication_set(''); +-- Can't drop table while it's in a repset +DROP TABLE public.test_publicschema; + +-- Can't drop table while it's in a repset +BEGIN; +SELECT pglogical.replicate_ddl_command($$ +DROP TABLE public.test_publicschema; +$$); +ROLLBACK; + +-- Can CASCADE though, even outside ddlrep +BEGIN; +DROP TABLE public.test_publicschema CASCADE; +ROLLBACK; + +-- ... and can drop after repset removal +SELECT pglogical.replication_set_remove_table('repset_replicate_all', 'public.test_publicschema'); +SELECT pglogical.replication_set_remove_table('default_insert_only', 'public.test_publicschema'); +BEGIN; +DROP TABLE public.test_publicschema; +ROLLBACK; + \set VERBOSITY terse SELECT pglogical.replicate_ddl_command($$ DROP TABLE public.test_publicschema CASCADE; diff --git a/sql/row_filter.sql b/sql/row_filter.sql index 5574d19..ad847a0 100644 --- a/sql/row_filter.sql +++ b/sql/row_filter.sql @@ -196,6 +196,24 @@ SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); SELECT * FROM test_jsonb ORDER BY json_type; +\c :provider_dsn + +-- Filter may refer to not-replicated columns +SELECT * FROM pglogical.replication_set_remove_table('default', 'basic_dml'); +SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml', false, columns := ARRAY['id', 'data'], row_filter := $rf$other = 2$rf$); + +INSERT INTO basic_dml(other, data, "SomeThing") VALUES (2, 'itstwo', '1 second'::interval); + +SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo'; + +SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- 'other' will be NULL as it wasn't in the repset +-- even though we filtered on it. So will SomeThing. +SELECT other, data, "SomeThing" FROM basic_dml WHERE data = 'itstwo'; + \c :provider_dsn \set VERBOSITY terse DROP FUNCTION funcn_add(integer, integer); From fd4135a81837861c0715635961bedf5041b6ff28 Mon Sep 17 00:00:00 2001 From: Mohamed Insaf K Date: Wed, 28 Nov 2018 15:28:18 +0530 Subject: [PATCH 2/2] Made initial table synchronization parallel. Initial table synchronization is done parallelly by multiple sync workers. Some more improvements were made which is explained below. If no worker slots are available to launch bgworker, the process just throws WARNING, and retries again after some time, rather than throwing ERROR and exiting. Up to max_sync_workers_per_subscription sync workers will be created per subscription, and that many tables will be sync-ed parallelly. During initial table data copying, we used to create one more backend connection to the target database, with the backend in COPY FROM stdin mode, and the bgworker used to route CopyData from "COPY TO" output from publisher to this backend. Now, this extra backend connection code has been removed. Instead, the sync worker directly writes into the underlying target database. This sync worker is also in replication mode, meaning that FK violation constraint triggers won't get called, and this allows us to sync each table in any arbitrary order. Now the subscription init code just gets the list of tables in the subscribed replication_set and add those table's meta in INIT state. Then the apply process spawns sync workers for each non-READY tables. In contrast, in the old code, the subscription init process used to get the list of tables and copied them sequentially. In the previous code, sync worker in CATCHUP state doesn't exit even if it has replayed up to the required LSN. It exits only after receiving at least one logical change (WAL) from the publisher, because the exit code is written only in handle_commit() function. Now the same code(after little modification) from handle_commit() is copied into the process_syncing_tables() as well. And this process_syncing_tables() will get called periodically, and within that, the sync process exits if it has caught up with the apply process. --- pglogical_apply.c | 55 ++++++++- pglogical_sync.c | 289 ++++++++++++++++++++++++++++++++++----------- pglogical_worker.c | 56 ++++++++- pglogical_worker.h | 3 + 4 files changed, 333 insertions(+), 70 deletions(-) diff --git a/pglogical_apply.c b/pglogical_apply.c index aea2ef7..a94e9a3 100644 --- a/pglogical_apply.c +++ b/pglogical_apply.c @@ -1788,7 +1788,11 @@ process_syncing_tables(XLogRecPtr end_lsn) { PGLogicalWorker *worker = (PGLogicalWorker *) lfirst(wlc); - if (pglogical_worker_running(worker)) + /* Is any sync worker running for the given table */ + if (pglogical_worker_running(worker) + && strcmp(NameStr(worker->worker.sync.nspname), NameStr(sync->nspname)) == 0 + && strcmp(NameStr(worker->worker.sync.relname), NameStr(sync->relname)) == 0 + ) nworkers++; } LWLockRelease(PGLogicalCtx->lock); @@ -1800,6 +1804,55 @@ process_syncing_tables(XLogRecPtr end_lsn) } } + //Following codes are modified version of the code from handle_commit() function + if (MyPGLogicalWorker->worker_type == PGLOGICAL_WORKER_SYNC) + { + /* + * Stop replay if we're doing limited replay and we've replayed up to the + * last record we're supposed to process. + */ + if (MyApplyWorker->replay_stop_lsn != InvalidXLogRecPtr + && MyApplyWorker->replay_stop_lsn <= end_lsn) + { + ereport(LOG, + (errmsg("pglogical sync finished processing; replayed to %X/%X of required %X/%X", + (uint32)(end_lsn>>32), (uint32)end_lsn, + (uint32)(MyApplyWorker->replay_stop_lsn >>32), + (uint32)MyApplyWorker->replay_stop_lsn))); + + /* + * If this is sync worker, update syncing table state to done. + */ + StartTransactionCommand(); + set_table_sync_status(MyApplyWorker->subid, + NameStr(MyPGLogicalWorker->worker.sync.nspname), + NameStr(MyPGLogicalWorker->worker.sync.relname), + SYNC_STATUS_SYNCDONE, end_lsn); + CommitTransactionCommand(); + + /* + * Flush all writes so the latest position can be reported back to the + * sender. + */ + XLogFlush(GetXLogWriteRecPtr()); + + /* + * Disconnect. + * + * This needs to happen before the pglogical_sync_worker_finish() + * call otherwise slot drop will fail. + */ + PQfinish(applyconn); + + pglogical_sync_worker_finish(); + + /* Stop gracefully */ + proc_exit(0); + } + + } + //Copied code ends + Assert(CurrentMemoryContext == MessageContext); } diff --git a/pglogical_sync.c b/pglogical_sync.c index eb42dd9..a33bf78 100644 --- a/pglogical_sync.c +++ b/pglogical_sync.c @@ -31,6 +31,7 @@ #include "commands/dbcommands.h" #include "commands/tablecmds.h" +#include "commands/copy.h" #include "lib/stringinfo.h" @@ -39,6 +40,9 @@ #include "nodes/makefuncs.h" #include "nodes/parsenodes.h" +#include "parser/parse_node.h" +#include "parser/parse_relation.h" + #include "pgstat.h" #include "replication/origin.h" @@ -84,6 +88,11 @@ void pglogical_sync_main(Datum main_arg); static PGLogicalSyncWorker *MySyncWorker = NULL; +static StringInfo copybuf = NULL; +static PGconn *source_copy_conn = NULL; + +static int copy_read_data(void *outbuf, int minread, int maxread); +static int libpqrcv_receive_pglogical(PGconn *conn, char **buffer,pgsocket *wait_fd); static void dump_structure(PGLogicalSubscription *sub, const char *destfile, @@ -393,13 +402,11 @@ make_copy_attnamelist(PGLogicalRelation *rel) * COPY single table over wire. */ static void -copy_table_data(PGconn *origin_conn, PGconn *target_conn, +copy_table_data(PGconn *origin_conn, PGLogicalRemoteRel *remoterel, List *replication_sets) { PGLogicalRelation *rel; PGresult *res; - int bytes; - char *copybuf; List *attnamelist; ListCell *lc; bool first; @@ -407,6 +414,9 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, StringInfoData attlist; MemoryContext curctx = CurrentMemoryContext, oldctx; + CopyState cstate; + ParseState *pstate; + StringInfoData stringinfodata = {0}; /* Build the relation map. */ StartTransactionCommand(); @@ -501,59 +511,39 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, PQerrorMessage(origin_conn)))); } - /* Build COPY FROM query. */ - resetStringInfo(&query); - appendStringInfo(&query, "COPY %s.%s ", - PQescapeIdentifier(origin_conn, remoterel->nspname, - strlen(remoterel->nspname)), - PQescapeIdentifier(origin_conn, remoterel->relname, - strlen(remoterel->relname))); - if (list_length(attnamelist)) - appendStringInfo(&query, "(%s) ", attlist.data); - appendStringInfoString(&query, "FROM stdin"); - - /* Execute COPY FROM. */ - res = PQexec(target_conn, query.data); - if (PQresultStatus(res) != PGRES_COPY_IN) - { - ereport(ERROR, - (errmsg("table copy failed"), - errdetail("Query '%s': %s", query.data, - PQerrorMessage(origin_conn)))); - } + PQclear(res); - while ((bytes = PQgetCopyData(origin_conn, ©buf, false)) > 0) - { - if (PQputCopyData(target_conn, copybuf, bytes) != 1) - { - ereport(ERROR, - (errmsg("writing to target table failed"), - errdetail("destination connection reported: %s", - PQerrorMessage(target_conn)))); - } - PQfreemem(copybuf); + /* + * Instead of creating another libpq connection in to the target database, we + * use the same sync worker to write to the target database. + * + * This connection is in replica mode, so foreign key constraints wont be + * checked, just like pg's logical replication sync worker does. This means + * that tables can be copied in any order without triggering FK violation. + */ - CHECK_FOR_INTERRUPTS(); - } + copybuf = &stringinfodata; + StartTransactionCommand(); + rel = pglogical_relation_open(remoterel->relid, RowExclusiveLock); + pstate = make_parsestate(NULL); + addRangeTableEntryForRelation(pstate, rel->rel, NULL, false, false); - if (bytes != -1) - { - ereport(ERROR, - (errmsg("reading from origin table failed"), - errdetail("source connection returned %d: %s", - bytes, PQerrorMessage(origin_conn)))); - } + source_copy_conn = origin_conn; - /* Send local finish */ - if (PQputCopyEnd(target_conn, NULL) != 1) - { - ereport(ERROR, - (errmsg("sending copy-completion to destination connection failed"), - errdetail("destination connection reported: %s", - PQerrorMessage(target_conn)))); - } + cstate = BeginCopyFrom(pstate, rel->rel, NULL, false, copy_read_data, attnamelist, NIL); - PQclear(res); + /* Do the copy */ + (void) CopyFrom(cstate); + + source_copy_conn = NULL; + + if(copybuf->data) + PQfreemem(copybuf->data); // malloc-ed by libpq + + copybuf = NULL; + + pglogical_relation_close(rel, RowExclusiveLock); + CommitTransactionCommand(); } /* @@ -568,16 +558,12 @@ copy_tables_data(char *sub_name, const char *origin_dsn, const char *origin_name) { PGconn *origin_conn; - PGconn *target_conn; ListCell *lc; /* Connect to origin node. */ origin_conn = pglogical_connect(origin_dsn, sub_name, "copy"); start_copy_origin_tx(origin_conn, origin_snapshot); - /* Connect to target node. */ - target_conn = pglogical_connect(target_dsn, sub_name, "copy"); - start_copy_target_tx(target_conn, origin_name); /* Copy every table. */ foreach (lc, tables) @@ -588,14 +574,13 @@ copy_tables_data(char *sub_name, const char *origin_dsn, remoterel = pg_logical_get_remote_repset_table(origin_conn, rv, replication_sets); - copy_table_data(origin_conn, target_conn, remoterel, replication_sets); + copy_table_data(origin_conn, remoterel, replication_sets); CHECK_FOR_INTERRUPTS(); } /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn); - finish_copy_target_tx(target_conn); } /* @@ -616,7 +601,6 @@ copy_replication_sets_data(char *sub_name, const char *origin_dsn, PGconn *origin_conn; PGconn *target_conn; List *tables; - ListCell *lc; /* Connect to origin node. */ origin_conn = pglogical_connect(origin_dsn, sub_name, "copy"); @@ -630,15 +614,20 @@ copy_replication_sets_data(char *sub_name, const char *origin_dsn, target_conn = pglogical_connect(target_dsn, sub_name, "copy"); start_copy_target_tx(target_conn, origin_name); - /* Copy every table. */ - foreach (lc, tables) - { - PGLogicalRemoteRel *remoterel = lfirst(lc); - copy_table_data(origin_conn, target_conn, remoterel, replication_sets); + /* + * We don't copy the table data here. Instead a sync worker + * is spawned for each table and it does the initial copying. + */ + // /* Copy every table. */ + // foreach (lc, tables) + // { + // PGLogicalRemoteRel *remoterel = lfirst(lc); + + // copy_table_data(origin_conn, target_conn, remoterel, replication_sets); - CHECK_FOR_INTERRUPTS(); - } + // CHECK_FOR_INTERRUPTS(); + // } /* Finish the transactions and disconnect. */ finish_copy_origin_tx(origin_conn); @@ -647,6 +636,172 @@ copy_replication_sets_data(char *sub_name, const char *origin_dsn, return tables; } +/* + * Data source callback for the COPY FROM, which reads from the remote + * connection and passes the data back to our local COPY. + * Modified version of copy_read_data() from pg/src/backend/replication/logical/tablesync.c + */ +static int +copy_read_data(void *outbuf, int minread, int maxread) +{ + int bytesread = 0; + int avail; + + /* If there are some leftover data from previous read, use it. */ + avail = copybuf->len - copybuf->cursor; + if (avail) + { + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + while (maxread > 0 && bytesread < minread) + { + pgsocket fd = PGINVALID_SOCKET; + int rc; + int len; + char *buf = NULL; + + for (;;) + { + /* Try read the data. */ + len = libpqrcv_receive_pglogical(source_copy_conn, &buf, &fd); + + CHECK_FOR_INTERRUPTS(); + + if (len == 0) + break; + else if (len < 0) + return bytesread; + else + { + /* Process the data */ + if(copybuf->data) + PQfreemem(copybuf->data); // malloc-ed by libpq + copybuf->data = buf; + copybuf->len = len; + copybuf->cursor = 0; + + avail = copybuf->len - copybuf->cursor; + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + outbuf = (void *) ((char *) outbuf + avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + if (maxread <= 0 || bytesread >= minread) + return bytesread; + } + + /* + * Wait for more data or latch. + */ + rc = WaitLatchOrSocket(MyLatch, + WL_SOCKET_READABLE | WL_LATCH_SET | + WL_TIMEOUT | WL_POSTMASTER_DEATH, + fd, 1000L/* , WAIT_EVENT_LOGICAL_SYNC_DATA */); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + ResetLatch(MyLatch); + } + + return bytesread; +} + +/* + * Modified version of libpqrcv_receive from + * pg/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c + */ +static int libpqrcv_receive_pglogical(PGconn *conn, char **buffer, + pgsocket *wait_fd) +{ + int rawlen; + char *recvBuf = NULL; + + *buffer = NULL; + + /* Try to receive a CopyData message */ + rawlen = PQgetCopyData(conn, &recvBuf, 1); + if (rawlen == 0) + { + /* Try consuming some data. */ + if (PQconsumeInput(conn) == 0) + ereport(ERROR, + (errmsg("could not receive data from WAL stream: %s", + pchomp(PQerrorMessage(conn))))); + + /* Now that we've consumed some input, try again */ + rawlen = PQgetCopyData(conn, &recvBuf, 1); + if (rawlen == 0) + { + /* Tell caller to try again when our socket is ready. */ + *wait_fd = PQsocket(conn); + return 0; + } + } + if (rawlen == -1) /* end-of-streaming or error */ + { + PGresult *res; + + res = PQgetResult(conn); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + + /* Verify that there are no more results. */ + res = PQgetResult(conn); + if (res != NULL) + { + PQclear(res); + + /* + * If the other side closed the connection orderly (otherwise + * we'd seen an error, or PGRES_COPY_IN) don't report an error + * here, but let callers deal with it. + */ + if (PQstatus(conn) == CONNECTION_BAD) + return -1; + + ereport(ERROR, + (errmsg("unexpected result after CommandComplete: %s", + PQerrorMessage(conn)))); + } + + return -1; + } + else if (PQresultStatus(res) == PGRES_COPY_IN) + { + PQclear(res); + return -1; + } + else + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive data from WAL stream: %s", + pchomp(PQerrorMessage(conn))))); + } + } + if (rawlen < -1) + ereport(ERROR, + (errmsg("could not receive data from WAL stream: %s", + pchomp(PQerrorMessage(conn))))); + + /* Return received messages to caller */ + *buffer = recvBuf; + return rawlen; +} + static void pglogical_sync_worker_cleanup(PGLogicalSubscription *sub) { @@ -851,7 +1006,7 @@ pglogical_sync_subscription(PGLogicalSubscription *sub) { set_table_sync_status(sub->id, remoterel->nspname, remoterel->relname, - SYNC_STATUS_READY, + SYNC_STATUS_INIT, lsn); } else @@ -862,7 +1017,7 @@ pglogical_sync_subscription(PGLogicalSubscription *sub) newsync.subid = sub->id; namestrcpy(&newsync.nspname, remoterel->nspname); namestrcpy(&newsync.relname, remoterel->relname); - newsync.status = SYNC_STATUS_READY; + newsync.status = SYNC_STATUS_INIT; newsync.statuslsn = lsn; create_local_sync_status(&newsync); } diff --git a/pglogical_worker.c b/pglogical_worker.c index bc40564..7649390 100644 --- a/pglogical_worker.c +++ b/pglogical_worker.c @@ -29,6 +29,8 @@ #include "utils/memutils.h" #include "utils/timestamp.h" +#include "replication/logicallauncher.h" + #include "pgstat.h" #include "pglogical_sync.h" @@ -115,11 +117,34 @@ pglogical_worker_register(PGLogicalWorker *worker) LWLockAcquire(PGLogicalCtx->lock, LW_EXCLUSIVE); + /* + * Limit sync workers per subscription upto the + * GUC max_sync_workers_per_subscprition + */ + if(worker->worker_type == PGLOGICAL_WORKER_SYNC) + { + int nsyncWorkers; + + nsyncWorkers = num_of_sync_workers(worker->dboid, worker->worker.sync.apply.subid); + + if(nsyncWorkers >= max_sync_workers_per_subscription) + { + LWLockRelease(PGLogicalCtx->lock); + return -1; + } + } + slot = find_empty_worker_slot(worker->dboid); if (slot == -1) { LWLockRelease(PGLogicalCtx->lock); - elog(ERROR, "could not register pglogical worker: all background worker slots are already used"); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of worker slots for pglogical workers"), + errhint("You might need to increase max_worker_processes."))); + + return -1; } worker_shm = &PGLogicalCtx->workers[slot]; @@ -179,7 +204,7 @@ pglogical_worker_register(PGLogicalWorker *worker) if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) { worker_shm->crashed_at = GetCurrentTimestamp(); - ereport(ERROR, + ereport(WARNING, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("worker registration failed, you might want to increase max_worker_processes setting"))); } @@ -737,3 +762,30 @@ pglogical_worker_type_name(PGLogicalWorkerType type) default: Assert(false); return NULL; } } + + +/* + * Find number of sync workers for the given subscriptions. + */ +int +num_of_sync_workers(Oid dboid, Oid subid) +{ + int i; + int nsync = 0; + + Assert(LWLockHeldByMe(PGLogicalCtx->lock)); + + for (i = 0; i < PGLogicalCtx->total_workers; i++) + { + /* + * Find num of sync workers for the given subscription which are not crashed + */ + if(PGLogicalCtx->workers[i].worker_type == PGLOGICAL_WORKER_SYNC + && PGLogicalCtx->workers[i].dboid == dboid + && PGLogicalCtx->workers[i].worker.sync.apply.subid == subid + && PGLogicalCtx->workers[i].crashed_at == 0 ) + nsync++; + } + + return nsync; +} diff --git a/pglogical_worker.h b/pglogical_worker.h index 663d0ba..99dbe36 100644 --- a/pglogical_worker.h +++ b/pglogical_worker.h @@ -108,4 +108,7 @@ extern void pglogical_worker_kill(PGLogicalWorker *worker); extern const char * pglogical_worker_type_name(PGLogicalWorkerType type); +extern int num_of_sync_workers(Oid dboid, Oid subid); + + #endif /* PGLOGICAL_WORKER_H */