Skip to content

Commit

Permalink
use serialize range in CompactRowSerializer
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Sep 5, 2024
1 parent 563423a commit 8cf9751
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
32 changes: 19 additions & 13 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
const folly::Range<const IndexRange*>& ranges,
Scratch& scratch) override {
size_t totalSize = 0;
std::vector<std::vector<vector_size_t>> rowSize(ranges.size());
row::CompactRow row(vector);
if (auto fixedRowSize =
row::CompactRow::fixedRowSize(asRowType(vector->type()))) {
for (const auto& range : ranges) {
totalSize += (fixedRowSize.value() + sizeof(TRowSize)) * range.size;
for (auto i = 0; i < ranges.size(); ++i) {
totalSize += (fixedRowSize.value() + sizeof(TRowSize)) * ranges[i].size;
rowSize[i].resize(ranges[i].size, fixedRowSize.value());
}

} else {
for (const auto& range : ranges) {
for (auto i = range.begin; i < range.begin + range.size; ++i) {
totalSize += row.rowSize(i) + sizeof(TRowSize);
for (auto i = 0; i < ranges.size(); ++i) {
const auto& range = ranges[i];
rowSize[i].resize(range.size);
for (auto j = 0; j < range.size; ++j) {
rowSize[i][j] = row.rowSize(range.begin + j);
totalSize += rowSize[i][j] + sizeof(TRowSize);
}
}
}
Expand All @@ -64,15 +68,17 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
buffers_.push_back(std::move(buffer));

size_t offset = 0;
for (auto& range : ranges) {
for (auto i = range.begin; i < range.begin + range.size; ++i) {
// Write row data.
TRowSize size = row.serialize(i, rawBuffer + offset + sizeof(TRowSize));

for (auto i = 0; i < ranges.size(); ++i) {
const auto& range = ranges[i];
std::vector<size_t> offsets(range.size);
for (auto j = 0; j < range.size; ++j) {
// Write raw size. Needs to be in big endian order.
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(size);
offset += sizeof(TRowSize) + size;
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[i][j]);
offsets[j] = offset + sizeof(TRowSize);
offset += rowSize[i][j] + sizeof(TRowSize);
}
// Write row data for all rows in range.
row.serialize(range.begin, range.size, rawBuffer, offsets);
}
}

Expand Down
8 changes: 3 additions & 5 deletions velox/serializers/tests/CompactRowSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,16 @@ class CompactRowSerializerTest : public ::testing::Test,
void serialize(RowVectorPtr rowVector, std::ostream* output) {
auto numRows = rowVector->size();

std::vector<IndexRange> rows(numRows);
for (int i = 0; i < numRows; i++) {
rows[i] = IndexRange{i, 1};
}
std::vector<IndexRange> rows;
rows.push_back(IndexRange{0, numRows});

auto arena = std::make_unique<StreamArena>(pool_.get());
auto rowType = asRowType(rowVector->type());
auto serializer =
serde_->createIterativeSerializer(rowType, numRows, arena.get());

Scratch scratch;
serializer->append(rowVector, folly::Range(rows.data(), numRows), scratch);
serializer->append(rowVector, folly::Range(rows.data(), 1), scratch);
auto size = serializer->maxSerializedSize();
OStreamOutputStream out(output);
serializer->flush(&out);
Expand Down

0 comments on commit 8cf9751

Please sign in to comment.