Skip to content

Commit

Permalink
Test new object_store
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Oct 24, 2024
1 parent 843e327 commit b0a8655
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 8 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,7 @@ datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafu
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}

object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'gcp_bugs' }

cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
2 changes: 1 addition & 1 deletion crates/arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ impl JobController {
self.cleanup_task = Some(self.start_cleanup(new_epoch));
}
}

// check on checkpointing
if self.model.checkpoint_state.is_some() {
self.model.finish_checkpoint_if_done(&self.db).await?;
Expand Down
7 changes: 2 additions & 5 deletions crates/arroyo-state/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::collections::{HashMap, HashSet};
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;
use tracing::{debug, info};

pub const FULL_KEY_RANGE: RangeInclusive<u64> = 0..=u64::MAX;
Expand Down Expand Up @@ -122,7 +121,7 @@ impl BackingStore for ParquetBackend {
})
.collect();

let storage_client = Mutex::new(get_storage_provider().await?);
let storage_client = get_storage_provider().await?;

// wait for all of the futures to complete
while let Some(result) = futures.next().await {
Expand All @@ -134,7 +133,7 @@ impl BackingStore for ParquetBackend {
epoch_to_remove,
&operator_id,
));
storage_client.lock().await.delete_if_present(path).await?;
storage_client.delete_if_present(path).await?;
}
debug!(
message = "Finished cleaning operator",
Expand All @@ -146,8 +145,6 @@ impl BackingStore for ParquetBackend {

for epoch_to_remove in old_min_epoch..min_epoch {
storage_client
.lock()
.await
.delete_if_present(metadata_path(&base_path(&metadata.job_id, epoch_to_remove)))
.await?;
}
Expand Down

0 comments on commit b0a8655

Please sign in to comment.