diff --git a/pg_failover_slots.c b/pg_failover_slots.c index f209ef2..e3413af 100644 --- a/pg_failover_slots.c +++ b/pg_failover_slots.c @@ -110,6 +110,7 @@ char *pg_failover_maintenance_db; /* Slots to sync */ char *pg_failover_slots_dsn; char *pg_failover_slot_names; +int pg_failover_slots_sync_timeout; static char *pg_failover_slot_names_str = NULL; static List *pg_failover_slot_names_list = NIL; static bool pg_failover_slots_drop = true; @@ -119,6 +120,12 @@ char *pg_failover_slots_version_str; void _PG_init(void); PGDLLEXPORT void pg_failover_slots_main(Datum main_arg); +typedef enum SlotCatchupState { + CatchupSlotDrop = 0, + CatchupSlotSucceeded = 1, + CatchupSlotDirty = 2, +} SlotCatchupState; + static bool check_failover_slot_names(char **newval, void **extra, GucSource source) { @@ -534,7 +541,7 @@ remote_connect(const char *connstr, const char *appname) * relies on us having already reserved the WAL for the old position of * `remote_slot` so `slot` can't continue to advance. */ -static bool +static SlotCatchupState wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) { List *slots; @@ -542,6 +549,8 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) StringInfoData connstr; TimestampTz cb_wait_start = 0; /* first invocation should happen immediately */ + TimestampTz wait_start = GetCurrentTimestamp(); + TimestampTz now = 0; elog( LOG, @@ -584,7 +593,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) "replication slot sync wait for slot %s interrupted by promotion", remote_slot->name))); PQfinish(conn); - return false; + return CatchupSlotDrop; } filter->key = FAILOVERSLOT_FILTER_NAME; @@ -595,7 +604,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) { /* Slot on provider vanished */ PQfinish(conn); - return false; + return CatchupSlotDrop; } receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); @@ -616,14 +625,30 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) remote_slot->confirmed_lsn = new_slot->confirmed_lsn; remote_slot->catalog_xmin = new_slot->catalog_xmin; PQfinish(conn); - return true; + return CatchupSlotSucceeded; } /* * Invoke any callbacks that will help move the slots along */ + now = GetCurrentTimestamp(); + if (pg_failover_slots_sync_timeout >= 0 && TimestampDifferenceExceeds( + wait_start, now, pg_failover_slots_sync_timeout)) + { + elog( + LOG, + "Give up on waiting for remote slot %s lsn (%X/%X) and catalog xmin (%u) to pass local slot lsn (%X/%X) and catalog xmin (%u)", + remote_slot->name, (uint32) (new_slot->restart_lsn >> 32), + (uint32) (new_slot->restart_lsn), new_slot->catalog_xmin, + (uint32) (slot->data.restart_lsn >> 32), + (uint32) (slot->data.restart_lsn), + slot->data.catalog_xmin); + PQfinish(conn); + return CatchupSlotDirty; + } + if (TimestampDifferenceExceeds( - cb_wait_start, GetCurrentTimestamp(), + cb_wait_start, now, Min(wal_retrieve_retry_interval * 5, PG_WAIT_EXTENSION))) { if (cb_wait_start > 0) @@ -639,6 +664,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) cb_wait_start = GetCurrentTimestamp(); } + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, wal_retrieve_retry_interval, PG_WAIT_EXTENSION); @@ -648,6 +674,16 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) ResetLatch(MyLatch); + + /* + * The user may change pg_failover_slots_sync_timeout, so update it if needed. + */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + } } @@ -669,6 +705,7 @@ synchronize_one_slot(RemoteSlot *remote_slot) { int i; bool found = false; + SlotCatchupState slot_state = CatchupSlotDirty; if (!RecoveryInProgress()) { @@ -714,42 +751,10 @@ synchronize_one_slot(RemoteSlot *remote_slot) if (found) { ReplicationSlotAcquire(remote_slot->name, true); - - /* - * We can't satisfy this remote slot's requirements with our known-safe - * local restart_lsn, catalog_xmin and xmin. - * - * This shouldn't happen for existing slots unless someone else messed - * with our physical replication slot on the master. - */ - if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || - TransactionIdPrecedes(remote_slot->catalog_xmin, - MyReplicationSlot->data.catalog_xmin)) - { - elog( - WARNING, - "not synchronizing slot %s; synchronization would move it backward", - remote_slot->name); - - ReplicationSlotRelease(); - PopActiveSnapshot(); - CommitTransactionCommand(); - return; - } - - LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); - LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, - remote_slot->catalog_xmin); - LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, - remote_slot->restart_lsn); - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - - elog( - DEBUG2, - "synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)", - remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32), - (uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin); + if (MyReplicationSlot->data.persistency == RS_PERSISTENT) + slot_state = CatchupSlotSucceeded; + else + slot_state = CatchupSlotDirty; } /* * Otherwise create the local slot and initialize it to the state of the @@ -767,13 +772,13 @@ synchronize_one_slot(RemoteSlot *remote_slot) * don't want it to persist if we fail. */ #if PG_VERSION_NUM >= 170000 - ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, false, false); #elif PG_VERSION_NUM >= 140000 - ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase); #else - ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL); + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY); #endif slot = MyReplicationSlot; @@ -796,56 +801,83 @@ synchronize_one_slot(RemoteSlot *remote_slot) slot->data.catalog_xmin = xmin_horizon; ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + slot_state = CatchupSlotDirty; + } + if (slot_state == CatchupSlotSucceeded) + { /* - * Our xmin and/or catalog_xmin may be > that required by one or more - * of the slots we are trying to sync from the master, and/or we don't - * have enough retained WAL for the slot's restart_lsn. - * - * If we persist the slot locally in that state it'll make a false - * promise we can't satisfy. - * - * This can happen if this replica is fairly new or has only recently - * started failover slot sync. + * We can't satisfy this remote slot's requirements with our known-safe + * local restart_lsn, catalog_xmin and xmin. * - * TODO: Don't stop synchronization of other slots for this, we can't - * add timeout because that could result in some slots never being - * synchronized as they will always be behind the physical slot. + * This shouldn't happen for existing slots unless someone else messed + * with our physical replication slot on the master. */ if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || TransactionIdPrecedes(remote_slot->catalog_xmin, MyReplicationSlot->data.catalog_xmin)) { - if (!wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot)) + elog( + WARNING, + "not synchronizing slot %s; synchronization would move it backward", + remote_slot->name); + ReplicationSlotRelease(); + PopActiveSnapshot(); + CommitTransactionCommand(); + return; + } + + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + elog( + DEBUG2, + "synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)", + remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32), + (uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin); + } else { + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + slot_state = wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot); + if (slot_state == CatchupSlotDrop) { /* Provider slot didn't catch up to locally reserved position */ ReplicationSlotRelease(); + ReplicationSlotDrop(remote_slot->name, false); PopActiveSnapshot(); CommitTransactionCommand(); return; } + } else { + slot_state = CatchupSlotSucceeded; } /* * We can locally satisfy requirements of remote slot's current * position now. Apply the new position if any and make it persistent. */ - LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); - LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, - remote_slot->catalog_xmin); - LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, - remote_slot->restart_lsn); - ReplicationSlotMarkDirty(); - - ReplicationSlotPersist(); - + if (slot_state == CatchupSlotSucceeded) + { + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + ReplicationSlotMarkDirty(); + ReplicationSlotPersist(); + } elog(DEBUG1, "synchronized new slot %s to lsn (%X/%X) and catalog xmin (%u)", remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32), (uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin); } - ReplicationSlotRelease(); PopActiveSnapshot(); CommitTransactionCommand(); @@ -944,7 +976,7 @@ synchronize_failover_slots(long sleep_time) bool active; bool found = false; - active = (s->active_pid != 0); + active = (s->active_pid != 0 && s->active_pid != MyProcPid); /* Only check inactive slots. */ if (!s->in_use || active) @@ -1053,13 +1085,6 @@ synchronize_failover_slots(long sleep_time) if (remote_slot->confirmed_lsn > receivePtr) remote_slot->confirmed_lsn = receivePtr; - /* - * For simplicity we always move restart_lsn of all slots to the - * restart_lsn needed by the furthest-behind master slot. - */ - if (remote_slot->restart_lsn > lsn) - remote_slot->restart_lsn = lsn; - synchronize_one_slot(remote_slot); } @@ -1073,6 +1098,41 @@ synchronize_failover_slots(long sleep_time) return sleep_time; } + +/* + * After a promotion, we need to clean up the unpersisted replication slots we created while in recovery. + * If they have never been persisted, it means they are in an incosistent state. + */ +static void +cleanup_failover_slots_after_promotion() +{ + int i; + for (;;) + { + char * dropslot = NULL; + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->active_pid == MyProcPid && s->data.persistency == RS_TEMPORARY) + { + dropslot = pstrdup(NameStr(s->data.name)); + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + if (dropslot) + { + elog(WARNING, "dropping inconsistent replication slot after promotion \"%s\"", dropslot); + ReplicationSlotDrop(dropslot, false); + pfree(dropslot); + } + else + break; + } +} + void pg_failover_slots_main(Datum main_arg) { @@ -1102,7 +1162,10 @@ pg_failover_slots_main(Datum main_arg) if (RecoveryInProgress()) sleep_time = synchronize_failover_slots(worker_nap_time); else + { + cleanup_failover_slots_after_promotion(); sleep_time = worker_nap_time * 10; + } rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, @@ -1493,6 +1556,13 @@ _PG_init(void) &pg_failover_maintenance_db, "postgres", PGC_SIGHUP, GUC_SUPERUSER_ONLY, NULL, NULL, NULL); + DefineCustomIntVariable( + "pg_failover_slots.sync_timeout", + "timeout when waiting for a slot to be persisted", + "If set to -1 (the default), we wait forever meaning we could hang up" + "up on one slot while other slots are ok to be synced.", + &pg_failover_slots_sync_timeout, -1, -1, INT_MAX, PGC_SIGHUP, + GUC_UNIT_MS, NULL, NULL, NULL); if (IsBinaryUpgrade) return; @@ -1505,7 +1575,7 @@ _PG_init(void) snprintf(bgw.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "pg_failover_slots_main"); snprintf(bgw.bgw_name, BGW_MAXLEN, "pg_failover_slots worker"); - bgw.bgw_restart_time = 60; + bgw.bgw_restart_time = 10; RegisterBackgroundWorker(&bgw);