Skip to content

Commit

Permalink
Fix record batch memory size double counting (apache#13377)
Browse files Browse the repository at this point in the history
* Fix record batch memory size double counting
  • Loading branch information
2010YOUY01 authored Nov 15, 2024
1 parent 8c35270 commit 172cf8d
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 17 deletions.
11 changes: 8 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{
};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use futures::StreamExt;
use std::any::Any;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -265,14 +266,18 @@ async fn sort_spill_reservation() {
// This test case shows how sort_spill_reservation works by
// purposely sorting data that requires non trivial memory to
// sort/merge.

// Merge operation needs extra memory to do row conversion, so make the
// memory limit larger.
let mem_limit = partition_size * 2;
let test = TestCase::new()
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
// force RowFormat / interner that makes merge require
// substantial memory
.with_query("select * from t ORDER BY a , b DESC")
// enough memory to sort if we don't try to merge it all at once
.with_memory_limit(partition_size)
.with_memory_limit(mem_limit)
// use a single partition so only a sort is needed
.with_scenario(scenario)
.with_disk_manager_config(DiskManagerConfig::NewOs)
Expand Down Expand Up @@ -311,7 +316,7 @@ async fn sort_spill_reservation() {
// reserve sufficient space up front for merge and this time,
// which will force the spills to happen with less buffered
// input and thus with enough to merge.
.with_sort_spill_reservation_bytes(partition_size / 2);
.with_sort_spill_reservation_bytes(mem_limit / 2);

test.with_config(config).with_expected_success().run().await;
}
Expand Down Expand Up @@ -774,7 +779,7 @@ fn make_dict_batches() -> Vec<RecordBatch> {

// How many bytes does the memory from dict_batches consume?
fn batches_byte_size(batches: &[RecordBatch]) -> usize {
batches.iter().map(|b| b.get_array_memory_size()).sum()
batches.iter().map(get_record_batch_memory_size).sum()
}

#[derive(Debug)]
Expand Down
6 changes: 4 additions & 2 deletions datafusion/physical-plan/src/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::spill::get_record_batch_memory_size;
use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -69,7 +70,8 @@ impl BatchBuilder {

/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
self.reservation.try_grow(batch.get_array_memory_size())?;
self.reservation
.try_grow(get_record_batch_memory_size(&batch))?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
Expand Down Expand Up @@ -141,7 +143,7 @@ impl BatchBuilder {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.reservation.shrink(batch.get_array_memory_size());
self.reservation.shrink(get_record_batch_memory_size(batch));
}
retain
});
Expand Down
24 changes: 15 additions & 9 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::{read_spill_as_stream, spill_record_batches};
use crate::spill::{
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
};
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::{
Expand Down Expand Up @@ -286,10 +288,12 @@ impl ExternalSorter {
}
self.reserve_memory_for_merge()?;

let size = input.get_array_memory_size();
let size = get_record_batch_memory_size(&input);

if self.reservation.try_grow(size).is_err() {
let before = self.reservation.size();
self.in_mem_sort().await?;

// Sorting may have freed memory, especially if fetch is `Some`
//
// As such we check again, and if the memory usage has dropped by
Expand Down Expand Up @@ -426,7 +430,7 @@ impl ExternalSorter {
let size: usize = self
.in_mem_batches
.iter()
.map(|x| x.get_array_memory_size())
.map(get_record_batch_memory_size)
.sum();

// Reserve headroom for next sort/merge
Expand Down Expand Up @@ -521,7 +525,8 @@ impl ExternalSorter {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation.try_resize(batch.get_array_memory_size())?;
self.reservation
.try_resize(get_record_batch_memory_size(&batch))?;
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, metrics, reservation);
}
Expand All @@ -530,7 +535,8 @@ impl ExternalSorter {
.into_iter()
.map(|batch| {
let metrics = self.metrics.baseline.intermediate();
let reservation = self.reservation.split(batch.get_array_memory_size());
let reservation =
self.reservation.split(get_record_batch_memory_size(&batch));
let input = self.sort_batch_stream(batch, metrics, reservation)?;
Ok(spawn_buffered(input, 1))
})
Expand Down Expand Up @@ -559,7 +565,7 @@ impl ExternalSorter {
metrics: BaselineMetrics,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
assert_eq!(batch.get_array_memory_size(), reservation.size());
assert_eq!(get_record_batch_memory_size(&batch), reservation.size());
let schema = batch.schema();

let fetch = self.fetch;
Expand Down Expand Up @@ -1185,9 +1191,9 @@ mod tests {

assert_eq!(metrics.output_rows().unwrap(), 10000);
assert!(metrics.elapsed_compute().unwrap() > 0);
assert_eq!(metrics.spill_count().unwrap(), 4);
assert_eq!(metrics.spilled_bytes().unwrap(), 38784);
assert_eq!(metrics.spilled_rows().unwrap(), 9600);
assert_eq!(metrics.spill_count().unwrap(), 3);
assert_eq!(metrics.spilled_bytes().unwrap(), 36000);
assert_eq!(metrics.spilled_rows().unwrap(), 9000);

let columns = result[0].columns();

Expand Down
208 changes: 205 additions & 3 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::ptr::NonNull;

use arrow::array::ArrayData;
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use log::debug;
use tokio::sync::mpsc::Sender;

use datafusion_common::{exec_datafusion_err, Result};
use datafusion_common::{exec_datafusion_err, HashSet, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::SendableRecordBatchStream;
Expand Down Expand Up @@ -109,10 +111,83 @@ pub fn spill_record_batch_by_size(
Ok(())
}

/// Calculate total used memory of this batch.
///
/// This function is used to estimate the physical memory usage of the `RecordBatch`.
/// It only counts the memory of large data `Buffer`s, and ignores metadata like
/// types and pointers.
/// The implementation will add up all unique `Buffer`'s memory
/// size, due to:
/// - The data pointer inside `Buffer` are memory regions returned by global memory
/// allocator, those regions can't have overlap.
/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap
/// or reuse the same `Buffer`. For example: taking a slice from `Array`.
///
/// Example:
/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing
/// to a sub-region of the same buffer.
///
/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
/// ^ ^ ^ ^
/// | | | |
/// col1->{ } | |
/// col2--------->{ }
///
/// In the above case, `get_record_batch_memory_size` will return the size of
/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
///
/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
/// buffer memory size if multiple arrays within the batch are sharing the same
/// `Buffer`. This method provides temporary fix until the issue is resolved:
/// <https://github.com/apache/arrow-rs/issues/6439>
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
// Store pointers to `Buffer`'s start memory address (instead of actual
// used data region's pointer represented by current `Array`)
let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
let mut total_size = 0;

for array in batch.columns() {
let array_data = array.to_data();
count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size);
}

total_size
}

/// Count the memory usage of `array_data` and its children recursively.
fn count_array_data_memory_size(
array_data: &ArrayData,
counted_buffers: &mut HashSet<NonNull<u8>>,
total_size: &mut usize,
) {
// Count memory usage for `array_data`
for buffer in array_data.buffers() {
if counted_buffers.insert(buffer.data_ptr()) {
*total_size += buffer.capacity();
} // Otherwise the buffer's memory is already counted
}

if let Some(null_buffer) = array_data.nulls() {
if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) {
*total_size += null_buffer.inner().inner().capacity();
}
}

// Count all children `ArrayData` recursively
for child in array_data.child_data() {
count_array_data_memory_size(child, counted_buffers, total_size);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::spill::{spill_record_batch_by_size, spill_record_batches};
use crate::test::build_table_i32;
use arrow::array::{Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::ListArray;
use datafusion_common::Result;
use datafusion_execution::disk_manager::DiskManagerConfig;
use datafusion_execution::DiskManager;
Expand Down Expand Up @@ -147,7 +222,7 @@ mod tests {
assert_eq!(cnt.unwrap(), num_rows);

let file = BufReader::new(File::open(spill_file.path())?);
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
let reader = FileReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 2);
assert_eq!(reader.schema(), schema);
Expand Down Expand Up @@ -175,11 +250,138 @@ mod tests {
)?;

let file = BufReader::new(File::open(spill_file.path())?);
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
let reader = FileReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 4);
assert_eq!(reader.schema(), schema);

Ok(())
}

#[test]
fn test_get_record_batch_memory_size() {
// Create a simple record batch with two columns
let schema = Arc::new(Schema::new(vec![
Field::new("ints", DataType::Int32, true),
Field::new("float64", DataType::Float64, false),
]));

let int_array =
Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_array), Arc::new(float64_array)],
)
.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 60);
}

#[test]
fn test_get_record_batch_memory_size_with_null() {
// Create a simple record batch with two columns
let schema = Arc::new(Schema::new(vec![
Field::new("ints", DataType::Int32, true),
Field::new("float64", DataType::Float64, false),
]));

let int_array = Int32Array::from(vec![None, Some(2), Some(3)]);
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]);

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_array), Arc::new(float64_array)],
)
.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 100);
}

