Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Oct 31, 2024
1 parent fa507d1 commit 67650d2
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 67 deletions.
81 changes: 40 additions & 41 deletions velox/row/CompactRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,50 @@ namespace {
constexpr size_t kSizeBytes = sizeof(int32_t);

void writeInt32(char* buffer, int32_t n) {
memcpy(buffer, &n, kSizeBytes);
::memcpy(buffer, &n, kSizeBytes);
}

int32_t readInt32(const char* buffer) {
int32_t n;
memcpy(&n, buffer, kSizeBytes);
::memcpy(&n, buffer, kSizeBytes);
return n;
}

FOLLY_ALWAYS_INLINE void writeFixedWidth(
char* buffer,
size_t& offset,
const char* rawData,
vector_size_t index,
size_t valueBytes) {
memcpy(buffer + offset, rawData + index * valueBytes, valueBytes);
size_t valueBytes,
char* buffer,
size_t& offset) {
::memcpy(buffer + offset, rawData + index * valueBytes, valueBytes);
offset += valueBytes;
}

FOLLY_ALWAYS_INLINE void
writeTimestamp(char* buffer, size_t& offset, const Timestamp& timestamp) {
writeTimestamp(const Timestamp& timestamp, char* buffer, size_t& offset) {
// Write micros(int64_t) for timestamp value.
const auto micros = timestamp.toMicros();
memcpy(buffer + offset, &micros, sizeof(int64_t));
const auto timeUs = timestamp.toMicros();
::memcpy(buffer + offset, &timeUs, sizeof(int64_t));
offset += sizeof(int64_t);
}

FOLLY_ALWAYS_INLINE void
writeString(char* buffer, size_t& offset, const StringView& value) {
writeString(const StringView& value, char* buffer, size_t& offset) {
writeInt32(buffer + offset, value.size());
if (!value.empty()) {
memcpy(buffer + offset + kSizeBytes, value.data(), value.size());
::memcpy(buffer + offset + kSizeBytes, value.data(), value.size());
}
offset += kSizeBytes + value.size();
}

// Serialize a child vector of a row type within a list of rows.
// Serialize the child vector of a row type within a range of consecutive rows.
// Write the serialized data at offsets of buffer row by row.
// Update offsets with the actual serialized size.
template <TypeKind kind>
void serializeTyped(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
const DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
Expand All @@ -73,7 +73,7 @@ void serializeTyped(
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
writeFixedWidth(
buffer, offsets[i], rawData, decoded.index(rows[i]), valueBytes);
rawData, decoded.index(rows[i]), valueBytes, buffer, offsets[i]);
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
Expand All @@ -82,7 +82,7 @@ void serializeTyped(
offsets[i] += valueBytes;
} else {
writeFixedWidth(
buffer, offsets[i], rawData, decoded.index(rows[i]), valueBytes);
rawData, decoded.index(rows[i]), valueBytes, buffer, offsets[i]);
}
}
}
Expand All @@ -92,7 +92,7 @@ template <>
void serializeTyped<TypeKind::UNKNOWN>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& /* unused */,
const DecodedVector& /* unused */,
size_t /* unused */,
const raw_vector<uint8_t*>& nulls,
char* /* unused */,
Expand All @@ -106,7 +106,7 @@ template <>
void serializeTyped<TypeKind::BOOLEAN>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
const DecodedVector& decoded,
size_t /* unused */,
const raw_vector<uint8_t*>& nulls,
char* buffer,
Expand All @@ -133,25 +133,25 @@ template <>
void serializeTyped<TypeKind::TIMESTAMP>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
const DecodedVector& decoded,
size_t /* unused */,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto* rawData = decoded.data<Timestamp>();
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
auto index = decoded.index(rows[i]);
writeTimestamp(buffer, offsets[i], rawData[index]);
const auto index = decoded.index(rows[i]);
writeTimestamp(rawData[index], buffer, offsets[i]);
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
offsets[i] += sizeof(int64_t);
} else {
auto index = decoded.index(rows[i]);
writeTimestamp(buffer, offsets[i], rawData[index]);
const auto index = decoded.index(rows[i]);
writeTimestamp(rawData[index], buffer, offsets[i]);
}
}
}
Expand All @@ -161,21 +161,21 @@ template <>
void serializeTyped<TypeKind::VARCHAR>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
const DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
writeString(buffer, offsets[i], decoded.valueAt<StringView>(rows[i]));
writeString(decoded.valueAt<StringView>(rows[i]), buffer, offsets[i]);
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
writeString(buffer, offsets[i], decoded.valueAt<StringView>(rows[i]));
writeString(decoded.valueAt<StringView>(rows[i]), buffer, offsets[i]);
}
}
}
Expand All @@ -185,7 +185,7 @@ template <>
void serializeTyped<TypeKind::VARBINARY>(
const raw_vector<vector_size_t>& rows,
uint32_t childIdx,
DecodedVector& decoded,
const DecodedVector& decoded,
size_t valueBytes,
const raw_vector<uint8_t*>& nulls,
char* buffer,
Expand Down Expand Up @@ -378,7 +378,7 @@ void CompactRow::serializeRow(

// After serializing each column, the 'offsets' are updated accordingly.
std::vector<size_t> offsets(size);
auto* base = reinterpret_cast<uint8_t*>(buffer);
auto* const base = reinterpret_cast<uint8_t*>(buffer);
for (auto i = 0; i < size; ++i) {
nulls[i] = base + bufferOffsets[i];
offsets[i] = bufferOffsets[i] + rowNullBytes_;
Expand All @@ -402,15 +402,14 @@ void CompactRow::serializeRow(
buffer,
offsets);
} else {
const auto mayHaveNulls = child.decoded_.mayHaveNulls();
const bool mayHaveNulls = child.decoded_.mayHaveNulls();
for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && child.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write non-null variable-width value.
auto bytes =
offsets[i] +=
child.serializeVariableWidth(rows[i], buffer + offsets[i]);
offsets[i] += bytes;
}
}
}
Expand Down Expand Up @@ -634,16 +633,16 @@ int32_t CompactRow::serializeMap(vector_size_t index, char* buffer) {
return keysSerializedBytes + valuesSerializedBytes;
}

