Skip to content

Commit

Permalink
refactor(metactl): change to subcommands & support transfer-leader (#โ€ฆ
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Aug 15, 2024
1 parent 81deb35 commit 43de923
Show file tree
Hide file tree
Showing 9 changed files with 442 additions and 209 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/binaries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fastrace = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
Expand Down
92 changes: 92 additions & 0 deletions src/meta/binaries/metactl/admin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use reqwest::Client;
use serde::Deserialize;

pub struct MetaAdminClient {
client: Client,
endpoint: String,
}

impl MetaAdminClient {
pub fn new(addr: &str) -> Self {
let client = Client::new();
MetaAdminClient {
client,
endpoint: format!("http://{}", addr),
}
}

pub async fn status(&self) -> anyhow::Result<AdminStatusResponse> {
let resp = self
.client
.get(format!("{}/v1/cluster/status", self.endpoint))
.send()
.await?;
let status = resp.status();
if status.is_success() {
let result = resp.json::<AdminStatusResponse>().await?;
Ok(result)
} else {
let data = resp.bytes().await?;
let msg = String::from_utf8_lossy(&data);
Err(anyhow::anyhow!("status code: {}, msg: {}", status, msg))
}
}

pub async fn transfer_leader(
&self,
target: Option<u64>,
) -> anyhow::Result<AdminTransferLeaderResponse> {
let resp = match target {
Some(to) => {
self.client
.get(format!(
"{}/v1/ctrl/trigger_transfer_leader?to={}",
self.endpoint, to
))
.send()
.await?
}
None => {
self.client
.get(format!("{}/v1/ctrl/trigger_transfer_leader", self.endpoint))
.send()
.await?
}
};
let status = resp.status();
if status.is_success() {
let result = resp.json::<AdminTransferLeaderResponse>().await?;
Ok(result)
} else {
let data = resp.bytes().await?;
let msg = String::from_utf8_lossy(&data);
Err(anyhow::anyhow!("status code: {}, msg: {}", status, msg))
}
}
}

#[derive(Deserialize, Debug)]
pub struct AdminStatusResponse {
pub name: String,
}

#[derive(Deserialize, Debug)]
pub struct AdminTransferLeaderResponse {
pub from: u64,
pub to: u64,
pub voter_ids: Vec<u64>,
}
15 changes: 7 additions & 8 deletions src/meta/binaries/metactl/export_from_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,29 @@ use databend_meta::store::StoreInner;
use futures::TryStreamExt;

use crate::upgrade;
use crate::Config;
use crate::ExportArgs;

/// Print the entire sled db.
///
/// The output encodes every key-value into one line:
/// `[sled_tree_name, {key_space: {key, value}}]`
/// E.g.:
/// `["state_machine/0",{"GenericKV":{"key":"wow","value":{"seq":3,"meta":null,"data":[119,111,119]}}}`
pub async fn export_from_dir(config: &Config) -> anyhow::Result<()> {
upgrade::upgrade(config).await?;
pub async fn export_from_dir(args: &ExportArgs) -> anyhow::Result<()> {
let raft_config: RaftConfig = args.clone().into();
upgrade::upgrade(&raft_config).await?;

eprintln!();
eprintln!("Export:");

let raft_config: RaftConfig = config.clone().into();

let sto_inn = StoreInner::open_create(&raft_config, Some(()), None).await?;
let mut lines = Arc::new(sto_inn).export();

eprintln!(" From: {}", raft_config.raft_dir);

let file: Option<File> = if !config.db.is_empty() {
eprintln!(" To: File: {}", config.db);
Some((File::create(&config.db))?)
let file: Option<File> = if !args.db.is_empty() {
eprintln!(" To: File: {}", args.db);
Some((File::create(&args.db))?)
} else {
eprintln!(" To: <stdout>");
None
Expand Down
23 changes: 9 additions & 14 deletions src/meta/binaries/metactl/export_from_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,19 @@ use databend_common_meta_types::protobuf;
use tokio::net::TcpSocket;
use tokio_stream::StreamExt;

use crate::Config;
use crate::ExportArgs;

/// Dump metasrv data, raft-log, state machine etc in json to stdout.
pub async fn export_from_running_node(config: &Config) -> Result<(), anyhow::Error> {
pub async fn export_from_running_node(args: &ExportArgs) -> Result<(), anyhow::Error> {
eprintln!();
eprintln!("Export:");
eprintln!(" From: online meta-service: {}", config.grpc_api_address);
eprintln!(" Export To: {}", config.db);
eprintln!(" Export Chunk Size: {:?}", config.export_chunk_size);

let grpc_api_addr = get_available_socket_addr(&config.grpc_api_address).await?;

export_from_grpc(
grpc_api_addr.to_string().as_str(),
config.db.clone(),
config.export_chunk_size,
)
.await?;
eprintln!(" From: online meta-service: {}", args.grpc_api_address);
eprintln!(" Export To: {}", args.db);
eprintln!(" Export Chunk Size: {:?}", args.chunk_size);

let grpc_api_addr = get_available_socket_addr(args.grpc_api_address.as_str()).await?;
let addr = grpc_api_addr.to_string();
export_from_grpc(addr.as_str(), args.db.clone(), args.chunk_size).await?;
Ok(())
}

Expand Down
59 changes: 29 additions & 30 deletions src/meta/binaries/metactl/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,38 +55,38 @@ use url::Url;

use crate::reading;
use crate::upgrade;
use crate::Config;
use crate::ImportArgs;

pub async fn import_data(config: &Config) -> anyhow::Result<()> {
let raft_dir = config.raft_dir.clone().unwrap_or_default();
pub async fn import_data(args: &ImportArgs) -> anyhow::Result<()> {
let raft_dir = args.raft_dir.clone().unwrap_or_default();

eprintln!();
eprintln!("Import:");
eprintln!(" Into Meta Dir: '{}'", raft_dir);
eprintln!(" Initialize Cluster with Id: {}, cluster: {{", config.id);
for peer in config.initial_cluster.clone() {
eprintln!(" Initialize Cluster with Id: {}, cluster: {{", args.id);
for peer in args.initial_cluster.clone() {
eprintln!(" Peer: {}", peer);
}
eprintln!(" }}");

let nodes = build_nodes(config.initial_cluster.clone(), config.id)?;
let nodes = build_nodes(args.initial_cluster.clone(), args.id)?;

init_sled_db(raft_dir.clone(), 64 * 1024 * 1024 * 1024);

clear(config)?;
let max_log_id = import_from_stdin_or_file(config).await?;
clear(args)?;
let max_log_id = import_from_stdin_or_file(args).await?;

if config.initial_cluster.is_empty() {
if args.initial_cluster.is_empty() {
return Ok(());
}

init_new_cluster(config, nodes, max_log_id, config.id).await?;
init_new_cluster(args, nodes, max_log_id).await?;
Ok(())
}

/// Import from lines of exported data and Return the max log id that is found.
async fn import_lines<B: BufRead + 'static>(
config: &Config,
raft_config: RaftConfig,
lines: Lines<B>,
) -> anyhow::Result<Option<LogId>> {
#[allow(clippy::useless_conversion)]
Expand All @@ -106,8 +106,8 @@ async fn import_lines<B: BufRead + 'static>(
please use an older version databend-metactl to import from V001"
));
}
DataVersion::V002 => import_v002(config, it).await?,
DataVersion::V003 => import_v003(config, it).await?,
DataVersion::V002 => import_v002(raft_config, it).await?,
DataVersion::V003 => import_v003(raft_config, it).await?,
};

Ok(max_log_id)
Expand All @@ -119,11 +119,11 @@ async fn import_lines<B: BufRead + 'static>(
///
/// It write logs and related entries to sled trees, and state_machine entries to a snapshot.
async fn import_v002(
config: &Config,
raft_config: RaftConfig,
lines: impl IntoIterator<Item = Result<String, io::Error>>,
) -> anyhow::Result<Option<LogId>> {
// v002 and v003 share the same exported data format.
import_v003(config, lines).await
import_v003(raft_config, lines).await
}

/// Import serialized lines for `DataVersion::V003`
Expand All @@ -132,11 +132,9 @@ async fn import_v002(
///
/// It write logs and related entries to sled trees, and state_machine entries to a snapshot.
async fn import_v003(
config: &Config,
raft_config: RaftConfig,
lines: impl IntoIterator<Item = Result<String, io::Error>>,
) -> anyhow::Result<Option<LogId>> {
let raft_config: RaftConfig = config.clone().into();

let db = get_sled_db();

let mut n = 0;
Expand Down Expand Up @@ -221,24 +219,26 @@ async fn import_v003(
/// Insert them into sled db and flush.
///
/// Finally upgrade the data in raft_dir to the latest version.
async fn import_from_stdin_or_file(config: &Config) -> anyhow::Result<Option<LogId>> {
let restore = config.db.clone();
async fn import_from_stdin_or_file(args: &ImportArgs) -> anyhow::Result<Option<LogId>> {
let restore = args.db.clone();

let raft_config: RaftConfig = args.clone().into();
let max_log_id = if restore.is_empty() {
eprintln!(" From: <stdin>");
let lines = io::stdin().lines();

import_lines(config, lines).await?
import_lines(raft_config, lines).await?
} else {
eprintln!(" From: {}", config.db);
eprintln!(" From: {}", args.db);
let file = File::open(restore)?;
let reader = BufReader::new(file);
let lines = reader.lines();

import_lines(config, lines).await?
import_lines(raft_config, lines).await?
};

upgrade::upgrade(config).await?;
let raft_config: RaftConfig = args.clone().into();
upgrade::upgrade(&raft_config).await?;

Ok(max_log_id)
}
Expand Down Expand Up @@ -298,16 +298,15 @@ fn build_nodes(initial_cluster: Vec<String>, id: u64) -> anyhow::Result<BTreeMap

// initial_cluster format: node_id=endpoint,grpc_api_addr;
async fn init_new_cluster(
config: &Config,
args: &ImportArgs,
nodes: BTreeMap<NodeId, Node>,
max_log_id: Option<LogId>,
id: u64,
) -> anyhow::Result<()> {
eprintln!();
eprintln!("Initialize Cluster with: {:?}", nodes);

let db = get_sled_db();
let raft_config: RaftConfig = config.clone().into();
let raft_config: RaftConfig = args.clone().into();

let mut sto = RaftStore::open_create(&raft_config, Some(()), None).await?;

Expand Down Expand Up @@ -375,13 +374,13 @@ async fn init_new_cluster(

// Reset node id
let raft_state = RaftState::open_create(&db, &raft_config, Some(()), None).await?;
raft_state.set_node_id(id).await?;
raft_state.set_node_id(args.id).await?;

Ok(())
}

/// Clear all sled data and on-disk snapshot.
fn clear(config: &Config) -> anyhow::Result<()> {
fn clear(args: &ImportArgs) -> anyhow::Result<()> {
eprintln!();
eprintln!("Clear All Sled Trees Before Import:");
let db = get_sled_db();
Expand All @@ -394,7 +393,7 @@ fn clear(config: &Config) -> anyhow::Result<()> {
eprintln!(" Cleared sled tree: {}", name);
}

let df_meta_path = format!("{}/df_meta", config.raft_dir.clone().unwrap_or_default());
let df_meta_path = format!("{}/df_meta", args.raft_dir.clone().unwrap_or_default());
if Path::new(&df_meta_path).exists() {
remove_dir_all(&df_meta_path)?;
}
Expand Down
Loading

0 comments on commit 43de923

Please sign in to comment.