Skip to content

Commit

Permalink
Refactor replay phase for standalone and leader
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN committed Oct 22, 2024
1 parent 2421289 commit cce4e00
Showing 1 changed file with 56 additions and 46 deletions.
102 changes: 56 additions & 46 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,37 +629,72 @@ i64 WalManager::ReplayWalFile(StorageMode targe_storage_mode) {
LOG_INFO("Start Wal Replay");
// log the wal files.

TxnTimeStamp max_commit_ts = 0; // the max commit ts that has be checkpointed
Vector<SharedPtr<WalEntry>> replay_entries;
String catalog_dir = "";
TxnTimeStamp last_commit_ts = 0; // last wal commit ts
TxnTimeStamp max_checkpoint_ts = 0; // the max commit ts that has be checkpointed
TxnTimeStamp last_commit_ts = 0; // last wal commit ts
bool found_full_checkpoint = false;

{ // if no checkpoint, max_commit_ts is 0
Vector<String> delta_ckp_path_array;
String full_ckp_path;
{
// Get all checkpoints path and latest checkpoint commit timestamp
WalListIterator iterator(wal_list);
// phase 1: find the max commit ts and catalog path
LOG_INFO("Replay phase 1: find the max commit ts and catalog path");

while (iterator.HasNext()) {
auto wal_entry = iterator.Next();
if (wal_entry.get() == nullptr) {
String error_message = "Found unexpected bad wal entry";
UnrecoverableError(error_message);
}
// LOG_TRACE(wal_entry->ToString());

WalCmdCheckpoint *checkpoint_cmd = nullptr;
if (wal_entry->IsCheckPoint(checkpoint_cmd)) {
max_commit_ts = checkpoint_cmd->max_commit_ts_;
max_checkpoint_ts = checkpoint_cmd->max_commit_ts_;
std::string catalog_path = fmt::format("{}/{}", data_path_, "catalog");
catalog_dir = Path(fmt::format("{}/{}", catalog_path, checkpoint_cmd->catalog_name_)).parent_path().string();
last_commit_ts = wal_entry->commit_ts_;
break;
String catalog_dir = Path(fmt::format("{}/{}", catalog_path, checkpoint_cmd->catalog_name_)).parent_path().string();
if (checkpoint_cmd->is_full_checkpoint_) {
// Full checkpoint, OK
found_full_checkpoint = true;
full_ckp_path = catalog_dir;
break;
}
delta_ckp_path_array.emplace_back(catalog_dir);
}
replay_entries.push_back(wal_entry);
}
LOG_INFO(fmt::format("Find checkpoint max commit ts: {}", max_commit_ts));
}

// phase 2: by the max commit ts, find the entries to replay
LOG_INFO("Replay phase 2: by the max commit ts, find the entries to replay");
switch (targe_storage_mode) {
case StorageMode::kWritable: {
if (!found_full_checkpoint) {
UnrecoverableError("No full checkpoint found in leader or standalone mode");
}
break;
}
case StorageMode::kReadable:
if (delta_ckp_path_array.empty() && full_ckp_path.empty()) {
// At very beginning, no any data and logs
return 0;
}
if (!found_full_checkpoint) {
UnrecoverableError("No full checkpoint found in follower or learner mode");
}
break;
default: {
String error_message = "Unreachable branch";
UnrecoverableError(error_message);
}
}

// From new -> old to old -> new
std::reverse(delta_ckp_path_array.begin(), delta_ckp_path_array.end());
storage_->LoadFullCheckpoint(full_ckp_path);
for(const String& delta_ckp_path: delta_ckp_path_array) {
storage_->AttachDeltaCheckpoint(delta_ckp_path);
}

Vector<SharedPtr<WalEntry>> replay_entries;
{
// Find all WAL entry larger than max_checkpoint_ts
WalListIterator iterator(wal_list);
while (iterator.HasNext()) {
auto wal_entry = iterator.Next();
if (wal_entry.get() == nullptr) {
Expand All @@ -669,52 +704,27 @@ i64 WalManager::ReplayWalFile(StorageMode targe_storage_mode) {
// replay_entries.clear();
break;
}
// LOG_TRACE(wal_entry->ToString());

if (wal_entry->commit_ts_ > max_commit_ts) {
if (wal_entry->commit_ts_ > max_checkpoint_ts) {
replay_entries.push_back(wal_entry);
} else {
break;
}
}
}

if (last_commit_ts == 0) {
switch (targe_storage_mode) {
case StorageMode::kWritable: {
// once wal is not empty, a checkpoint should always be found in leader or standalone mode.
String error_message = "WAL replay: No checkpoint found in wal";
UnrecoverableError(error_message);
break;
}
case StorageMode::kReadable: {
return 0;
}
default: {
String error_message = "Unreachable branch";
UnrecoverableError(error_message);
}
}
}
LOG_INFO(fmt::format("Checkpoint found, replay the catalog"));
auto catalog_fileinfo = CatalogFile::ParseValidCheckpointFilenames(catalog_dir, max_commit_ts);
if (!catalog_fileinfo.has_value()) {
String error_message = fmt::format("Wal Replay: Parse catalog file failed, catalog_dir: {}", catalog_dir);
UnrecoverableError(error_message);
}
auto &[full_catalog_fileinfo, delta_catalog_fileinfo_array] = catalog_fileinfo.value();
storage_->AttachCatalog(full_catalog_fileinfo, delta_catalog_fileinfo_array);
std::reverse(replay_entries.begin(), replay_entries.end());

// phase 3: replay the entries
LOG_INFO(fmt::format("Replay phase 3: replay {} entries", replay_entries.size()));
std::reverse(replay_entries.begin(), replay_entries.end());
TransactionID last_txn_id = 0;

for (SizeT replay_count = 0; replay_count < replay_entries.size(); ++replay_count) {
if (replay_entries[replay_count]->commit_ts_ < max_commit_ts) {
if (replay_entries[replay_count]->commit_ts_ < max_checkpoint_ts) {
String error_message = fmt::format("Wal Replay: Commit ts should be greater than max commit ts, commit_ts: {}, max_commit: {}",
replay_entries[replay_count]->commit_ts_,
max_commit_ts);
max_checkpoint_ts);
UnrecoverableError(error_message);
}
last_commit_ts = replay_entries[replay_count]->commit_ts_;
Expand Down Expand Up @@ -931,7 +941,7 @@ void WalManager::ReplayWalEntry(const WalEntry &entry, bool on_startup) {
// entry.commit_ts_);
// break;
case WalCommandType::CHECKPOINT: {
if(on_startup) {
if (on_startup) {
if (storage_->GetStorageMode() == StorageMode::kReadable) {
WalCmdCheckpointReplay(*static_cast<const WalCmdCheckpoint *>(cmd.get()), entry.txn_id_, entry.commit_ts_);
} else {
Expand Down

0 comments on commit cce4e00

Please sign in to comment.