diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto index eab1d801a..a40e6f2d2 100644 --- a/ballista/core/proto/ballista.proto +++ b/ballista/core/proto/ballista.proto @@ -289,14 +289,11 @@ message ExecutorMetadata { } -// Used by grpc +// Used for scheduler-executor +// communication message ExecutorRegistration { string id = 1; - // "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/tokio-rs/prost/issues/430 and https://github.com/tokio-rs/prost/pull/455) - // this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3) - oneof optional_host { - string host = 2; - } + optional string host = 2; uint32 port = 3; uint32 grpc_port = 4; ExecutorSpecification specification = 5; @@ -527,7 +524,7 @@ message UpdateTaskStatusResult { message ExecuteQueryParams { oneof query { bytes logical_plan = 1; - string sql = 2; + string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql` } oneof optional_session_id { string session_id = 3; @@ -629,15 +626,6 @@ message GetJobStatusResult { JobStatus status = 1; } -message GetFileMetadataParams { - string path = 1; - string file_type = 2; -} - -message GetFileMetadataResult { - datafusion_common.Schema schema = 1; -} - message FilePartitionMetadata { repeated string filename = 1; } @@ -713,8 +701,6 @@ service SchedulerGrpc { rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {} - rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {} - rpc CreateSession (CreateSessionParams) returns (CreateSessionResult) {} rpc UpdateSession (UpdateSessionParams) returns (UpdateSessionResult) {} diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index 51a7b80be..d61ef331e 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -435,31 +435,20 @@ pub struct ExecutorMetadata { #[prost(message, optional, tag = "5")] pub specification: ::core::option::Option, } -/// Used by grpc +/// Used for scheduler-executor +/// communication #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorRegistration { #[prost(string, tag = "1")] pub id: ::prost::alloc::string::String, + #[prost(string, optional, tag = "2")] + pub host: ::core::option::Option<::prost::alloc::string::String>, #[prost(uint32, tag = "3")] pub port: u32, #[prost(uint32, tag = "4")] pub grpc_port: u32, #[prost(message, optional, tag = "5")] pub specification: ::core::option::Option, - /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see and ) - /// this syntax is ugly but is binary compatible with the "optional" keyword (see ) - #[prost(oneof = "executor_registration::OptionalHost", tags = "2")] - pub optional_host: ::core::option::Option, -} -/// Nested message and enum types in `ExecutorRegistration`. -pub mod executor_registration { - /// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see and ) - /// this syntax is ugly but is binary compatible with the "optional" keyword (see ) - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum OptionalHost { - #[prost(string, tag = "2")] - Host(::prost::alloc::string::String), - } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecutorHeartbeat { @@ -815,6 +804,7 @@ pub mod execute_query_params { pub enum Query { #[prost(bytes, tag = "1")] LogicalPlan(::prost::alloc::vec::Vec), + /// I'd suggest to remove this, if SQL needed use `flight-sql` #[prost(string, tag = "2")] Sql(::prost::alloc::string::String), } @@ -971,18 +961,6 @@ pub struct GetJobStatusResult { pub status: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetFileMetadataParams { - #[prost(string, tag = "1")] - pub path: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub file_type: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetFileMetadataResult { - #[prost(message, optional, tag = "1")] - pub schema: ::core::option::Option<::datafusion_proto_common::Schema>, -} -#[derive(Clone, PartialEq, ::prost::Message)] pub struct FilePartitionMetadata { #[prost(string, repeated, tag = "1")] pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, @@ -1262,32 +1240,6 @@ pub mod scheduler_grpc_client { ); self.inner.unary(req, path, codec).await } - pub async fn get_file_metadata( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::unknown( - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/ballista.protobuf.SchedulerGrpc/GetFileMetadata", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "GetFileMetadata"), - ); - self.inner.unary(req, path, codec).await - } pub async fn create_session( &mut self, request: impl tonic::IntoRequest, @@ -1756,13 +1708,6 @@ pub mod scheduler_grpc_server { tonic::Response, tonic::Status, >; - async fn get_file_metadata( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; async fn create_session( &self, request: tonic::Request, @@ -2080,52 +2025,6 @@ pub mod scheduler_grpc_server { }; Box::pin(fut) } - "/ballista.protobuf.SchedulerGrpc/GetFileMetadata" => { - #[allow(non_camel_case_types)] - struct GetFileMetadataSvc(pub Arc); - impl< - T: SchedulerGrpc, - > tonic::server::UnaryService - for GetFileMetadataSvc { - type Response = super::GetFileMetadataResult; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::get_file_metadata(&inner, request) - .await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let method = GetFileMetadataSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/ballista.protobuf.SchedulerGrpc/CreateSession" => { #[allow(non_camel_case_types)] struct CreateSessionSvc(pub Arc); diff --git a/ballista/executor/src/executor.rs b/ballista/executor/src/executor.rs index 53a368555..a799b85c8 100644 --- a/ballista/executor/src/executor.rs +++ b/ballista/executor/src/executor.rs @@ -359,7 +359,7 @@ mod test { port: 0, grpc_port: 0, specification: None, - optional_host: None, + host: None, }; let config_producer = Arc::new(|| { SessionConfig::new().with_option_extension(BallistaConfig::new().unwrap()) diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index a15bfadbd..fe57ec378 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -50,9 +50,8 @@ use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::executor_resource::Resource; use ballista_core::serde::protobuf::executor_status::Status; use ballista_core::serde::protobuf::{ - executor_registration, scheduler_grpc_client::SchedulerGrpcClient, - ExecutorRegistration, ExecutorResource, ExecutorSpecification, ExecutorStatus, - ExecutorStoppedParams, HeartBeatParams, + scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration, ExecutorResource, + ExecutorSpecification, ExecutorStatus, ExecutorStoppedParams, HeartBeatParams, }; use ballista_core::serde::{ BallistaCodec, BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec, @@ -184,10 +183,7 @@ pub async fn start_executor_process(opt: Arc) -> Result<( let executor_id = Uuid::new_v4().to_string(); let executor_meta = ExecutorRegistration { id: executor_id.clone(), - optional_host: opt - .external_host - .clone() - .map(executor_registration::OptionalHost::Host), + host: opt.external_host.clone(), port: opt.port as u32, grpc_port: opt.grpc_port as u32, specification: Some(ExecutorSpecification { @@ -392,10 +388,7 @@ pub async fn start_executor_process(opt: Arc) -> Result<( }), metadata: Some(ExecutorRegistration { id: executor_id.clone(), - optional_host: opt - .external_host - .clone() - .map(executor_registration::OptionalHost::Host), + host: opt.external_host.clone(), port: opt.port as u32, grpc_port: opt.grpc_port as u32, specification: Some(ExecutorSpecification { diff --git a/ballista/executor/src/standalone.rs b/ballista/executor/src/standalone.rs index 628de96f4..28efe70fa 100644 --- a/ballista/executor/src/standalone.rs +++ b/ballista/executor/src/standalone.rs @@ -23,7 +23,6 @@ use ballista_core::utils::SessionConfigExt; use ballista_core::{ error::Result, object_store_registry::with_object_store_registry, - serde::protobuf::executor_registration::OptionalHost, serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration}, serde::scheduler::ExecutorSpecification, serde::BallistaCodec, @@ -73,7 +72,7 @@ pub async fn new_standalone_executor_from_state< let executor_meta = ExecutorRegistration { id: Uuid::new_v4().to_string(), // assign this executor a unique ID - optional_host: Some(OptionalHost::Host("localhost".to_string())), + host: Some("localhost".to_string()), port: addr.port() as u32, // TODO Make it configurable grpc_port: 50020, @@ -145,7 +144,7 @@ pub async fn new_standalone_executor< let executor_meta = ExecutorRegistration { id: Uuid::new_v4().to_string(), // assign this executor a unique ID - optional_host: Some(OptionalHost::Host("localhost".to_string())), + host: Some("localhost".to_string()), port: addr.port() as u32, // TODO Make it configurable grpc_port: 50020, diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index e475e438a..1758dfd87 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -19,40 +19,30 @@ use axum::extract::ConnectInfo; use ballista_core::config::{BallistaConfig, BALLISTA_JOB_NAME}; use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, Query}; use std::collections::HashMap; -use std::convert::TryInto; use std::net::SocketAddr; -use ballista_core::serde::protobuf::executor_registration::OptionalHost; use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc; use ballista_core::serde::protobuf::{ execute_query_failure_result, execute_query_result, AvailableTaskSlots, CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult, CreateSessionParams, CreateSessionResult, ExecuteQueryFailureResult, ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult, ExecutorHeartbeat, - ExecutorStoppedParams, ExecutorStoppedResult, GetFileMetadataParams, - GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, HeartBeatParams, - HeartBeatResult, PollWorkParams, PollWorkResult, RegisterExecutorParams, - RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult, - UpdateSessionParams, UpdateSessionResult, UpdateTaskStatusParams, - UpdateTaskStatusResult, + ExecutorStoppedParams, ExecutorStoppedResult, GetJobStatusParams, GetJobStatusResult, + HeartBeatParams, HeartBeatResult, PollWorkParams, PollWorkResult, + RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams, + RemoveSessionResult, UpdateSessionParams, UpdateSessionResult, + UpdateTaskStatusParams, UpdateTaskStatusResult, }; use ballista_core::serde::scheduler::ExecutorMetadata; - -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::file_format::FileFormat; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; -use futures::TryStreamExt; use log::{debug, error, info, trace, warn}; -use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; use std::ops::Deref; -use std::sync::Arc; use crate::cluster::{bind_task_bias, bind_task_round_robin}; use crate::config::TaskDistributionPolicy; use crate::scheduler_server::event::QueryStageSchedulerEvent; -use datafusion::prelude::SessionContext; use std::time::{SystemTime, UNIX_EPOCH}; use tonic::{Request, Response, Status}; @@ -88,10 +78,7 @@ impl SchedulerGrpc let metadata = ExecutorMetadata { id: metadata.id, host: metadata - .optional_host - .map(|h| match h { - OptionalHost::Host(host) => host, - }) + .host .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()), port: metadata.port as u16, grpc_port: metadata.grpc_port as u16, @@ -166,10 +153,7 @@ impl SchedulerGrpc let metadata = ExecutorMetadata { id: metadata.id, host: metadata - .optional_host - .map(|h| match h { - OptionalHost::Host(host) => host, - }) + .host .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()), port: metadata.port as u16, grpc_port: metadata.grpc_port as u16, @@ -214,10 +198,7 @@ impl SchedulerGrpc let metadata = ExecutorMetadata { id: metadata.id, host: metadata - .optional_host - .map(|h| match h { - OptionalHost::Host(host) => host, - }) + .host .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()), port: metadata.port as u16, grpc_port: metadata.grpc_port as u16, @@ -286,56 +267,6 @@ impl SchedulerGrpc Ok(Response::new(UpdateTaskStatusResult { success: true })) } - async fn get_file_metadata( - &self, - request: Request, - ) -> Result, Status> { - // Here, we use the default config, since we don't know the session id - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); - - // TODO support multiple object stores - let obj_store: Arc = Arc::new(LocalFileSystem::new()); - // TODO shouldn't this take a ListingOption object as input? - - let GetFileMetadataParams { path, file_type } = request.into_inner(); - let file_format: Arc = match file_type.as_str() { - "parquet" => Ok(Arc::new(ParquetFormat::default())), - // TODO implement for CSV - _ => Err(tonic::Status::unimplemented( - "get_file_metadata unsupported file type", - )), - }?; - - let path = Path::from(path.as_str()); - let file_metas: Vec<_> = obj_store - .list(Some(&path)) - .try_collect() - .await - .map_err(|e| { - let msg = format!("Error listing files: {e}"); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - - let schema = file_format - .infer_schema(&state, &obj_store, &file_metas) - .await - .map_err(|e| { - let msg = format!("Error inferring schema: {e}"); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - - Ok(Response::new(GetFileMetadataResult { - schema: Some(schema.as_ref().try_into().map_err(|e| { - let msg = format!("Error inferring schema: {e}"); - error!("{}", msg); - tonic::Status::internal(msg) - })?), - })) - } - async fn create_session( &self, request: Request, @@ -661,9 +592,8 @@ mod test { use crate::metrics::default_metrics_collector; use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::{ - executor_registration::OptionalHost, executor_status, ExecutorRegistration, - ExecutorStatus, ExecutorStoppedParams, HeartBeatParams, PollWorkParams, - RegisterExecutorParams, + executor_status, ExecutorRegistration, ExecutorStatus, ExecutorStoppedParams, + HeartBeatParams, PollWorkParams, RegisterExecutorParams, }; use ballista_core::serde::scheduler::ExecutorSpecification; use ballista_core::serde::BallistaCodec; @@ -690,7 +620,7 @@ mod test { scheduler.init().await?; let exec_meta = ExecutorRegistration { id: "abc".to_owned(), - optional_host: Some(OptionalHost::Host("http://localhost:8080".to_owned())), + host: Some("http://localhost:8080".to_owned()), port: 0, grpc_port: 0, specification: Some(ExecutorSpecification { task_slots: 2 }.into()), @@ -778,7 +708,7 @@ mod test { let exec_meta = ExecutorRegistration { id: "abc".to_owned(), - optional_host: Some(OptionalHost::Host("http://localhost:8080".to_owned())), + host: Some("http://localhost:8080".to_owned()), port: 0, grpc_port: 0, specification: Some(ExecutorSpecification { task_slots: 2 }.into()), @@ -863,7 +793,7 @@ mod test { let exec_meta = ExecutorRegistration { id: "abc".to_owned(), - optional_host: Some(OptionalHost::Host("http://localhost:8080".to_owned())), + host: Some("http://localhost:8080".to_owned()), port: 0, grpc_port: 0, specification: Some(ExecutorSpecification { task_slots: 2 }.into()), @@ -916,7 +846,7 @@ mod test { let exec_meta = ExecutorRegistration { id: "abc".to_owned(), - optional_host: Some(OptionalHost::Host("http://localhost:8080".to_owned())), + host: Some("http://localhost:8080".to_owned()), port: 0, grpc_port: 0, specification: Some(ExecutorSpecification { task_slots: 2 }.into()), diff --git a/docs/developer/architecture.md b/docs/developer/architecture.md index 9ae20d9db..d49e50130 100644 --- a/docs/developer/architecture.md +++ b/docs/developer/architecture.md @@ -57,7 +57,6 @@ The scheduler process implements a gRPC interface (defined in | -------------------- | -------------------------------------------------------------------- | | ExecuteQuery | Submit a logical query plan or SQL query for execution | | GetExecutorsMetadata | Retrieves a list of executors that have registered with a scheduler | -| GetFileMetadata | Retrieve metadata about files available in the cluster file system | | GetJobStatus | Get the status of a submitted query | | RegisterExecutor | Executors call this method to register themselves with the scheduler |