int32_t CompactRow::serialize(vector_size_t index, char* buffer) {
return serializeRow(index, buffer);
}

void CompactRow::serialize(
vector_size_t offset,
vector_size_t size,
char* buffer,
const size_t* bufferOffsets) {
if (size == 1) {
(void)serializeRow(offset, buffer + *bufferOffsets);
return;
}
return serializeRow(offset, size, buffer, bufferOffsets);
serializeRow(offset, size, buffer, bufferOffsets);
}

void CompactRow::serializeFixedWidth(vector_size_t index, char* buffer) {
Expand All @@ -654,11 +653,11 @@ void CompactRow::serializeFixedWidth(vector_size_t index, char* buffer) {
break;
case TypeKind::TIMESTAMP: {
auto micros = decoded_.valueAt<Timestamp>(index).toMicros();
memcpy(buffer, &micros, sizeof(int64_t));
::memcpy(buffer, &micros, sizeof(int64_t));
break;
}
default:
memcpy(
::memcpy(
buffer,
decoded_.data<char>() + decoded_.index(index) * valueBytes_,
valueBytes_);
Expand All @@ -672,7 +671,7 @@ void CompactRow::serializeFixedWidth(
VELOX_DCHECK(supportsBulkCopy_);
// decoded_.data<char>() can be null if all values are null.
if (decoded_.data<char>()) {
memcpy(
::memcpy(
buffer,
decoded_.data<char>() + decoded_.index(offset) * valueBytes_,
valueBytes_ * size);
Expand All @@ -687,7 +686,7 @@ int32_t CompactRow::serializeVariableWidth(vector_size_t index, char* buffer) {
auto value = decoded_.valueAt<StringView>(index);
writeInt32(buffer, value.size());
if (!value.empty()) {
memcpy(buffer + kSizeBytes, value.data(), value.size());
::memcpy(buffer + kSizeBytes, value.data(), value.size());
}
return kSizeBytes + value.size();
}
Expand Down Expand Up @@ -716,11 +715,11 @@ void readFixedWidthValue(
flatVector->setNull(index, true);
} else if constexpr (std::is_same_v<T, Timestamp>) {
int64_t micros;
memcpy(&micros, buffer, sizeof(int64_t));
::memcpy(&micros, buffer, sizeof(int64_t));
flatVector->set(index, Timestamp::fromMicros(micros));
} else {
T value;
memcpy(&value, buffer, sizeof(T));
::memcpy(&value, buffer, sizeof(T));
flatVector->set(index, value);
}
}
Expand Down
4 changes: 4 additions & 0 deletions velox/row/CompactRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class CompactRow {
/// 'fixedRowSize' returned std::nullopt.
int32_t rowSize(vector_size_t index);

/// Serializes row at specified index into 'buffer'.
/// 'buffer' must have sufficient capacity and set to all zeros.
int32_t serialize(vector_size_t index, char* buffer);

/// Serializes rows in the range [offset, offset + size) into 'buffer' at
/// given 'bufferOffsets'. 'buffer' must have sufficient capacity and set to
/// all zeros for null-bits handling. 'bufferOffsets' must be pre-filled with
Expand Down
52 changes: 37 additions & 15 deletions velox/row/tests/CompactRowTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,46 @@ class CompactRowTest : public ::testing::Test, public VectorTestBase {

BufferPtr buffer = AlignedBuffer::allocate<char>(totalSize, pool(), 0);
auto* rawBuffer = buffer->asMutable<char>();
std::vector<std::string_view> serialized;

vector_size_t offset = 0;
vector_size_t rangeSize = 1;
// Serialize with different range size.
while (offset < numRows) {
auto size = std::min<vector_size_t>(rangeSize, numRows - offset);
row.serialize(offset, size, rawBuffer, offsets.data() + offset);
offset += size;
rangeSize = checkedMultiply<vector_size_t>(rangeSize, 2);
{
// Test serialize row-by-row.
size_t offset = 0;
std::vector<std::string_view> serialized;
for (auto i = 0; i < numRows; ++i) {
auto size = row.serialize(i, rawBuffer + offset);
serialized.push_back(std::string_view(rawBuffer + offset, size));
offset += size;

VELOX_CHECK_EQ(
size, row.rowSize(i), "Row {}: {}", i, data->toString(i));
}

VELOX_CHECK_EQ(offset, totalSize);

auto copy = CompactRow::deserialize(serialized, rowType, pool());
assertEqualVectors(data, copy);
}
{
// Test serialize by range.
memset(rawBuffer, 0, totalSize);

std::vector<std::string_view> serialized;
vector_size_t offset = 0;
vector_size_t rangeSize = 1;
// Serialize with different range size.
while (offset < numRows) {
auto size = std::min<vector_size_t>(rangeSize, numRows - offset);
row.serialize(offset, size, rawBuffer, offsets.data() + offset);
offset += size;
rangeSize = checkedMultiply<vector_size_t>(rangeSize, 2);
}

for (auto i = 0; i < numRows; ++i) {
serialized.push_back(
std::string_view(rawBuffer + offsets[i], rowSize[i]));
for (auto i = 0; i < numRows; ++i) {
serialized.push_back(
std::string_view(rawBuffer + offsets[i], rowSize[i]));
}
auto copy = CompactRow::deserialize(serialized, rowType, pool());
assertEqualVectors(data, copy);
}
auto copy = CompactRow::deserialize(serialized, rowType, pool());
assertEqualVectors(data, copy);
}
};

Expand Down
23 changes: 12 additions & 11 deletions velox/serializers/CompactRowSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
const folly::Range<const IndexRange*>& ranges,
Scratch& scratch) override {
size_t totalSize = 0;
auto totalRows = std::accumulate(
const auto totalRows = std::accumulate(
ranges.begin(),
ranges.end(),
0,
[](vector_size_t sum, const auto& range) { return sum + range.size; });

if (totalRows == 0) {
return;
}

row::CompactRow row(vector);
std::vector<vector_size_t> rowSize(totalRows);
if (auto fixedRowSize =
Expand All @@ -55,10 +59,9 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
} else {
vector_size_t index = 0;
for (const auto& range : ranges) {
for (auto i = 0; i < range.size; ++i) {
for (auto i = 0; i < range.size; ++i, ++index) {
rowSize[index] = row.rowSize(range.begin + i);
totalSize += rowSize[index] + sizeof(TRowSize);
index++;
}
}
}
Expand All @@ -68,7 +71,7 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
}

BufferPtr buffer = AlignedBuffer::allocate<char>(totalSize, pool_, 0);
auto rawBuffer = buffer->asMutable<char>();
auto* rawBuffer = buffer->asMutable<char>();
buffers_.push_back(std::move(buffer));

size_t offset = 0;
Expand All @@ -77,19 +80,17 @@ class CompactRowVectorSerializer : public IterativeVectorSerializer {
if (range.size == 1) {
// Fast path for single-row serialization.
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[index]);
static const auto rowSizeOffset = sizeof(TRowSize);
row.serialize(
range.begin, range.size, rawBuffer + offset, &rowSizeOffset);
offset += rowSize[index] + sizeof(TRowSize);
index++;
auto size =
row.serialize(range.begin, rawBuffer + offset + sizeof(TRowSize));
offset += size + sizeof(TRowSize);
++index;
} else {
raw_vector<size_t> offsets(range.size);
for (auto i = 0; i < range.size; ++i) {
for (auto i = 0; i < range.size; ++i, ++index) {
// Write raw size. Needs to be in big endian order.
*(TRowSize*)(rawBuffer + offset) = folly::Endian::big(rowSize[index]);
offsets[i] = offset + sizeof(TRowSize);
offset += rowSize[index] + sizeof(TRowSize);
index++;
}
// Write row data for all rows in range.
row.serialize(range.begin, range.size, rawBuffer, offsets.data());
Expand Down

0 comments on commit 67650d2

Please sign in to comment.