Skip to content

Commit

Permalink
Don't overwrite udfs (#784)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Nov 12, 2024
1 parent 1ff9c60 commit bd3acb3
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions crates/arroyo-operator/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::context::{ArrowContext, BatchReceiver};
use crate::inq_reader::InQReader;
use crate::udfs::{ArroyoUdaf, UdafArg};
use crate::{CheckpointCounter, ControlOutcome, SourceFinishType};
use anyhow::anyhow;
use anyhow::{anyhow, bail};
use arrow::array::RecordBatch;
use arrow::datatypes::DataType;
use arrow::datatypes::Schema;
Expand Down Expand Up @@ -31,10 +31,13 @@ use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::io::ErrorKind;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio::sync::Barrier;
use tokio_stream::StreamExt;
use tracing::{debug, error, info, trace, warn, Instrument};
Expand Down Expand Up @@ -638,7 +641,6 @@ impl Registry {
)
});

// write the dylib to a local file
let local_udfs_dir = "/tmp/arroyo/local_udfs";
tokio::fs::create_dir_all(local_udfs_dir)
.await
Expand All @@ -649,9 +651,28 @@ impl Registry {
.ok_or_else(|| anyhow!("Invalid dylib path: {}", config.dylib_path))?;
let local_dylib_path = Path::new(local_udfs_dir).join(dylib_file_name);

tokio::fs::write(&local_dylib_path, udf)
// write the dylib to a local file if it's not already present
match OpenOptions::new()
.write(true)
.create_new(true)
.open(&local_dylib_path)
.await
.map_err(|e| anyhow!("unable to write dylib to file: {:?}", e))?;
{
Ok(mut file) => {
file.write_all(&udf).await?;
file.sync_all().await?;
}
Err(e) if e.kind() == ErrorKind::AlreadyExists => {
// nothing to do, UDF already written
}
Err(e) => {
bail!(
"Failed to write UDF dylib to {}: {}",
local_dylib_path.to_string_lossy(),
e
);
}
};

let interface = if config.is_async {
UdfInterface::Async(Arc::new(ContainerOrLocal::Container(unsafe {
Expand Down

0 comments on commit bd3acb3

Please sign in to comment.