Skip to content

Commit

Permalink
[WIP] PO DEBUG
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Oct 10, 2024
1 parent 9cf4ee0 commit fc955b6
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
3 changes: 3 additions & 0 deletions velox/common/memory/ByteStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ void ByteOutputStream::extend(int32_t bytes) {
ranges_.emplace_back();
current_ = &ranges_.back();
lastRangeEnd_ = 0;
if (bytes == 0) {
return;
}
arena_->newRange(
newRangeSize(bytes),
ranges_.size() == 1 ? nullptr : &ranges_[ranges_.size() - 2],
Expand Down
43 changes: 33 additions & 10 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1386,8 +1386,9 @@ class VectorStream {
}

// The first element in the offsets in the wire format is always 0 for
// nested types.
lengths_.startWrite(sizeof(vector_size_t));
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.startWrite(0);
lengths_.appendOne<int32_t>(0);
}
return;
Expand Down Expand Up @@ -1504,6 +1505,10 @@ class VectorStream {
nonNullCount_ += count;
}

bool lengthEmpty() {
return totalLength_ == 0 && nonNullCount_ == 0 && nullCount_ == 0;
}

void appendLength(int32_t length) {
totalLength_ += length;
lengths_.appendOne<int32_t>(totalLength_);
Expand Down Expand Up @@ -1720,7 +1725,9 @@ class VectorStream {
lengths_.startWrite(lengths_.size());
if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY ||
type_->kind() == TypeKind::MAP) {
// A complex type has a 0 as first length.
// The first element in the offsets in the wire format is always 0 for
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.appendOne<int32_t>(0);
}
}
Expand All @@ -1736,7 +1743,7 @@ class VectorStream {
std::optional<VectorPtr> vector,
vector_size_t initialNumRows) {
initializeHeader(typeToEncodingName(type_), *streamArena_);
nulls_.startWrite(1 + (initialNumRows / 8));
nulls_.startWrite(0);

switch (type_->kind()) {
case TypeKind::ROW:
Expand All @@ -1745,7 +1752,7 @@ class VectorStream {
[[fallthrough]];
case TypeKind::MAP:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
lengths_.startWrite(0);
children_.resize(type_->size());
for (int32_t i = 0; i < type_->size(); ++i) {
children_[i] = std::make_unique<VectorStream>(
Expand All @@ -1757,21 +1764,22 @@ class VectorStream {
opts_);
}
// The first element in the offsets in the wire format is always 0 for
// nested types.
// nested types. Set upon construction/reset in case empty (no append
// calls will be made).
lengths_.appendOne<int32_t>(0);
break;
case TypeKind::VARCHAR:
[[fallthrough]];
case TypeKind::VARBINARY:
hasLengths_ = true;
lengths_.startWrite(initialNumRows * sizeof(vector_size_t));
lengths_.startWrite(0);
if (values_.ranges().empty()) {
values_.startWrite(initialNumRows * 10);
values_.startWrite(0);
}
break;
default:
if (values_.ranges().empty()) {
values_.startWrite(initialNumRows * 4);
values_.startWrite(0);
}
break;
}
Expand Down Expand Up @@ -1972,13 +1980,17 @@ void serializeWrapped(
}
}

// Serialized layout for RowVector
// null bits (1 bit per row) | serialized child_0 | serialized child_1 | ...
void serializeRowVector(
const VectorPtr& vector,
const folly::Range<const IndexRange*>& ranges,
VectorStream* stream,
Scratch& scratch) {
// if (stream->lengthEmpty()) { // jtan6
// stream->appendLength(0);
// }
auto rowVector = vector->as<RowVector>();

std::vector<IndexRange> childRanges;
for (int32_t i = 0; i < ranges.size(); ++i) {
auto begin = ranges[i].begin;
Expand Down Expand Up @@ -2009,6 +2021,9 @@ void serializeArrayVector(
auto rawOffsets = arrayVector->rawOffsets();
std::vector<IndexRange> childRanges;
childRanges.reserve(ranges.size());
// if (stream->lengthEmpty()) { // jtan6
// stream->appendLength(0);
// }
for (int32_t i = 0; i < ranges.size(); ++i) {
int32_t begin = ranges[i].begin;
int32_t end = begin + ranges[i].size;
Expand Down Expand Up @@ -2743,6 +2758,9 @@ void serializeRowVector(
ScratchPtr<vector_size_t, 64> innerRowsHolder(scratch);
auto innerRows = rows.data();
auto numInnerRows = rows.size();
// if (stream->lengthEmpty()) { // jtan6
// stream->appendLength(0);
// }
if (auto rawNulls = vector->rawNulls()) {
auto nulls = nullsHolder.get(bits::nwords(rows.size()));
simd::gatherBits(rawNulls, rows, nulls);
Expand Down Expand Up @@ -3952,6 +3970,11 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
}
}

static int64_t& maxLength() {
static int64_t maxLength{0};
return maxLength;
}

void append(
const RowVectorPtr& vector,
const folly::Range<const vector_size_t*>& rows,
Expand Down
27 changes: 27 additions & 0 deletions velox/serializers/tests/PrestoSerializerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,33 @@ TEST_P(PrestoSerializerTest, emptyPage) {
assertEqualVectors(deserialized, rowVector);
}

TEST_P(PrestoSerializerTest, initMemory) {
const auto numRows = 100;
auto testFunc = [&](TypePtr type, int64_t expectedBytes) {
const auto poolMemUsage = pool_->usedBytes();
auto arena = std::make_unique<StreamArena>(pool_.get());
const auto paramOptions = getParamSerdeOptions(nullptr);
const auto rowType = ROW({type});
const auto serializer = serde_->createIterativeSerializer(
rowType, numRows, arena.get(), &paramOptions);
ASSERT_EQ(pool_->usedBytes() - poolMemUsage, expectedBytes);
};

testFunc(BOOLEAN(), 0);
testFunc(TINYINT(), 0);
testFunc(SMALLINT(), 0);
testFunc(INTEGER(), 0);
testFunc(BIGINT(), 0);
testFunc(REAL(), 0);
testFunc(DOUBLE(), 0);
testFunc(VARCHAR(), 0);
testFunc(TIMESTAMP(), 0);
// For nested types, 2 pages allocation quantum for first offset (0).
testFunc(ROW({VARCHAR()}), 8192);
testFunc(ARRAY(INTEGER()), 8192);
testFunc(MAP(VARCHAR(), INTEGER()), 8192);
}

TEST_P(PrestoSerializerTest, serializeNoRowsSelected) {
std::ostringstream out;
facebook::velox::serializer::presto::PrestoOutputStreamListener listener;
Expand Down

0 comments on commit fc955b6

Please sign in to comment.