diff --git a/Cargo.lock b/Cargo.lock index 11a5e9f58..e5655eb64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5873,8 +5873,7 @@ dependencies = [ [[package]] name = "object_store" version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6da452820c715ce78221e8202ccc599b4a52f3e1eb3eedb487b680c81a8e3f3" +source = "git+http://github.com/ArroyoSystems/arrow-rs?branch=gcp_bugs#ce6b416df5c62799276e4d528fdd3f14d338b04f" dependencies = [ "async-trait", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index 5f74b81cd..8f8bf0e56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,5 +80,7 @@ datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafu datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'} datafusion-functions = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'} +object_store = { git = 'http://github.com/ArroyoSystems/arrow-rs', branch = 'gcp_bugs' } + cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" } cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" } diff --git a/crates/arroyo-controller/src/job_controller/mod.rs b/crates/arroyo-controller/src/job_controller/mod.rs index 110145447..21f10e502 100644 --- a/crates/arroyo-controller/src/job_controller/mod.rs +++ b/crates/arroyo-controller/src/job_controller/mod.rs @@ -759,7 +759,7 @@ impl JobController { self.cleanup_task = Some(self.start_cleanup(new_epoch)); } } - + // check on checkpointing if self.model.checkpoint_state.is_some() { self.model.finish_checkpoint_if_done(&self.db).await?; diff --git a/crates/arroyo-state/src/parquet.rs b/crates/arroyo-state/src/parquet.rs index 96813e249..ca772dd01 100644 --- a/crates/arroyo-state/src/parquet.rs +++ b/crates/arroyo-state/src/parquet.rs @@ -16,7 +16,6 @@ use std::collections::{HashMap, HashSet}; use std::ops::RangeInclusive; use std::sync::Arc; use std::time::SystemTime; -use tokio::sync::Mutex; use tracing::{debug, info}; pub const FULL_KEY_RANGE: RangeInclusive = 0..=u64::MAX; @@ -122,7 +121,7 @@ impl BackingStore for ParquetBackend { }) .collect(); - let storage_client = Mutex::new(get_storage_provider().await?); + let storage_client = get_storage_provider().await?; // wait for all of the futures to complete while let Some(result) = futures.next().await { @@ -134,7 +133,7 @@ impl BackingStore for ParquetBackend { epoch_to_remove, &operator_id, )); - storage_client.lock().await.delete_if_present(path).await?; + storage_client.delete_if_present(path).await?; } debug!( message = "Finished cleaning operator", @@ -146,8 +145,6 @@ impl BackingStore for ParquetBackend { for epoch_to_remove in old_min_epoch..min_epoch { storage_client - .lock() - .await .delete_if_present(metadata_path(&base_path(&metadata.job_id, epoch_to_remove))) .await?; }