From 60644bba921b2099ed32ea5c39e1b07b355fbd93 Mon Sep 17 00:00:00 2001 From: Ronan Dunklau Date: Thu, 21 Sep 2023 14:48:08 +0200 Subject: [PATCH] Keep synchronizing slots when others are lagging on primary. Instead of blocking indefinitely for a replication slot to be syncable, introduce a new GUC pg_failover_slots.sync_timeout after which we will move to the next one. To avoid waiting from scratch, we create the replication as temporary ones instead of ephemeral ones, allowing them to keep their state between runs. When the slot is finally synced, we persist it to disk. Since we do not block in waiting state anymore, we need to cleanup the inconsistent slots after promotion. --- pg_failover_slots.c | 222 +++++++++++++++++++++++++++++--------------- 1 file changed, 146 insertions(+), 76 deletions(-) diff --git a/pg_failover_slots.c b/pg_failover_slots.c index f209ef2..e3413af 100644 --- a/pg_failover_slots.c +++ b/pg_failover_slots.c @@ -110,6 +110,7 @@ char *pg_failover_maintenance_db; /* Slots to sync */ char *pg_failover_slots_dsn; char *pg_failover_slot_names; +int pg_failover_slots_sync_timeout; static char *pg_failover_slot_names_str = NULL; static List *pg_failover_slot_names_list = NIL; static bool pg_failover_slots_drop = true; @@ -119,6 +120,12 @@ char *pg_failover_slots_version_str; void _PG_init(void); PGDLLEXPORT void pg_failover_slots_main(Datum main_arg); +typedef enum SlotCatchupState { + CatchupSlotDrop = 0, + CatchupSlotSucceeded = 1, + CatchupSlotDirty = 2, +} SlotCatchupState; + static bool check_failover_slot_names(char **newval, void **extra, GucSource source) { @@ -534,7 +541,7 @@ remote_connect(const char *connstr, const char *appname) * relies on us having already reserved the WAL for the old position of * `remote_slot` so `slot` can't continue to advance. */ -static bool +static SlotCatchupState wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) { List *slots; @@ -542,6 +549,8 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) StringInfoData connstr; TimestampTz cb_wait_start = 0; /* first invocation should happen immediately */ + TimestampTz wait_start = GetCurrentTimestamp(); + TimestampTz now = 0; elog( LOG, @@ -584,7 +593,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) "replication slot sync wait for slot %s interrupted by promotion", remote_slot->name))); PQfinish(conn); - return false; + return CatchupSlotDrop; } filter->key = FAILOVERSLOT_FILTER_NAME; @@ -595,7 +604,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) { /* Slot on provider vanished */ PQfinish(conn); - return false; + return CatchupSlotDrop; } receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); @@ -616,14 +625,30 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) remote_slot->confirmed_lsn = new_slot->confirmed_lsn; remote_slot->catalog_xmin = new_slot->catalog_xmin; PQfinish(conn); - return true; + return CatchupSlotSucceeded; } /* * Invoke any callbacks that will help move the slots along */ + now = GetCurrentTimestamp(); + if (pg_failover_slots_sync_timeout >= 0 && TimestampDifferenceExceeds( + wait_start, now, pg_failover_slots_sync_timeout)) + { + elog( + LOG, + "Give up on waiting for remote slot %s lsn (%X/%X) and catalog xmin (%u) to pass local slot lsn (%X/%X) and catalog xmin (%u)", + remote_slot->name, (uint32) (new_slot->restart_lsn >> 32), + (uint32) (new_slot->restart_lsn), new_slot->catalog_xmin, + (uint32) (slot->data.restart_lsn >> 32), + (uint32) (slot->data.restart_lsn), + slot->data.catalog_xmin); + PQfinish(conn); + return CatchupSlotDirty; + } + if (TimestampDifferenceExceeds( - cb_wait_start, GetCurrentTimestamp(), + cb_wait_start, now, Min(wal_retrieve_retry_interval * 5, PG_WAIT_EXTENSION))) { if (cb_wait_start > 0) @@ -639,6 +664,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) cb_wait_start = GetCurrentTimestamp(); } + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, wal_retrieve_retry_interval, PG_WAIT_EXTENSION); @@ -648,6 +674,16 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) ResetLatch(MyLatch); + + /* + * The user may change pg_failover_slots_sync_timeout, so update it if needed. + */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } } @@ -669,6 +705,7 @@ synchronize_one_slot(RemoteSlot *remote_slot) { int i; bool found = false; + SlotCatchupState slot_state = CatchupSlotDirty; if (!RecoveryInProgress()) { @@ -714,42 +751,10 @@ synchronize_one_slot(RemoteSlot *remote_slot) if (found) { ReplicationSlotAcquire(remote_slot->name, true); - - /* - * We can't satisfy this remote slot's requirements with our known-safe - * local restart_lsn, catalog_xmin and xmin. - * - * This shouldn't happen for existing slots unless someone else messed - * with our physical replication slot on the master. - */ - if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || - TransactionIdPrecedes(remote_slot->catalog_xmin, - MyReplicationSlot->data.catalog_xmin)) - { - elog( - WARNING, - "not synchronizing slot %s; synchronization would move it backward", - remote_slot->name); - - ReplicationSlotRelease(); - PopActiveSnapshot(); - CommitTransactionCommand(); - return; - } - - LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); - LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, - remote_slot->catalog_xmin); - LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, - remote_slot->restart_lsn); - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - - elog( - DEBUG2, - "synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)", - remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32), - (uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin); + if (MyReplicationSlot->data.persistency == RS_PERSISTENT) + slot_state = CatchupSlotSucceeded; + else + slot_state = CatchupSlotDirty; } /* * Otherwise create the local slot and initialize it to the state of the @@ -767,13 +772,13 @@ synchronize_one_slot(RemoteSlot *remote_slot) * don't want it to persist if we fail. */ #if PG_VERSION_NUM >= 170000 - ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, false, false); #elif PG_VERSION_NUM >= 140000 - ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase); #else - ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL); + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY); #endif slot = MyReplicationSlot; @@ -796,56 +801,83 @@ synchronize_one_slot(RemoteSlot *remote_slot) slot->data.catalog_xmin = xmin_horizon; ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + slot_state = CatchupSlotDirty; + } + if (slot_state == CatchupSlotSucceeded) + { /* - * Our xmin and/or catalog_xmin may be > that required by one or more - * of the slots we are trying to sync from the master, and/or we don't - * have enough retained WAL for the slot's restart_lsn. - * - * If we persist the slot locally in that state it'll make a false - * promise we can't satisfy. - * - * This can happen if this replica is fairly new or has only recently - * started failover slot sync. + * We can't satisfy this remote slot's requirements with our known-safe + * local restart_lsn, catalog_xmin and xmin. * - * TODO: Don't stop synchronization of other slots for this, we can't - * add timeout because that could result in some slots never being - * synchronized as they will always be behind the physical slot. + * This shouldn't happen for existing slots unless someone else messed + * with our physical replication slot on the master. */ if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || TransactionIdPrecedes(remote_slot->catalog_xmin, MyReplicationSlot->data.catalog_xmin)) { - if (!wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot)) + elog( + WARNING, + "not synchronizing slot %s; synchronization would move it backward", + remote_slot->name); + ReplicationSlotRelease(); + PopActiveSnapshot(); + CommitTransactionCommand(); + return; + } + + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + elog( + DEBUG2, + "synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)", + remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32), + (uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin); + } else { + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + slot_state = wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot); + if (slot_state == CatchupSlotDrop) { /* Provider slot didn't catch up to locally reserved position */ ReplicationSlotRelease(); + ReplicationSlotDrop(remote_slot->name, false); PopActiveSnapshot(); CommitTransactionCommand(); return; } + } else { + slot_state = CatchupSlotSucceeded; } /* * We can locally satisfy requirements of remote slot's current * position now. Apply the new position if any and make it persistent. */ - LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); - LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, - remote_slot->catalog_xmin); - LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, - remote_slot->restart_lsn); - ReplicationSlotMarkDirty(); - - ReplicationSlotPersist(); - + if (slot_state == CatchupSlotSucceeded) + { + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + ReplicationSlotMarkDirty(); + ReplicationSlotPersist(); + } elog(DEBUG1, "synchronized new slot %s to lsn (%X/%X) and catalog xmin (%u)", remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32), (uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin); } - ReplicationSlotRelease(); PopActiveSnapshot(); CommitTransactionCommand(); @@ -944,7 +976,7 @@ synchronize_failover_slots(long sleep_time) bool active; bool found = false; - active = (s->active_pid != 0); + active = (s->active_pid != 0 && s->active_pid != MyProcPid); /* Only check inactive slots. */ if (!s->in_use || active) @@ -1053,13 +1085,6 @@ synchronize_failover_slots(long sleep_time) if (remote_slot->confirmed_lsn > receivePtr) remote_slot->confirmed_lsn = receivePtr; - /* - * For simplicity we always move restart_lsn of all slots to the - * restart_lsn needed by the furthest-behind master slot. - */ - if (remote_slot->restart_lsn > lsn) - remote_slot->restart_lsn = lsn; - synchronize_one_slot(remote_slot); } @@ -1073,6 +1098,41 @@ synchronize_failover_slots(long sleep_time) return sleep_time; } + +/* + * After a promotion, we need to clean up the unpersisted replication slots we created while in recovery. + * If they have never been persisted, it means they are in an incosistent state. + */ +static void +cleanup_failover_slots_after_promotion() +{ + int i; + for (;;) + { + char * dropslot = NULL; + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->active_pid == MyProcPid && s->data.persistency == RS_TEMPORARY) + { + dropslot = pstrdup(NameStr(s->data.name)); + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + if (dropslot) + { + elog(WARNING, "dropping inconsistent replication slot after promotion \"%s\"", dropslot); + ReplicationSlotDrop(dropslot, false); + pfree(dropslot); + } + else + break; + } +} + void pg_failover_slots_main(Datum main_arg) { @@ -1102,7 +1162,10 @@ pg_failover_slots_main(Datum main_arg) if (RecoveryInProgress()) sleep_time = synchronize_failover_slots(worker_nap_time); else + { + cleanup_failover_slots_after_promotion(); sleep_time = worker_nap_time * 10; + } rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, @@ -1493,6 +1556,13 @@ _PG_init(void) &pg_failover_maintenance_db, "postgres", PGC_SIGHUP, GUC_SUPERUSER_ONLY, NULL, NULL, NULL); + DefineCustomIntVariable( + "pg_failover_slots.sync_timeout", + "timeout when waiting for a slot to be persisted", + "If set to -1 (the default), we wait forever meaning we could hang up" + "up on one slot while other slots are ok to be synced.", + &pg_failover_slots_sync_timeout, -1, -1, INT_MAX, PGC_SIGHUP, + GUC_UNIT_MS, NULL, NULL, NULL); if (IsBinaryUpgrade) return; @@ -1505,7 +1575,7 @@ _PG_init(void) snprintf(bgw.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "pg_failover_slots_main"); snprintf(bgw.bgw_name, BGW_MAXLEN, "pg_failover_slots worker"); - bgw.bgw_restart_time = 60; + bgw.bgw_restart_time = 10; RegisterBackgroundWorker(&bgw);