Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep synchronizing slots when others are lagging on primary. #31

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 146 additions & 76 deletions pg_failover_slots.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ char *pg_failover_maintenance_db;
/* Slots to sync */
char *pg_failover_slots_dsn;
char *pg_failover_slot_names;
int pg_failover_slots_sync_timeout;
static char *pg_failover_slot_names_str = NULL;
static List *pg_failover_slot_names_list = NIL;
static bool pg_failover_slots_drop = true;
Expand All @@ -119,6 +120,12 @@ char *pg_failover_slots_version_str;
void _PG_init(void);
PGDLLEXPORT void pg_failover_slots_main(Datum main_arg);

typedef enum SlotCatchupState {
CatchupSlotDrop = 0,
CatchupSlotSucceeded = 1,
CatchupSlotDirty = 2,
} SlotCatchupState;

static bool
check_failover_slot_names(char **newval, void **extra, GucSource source)
{
Expand Down Expand Up @@ -534,14 +541,16 @@ remote_connect(const char *connstr, const char *appname)
* relies on us having already reserved the WAL for the old position of
* `remote_slot` so `slot` can't continue to advance.
*/
static bool
static SlotCatchupState
wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
{
List *slots;
PGconn *conn;
StringInfoData connstr;
TimestampTz cb_wait_start =
0; /* first invocation should happen immediately */
TimestampTz wait_start = GetCurrentTimestamp();
TimestampTz now = 0;

elog(
LOG,
Expand Down Expand Up @@ -584,7 +593,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
"replication slot sync wait for slot %s interrupted by promotion",
remote_slot->name)));
PQfinish(conn);
return false;
return CatchupSlotDrop;
}

filter->key = FAILOVERSLOT_FILTER_NAME;
Expand All @@ -595,7 +604,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
{
/* Slot on provider vanished */
PQfinish(conn);
return false;
return CatchupSlotDrop;
}

receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
Expand All @@ -616,14 +625,30 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
remote_slot->confirmed_lsn = new_slot->confirmed_lsn;
remote_slot->catalog_xmin = new_slot->catalog_xmin;
PQfinish(conn);
return true;
return CatchupSlotSucceeded;
}

/*
* Invoke any callbacks that will help move the slots along
*/
now = GetCurrentTimestamp();
if (pg_failover_slots_sync_timeout >= 0 && TimestampDifferenceExceeds(
wait_start, now, pg_failover_slots_sync_timeout))
{
elog(
LOG,
"Give up on waiting for remote slot %s lsn (%X/%X) and catalog xmin (%u) to pass local slot lsn (%X/%X) and catalog xmin (%u)",
remote_slot->name, (uint32) (new_slot->restart_lsn >> 32),
(uint32) (new_slot->restart_lsn), new_slot->catalog_xmin,
(uint32) (slot->data.restart_lsn >> 32),
(uint32) (slot->data.restart_lsn),
slot->data.catalog_xmin);
PQfinish(conn);
return CatchupSlotDirty;
}

if (TimestampDifferenceExceeds(
cb_wait_start, GetCurrentTimestamp(),
cb_wait_start, now,
Min(wal_retrieve_retry_interval * 5, PG_WAIT_EXTENSION)))
{
if (cb_wait_start > 0)
Expand All @@ -639,6 +664,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
cb_wait_start = GetCurrentTimestamp();
}


rc =
WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
wal_retrieve_retry_interval, PG_WAIT_EXTENSION);
Expand All @@ -648,6 +674,16 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)


ResetLatch(MyLatch);

/*
* The user may change pg_failover_slots_sync_timeout, so update it if needed.
*/
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}

}
}

Expand All @@ -669,6 +705,7 @@ synchronize_one_slot(RemoteSlot *remote_slot)
{
int i;
bool found = false;
SlotCatchupState slot_state = CatchupSlotDirty;

if (!RecoveryInProgress())
{
Expand Down Expand Up @@ -714,42 +751,10 @@ synchronize_one_slot(RemoteSlot *remote_slot)
if (found)
{
ReplicationSlotAcquire(remote_slot->name, true);

/*
* We can't satisfy this remote slot's requirements with our known-safe
* local restart_lsn, catalog_xmin and xmin.
*
* This shouldn't happen for existing slots unless someone else messed
* with our physical replication slot on the master.
*/
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
TransactionIdPrecedes(remote_slot->catalog_xmin,
MyReplicationSlot->data.catalog_xmin))
{
elog(
WARNING,
"not synchronizing slot %s; synchronization would move it backward",
remote_slot->name);

ReplicationSlotRelease();
PopActiveSnapshot();
CommitTransactionCommand();
return;
}

LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
remote_slot->catalog_xmin);
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
remote_slot->restart_lsn);
ReplicationSlotMarkDirty();
ReplicationSlotSave();

elog(
DEBUG2,
"synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)",
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
if (MyReplicationSlot->data.persistency == RS_PERSISTENT)
slot_state = CatchupSlotSucceeded;
else
slot_state = CatchupSlotDirty;
}
/*
* Otherwise create the local slot and initialize it to the state of the
Expand All @@ -767,13 +772,13 @@ synchronize_one_slot(RemoteSlot *remote_slot)
* don't want it to persist if we fail.
*/
#if PG_VERSION_NUM >= 170000
ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL,
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
remote_slot->two_phase, false, false);
#elif PG_VERSION_NUM >= 140000
ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL,
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
remote_slot->two_phase);
#else
ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL);
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY);
#endif
slot = MyReplicationSlot;

Expand All @@ -796,56 +801,83 @@ synchronize_one_slot(RemoteSlot *remote_slot)
slot->data.catalog_xmin = xmin_horizon;
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
slot_state = CatchupSlotDirty;
}

