-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.rs
113 lines (96 loc) · 4.21 KB
/
server.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::env;
use std::str::FromStr;
use std::time::Duration;
use anyhow::Result;
use clap::Parser;
use tonic::transport::Server;
use configmanager::ConfigManager;
use indexengine::index::Index;
use indexengine::no_index::NoIndex;
use crate::key_value_service_server::key_value_store::key_value_service_server::{KeyValueServiceServer};
mod logging_middleware;
mod key_value_service_server;
mod error;
#[derive(Debug, Clone)]
enum IndexEngine {
BTree,
LSMTree,
NoIndex,
HashMap,
}
impl FromStr for IndexEngine {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"BTree" => Ok(IndexEngine::BTree),
"LSMTree" => Ok(IndexEngine::LSMTree),
"NoIndex" => Ok(IndexEngine::NoIndex),
"HashMap" => Ok(IndexEngine::HashMap),
_ => Err("no match"),
}
}
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(long, default_value = "patrick.db")]
storage_file_name: String,
#[arg(long, default_value = "localhost:2181,localhost:2182,localhost:2183")]
zookeeper_servers: String,
#[arg(long, default_value = "[::1]:50052")]
server_address: String,
#[arg(long, default_value = "http://[::1]:50052")]
server_url: String,
#[arg(long, default_value = "/latch")]
leader_election_path: String,
#[arg(long, default_value = "/services")]
service_registry_path: String,
#[arg(long, default_value = "BTree")]
index_engine: IndexEngine,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let args = Args::parse();
let server_address = env::var("SERVER_ADDRESS").ok().unwrap_or(args.server_address);
let server_url = env::var("SERVER_URL").ok().unwrap_or(args.server_url);
let zookeeper_servers = env::var("ZOOKEEPER_SERVERS").ok().unwrap_or(args.zookeeper_servers);
let leader_election_path = env::var("LEADER_ELECTION_PATH").ok().unwrap_or(args.leader_election_path);
let service_registry_path = env::var("SERVICE_REGISTRY_PATH").ok().unwrap_or(args.service_registry_path);
log::info!("start zookeeper config manager");
let config_manager = configmanager::ZooKeeperConfigManager::new(
service_registry_path.as_str(),
leader_election_path.as_str(),
server_url.as_str(),
zookeeper_servers.as_str(),
)?;
log::info!("follower addresses: {:?}", config_manager.get_follower_addresses()?);
log::info!("leader address: {:?}", config_manager.get_leader_address()?);
log::info!("finished starting zookeeper config manager");
log::info!("init storage engine");
let storage_file_name = env::var("STORAGE_FILE_NAME").ok().unwrap_or(args.storage_file_name);
let file_handler = storageengine::file_handler::FileHandlerImpl::new(&storage_file_name)?;
let operations = storageengine::operations::DbOperationsImpl::new(Box::new(file_handler));
let index_engine: Box<dyn Index<Vec<u8>, Vec<u8>>> = match args.index_engine {
IndexEngine::BTree => indexengine::new_index_engine(indexengine::IndexEngine::BTree, Box::new(operations)).expect("failed to create btree"),
IndexEngine::LSMTree => indexengine::new_index_engine(indexengine::IndexEngine::LSM, Box::new(operations)).expect("failed to create lsm"),
IndexEngine::NoIndex => Box::new(NoIndex::new(Box::new(operations))),
IndexEngine::HashMap => indexengine::new_index_engine(indexengine::IndexEngine::HashMap, Box::new(operations)).expect("failed to create hashmap"),
};
log::info!("finished init storage engine");
let addr = server_address.parse()?;
let server = key_value_service_server::KeyValueStoreImpl::new(index_engine, Box::new(config_manager)).await;
let layer = tower::ServiceBuilder::new()
// Apply middleware from tower
.timeout(Duration::from_secs(30))
// Apply our own middleware
.layer(logging_middleware::LoggingInterceptorLayer)
.into_inner();
log::info!("start server");
Server::builder()
.layer(layer)
.add_service(KeyValueServiceServer::new(server))
.serve(addr)
.await?;
Ok(())
}