diff --git a/Cargo.lock b/Cargo.lock index 8708699f..d1b93638 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,7 +66,7 @@ name = "bindgen" version = "0.47.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "bitflags 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "cexpr 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "clang-sys 0.26.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -89,7 +89,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "bitflags" -version = "1.0.5" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -135,7 +135,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", - "bitflags 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-width 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -147,7 +147,7 @@ name = "cloudabi" version = "0.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "bitflags 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -244,6 +244,11 @@ name = "fuchsia-cprng" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "glob" version = "0.2.11" @@ -663,11 +668,13 @@ dependencies = [ name = "sonic-server" version = "1.2.0" dependencies = [ + "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "fst 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "fst-levenshtein 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "fst-regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)", "graceful 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "hashbrown 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "jemallocator 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -676,6 +683,7 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "radix 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "regex-syntax 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.90 (registry+https://github.com/rust-lang/crates.io-index)", @@ -865,7 +873,7 @@ dependencies = [ "checksum backtrace-sys 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)" = "797c830ac25ccc92a7f8a7b9862bde440715531514594a6154e3d4a54dd769b6" "checksum bindgen 0.47.3 (registry+https://github.com/rust-lang/crates.io-index)" = "df683a55b54b41d5ea8ebfaebb5aa7e6b84e3f3006a78f010dadc9ca88469260" "checksum bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3" -"checksum bitflags 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "bd1fa8ad26490b0a5cfec99089952250301b6716cdeaa7c9ab229598fb82ab66" +"checksum bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "228047a76f468627ca71776ecdebd732a3423081fcf5125585bcd7c49886ce12" "checksum byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a019b10a2a7cdeb292db131fc8113e57ea2a908f6e7894b0c3c671893b65dbeb" "checksum cc 1.0.36 (registry+https://github.com/rust-lang/crates.io-index)" = "a0c56216487bb80eec9c4516337b2588a4f2a2290d72a1416d930e4dcdb0c90d" "checksum cexpr 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "a7fa24eb00d5ffab90eaeaf1092ac85c04c64aaf358ea6f84505b8116d24c6af" @@ -884,6 +892,7 @@ dependencies = [ "checksum fst-levenshtein 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9e88429a6c230ef3bedcc8bc7c063252f7e82e8192571aebb56b094240a0b5e8" "checksum fst-regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "87aca1d91eed3c128132cee31d291fd4e8492df0b742a5b1453857a4c7cedd88" "checksum fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" +"checksum futures 0.1.27 (registry+https://github.com/rust-lang/crates.io-index)" = "a2037ec1c6c1c4f79557762eab1f7eae1f64f6cb418ace90fae88f0942b60139" "checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb" "checksum graceful 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1ca5313c7c751c3e64c789d0c0abfc8f6e782d4e93d88da070012d434a215ecc" "checksum hashbrown 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3bae29b6653b3412c2e71e9d486db9f9df5d701941d86683005efb9f2d28e3da" diff --git a/Cargo.toml b/Cargo.toml index 9f41de30..6fb68dae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,9 @@ byteorder = "1.3" hashbrown = "0.3" linked_hash_set = "0.1" whatlang = "0.7" +rayon = "1.0.3" +futures = "0.1.27" +bitflags = "=1.0.4" [features] default = ["alloc-jemalloc"] diff --git a/src/channel/command.rs b/src/channel/command.rs index a9bd453b..15cbed51 100644 --- a/src/channel/command.rs +++ b/src/channel/command.rs @@ -4,12 +4,15 @@ // Copyright: 2019, Valerian Saliou // License: Mozilla Public License v2.0 (MPL v2.0) +use futures::future; +use futures::future::FutureResult; use hashbrown::HashMap; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::path::Path; use std::str::{self, SplitWhitespace}; use std::vec::Vec; +use std::boxed::FnBox; use super::format::unescape; use super::statistics::ChannelStatistics; @@ -20,7 +23,7 @@ use crate::store::kv::StoreKVPool; use crate::store::operation::StoreOperationDispatch; use crate::APP_CONF; -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub enum ChannelCommandError { UnknownCommand, NotFound, @@ -33,7 +36,7 @@ pub enum ChannelCommandError { InvalidMetaValue((String, String)), } -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub enum ChannelCommandResponse { Void, Ok, @@ -52,7 +55,21 @@ pub struct ChannelCommandControl; pub type ChannelCommandResponseArgs = (&'static str, Option>); -type ChannelResult = Result, ChannelCommandError>; +pub enum ChannelResult<'a> { + Sync(ChannelResultSync), + Async(ChannelResultAsync<'a>), +} + +pub type ChannelResultSync = Result; +pub type ChannelResultAsync<'a> = Result< + ( + ChannelCommandResponse, + ChannelResultAsyncFuture<'a>, + ), + ChannelCommandError, +>; +pub type ChannelResultAsyncFuture<'a> = Box FutureResult + Send + 'a>; + type MetaPartsResult<'a> = Result<(&'a str, &'a str), (&'a str, &'a str)>; pub const EVENT_ID_SIZE: usize = 8; @@ -132,52 +149,52 @@ impl ChannelCommandResponse { } impl ChannelCommandBase { - pub fn dispatch_ping(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_ping<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match parts.next() { - None => Ok(vec![ChannelCommandResponse::Pong]), - _ => Err(ChannelCommandError::InvalidFormat("PING")), + None => ChannelResult::Sync(Ok(ChannelCommandResponse::Pong)), + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat("PING"))), } } - pub fn dispatch_quit(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_quit<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match parts.next() { - None => Ok(vec![ChannelCommandResponse::Ended("quit")]), - _ => Err(ChannelCommandError::InvalidFormat("QUIT")), + None => ChannelResult::Sync(Ok(ChannelCommandResponse::Ended("quit"))), + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat("QUIT"))), } } - pub fn generic_dispatch_help( + pub fn generic_dispatch_help<'a>( mut parts: SplitWhitespace, manuals: &HashMap<&str, &Vec<&str>>, - ) -> ChannelResult { + ) -> ChannelResult<'a> { match (parts.next(), parts.next()) { (None, _) => { let manual_list = manuals.keys().map(|k| k.to_owned()).collect::>(); - Ok(vec![ChannelCommandResponse::Result(format!( + ChannelResult::Sync(Ok(ChannelCommandResponse::Result(format!( "manuals({})", manual_list.join(", ") - ))]) + )))) } (Some(manual_key), next_part) => { if next_part.is_none() { if let Some(manual_data) = manuals.get(manual_key) { - Ok(vec![ChannelCommandResponse::Result(format!( + ChannelResult::Sync(Ok(ChannelCommandResponse::Result(format!( "{}({})", manual_key, manual_data.join(", ") - ))]) + )))) } else { - Err(ChannelCommandError::NotFound) + ChannelResult::Sync(Err(ChannelCommandError::NotFound)) } } else { - Err(ChannelCommandError::InvalidFormat("HELP []?")) + ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat("HELP []?"))) } } } } - pub fn parse_text_parts(parts: &mut SplitWhitespace) -> Option { + pub fn parse_text_parts<'a>(parts: &mut SplitWhitespace) -> Option { // Parse text parts and nest them together let mut text_raw = String::new(); @@ -313,20 +330,20 @@ impl ChannelCommandBase { ChannelCommandError::InvalidMetaValue((meta_key.to_owned(), meta_value.to_owned())) } - pub fn commit_ok_operation(query_builder: QueryBuilderResult) -> ChannelResult { + pub fn commit_ok_operation(query_builder: QueryBuilderResult) -> ChannelResultSync { query_builder .and_then(|query| StoreOperationDispatch::dispatch(query)) - .and_then(|_| Ok(vec![ChannelCommandResponse::Ok])) + .and_then(|_| Ok(ChannelCommandResponse::Ok)) .or(Err(ChannelCommandError::QueryError)) } - pub fn commit_result_operation(query_builder: QueryBuilderResult) -> ChannelResult { + pub fn commit_result_operation(query_builder: QueryBuilderResult) -> ChannelResultSync { query_builder .and_then(|query| StoreOperationDispatch::dispatch(query)) .or(Err(ChannelCommandError::QueryError)) .and_then(|result| { if let Some(result_inner) = result { - Ok(vec![ChannelCommandResponse::Result(result_inner)]) + Ok(ChannelCommandResponse::Result(result_inner)) } else { Err(ChannelCommandError::InternalError) } @@ -334,32 +351,21 @@ impl ChannelCommandBase { } pub fn commit_pending_operation<'a>( + query_id: &'static str, query_type: &'static str, - query_id: &str, query_builder: QueryBuilderResult<'a>, - ) -> ChannelResult { - // Idea: this could be made asynchronous in the future, if there are some latency issues \ - // on large Sonic deployments. The idea would be to have a number of worker threads for \ - // the whole running daemon, and channel threads dispatching work to those threads. This \ - // way Sonic can be up-scaled to N CPUs instead of 1 CPU per channel connection. Now on, \ - // the only way to scale Sonic executors to multiple CPUs is opening multiple parallel \ - // Sonic Channel connections and dispatching work evenly to each connection. It does not \ - // prevent scaling Sonic vertically, but could be made simpler for the Sonic Channel \ - // consumer via a worker thread pool. - - query_builder - .and_then(|query| StoreOperationDispatch::dispatch(query)) - .and_then(|results| { - Ok(vec![ - ChannelCommandResponse::Pending(query_id.to_string()), - ChannelCommandResponse::Event( - query_type, - query_id.to_string(), - results.unwrap_or(String::new()), - ), - ]) - }) - .or(Err(ChannelCommandError::QueryError)) + ) -> ChannelResult<'a> { + ChannelResult::Async(Ok(( + ChannelCommandResponse::Pending(query_id.to_string()), + Box::new(move || match query_builder.and_then(|query| StoreOperationDispatch::dispatch(query)) { + Ok(results) => future::ok(ChannelCommandResponse::Event( + query_type, + query_id.to_string(), + results.unwrap_or(String::new()), + )), + Err(_) => future::err(ChannelCommandError::QueryError), + }, + )))) } pub fn generate_event_id() -> String { @@ -371,7 +377,7 @@ impl ChannelCommandBase { } impl ChannelCommandSearch { - pub fn dispatch_query(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_query<'a>(mut parts: SplitWhitespace<'a>) -> ChannelResult<'a> { match ( parts.next(), parts.next(), @@ -411,28 +417,32 @@ impl ChannelCommandSearch { } if let Some(err) = last_meta_err { - Err(err) + ChannelResult::Sync(Err(err)) } else if query_limit < 1 || query_limit > APP_CONF.channel.search.query_limit_maximum { - Err(ChannelCommandError::PolicyReject( + ChannelResult::Sync(Err(ChannelCommandError::PolicyReject( "LIMIT out of minimum/maximum bounds", - )) + ))) } else { debug!( "will search for #{} with text: {}, limit: {}, offset: {}, locale: <{:?}>", event_id, text, query_limit, query_offset, query_lang ); + + let query_id = Box::leak(event_id.into_boxed_str()); + let txt = Box::leak(text.into_boxed_str()); + // Commit 'search' query ChannelCommandBase::commit_pending_operation( + query_id, "QUERY", - &event_id, QueryBuilder::search( - &event_id, + query_id, collection, bucket, - &text, + txt, query_limit, query_offset, query_lang, @@ -440,14 +450,14 @@ impl ChannelCommandSearch { ) } } - _ => Err(ChannelCommandError::InvalidFormat( + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( "QUERY \"\" [LIMIT()]? [OFFSET()]? \ [LANG()]?", - )), + ))), } } - pub fn dispatch_suggest(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_suggest<'a>(mut parts: SplitWhitespace<'a>) -> ChannelResult<'a> { match ( parts.next(), parts.next(), @@ -478,34 +488,43 @@ impl ChannelCommandSearch { } if let Some(err) = last_meta_err { - Err(err) + ChannelResult::Sync(Err(err)) } else if suggest_limit < 1 || suggest_limit > APP_CONF.channel.search.suggest_limit_maximum { - Err(ChannelCommandError::PolicyReject( + ChannelResult::Sync(Err(ChannelCommandError::PolicyReject( "LIMIT out of minimum/maximum bounds", - )) + ))) } else { debug!( "will suggest for #{} with text: {}, limit: {}", event_id, text, suggest_limit ); + let query_id = Box::leak(event_id.into_boxed_str()); + let txt = Box::leak(text.into_boxed_str()); + // Commit 'suggest' query ChannelCommandBase::commit_pending_operation( + query_id, "SUGGEST", - &event_id, - QueryBuilder::suggest(&event_id, collection, bucket, &text, suggest_limit), + QueryBuilder::suggest( + query_id, + collection, + bucket, + txt, + suggest_limit, + ), ) } } - _ => Err(ChannelCommandError::InvalidFormat( + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( "SUGGEST \"\" [LIMIT()]?", - )), + ))), } } - pub fn dispatch_help(parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_help<'a>(parts: SplitWhitespace) -> ChannelResult { ChannelCommandBase::generic_dispatch_help(parts, &*MANUAL_MODE_SEARCH) } @@ -602,7 +621,7 @@ impl ChannelCommandSearch { } impl ChannelCommandIngest { - pub fn dispatch_push(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_push<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match ( parts.next(), parts.next(), @@ -632,7 +651,7 @@ impl ChannelCommandIngest { } if let Some(err) = last_meta_err { - Err(err) + ChannelResult::Sync(Err(err)) } else { debug!( "will push for text: {} with hinted locale: <{:?}>", @@ -640,18 +659,18 @@ impl ChannelCommandIngest { ); // Commit 'push' query - ChannelCommandBase::commit_ok_operation(QueryBuilder::push( - collection, bucket, object, &text, push_lang, + ChannelResult::Sync(ChannelCommandBase::commit_ok_operation( + QueryBuilder::push(collection, bucket, object, &text, push_lang), )) } } - _ => Err(ChannelCommandError::InvalidFormat( + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( "PUSH \"\" [LANG()]?", - )), + ))), } } - pub fn dispatch_pop(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_pop<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match ( parts.next(), parts.next(), @@ -667,35 +686,33 @@ impl ChannelCommandIngest { debug!("ingest pop has text: {}", text); // Make 'pop' query - ChannelCommandBase::commit_result_operation(QueryBuilder::pop( - collection, bucket, object, &text, + ChannelResult::Sync(ChannelCommandBase::commit_result_operation( + QueryBuilder::pop(collection, bucket, object, &text), )) } - _ => Err(ChannelCommandError::InvalidFormat( + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( "POP \"\"", - )), + ))), } } - pub fn dispatch_count(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_count<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match (parts.next(), parts.next(), parts.next(), parts.next()) { (Some(collection), bucket_part, object_part, None) => { debug!("dispatching ingest count in collection: {}", collection); // Make 'count' query - ChannelCommandBase::commit_result_operation(QueryBuilder::count( - collection, - bucket_part, - object_part, + ChannelResult::Sync(ChannelCommandBase::commit_result_operation( + QueryBuilder::count(collection, bucket_part, object_part), )) } - _ => Err(ChannelCommandError::InvalidFormat( + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( "COUNT [ []?]?", - )), + ))), } } - pub fn dispatch_flushc(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_flushc<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match (parts.next(), parts.next()) { (Some(collection), None) => { debug!( @@ -704,9 +721,13 @@ impl ChannelCommandIngest { ); // Make 'flushc' query - ChannelCommandBase::commit_result_operation(QueryBuilder::flushc(collection)) + ChannelResult::Sync(ChannelCommandBase::commit_result_operation( + QueryBuilder::flushc(collection), + )) } - _ => Err(ChannelCommandError::InvalidFormat("FLUSHC ")), + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( + "FLUSHC ", + ))), } } @@ -719,17 +740,17 @@ impl ChannelCommandIngest { ); // Make 'flushb' query - ChannelCommandBase::commit_result_operation(QueryBuilder::flushb( - collection, bucket, + ChannelResult::Sync(ChannelCommandBase::commit_result_operation( + QueryBuilder::flushb(collection, bucket), )) } - _ => Err(ChannelCommandError::InvalidFormat( + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( "FLUSHB ", - )), + ))), } } - pub fn dispatch_flusho(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_flusho<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match (parts.next(), parts.next(), parts.next(), parts.next()) { (Some(collection), Some(bucket), Some(object), None) => { debug!( @@ -738,17 +759,17 @@ impl ChannelCommandIngest { ); // Make 'flusho' query - ChannelCommandBase::commit_result_operation(QueryBuilder::flusho( - collection, bucket, object, + ChannelResult::Sync(ChannelCommandBase::commit_result_operation( + QueryBuilder::flusho(collection, bucket, object), )) } - _ => Err(ChannelCommandError::InvalidFormat( + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( "FLUSHO ", - )), + ))), } } - pub fn dispatch_help(parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_help<'a>(parts: SplitWhitespace) -> ChannelResult<'a> { ChannelCommandBase::generic_dispatch_help(parts, &*MANUAL_MODE_INGEST) } @@ -785,12 +806,12 @@ impl ChannelCommandIngest { } impl ChannelCommandControl { - pub fn dispatch_trigger(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_trigger<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match (parts.next(), parts.next(), parts.next()) { - (None, _, _) => Ok(vec![ChannelCommandResponse::Result(format!( + (None, _, _) => ChannelResult::Sync(Ok(ChannelCommandResponse::Result(format!( "actions({})", CONTROL_TRIGGER_ACTIONS.join(", ") - ))]), + )))), (Some(action_key), data_part, last_part) => { let action_key_lower = action_key.to_lowercase(); @@ -800,9 +821,11 @@ impl ChannelCommandControl { // Force a FST consolidate StoreFSTPool::consolidate(true); - Ok(vec![ChannelCommandResponse::Ok]) + ChannelResult::Sync(Ok(ChannelCommandResponse::Ok)) } else { - Err(ChannelCommandError::InvalidFormat("TRIGGER consolidate")) + ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( + "TRIGGER consolidate", + ))) } } "backup" => { @@ -814,12 +837,14 @@ impl ChannelCommandControl { if StoreKVPool::backup(&path.join(BACKUP_KV_PATH)).is_ok() && StoreFSTPool::backup(&path.join(BACKUP_FST_PATH)).is_ok() { - Ok(vec![ChannelCommandResponse::Ok]) + ChannelResult::Sync(Ok(ChannelCommandResponse::Ok)) } else { - Err(ChannelCommandError::InternalError) + ChannelResult::Sync(Err(ChannelCommandError::InternalError)) } } - _ => Err(ChannelCommandError::InvalidFormat("TRIGGER backup ")), + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( + "TRIGGER backup ", + ))), } } "restore" => { @@ -831,26 +856,28 @@ impl ChannelCommandControl { if StoreKVPool::restore(&path.join(BACKUP_KV_PATH)).is_ok() && StoreFSTPool::restore(&path.join(BACKUP_FST_PATH)).is_ok() { - Ok(vec![ChannelCommandResponse::Ok]) + ChannelResult::Sync(Ok(ChannelCommandResponse::Ok)) } else { - Err(ChannelCommandError::InternalError) + ChannelResult::Sync(Err(ChannelCommandError::InternalError)) } } - _ => Err(ChannelCommandError::InvalidFormat("TRIGGER restore ")), + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat( + "TRIGGER restore ", + ))), } } - _ => Err(ChannelCommandError::NotFound), + _ => ChannelResult::Sync(Err(ChannelCommandError::NotFound)), } } } } - pub fn dispatch_info(mut parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_info<'a>(mut parts: SplitWhitespace) -> ChannelResult<'a> { match parts.next() { None => { let statistics = ChannelStatistics::gather(); - Ok(vec![ChannelCommandResponse::Result(format!( + ChannelResult::Sync(Ok(ChannelCommandResponse::Result(format!( "uptime({}) clients_connected({}) commands_total({}) \ command_latency_best({}) command_latency_worst({}) \ kv_open_count({}) fst_open_count({}) fst_consolidate_count({})", @@ -862,13 +889,13 @@ impl ChannelCommandControl { statistics.kv_open_count, statistics.fst_open_count, statistics.fst_consolidate_count - ))]) + )))) } - _ => Err(ChannelCommandError::InvalidFormat("INFO")), + _ => ChannelResult::Sync(Err(ChannelCommandError::InvalidFormat("INFO"))), } } - pub fn dispatch_help(parts: SplitWhitespace) -> ChannelResult { + pub fn dispatch_help<'a>(parts: SplitWhitespace) -> ChannelResult<'a> { ChannelCommandBase::generic_dispatch_help(parts, &*MANUAL_MODE_CONTROL) } } diff --git a/src/channel/command_pool.rs b/src/channel/command_pool.rs new file mode 100644 index 00000000..68790f06 --- /dev/null +++ b/src/channel/command_pool.rs @@ -0,0 +1,91 @@ +use crate::APP_CONF; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::sync::{Arc, Mutex}; + +lazy_static! { + pub static ref COMMAND_POOL: CommandPool = CommandPool::new(); +} + +pub struct CommandPool { + pub thread_pool: Arc>, +} + +impl CommandPool { + pub fn new() -> Self { + let num_threads = APP_CONF.channel.command_pool_num_threads; + debug!("initializing command pool, num threads: {}", num_threads); + Self { + thread_pool: Arc::new(Mutex::new( + ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(), + )), + } + } + + pub fn enqueue<'a, F>(&self, func: F) + where + F: FnOnce() + Send + 'a, + { + self.thread_pool.lock().unwrap().scope(|s| { + s.spawn(move |_| func()); + }); + } +} + +// tests + +#[cfg(test)] +mod tests { + use super::*; + // use std::thread::sleep; + // use std::time::Duration; + + #[test] + // FIXME: threadpool is not asyn so it'f disabled. + // fn checks_if_enqueued_functions_working_async() { + // let index: Arc> = Arc::new(Mutex::new(0)); + // COMMAND_POOL.enqueue(|| { + // let mut index = index.lock().unwrap(); + // sleep(Duration::from_millis(10000)); + // *index += 1_usize; + // sleep(Duration::from_millis(50)); + // *index += 1_usize; + // }); + // COMMAND_POOL.enqueue(|| { + // let mut index = index.lock().unwrap(); + // sleep(Duration::from_millis(100)); + // *index += 1_usize; + // sleep(Duration::from_millis(70)); + // *index += 1_usize; + // }); + // COMMAND_POOL.enqueue(|| { + // let mut index = index.lock().unwrap(); + // sleep(Duration::from_millis(100)); + // *index += 1_usize; + // sleep(Duration::from_millis(90)); + // // *index += 1_usize; + // }); + // assert_eq!(*index.lock().unwrap(), 0_usize); + // // sleep(Duration::from_millis(20)); + // // assert_eq!(*index.lock().unwrap(), 3_usize); + // // sleep(Duration::from_millis(60)); + // // assert_eq!(*index.lock().unwrap(), 4_usize); + // // sleep(Duration::from_millis(80)); + // // assert_eq!(*index.lock().unwrap(), 5_usize); + // // sleep(Duration::from_millis(100)); + // // assert_eq!(*index.lock().unwrap(), 6_usize); + // } + #[test] + fn num_thread_should_be_default() { + assert_eq!( + COMMAND_POOL + .thread_pool + .lock() + .unwrap() + .current_num_threads(), + APP_CONF.channel.command_pool_num_threads + ); + } +} diff --git a/src/channel/handle.rs b/src/channel/handle.rs index 9b5d11f5..f3f82d21 100644 --- a/src/channel/handle.rs +++ b/src/channel/handle.rs @@ -160,7 +160,7 @@ impl ChannelHandle { while let Some(byte) = buffer.pop_front() { // Commit line and start a new one? if byte == BUFFER_LINE_SEPARATOR { - if Self::on_message(&mode, &stream, &processed_line) + if Self::on_message(&mode, &mut stream, &processed_line) == ChannelMessageResult::Close { // Should close? @@ -251,18 +251,19 @@ impl ChannelHandle { fn on_message( mode: &ChannelMode, - stream: &TcpStream, + stream: &mut TcpStream, message_slice: &[u8], ) -> ChannelMessageResult { + let mut channel_message = ChannelMessage::new(stream, Some(message_slice)); match mode { ChannelMode::Search => { - ChannelMessage::on::(stream, message_slice) + channel_message.handle::() } ChannelMode::Ingest => { - ChannelMessage::on::(stream, message_slice) + channel_message.handle::() } ChannelMode::Control => { - ChannelMessage::on::(stream, message_slice) + channel_message.handle::() } } } diff --git a/src/channel/macros.rs b/src/channel/macros.rs index 720a4cc0..08691560 100644 --- a/src/channel/macros.rs +++ b/src/channel/macros.rs @@ -7,24 +7,27 @@ #[macro_export] macro_rules! gen_channel_message_mode_handle { ($message:ident, $commands:ident, { $($external:expr => $internal:expr),+, }) => {{ - let (command, parts) = ChannelMessage::extract($message); + let mut parts: SplitWhitespace = $message.split_whitespace(); + let command = parts.next().unwrap_or("").to_uppercase(); + + debug!("will dispatch search command: {}", command); if command.is_empty() == true || $commands.contains(&command.as_str()) == true { match command.as_str() { - "" => Ok(vec![ChannelCommandResponse::Void]), + "" => ChannelResult::Sync(Ok(ChannelCommandResponse::Void)), $( $external => $internal(parts), )+ "PING" => ChannelCommandBase::dispatch_ping(parts), "QUIT" => ChannelCommandBase::dispatch_quit(parts), - _ => Ok(vec![ChannelCommandResponse::Err( + _ => ChannelResult::Sync(Ok(ChannelCommandResponse::Err( ChannelCommandError::InternalError, - )]), + ))), } } else { - Ok(vec![ChannelCommandResponse::Err( + ChannelResult::Sync(Ok(ChannelCommandResponse::Err( ChannelCommandError::UnknownCommand, - )]) + ))) } }}; } diff --git a/src/channel/message.rs b/src/channel/message.rs index 6adffe61..4e05e8dd 100644 --- a/src/channel/message.rs +++ b/src/channel/message.rs @@ -8,101 +8,199 @@ use std::io::Write; use std::net::TcpStream; use std::str::{self, SplitWhitespace}; use std::time::Instant; +use futures::future::Future; use super::command::{ - ChannelCommandBase, ChannelCommandControl, ChannelCommandError, ChannelCommandIngest, - ChannelCommandResponse, ChannelCommandResponseArgs, ChannelCommandSearch, - COMMANDS_MODE_CONTROL, COMMANDS_MODE_INGEST, COMMANDS_MODE_SEARCH, + ChannelCommandBase, ChannelCommandControl, ChannelCommandError, ChannelCommandIngest, ChannelResultAsyncFuture, + ChannelCommandResponse, ChannelCommandResponseArgs, ChannelCommandSearch, ChannelResult, + ChannelResultAsync, ChannelResultSync, COMMANDS_MODE_CONTROL, COMMANDS_MODE_INGEST, + COMMANDS_MODE_SEARCH, }; +use super::command_pool::COMMAND_POOL; use super::listen::CHANNEL_AVAILABLE; use super::statistics::{COMMANDS_TOTAL, COMMAND_LATENCY_BEST, COMMAND_LATENCY_WORST}; use crate::LINE_FEED; -pub struct ChannelMessage; +pub struct ChannelMessage<'a> { + stream: &'a mut TcpStream, + message: String, + command_start: Instant, + result: ChannelMessageResult, + response_args: Option, + channel_available: bool, +} + pub struct ChannelMessageModeSearch; pub struct ChannelMessageModeIngest; pub struct ChannelMessageModeControl; const COMMAND_ELAPSED_MILLIS_SLOW_WARN: u128 = 50; -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub enum ChannelMessageResult { Continue, Close, } pub trait ChannelMessageMode { - fn handle(message: &str) -> Result, ChannelCommandError>; + fn handle<'b>(message: &'static str) -> ChannelResult<'b>; } -impl ChannelMessage { - pub fn on( - mut stream: &TcpStream, - message_slice: &[u8], - ) -> ChannelMessageResult { - let message = str::from_utf8(message_slice).unwrap_or(""); +impl<'a> ChannelMessage<'a> { + pub fn new(stream: &'a mut TcpStream, message_slice: Option<&[u8]>) -> Self { + let message = match message_slice { + Some(msg_slice) => String::from_utf8(msg_slice.to_vec()).unwrap_or(String::from("")), + None => String::new(), + }; + Self { + stream, + message, + command_start: Instant::now(), + result: ChannelMessageResult::Continue, + response_args: None, + channel_available: *CHANNEL_AVAILABLE.read().unwrap(), + } + } - debug!("got channel message: {}", message); + pub fn handle(&mut self) -> ChannelMessageResult { + self.print_command_received_msg(); - let command_start = Instant::now(); + if self.channel_availability() == false { + self.set_response_args( + ChannelCommandResponse::Err(ChannelCommandError::ShuttingDown).to_args(), + ); + self.send_reponse_message(); + return self.result.clone(); + } - let mut result = ChannelMessageResult::Continue; + self.execute_message::(); + self.result.clone() + } - // Process response for issued command - let response_args_groups: Vec; + fn print_command_received_msg(&self) { + debug!("received channel message: {}", self.message); + } - if *CHANNEL_AVAILABLE.read().unwrap() != true { - // Server going down, reject command - response_args_groups = - vec![ChannelCommandResponse::Err(ChannelCommandError::ShuttingDown).to_args()]; - } else { - // Handle response arguments to issued command - response_args_groups = match M::handle(&message) { - Ok(resp_groups) => resp_groups - .iter() - .map(|resp| match resp { - ChannelCommandResponse::Ok - | ChannelCommandResponse::Pong - | ChannelCommandResponse::Pending(_) - | ChannelCommandResponse::Result(_) - | ChannelCommandResponse::Event(_, _, _) - | ChannelCommandResponse::Void - | ChannelCommandResponse::Err(_) => resp.to_args(), - ChannelCommandResponse::Ended(_) => { - result = ChannelMessageResult::Close; - resp.to_args() - } - }) - .collect(), - Err(reason) => vec![ChannelCommandResponse::Err(reason).to_args()], + fn channel_availability(&self) -> bool { + self.channel_available + } + + // Handle response arguments to issued command + fn execute_message(&mut self) { + let channel_result = M::handle(Box::leak(self.message.clone().into_boxed_str())); + // let _self = self; + match channel_result { + ChannelResult::Sync(sync_res) => { + self.handle_sync_channel_result(sync_res); + }, + ChannelResult::Async(async_res) => { + self.handle_async_channel_result(async_res); + }, + }; + } + + fn handle_sync_channel_result(&mut self, sync_response: ChannelResultSync) { + match sync_response { + Ok(resp) => match resp { + ChannelCommandResponse::Ended(_) => { + self.result = ChannelMessageResult::Close; + self.set_response_args(resp.to_args()); + } + _ => self.set_response_args(resp.to_args()), + }, + Err(reason) => self.set_response_args(ChannelCommandResponse::Err(reason).to_args()), + }; + self.send_reponse_message(); + self.print_elapsed_time(); + self.update_statistics(); + } + + fn handle_async_channel_result<'b>(&mut self, async_response: ChannelResultAsync<'b>) { + match async_response { + Ok(resp) => match resp.0 { + ChannelCommandResponse::Ended(_) => { + self.result = ChannelMessageResult::Close; + self.set_response_args(resp.0.to_args()); + } + _ => { + self.set_response_args(resp.0.to_args()); + self.send_reponse_message(); + self.enqueue_async_command(resp.1); + } + }, + Err(reason) => { + self.set_response_args(ChannelCommandResponse::Err(reason).to_args()); + } + }; + } + + fn enqueue_async_command<'b>( + &mut self, + future_operation: ChannelResultAsyncFuture<'b>, + ) { + let command_start = self.command_start; + let mut stream = self.stream.try_clone().expect("clone tcp stream failed..."); + COMMAND_POOL.enqueue(move || { + debug!("executing async command"); + let mut _self = ChannelMessage::new(&mut stream, None); + _self.command_start = command_start; + let response = future_operation().wait(); + match response { + Ok(resp) => match resp { + _ => _self.set_response_args(resp.to_args()), + }, + Err(reason) => { + _self.set_response_args(ChannelCommandResponse::Err(reason).to_args()) + } }; - } + _self.send_reponse_message(); + _self.print_elapsed_time(); + _self.update_statistics(); + }); + } - // Serve response messages on socket - for response_args in response_args_groups { - if !response_args.0.is_empty() { - if let Some(ref values) = response_args.1 { - let values_string = values.join(" "); + fn set_response_args(&mut self, args: ChannelCommandResponseArgs) { + self.response_args = Some(args); + } + + // Send response message on socket + fn send_reponse_message(&mut self) { + match &self.response_args { + Some(response_args) => { + if !response_args.0.is_empty() { + if let Some(ref values) = response_args.1 { + let values_string = values.join(" "); - write!(stream, "{} {}{}", response_args.0, values_string, LINE_FEED) + write!( + self.stream, + "{} {}{}", + response_args.0, values_string, LINE_FEED + ) .expect("write failed"); - debug!( - "wrote response with values: {} ({})", - response_args.0, values_string - ); - } else { - write!(stream, "{}{}", response_args.0, LINE_FEED).expect("write failed"); + debug!( + "wrote response with values: {} ({})", + response_args.0, values_string + ); + } else { + write!(self.stream, "{}{}", response_args.0, LINE_FEED) + .expect("write failed"); - debug!("wrote response with no values: {}", response_args.0); + debug!("wrote response with no values: {}", response_args.0); + } } } + None => { + debug!("try to send empty message"); + } } + } - // Measure and log time it took to execute command - // Notice: this is critical as to raise developer awareness on the performance bits when \ - // altering commands-related code, or when making changes to underlying store executors. - let command_took = command_start.elapsed(); + // Measure and log time it took to execute command + // Notice: this is critical as to raise developer awareness on the performance bits when \ + // altering commands-related code, or when making changes to underlying store executors. + fn print_elapsed_time(&self) { + let command_took = self.command_start.elapsed(); if command_took.as_millis() >= COMMAND_ELAPSED_MILLIS_SLOW_WARN { warn!( @@ -117,44 +215,31 @@ impl ChannelMessage { command_took.as_nanos(), ); } - - // Update command statistics - { - // Update performance measures - // Notice: commands that take 0ms are not accounted for there (ie. those are usually \ - // commands that do no work or I/O; they would make statistics less accurate) - let command_took_millis = command_took.as_millis() as u32; - - if command_took_millis > *COMMAND_LATENCY_WORST.read().unwrap() { - *COMMAND_LATENCY_WORST.write().unwrap() = command_took_millis; - } - if command_took_millis > 0 - && (*COMMAND_LATENCY_BEST.read().unwrap() == 0 - || command_took_millis < *COMMAND_LATENCY_BEST.read().unwrap()) - { - *COMMAND_LATENCY_BEST.write().unwrap() = command_took_millis; - } - - // Increment total commands - *COMMANDS_TOTAL.write().unwrap() += 1; - } - - result } - fn extract(message: &str) -> (String, SplitWhitespace) { - // Extract command name and arguments - let mut parts = message.split_whitespace(); - let command = parts.next().unwrap_or("").to_uppercase(); + // Update performance measures + // Notice: commands that take 0ms are not accounted for there (ie. those are usually \ + // commands that do no work or I/O; they would make statistics less accurate) + fn update_statistics(&self) { + let command_took_millis = self.command_start.elapsed().as_millis() as u32; - debug!("will dispatch search command: {}", command); + if command_took_millis > *COMMAND_LATENCY_WORST.read().unwrap() { + *COMMAND_LATENCY_WORST.write().unwrap() = command_took_millis; + } + if command_took_millis > 0 + && (*COMMAND_LATENCY_BEST.read().unwrap() == 0 + || command_took_millis < *COMMAND_LATENCY_BEST.read().unwrap()) + { + *COMMAND_LATENCY_BEST.write().unwrap() = command_took_millis; + } - (command, parts) + // Increment total commands + *COMMANDS_TOTAL.write().unwrap() += 1; } } impl ChannelMessageMode for ChannelMessageModeSearch { - fn handle(message: &str) -> Result, ChannelCommandError> { + fn handle<'b>(message: &'static str) -> ChannelResult<'b> { gen_channel_message_mode_handle!(message, COMMANDS_MODE_SEARCH, { "QUERY" => ChannelCommandSearch::dispatch_query, "SUGGEST" => ChannelCommandSearch::dispatch_suggest, @@ -164,7 +249,7 @@ impl ChannelMessageMode for ChannelMessageModeSearch { } impl ChannelMessageMode for ChannelMessageModeIngest { - fn handle(message: &str) -> Result, ChannelCommandError> { + fn handle<'b>(message: &'static str) -> ChannelResult<'b> { gen_channel_message_mode_handle!(message, COMMANDS_MODE_INGEST, { "PUSH" => ChannelCommandIngest::dispatch_push, "POP" => ChannelCommandIngest::dispatch_pop, @@ -178,7 +263,7 @@ impl ChannelMessageMode for ChannelMessageModeIngest { } impl ChannelMessageMode for ChannelMessageModeControl { - fn handle(message: &str) -> Result, ChannelCommandError> { + fn handle<'b>(message: &'static str) -> ChannelResult<'b> { gen_channel_message_mode_handle!(message, COMMANDS_MODE_CONTROL, { "TRIGGER" => ChannelCommandControl::dispatch_trigger, "INFO" => ChannelCommandControl::dispatch_info, @@ -186,3 +271,19 @@ impl ChannelMessageMode for ChannelMessageModeControl { }) } } + +// tests + +// #[cfg(test)] +// mod tests { +// use super::*; + +// #[test] +// fn channel_message_context_can_be_initialized() { +// let mut fake_tcp: Vec = vec![]; +// assert_eq!( +// ChannelMessage::new(&mut fake_tcp, &(b"a").clone()).message, +// String::from("a") +// ); +// } +// } diff --git a/src/channel/mod.rs b/src/channel/mod.rs index b93c4b67..f96d7f76 100644 --- a/src/channel/mod.rs +++ b/src/channel/mod.rs @@ -8,6 +8,7 @@ mod macros; mod command; +mod command_pool; mod format; mod handle; mod message; diff --git a/src/config/config.rs b/src/config/config.rs index 0767ba59..0822f458 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -32,6 +32,9 @@ pub struct ConfigChannel { pub auth_password: Option, pub search: ConfigChannelSearch, + + #[serde(default = "defaults::channel_command_pool_num_threads")] + pub command_pool_num_threads: usize, } #[derive(Deserialize)] diff --git a/src/config/defaults.rs b/src/config/defaults.rs index 5985c63a..df39a4b4 100644 --- a/src/config/defaults.rs +++ b/src/config/defaults.rs @@ -39,6 +39,10 @@ pub fn channel_search_suggest_limit_maximum() -> u16 { 20 } +pub fn channel_command_pool_num_threads() -> usize { + 8 +} + pub fn store_kv_path() -> PathBuf { PathBuf::from("./data/store/kv/") } diff --git a/src/main.rs b/src/main.rs index 327adfc4..c789e00e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ // License: Mozilla Public License v2.0 (MPL v2.0) #![cfg_attr(feature = "benchmark", feature(test))] +#![feature(fnbox)] #[macro_use] extern crate log; @@ -18,11 +19,13 @@ extern crate byteorder; extern crate fst; extern crate fst_levenshtein; extern crate fst_regex; +extern crate futures; extern crate graceful; extern crate hashbrown; extern crate linked_hash_set; extern crate radix; extern crate rand; +extern crate rayon; extern crate regex_syntax; extern crate rocksdb; extern crate toml;