Skip to content

Commit

Permalink
Ballista proto cleanup (#1110)
Browse files Browse the repository at this point in the history
* remove `GetFileMetadata` from proto ...

... as it was not finished. There is
no documentation what was the intention of it.

* host is optional instead on one off ...

... as we upgraded to latest protoc since then.

* proposal to deprecate `ExecuteQueryParams.sql`
  • Loading branch information
milenkovicm authored Nov 6, 2024
1 parent 626e9fa commit 5bb69a7
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 224 deletions.
22 changes: 4 additions & 18 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,11 @@ message ExecutorMetadata {
}


// Used by grpc
// Used for scheduler-executor
// communication
message ExecutorRegistration {
string id = 1;
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/tokio-rs/prost/issues/430 and https://github.com/tokio-rs/prost/pull/455)
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
oneof optional_host {
string host = 2;
}
optional string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
ExecutorSpecification specification = 5;
Expand Down Expand Up @@ -527,7 +524,7 @@ message UpdateTaskStatusResult {
message ExecuteQueryParams {
oneof query {
bytes logical_plan = 1;
string sql = 2;
string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql`
}
oneof optional_session_id {
string session_id = 3;
Expand Down Expand Up @@ -629,15 +626,6 @@ message GetJobStatusResult {
JobStatus status = 1;
}

message GetFileMetadataParams {
string path = 1;
string file_type = 2;
}

message GetFileMetadataResult {
datafusion_common.Schema schema = 1;
}

message FilePartitionMetadata {
repeated string filename = 1;
}
Expand Down Expand Up @@ -713,8 +701,6 @@ service SchedulerGrpc {

rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}

rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {}

rpc CreateSession (CreateSessionParams) returns (CreateSessionResult) {}

rpc UpdateSession (UpdateSessionParams) returns (UpdateSessionResult) {}
Expand Down
111 changes: 5 additions & 106 deletions ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,31 +435,20 @@ pub struct ExecutorMetadata {
#[prost(message, optional, tag = "5")]
pub specification: ::core::option::Option<ExecutorSpecification>,
}
/// Used by grpc
/// Used for scheduler-executor
/// communication
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorRegistration {
#[prost(string, tag = "1")]
pub id: ::prost::alloc::string::String,
#[prost(string, optional, tag = "2")]
pub host: ::core::option::Option<::prost::alloc::string::String>,
#[prost(uint32, tag = "3")]
pub port: u32,
#[prost(uint32, tag = "4")]
pub grpc_port: u32,
#[prost(message, optional, tag = "5")]
pub specification: ::core::option::Option<ExecutorSpecification>,
/// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see <https://github.com/tokio-rs/prost/issues/430> and <https://github.com/tokio-rs/prost/pull/455>)
/// this syntax is ugly but is binary compatible with the "optional" keyword (see <https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
#[prost(oneof = "executor_registration::OptionalHost", tags = "2")]
pub optional_host: ::core::option::Option<executor_registration::OptionalHost>,
}
/// Nested message and enum types in `ExecutorRegistration`.
pub mod executor_registration {
/// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see <https://github.com/tokio-rs/prost/issues/430> and <https://github.com/tokio-rs/prost/pull/455>)
/// this syntax is ugly but is binary compatible with the "optional" keyword (see <https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3>)
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum OptionalHost {
#[prost(string, tag = "2")]
Host(::prost::alloc::string::String),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorHeartbeat {
Expand Down Expand Up @@ -815,6 +804,7 @@ pub mod execute_query_params {
pub enum Query {
#[prost(bytes, tag = "1")]
LogicalPlan(::prost::alloc::vec::Vec<u8>),
/// I'd suggest to remove this, if SQL needed use `flight-sql`
#[prost(string, tag = "2")]
Sql(::prost::alloc::string::String),
}
Expand Down Expand Up @@ -971,18 +961,6 @@ pub struct GetJobStatusResult {
pub status: ::core::option::Option<JobStatus>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetFileMetadataParams {
#[prost(string, tag = "1")]
pub path: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub file_type: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetFileMetadataResult {
#[prost(message, optional, tag = "1")]
pub schema: ::core::option::Option<::datafusion_proto_common::Schema>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FilePartitionMetadata {
#[prost(string, repeated, tag = "1")]
pub filename: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
Expand Down Expand Up @@ -1262,32 +1240,6 @@ pub mod scheduler_grpc_client {
);
self.inner.unary(req, path, codec).await
}
pub async fn get_file_metadata(
&mut self,
request: impl tonic::IntoRequest<super::GetFileMetadataParams>,
) -> std::result::Result<
tonic::Response<super::GetFileMetadataResult>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/ballista.protobuf.SchedulerGrpc/GetFileMetadata",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "GetFileMetadata"),
);
self.inner.unary(req, path, codec).await
}
pub async fn create_session(
&mut self,
request: impl tonic::IntoRequest<super::CreateSessionParams>,
Expand Down Expand Up @@ -1756,13 +1708,6 @@ pub mod scheduler_grpc_server {
tonic::Response<super::UpdateTaskStatusResult>,
tonic::Status,
>;
async fn get_file_metadata(
&self,
request: tonic::Request<super::GetFileMetadataParams>,
) -> std::result::Result<
tonic::Response<super::GetFileMetadataResult>,
tonic::Status,
>;
async fn create_session(
&self,
request: tonic::Request<super::CreateSessionParams>,
Expand Down Expand Up @@ -2080,52 +2025,6 @@ pub mod scheduler_grpc_server {
};
Box::pin(fut)
}
"/ballista.protobuf.SchedulerGrpc/GetFileMetadata" => {
#[allow(non_camel_case_types)]
struct GetFileMetadataSvc<T: SchedulerGrpc>(pub Arc<T>);
impl<
T: SchedulerGrpc,
> tonic::server::UnaryService<super::GetFileMetadataParams>
for GetFileMetadataSvc<T> {
type Response = super::GetFileMetadataResult;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetFileMetadataParams>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as SchedulerGrpc>::get_file_metadata(&inner, request)
.await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = GetFileMetadataSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/ballista.protobuf.SchedulerGrpc/CreateSession" => {
#[allow(non_camel_case_types)]
struct CreateSessionSvc<T: SchedulerGrpc>(pub Arc<T>);
Expand Down
2 changes: 1 addition & 1 deletion ballista/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ mod test {
port: 0,
grpc_port: 0,
specification: None,
optional_host: None,
host: None,
};
let config_producer = Arc::new(|| {
SessionConfig::new().with_option_extension(BallistaConfig::new().unwrap())
Expand Down
15 changes: 4 additions & 11 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,8 @@ use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::executor_resource::Resource;
use ballista_core::serde::protobuf::executor_status::Status;
use ballista_core::serde::protobuf::{
executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
ExecutorRegistration, ExecutorResource, ExecutorSpecification, ExecutorStatus,
ExecutorStoppedParams, HeartBeatParams,
scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration, ExecutorResource,
ExecutorSpecification, ExecutorStatus, ExecutorStoppedParams, HeartBeatParams,
};
use ballista_core::serde::{
BallistaCodec, BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec,
Expand Down Expand Up @@ -184,10 +183,7 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
let executor_id = Uuid::new_v4().to_string();
let executor_meta = ExecutorRegistration {
id: executor_id.clone(),
optional_host: opt
.external_host
.clone()
.map(executor_registration::OptionalHost::Host),
host: opt.external_host.clone(),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
specification: Some(ExecutorSpecification {
Expand Down Expand Up @@ -392,10 +388,7 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
}),
metadata: Some(ExecutorRegistration {
id: executor_id.clone(),
optional_host: opt
.external_host
.clone()
.map(executor_registration::OptionalHost::Host),
host: opt.external_host.clone(),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
specification: Some(ExecutorSpecification {
Expand Down
5 changes: 2 additions & 3 deletions ballista/executor/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use ballista_core::utils::SessionConfigExt;
use ballista_core::{
error::Result,
object_store_registry::with_object_store_registry,
serde::protobuf::executor_registration::OptionalHost,
serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration},
serde::scheduler::ExecutorSpecification,
serde::BallistaCodec,
Expand Down Expand Up @@ -73,7 +72,7 @@ pub async fn new_standalone_executor_from_state<

let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
optional_host: Some(OptionalHost::Host("localhost".to_string())),
host: Some("localhost".to_string()),
port: addr.port() as u32,
// TODO Make it configurable
grpc_port: 50020,
Expand Down Expand Up @@ -145,7 +144,7 @@ pub async fn new_standalone_executor<

let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
optional_host: Some(OptionalHost::Host("localhost".to_string())),
host: Some("localhost".to_string()),
port: addr.port() as u32,
// TODO Make it configurable
grpc_port: 50020,
Expand Down
Loading

0 comments on commit 5bb69a7

Please sign in to comment.