#[test]
fn test_get_record_batch_memory_size_empty() {
// Test with empty record batch
let schema = Arc::new(Schema::new(vec![Field::new(
"ints",
DataType::Int32,
false,
)]));

let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 0, "Empty batch should have 0 memory size");
}

#[test]
fn test_get_record_batch_memory_size_shared_buffer() {
// Test with slices that share the same underlying buffer
let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
let slice1 = original.slice(0, 3);
let slice2 = original.slice(2, 3);

// `RecordBatch` with `original` array
// ----
let schema_origin = Arc::new(Schema::new(vec![Field::new(
"origin_col",
DataType::Int32,
false,
)]));
let batch_origin =
RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap();

// `RecordBatch` with all columns are reference to `original` array
// ----
let schema = Arc::new(Schema::new(vec![
Field::new("slice1", DataType::Int32, false),
Field::new("slice2", DataType::Int32, false),
]));

let batch_sliced =
RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)])
.unwrap();

// Two sizes should all be only counting the buffer in `original` array
let size_origin = get_record_batch_memory_size(&batch_origin);
let size_sliced = get_record_batch_memory_size(&batch_sliced);

assert_eq!(size_origin, size_sliced);
}

#[test]
fn test_get_record_batch_memory_size_nested_array() {
let schema = Arc::new(Schema::new(vec![
Field::new(
"nested_int",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false,
),
Field::new(
"nested_int2",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false,
),
]));

let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
]);

let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(4), Some(5), Some(6)]),
]);

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_list_array), Arc::new(int_list_array2)],
)
.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 8320);
}
}

0 comments on commit 172cf8d

Please sign in to comment.