Skip to content

Commit

Permalink
update serde
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 12, 2024
1 parent 77f626c commit faaf61a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 20 deletions.
53 changes: 37 additions & 16 deletions velox/serializers/UnsafeRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,28 @@ class UnsafeRowVectorSerializer : public IterativeVectorSerializer {
const folly::Range<const IndexRange*>& ranges,
Scratch& /*scratch*/) override {
size_t totalSize = 0;
const auto totalRows = std::accumulate(
ranges.begin(),
ranges.end(),
0,
[](vector_size_t sum, const auto& range) { return sum + range.size; });

if (totalRows == 0) {
return;
}

row::UnsafeRowFast unsafeRow(vector);
std::vector<vector_size_t> rowSize(totalRows);
if (auto fixedRowSize =
row::UnsafeRowFast::fixedRowSize(asRowType(vector->type()))) {
for (const auto& range : ranges) {
totalSize += (fixedRowSize.value() + sizeof(TRowSize)) * range.size;
}

totalSize += (fixedRowSize.value() + sizeof(TRowSize)) * totalRows;
std::fill(rowSize.begin(), rowSize.end(), fixedRowSize.value());
} else {
vector_size_t index = 0;
for (const auto& range : ranges) {
for (auto i = range.begin; i < range.begin + range.size; ++i) {
totalSize += unsafeRow.rowSize(i) + sizeof(TRowSize);
for (auto i = 0; i < range.size; ++i, ++index) {
rowSize[index] = unsafeRow.rowSize(range.begin + i);
totalSize += rowSize[index] + sizeof(TRowSize);
}
}
}
Expand All @@ -54,19 +65,29 @@ class UnsafeRowVectorSerializer : public IterativeVectorSerializer {
}

BufferPtr buffer = AlignedBuffer::allocate<char>(totalSize, pool_, 0);
auto rawBuffer = buffer->asMutable<char>();
auto* rawBuffer = buffer->asMutable<char>();
buffers_.push_back(std::move(buffer));

size_t offset = 0;
for (auto& range : ranges) {
for (auto i = range.begin; i < range.begin + range.size; ++i) {
// Write row data.
TRowSize size =
unsafeRow.serialize(i, rawBuffer + offset + sizeof(TRowSize));

// Write raw size. Needs to be in big endian order.
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(size);
offset += sizeof(TRowSize) + size;
vector_size_t index = 0;
for (const auto& range : ranges) {
if (range.size == 1) {
// Fast path for single-row serialization.
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[index]);
auto size = unsafeRow.serialize(
range.begin, rawBuffer + offset + sizeof(TRowSize));
offset += size + sizeof(TRowSize);
++index;
} else {
raw_vector<size_t> offsets(range.size);
for (auto i = 0; i < range.size; ++i, ++index) {
// Write raw size. Needs to be in big endian order.
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[index]);
offsets[i] = offset + sizeof(TRowSize);
offset += rowSize[index] + sizeof(TRowSize);
}
// Write row data for all rows in range.
unsafeRow.serialize(range.begin, range.size, offsets.data(), rawBuffer);
}
}
}
Expand Down
25 changes: 25 additions & 0 deletions velox/serializers/benchmarks/RowSerializerBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <folly/init/Init.h>

#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/UnsafeRowSerializer.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

namespace facebook::velox::test {
Expand All @@ -31,6 +32,14 @@ class RowSerializerBenchmark {
deregisterVectorSerde();
}

void unsafeRowVectorSerde(
const RowTypePtr& rowType,
vector_size_t rangeSize) {
serializer::spark::UnsafeRowVectorSerde::registerVectorSerde();
serialize(rowType, rangeSize);
deregisterVectorSerde();
}

private:
void serialize(const RowTypePtr& rowType, vector_size_t rangeSize) {
folly::BenchmarkSuspender suspender;
Expand Down Expand Up @@ -72,17 +81,33 @@ class RowSerializerBenchmark {
RowSerializerBenchmark benchmark; \
benchmark.compactRowVectorSerde(rowType, 1); \
} \
BENCHMARK(unsafe_serialize_1_##name) { \
RowSerializerBenchmark benchmark; \
benchmark.unsafeRowVectorSerde(rowType, 1); \
} \
BENCHMARK(compact_serialize_10_##name) { \
RowSerializerBenchmark benchmark; \
benchmark.compactRowVectorSerde(rowType, 10); \
} \
BENCHMARK(unsafe_serialize_10_##name) { \
RowSerializerBenchmark benchmark; \
benchmark.unsafeRowVectorSerde(rowType, 10); \
} \
BENCHMARK(compact_serialize_100_##name) { \
RowSerializerBenchmark benchmark; \
benchmark.compactRowVectorSerde(rowType, 100); \
} \
BENCHMARK(unsafe_serialize_100_##name) { \
RowSerializerBenchmark benchmark; \
benchmark.unsafeRowVectorSerde(rowType, 100); \
} \
BENCHMARK(compact_serialize_1000_##name) { \
RowSerializerBenchmark benchmark; \
benchmark.compactRowVectorSerde(rowType, 1'000); \
} \
BENCHMARK(unsafe_serialize_1000_##name) { \
RowSerializerBenchmark benchmark; \
benchmark.unsafeRowVectorSerde(rowType, 1'000); \
}

VECTOR_SERDE_BENCHMARKS(
Expand Down
14 changes: 10 additions & 4 deletions velox/serializers/tests/UnsafeRowSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ class UnsafeRowSerializerTest : public ::testing::Test,
void serialize(RowVectorPtr rowVector, std::ostream* output) {
const auto numRows = rowVector->size();

std::vector<IndexRange> ranges(numRows);
for (int i = 0; i < numRows; i++) {
ranges[i] = IndexRange{i, 1};
// Serialize with different range size.
std::vector<IndexRange> ranges;
vector_size_t offset = 0;
vector_size_t rangeSize = 1;
while (offset < numRows) {
auto size = std::min<vector_size_t>(rangeSize, numRows - offset);
ranges.push_back(IndexRange{offset, size});
offset += size;
rangeSize = checkedMultiply<vector_size_t>(rangeSize, 2);
}

std::unique_ptr<row::UnsafeRowFast> unsafeRow;
Expand All @@ -79,7 +85,7 @@ class UnsafeRowSerializerTest : public ::testing::Test,
} else {
Scratch scratch;
serializer->append(
rowVector, folly::Range(ranges.data(), numRows), scratch);
rowVector, folly::Range(ranges.data(), ranges.size()), scratch);
}

auto size = serializer->maxSerializedSize();
Expand Down

0 comments on commit faaf61a

Please sign in to comment.