if (slot_state == CatchupSlotSucceeded)
{
/*
* Our xmin and/or catalog_xmin may be > that required by one or more
* of the slots we are trying to sync from the master, and/or we don't
* have enough retained WAL for the slot's restart_lsn.
*
* If we persist the slot locally in that state it'll make a false
* promise we can't satisfy.
*
* This can happen if this replica is fairly new or has only recently
* started failover slot sync.
* We can't satisfy this remote slot's requirements with our known-safe
* local restart_lsn, catalog_xmin and xmin.
*
* TODO: Don't stop synchronization of other slots for this, we can't
* add timeout because that could result in some slots never being
* synchronized as they will always be behind the physical slot.
* This shouldn't happen for existing slots unless someone else messed
* with our physical replication slot on the master.
*/
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
TransactionIdPrecedes(remote_slot->catalog_xmin,
MyReplicationSlot->data.catalog_xmin))
{
if (!wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot))
elog(
WARNING,
"not synchronizing slot %s; synchronization would move it backward",
remote_slot->name);
ReplicationSlotRelease();
PopActiveSnapshot();
CommitTransactionCommand();
return;
}

LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
remote_slot->catalog_xmin);
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
remote_slot->restart_lsn);
ReplicationSlotMarkDirty();
ReplicationSlotSave();
elog(
DEBUG2,
"synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)",
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
} else {
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
TransactionIdPrecedes(remote_slot->catalog_xmin,
MyReplicationSlot->data.catalog_xmin))
{
slot_state = wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot);
if (slot_state == CatchupSlotDrop)
{
/* Provider slot didn't catch up to locally reserved position
*/
ReplicationSlotRelease();
ReplicationSlotDrop(remote_slot->name, false);
PopActiveSnapshot();
CommitTransactionCommand();
return;
}
} else {
slot_state = CatchupSlotSucceeded;
}

/*
* We can locally satisfy requirements of remote slot's current
* position now. Apply the new position if any and make it persistent.
*/
LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
remote_slot->catalog_xmin);
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
remote_slot->restart_lsn);
ReplicationSlotMarkDirty();

ReplicationSlotPersist();

if (slot_state == CatchupSlotSucceeded)
{
LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
remote_slot->catalog_xmin);
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
remote_slot->restart_lsn);
ReplicationSlotMarkDirty();
ReplicationSlotPersist();
}
elog(DEBUG1,
"synchronized new slot %s to lsn (%X/%X) and catalog xmin (%u)",
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
}

ReplicationSlotRelease();
PopActiveSnapshot();
CommitTransactionCommand();
Expand Down Expand Up @@ -944,7 +976,7 @@ synchronize_failover_slots(long sleep_time)
bool active;
bool found = false;

active = (s->active_pid != 0);
active = (s->active_pid != 0 && s->active_pid != MyProcPid);

/* Only check inactive slots. */
if (!s->in_use || active)
Expand Down Expand Up @@ -1053,13 +1085,6 @@ synchronize_failover_slots(long sleep_time)
if (remote_slot->confirmed_lsn > receivePtr)
remote_slot->confirmed_lsn = receivePtr;

/*
* For simplicity we always move restart_lsn of all slots to the
* restart_lsn needed by the furthest-behind master slot.
*/
if (remote_slot->restart_lsn > lsn)
remote_slot->restart_lsn = lsn;

synchronize_one_slot(remote_slot);
}

Expand All @@ -1073,6 +1098,41 @@ synchronize_failover_slots(long sleep_time)
return sleep_time;
}


/*
* After a promotion, we need to clean up the unpersisted replication slots we created while in recovery.
* If they have never been persisted, it means they are in an incosistent state.
*/
static void
cleanup_failover_slots_after_promotion()
{
int i;
for (;;)
{
char * dropslot = NULL;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];

if (s->active_pid == MyProcPid && s->data.persistency == RS_TEMPORARY)
{
dropslot = pstrdup(NameStr(s->data.name));
break;
}
}
LWLockRelease(ReplicationSlotControlLock);
if (dropslot)
{
elog(WARNING, "dropping inconsistent replication slot after promotion \"%s\"", dropslot);
ReplicationSlotDrop(dropslot, false);
pfree(dropslot);
}
else
break;
}
}

void
pg_failover_slots_main(Datum main_arg)
{
Expand Down Expand Up @@ -1102,7 +1162,10 @@ pg_failover_slots_main(Datum main_arg)
if (RecoveryInProgress())
sleep_time = synchronize_failover_slots(worker_nap_time);
else
{
cleanup_failover_slots_after_promotion();
sleep_time = worker_nap_time * 10;
}

rc =
WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
Expand Down Expand Up @@ -1493,6 +1556,13 @@ _PG_init(void)
&pg_failover_maintenance_db, "postgres", PGC_SIGHUP, GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);

DefineCustomIntVariable(
"pg_failover_slots.sync_timeout",
"timeout when waiting for a slot to be persisted",
"If set to -1 (the default), we wait forever meaning we could hang up"
"up on one slot while other slots are ok to be synced.",
&pg_failover_slots_sync_timeout, -1, -1, INT_MAX, PGC_SIGHUP,
GUC_UNIT_MS, NULL, NULL, NULL);

if (IsBinaryUpgrade)
return;
Expand All @@ -1505,7 +1575,7 @@ _PG_init(void)
snprintf(bgw.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "pg_failover_slots_main");
snprintf(bgw.bgw_name, BGW_MAXLEN, "pg_failover_slots worker");
bgw.bgw_restart_time = 60;
bgw.bgw_restart_time = 10;

RegisterBackgroundWorker(&bgw);

Expand Down