Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Aug 29, 2024
1 parent 54375e1 commit 2e4eacd
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 50 deletions.
117 changes: 75 additions & 42 deletions velox/row/CompactRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ int32_t readInt32(const char* buffer) {
return n;
}

FOLLY_ALWAYS_INLINE void writeFixeWidth(
char* buffer,
size_t& offset,
const char* rawData,
vector_size_t index,
size_t valueBytes) {
memcpy(buffer + offset, rawData + index * valueBytes, valueBytes);
offset += valueBytes;
}

FOLLY_ALWAYS_INLINE void
writeTimeStamp(char* buffer, size_t& offset, int64_t micros) {
// Write micros(int64_t) for timestamp value.
memcpy(buffer + offset, &micros, sizeof(int64_t));
offset += sizeof(int64_t);
}

FOLLY_ALWAYS_INLINE void
writeString(char* buffer, size_t& offset, const StringView& value) {
writeInt32(buffer + offset, value.size());
if (!value.empty()) {
memcpy(buffer + offset + kSizeBytes, value.data(), value.size());
}
offset += kSizeBytes + value.size();
}

// Serialize a child vector of a row type within a list of rows.
// Write the serialized data at offsets of buffer row by row.
// Update offsets with the actual serialized size.
Expand All @@ -42,20 +68,22 @@ void serializeTyped(
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();
const auto* rawData = decoded.data<char>();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write fixed-width value.
memcpy(
buffer + offsets[i],
rawData + decoded.index(rows[i]) * valueBytes,
valueBytes);
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
writeFixeWidth(
buffer, offsets[i], rawData, decoded.index(rows[i]), valueBytes);
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
offsets[i] += valueBytes;
} else {
writeFixeWidth(
buffer, offsets[i], rawData, decoded.index(rows[i]), valueBytes);
}
}
offsets[i] += valueBytes;
}
}

Expand All @@ -82,17 +110,21 @@ void serializeTyped<TypeKind::BOOLEAN>(
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();
auto* byte = reinterpret_cast<bool*>(buffer);

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write 1 byte for bool type.
byte[offsets[i]] = decoded.valueAt<bool>(rows[i]);
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
byte[offsets[i]++] = decoded.valueAt<bool>(rows[i]);
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
offsets[i] += 1;
} else {
// Write 1 byte for bool type.
byte[offsets[i]++] = decoded.valueAt<bool>(rows[i]);
}
}
offsets[i] += 1;
}
}

Expand All @@ -105,18 +137,20 @@ void serializeTyped<TypeKind::TIMESTAMP>(
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();
const auto* rawData = decoded.data<Timestamp>();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
// Write micros(int64_t) for timestamp value.
auto micros = rawData[rows[i]].toMicros();
memcpy(buffer + offsets[i], &micros, sizeof(int64_t));
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
writeTimeStamp(buffer, offsets[i], rawData[rows[i]].toMicros());
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
offsets[i] += sizeof(int64_t);
} else {
writeTimeStamp(buffer, offsets[i], rawData[rows[i]].toMicros());
}
}
offsets[i] += sizeof(int64_t);
}
}

Expand All @@ -129,18 +163,17 @@ void serializeTyped<TypeKind::VARCHAR>(
const raw_vector<uint8_t*>& nulls,
char* buffer,
std::vector<size_t>& offsets) {
const auto mayHaveNulls = decoded.mayHaveNulls();

for (auto i = 0; i < rows.size(); ++i) {
if (mayHaveNulls && decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
auto value = decoded.valueAt<StringView>(rows[i]);
writeInt32(buffer + offsets[i], value.size());
if (!value.empty()) {
memcpy(buffer + offsets[i] + kSizeBytes, value.data(), value.size());
if (!decoded.mayHaveNulls()) {
for (auto i = 0; i < rows.size(); ++i) {
writeString(buffer, offsets[i], decoded.valueAt<StringView>(rows[i]));
}
} else {
for (auto i = 0; i < rows.size(); ++i) {
if (decoded.isNullAt(rows[i])) {
bits::setBit(nulls[i], childIdx, true);
} else {
writeString(buffer, offsets[i], decoded.valueAt<StringView>(rows[i]));
}
offsets[i] += kSizeBytes + value.size();
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions velox/row/CompactRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ class CompactRow {

/// Serializes rows in range [offset, offset + size) into 'buffer' at given
/// 'bufferOffsets'. 'buffer' must have sufficient capacity and set to all
/// zeros. 'bufferOffsets' must be pre-filled with the write offsets for each
/// row and must have the same number of elements as the 'size' parameter.
/// The caller must ensure that the space between each offset in
/// 'bufferOffsets' is no less than the 'fixedRowSize' or 'rowSize'.
/// zeros for null-bits handling. 'bufferOffsets' must be pre-filled with
/// the write offsets for each row and must have the same number of elements
/// as the 'size' parameter. The caller must ensure that the space between
/// each offset in 'bufferOffsets' is no less than the 'fixedRowSize' or
/// 'rowSize'.
void serialize(
vector_size_t offset,
vector_size_t size,
Expand Down
7 changes: 3 additions & 4 deletions velox/row/benchmarks/UnsafeRowSerializeBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,16 @@ class SerializeBenchmark {
auto data = makeData(rowType);
suspender.dismiss();

auto numRows = data->size();
const auto numRows = data->size();
std::vector<size_t> rowSize(numRows);
std::vector<size_t> offsets(numRows);

CompactRow compact(data);
auto totalSize =
computeTotalSize(compact, rowType, numRows, rowSize, offsets);
auto buffer = AlignedBuffer::allocate<char>(totalSize, pool(), 0);
auto serialized =
serialize(compact, data->size(), buffer, rowSize, offsets);
VELOX_CHECK_EQ(serialized.size(), data->size());
auto serialized = serialize(compact, numRows, buffer, rowSize, offsets);
VELOX_CHECK_EQ(serialized.size(), numRows);
}

void deserializeCompact(const RowTypePtr& rowType) {
Expand Down

0 comments on commit 2e4eacd

Please sign in to comment.