Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trim down BallistaConfig #1108

Merged
merged 19 commits into from
Nov 13, 2024

Conversation

milenkovicm
Copy link
Contributor

Which issue does this PR close?

Closes #1104.

Rationale for this change

What changes are included in this PR?

There are two major changes proposed with this PR:

  1. BallistaConfig has been trimmed down, removing configuration options which map one-to-one to datafusion options.
  2. BallistaConfig has been integrated with SessionConfig and propagated across cluster, from client to scheduler to executors. Uses can provide their own configuration which will be propagated, as well.
  3. Example how to configure ObjectStoreRegistry

A specific BallistaConfig can be set/accessed with methods provided by SessionConfigExt:

let session_config = SessionConfig::new_with_ballista()
    .with_information_schema(true)
    .with_ballista_job_name("Super Cool Ballista App");

as BallistaConfig has been integrated with SessionConfig infrastructure it can be changed with SQL SET:

ctx.sql("SET ballista.job.name = 'Super Cool Ballista App'").await?.show().await?;    

BallistaConfig will automatically get propagated around cluster. How to propagate user specific configuration we provide
full example in tests [...]

Short overview:

// Setting up configuration producer
//
// configuration producer registers user defined config extension
// S3Option with relevant S3 configuration
let config_producer = Arc::new(|| {
    SessionConfig::new_with_ballista()
        .with_information_schema(true)
        .with_option_extension(S3Options::default())
});
// Setting up runtime producer
//
// Runtime producer creates object store registry
// which can create object store connecter based on
// S3Option configuration.
let runtime_producer: RuntimeProducer =
    Arc::new(|session_config: &SessionConfig| {
        let s3options = session_config
            .options()
            .extensions
            .get::<S3Options>()
            .ok_or(DataFusionError::Configuration(
                "S3 Options not set".to_string(),
            ))?;

        let config = RuntimeConfig::new().with_object_store_registry(Arc::new(
            // our custom ObjectStoreRegistry will use shared configuration
            CustomObjectStoreRegistry::new(s3options.clone()),
        ));

        Ok(Arc::new(RuntimeEnv::new(config)?))
    });

// Session builder creates SessionState
//
// which is configured using runtime and configuration producer,
// producing same runtime environment, and providing same
// object store registry.

let session_builder = Arc::new(produce_state);
let state = session_builder(config_producer());

// setting up ballista cluster with new runtime, configuration, and session state producers
//
// this step is important as it setups scheduler and executor to handle 
// custom configuration 
let (host, port) = crate::common::setup_test_cluster_with_builders(
    config_producer,
    runtime_producer,
    session_builder,
)
.await;

// establishing cluster connection,
let ctx: SessionContext = SessionContext::remote_with_state("df://localhost:50050", state).await?;

// setting up relevant S3 options
ctx.sql(&format!("SET s3.access_key_id = '{}'", ACCESS_KEY_ID))
    .await?
    .show()
    .await?;
ctx.sql(&format!("SET s3.secret_access_key = '{}'", SECRET_KEY))
    .await?
    .show()
    .await?;

where S3Options is defined like:

pub struct S3Options {
    config: Arc<RwLock<S3RegistryConfiguration>>,
}

impl ExtensionOptions for S3Options {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }

    fn cloned(&self) -> Box<dyn ExtensionOptions> {
        Box::new(self.clone())
    }

    fn set(&mut self, key: &str, value: &str) -> Result<()> {
        // omitted
        Ok(())
    }

    fn entries(&self) -> Vec<ConfigEntry> {
        // omitted 
    }
}

impl ConfigExtension for S3Options {
    const PREFIX: &'static str = "s3";
}
#[derive(Default, Debug, Clone)]
pub struct S3RegistryConfiguration {
    
    pub access_key_id: Option<String>,
    pub secret_access_key: Option<String>,
    pub session_token: Option<String>,
    pub region: Option<String>,
    pub endpoint: Option<String>,
    pub allow_http: Option<bool>,
}

Configuration values could be list with:

ctx.sql("select name, value from information_schema.df_settings where name like 's3.%'").await?.show().await?;

which should return something like:

+----------------------+------------------------+
| name                 | value                  |
+----------------------+------------------------+
| s3.access_key_id     | MINIO                  |
| s3.secret_access_key | MINIOMINIO             |
| s3.session_token     |                        |
| s3.region            |                        |
| s3.endpoint          | http://localhost:55001 |
| s3.allow_http        | true                   |
+----------------------+------------------------+

For follow up:

  • further cleanup is needed for BallistaConfig once we remove BallistaContext
  • this change makes object_store module and features obsolete, they will be removed once we remove BallistaContext

Are there any user-facing changes?

BallistaConfig looses most of its configuration options

@milenkovicm milenkovicm force-pushed the scheduler_configuration_propagation branch from ddb9fd9 to 30c2863 Compare November 7, 2024 09:25
@milenkovicm
Copy link
Contributor Author

conflicts have been resolved.
@andygrove, it would be great if you could have a look when you get chance

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked out this PR and ran benchmarks and did not see any issues. I have not reviewed extensively, but happy to approve to keep things moving along.

@andygrove andygrove merged commit a542608 into apache:main Nov 13, 2024
15 checks passed
@milenkovicm milenkovicm deleted the scheduler_configuration_propagation branch November 13, 2024 07:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Trim down BallistaConfig
2 participants