Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor replay phase for standalone and leader #2086

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 55 additions & 48 deletions src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -629,37 +629,71 @@ i64 WalManager::ReplayWalFile(StorageMode targe_storage_mode) {
LOG_INFO("Start Wal Replay");
// log the wal files.

TxnTimeStamp max_commit_ts = 0; // the max commit ts that has be checkpointed
Vector<SharedPtr<WalEntry>> replay_entries;
String catalog_dir = "";
TxnTimeStamp last_commit_ts = 0; // last wal commit ts
TxnTimeStamp max_checkpoint_ts = 0; // the max commit ts that has be checkpointed
TxnTimeStamp last_commit_ts = 0; // last wal commit ts
bool found_full_checkpoint = false;

{ // if no checkpoint, max_commit_ts is 0
Vector<String> delta_ckp_path_array;
String full_ckp_path;
{
// Get all checkpoints path and latest checkpoint commit timestamp
WalListIterator iterator(wal_list);
// phase 1: find the max commit ts and catalog path
LOG_INFO("Replay phase 1: find the max commit ts and catalog path");

while (iterator.HasNext()) {
auto wal_entry = iterator.Next();
if (wal_entry.get() == nullptr) {
String error_message = "Found unexpected bad wal entry";
UnrecoverableError(error_message);
}
// LOG_TRACE(wal_entry->ToString());

WalCmdCheckpoint *checkpoint_cmd = nullptr;
if (wal_entry->IsCheckPoint(checkpoint_cmd)) {
max_commit_ts = checkpoint_cmd->max_commit_ts_;
std::string catalog_path = fmt::format("{}/{}", data_path_, "catalog");
catalog_dir = Path(fmt::format("{}/{}", catalog_path, checkpoint_cmd->catalog_name_)).parent_path().string();
last_commit_ts = wal_entry->commit_ts_;
break;
max_checkpoint_ts = checkpoint_cmd->max_commit_ts_;
std::string catalog_path = fmt::format("{}/{}/{}", data_path_, "catalog", checkpoint_cmd->catalog_name_);
if (checkpoint_cmd->is_full_checkpoint_) {
// Full checkpoint, OK
found_full_checkpoint = true;
full_ckp_path = catalog_path;
break;
}
delta_ckp_path_array.emplace_back(catalog_path);
}
replay_entries.push_back(wal_entry);
}
LOG_INFO(fmt::format("Find checkpoint max commit ts: {}", max_commit_ts));
}

// phase 2: by the max commit ts, find the entries to replay
LOG_INFO("Replay phase 2: by the max commit ts, find the entries to replay");
switch (targe_storage_mode) {
case StorageMode::kWritable: {
if (!found_full_checkpoint) {
UnrecoverableError("No full checkpoint found in leader or standalone mode");
}
break;
}
case StorageMode::kReadable:
if (delta_ckp_path_array.empty() && full_ckp_path.empty()) {
// At very beginning, no any data and logs
return 0;
}
if (!found_full_checkpoint) {
UnrecoverableError("No full checkpoint found in follower or learner mode");
}
break;
default: {
String error_message = "Unreachable branch";
UnrecoverableError(error_message);
}
}

// From new -> old to old -> new
std::reverse(delta_ckp_path_array.begin(), delta_ckp_path_array.end());
storage_->LoadFullCheckpoint(full_ckp_path);
for(const String& delta_ckp_path: delta_ckp_path_array) {
storage_->AttachDeltaCheckpoint(delta_ckp_path);
}

Vector<SharedPtr<WalEntry>> replay_entries;
{
// Find all WAL entry larger than max_checkpoint_ts
WalListIterator iterator(wal_list);
while (iterator.HasNext()) {
auto wal_entry = iterator.Next();
if (wal_entry.get() == nullptr) {
Expand All @@ -669,52 +703,25 @@ i64 WalManager::ReplayWalFile(StorageMode targe_storage_mode) {
// replay_entries.clear();
break;
}
// LOG_TRACE(wal_entry->ToString());

if (wal_entry->commit_ts_ > max_commit_ts) {
if (wal_entry->commit_ts_ > max_checkpoint_ts) {
replay_entries.push_back(wal_entry);
} else {
break;
}
}
}

if (last_commit_ts == 0) {
switch (targe_storage_mode) {
case StorageMode::kWritable: {
// once wal is not empty, a checkpoint should always be found in leader or standalone mode.
String error_message = "WAL replay: No checkpoint found in wal";
UnrecoverableError(error_message);
break;
}
case StorageMode::kReadable: {
return 0;
}
default: {
String error_message = "Unreachable branch";
UnrecoverableError(error_message);
}
}
}
LOG_INFO(fmt::format("Checkpoint found, replay the catalog"));
auto catalog_fileinfo = CatalogFile::ParseValidCheckpointFilenames(catalog_dir, max_commit_ts);
if (!catalog_fileinfo.has_value()) {
String error_message = fmt::format("Wal Replay: Parse catalog file failed, catalog_dir: {}", catalog_dir);
UnrecoverableError(error_message);
}
auto &[full_catalog_fileinfo, delta_catalog_fileinfo_array] = catalog_fileinfo.value();
storage_->AttachCatalog(full_catalog_fileinfo, delta_catalog_fileinfo_array);

// phase 3: replay the entries
LOG_INFO(fmt::format("Replay phase 3: replay {} entries", replay_entries.size()));
std::reverse(replay_entries.begin(), replay_entries.end());
TransactionID last_txn_id = 0;

for (SizeT replay_count = 0; replay_count < replay_entries.size(); ++replay_count) {
if (replay_entries[replay_count]->commit_ts_ < max_commit_ts) {
if (replay_entries[replay_count]->commit_ts_ < max_checkpoint_ts) {
String error_message = fmt::format("Wal Replay: Commit ts should be greater than max commit ts, commit_ts: {}, max_commit: {}",
replay_entries[replay_count]->commit_ts_,
max_commit_ts);
max_checkpoint_ts);
UnrecoverableError(error_message);
}
last_commit_ts = replay_entries[replay_count]->commit_ts_;
Expand Down Expand Up @@ -931,7 +938,7 @@ void WalManager::ReplayWalEntry(const WalEntry &entry, bool on_startup) {
// entry.commit_ts_);
// break;
case WalCommandType::CHECKPOINT: {
if(on_startup) {
if (on_startup) {
if (storage_->GetStorageMode() == StorageMode::kReadable) {
WalCmdCheckpointReplay(*static_cast<const WalCmdCheckpoint *>(cmd.get()), entry.txn_id_, entry.commit_ts_);
} else {
Expand Down
Loading