From 0045bbdc35c69e788a5f8390e0a4f5a7e3512178 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Mon, 20 Jan 2025 17:13:05 +0100 Subject: [PATCH] feat: Add NDJson sink for the new streaming engine Little bonus. ping @orlp. --- .../polars-stream/src/nodes/io_sinks/json.rs | 123 ++++++++++++++++++ .../polars-stream/src/nodes/io_sinks/mod.rs | 2 + .../src/physical_plan/lower_ir.rs | 9 ++ .../src/physical_plan/to_graph.rs | 5 + 4 files changed, 139 insertions(+) create mode 100644 crates/polars-stream/src/nodes/io_sinks/json.rs diff --git a/crates/polars-stream/src/nodes/io_sinks/json.rs b/crates/polars-stream/src/nodes/io_sinks/json.rs new file mode 100644 index 000000000000..e9c905332122 --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sinks/json.rs @@ -0,0 +1,123 @@ +use std::cmp::Reverse; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use polars_error::PolarsResult; +use polars_expr::state::ExecutionState; +use polars_io::json::BatchedWriter; +use polars_utils::priority::Priority; + +use crate::async_primitives::linearizer::Linearizer; +use crate::nodes::{ComputeNode, JoinHandle, MorselSeq, PortState, TaskPriority, TaskScope}; +use crate::pipe::{RecvPort, SendPort}; +use crate::DEFAULT_LINEARIZER_BUFFER_SIZE; + +pub struct NDJsonSinkNode { + path: PathBuf, +} + +impl NDJsonSinkNode { + pub fn new(path: &Path) -> PolarsResult { + Ok(Self { + path: path.to_path_buf(), + }) + } +} + +impl ComputeNode for NDJsonSinkNode { + fn name(&self) -> &str { + "ndjson_sink" + } + + fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { + assert!(send.is_empty()); + assert!(recv.len() == 1); + + // We are always ready to receive, unless the sender is done, then we're + // also done. + if recv[0] != PortState::Done { + recv[0] = PortState::Ready; + } + + Ok(()) + } + + fn spawn<'env, 's>( + &'env mut self, + scope: &'s TaskScope<'s, 'env>, + recv_ports: &mut [Option>], + send_ports: &mut [Option>], + _state: &'s ExecutionState, + join_handles: &mut Vec>>, + ) { + assert!(recv_ports.len() == 1); + assert!(send_ports.is_empty()); + + // .. -> Encode task + let receivers = recv_ports[0].take().unwrap().parallel(); + // Encode tasks -> IO task + let (mut linearizer, senders) = Linearizer::, Vec>>::new( + receivers.len(), + DEFAULT_LINEARIZER_BUFFER_SIZE, + ); + + let slf = &*self; + + // 16MB + const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24; + + // Encode task. + // + // Task encodes the columns into their corresponding CSV encoding. + for (mut receiver, mut sender) in receivers.into_iter().zip(senders) { + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + // Amortize the allocations over time. If we see that we need to do way larger + // allocations, we adjust to that over time. + let mut allocation_size = DEFAULT_ALLOCATION_SIZE; + + while let Ok(morsel) = receiver.recv().await { + let (df, seq, _, _) = morsel.into_inner(); + + let mut buffer = Vec::with_capacity(allocation_size); + let mut writer = BatchedWriter::new(&mut buffer); + + writer.write_batch(&df)?; + + allocation_size = allocation_size.max(buffer.len()); + sender.insert(Priority(Reverse(seq), buffer)).await.unwrap(); + } + + PolarsResult::Ok(()) + })); + } + + // IO task. + // + // Task that will actually do write to the target file. + let io_runtime = polars_io::pl_async::get_runtime(); + + let path = slf.path.clone(); + let io_task = io_runtime.spawn(async move { + use tokio::fs::OpenOptions; + + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path.as_path()) + .await + .map_err(|err| polars_utils::_limit_path_len_io_err(path.as_path(), err))?; + + let mut file = file.into_std().await; + + // Write the header + while let Some(Priority(_, buffer)) = linearizer.get().await { + file.write_all(&buffer)?; + } + + PolarsResult::Ok(()) + }); + join_handles + .push(scope.spawn_task(TaskPriority::Low, async move { io_task.await.unwrap() })); + } +} diff --git a/crates/polars-stream/src/nodes/io_sinks/mod.rs b/crates/polars-stream/src/nodes/io_sinks/mod.rs index 65eb98c04035..730a75670d6e 100644 --- a/crates/polars-stream/src/nodes/io_sinks/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/mod.rs @@ -1,4 +1,6 @@ #[cfg(feature = "ipc")] pub mod ipc; +#[cfg(feature = "json")] +pub mod json; #[cfg(feature = "parquet")] pub mod parquet; diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 66dae935db46..ab987bec86d0 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -236,6 +236,15 @@ pub fn lower_ir( input: phys_input, } }, + #[cfg(feature = "json")] + FileType::Json(_) => { + let phys_input = lower_ir!(*input)?; + PhysNodeKind::FileSink { + path, + file_type, + input: phys_input, + } + }, _ => todo!(), } }, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 02d66968c0b7..1369293e0abe 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -221,6 +221,11 @@ fn to_graph_rec<'a>( nodes::io_sinks::ipc::IpcSinkNode::new(input_schema, path, ipc_writer_options)?, [(input_key, input.port)], ), + #[cfg(feature = "json")] + FileType::Json(_) => ctx.graph.add_node( + nodes::io_sinks::json::NDJsonSinkNode::new(path)?, + [(input_key, input.port)], + ), #[cfg(feature = "ipc")] FileType::Parquet(ipc_writer_options) => ctx.graph.add_node( nodes::io_sinks::parquet::ParquetSinkNode::new(