Skip to content

Commit

Permalink
chore(dataobj): track non-NULL values in pages/columns (#15758)
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto authored Jan 14, 2025
1 parent cdb0082 commit aec8e96
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 47 deletions.
1 change: 1 addition & 0 deletions pkg/dataobj/internal/dataset/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type (
Compression datasetmd.CompressionType // Compression used for the column.

RowsCount int // Total number of rows in the column.
ValuesCount int // Total number of non-NULL values in the column.
CompressedSize int // Total size of all pages in the column after compression.
UncompressedSize int // Total size of all pages in the column before compression.

Expand Down
1 change: 1 addition & 0 deletions pkg/dataobj/internal/dataset/column_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) {

for _, page := range cb.pages {
info.RowsCount += page.Info.RowCount
info.ValuesCount += page.Info.ValuesCount
info.CompressedSize += page.Info.CompressedSize
info.UncompressedSize += page.Info.UncompressedSize
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataobj/internal/dataset/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func TestColumnBuilder_ReadWrite(t *testing.T) {
col, err := b.Flush()
require.NoError(t, err)
require.Equal(t, datasetmd.VALUE_TYPE_STRING, col.Info.Type)
require.Equal(t, len(in), col.Info.RowsCount)
require.Equal(t, len(in)-2, col.Info.ValuesCount) // -2 for the empty strings
require.Greater(t, len(col.Pages), 1)

t.Log("Uncompressed size: ", col.Info.UncompressedSize)
Expand Down
1 change: 1 addition & 0 deletions pkg/dataobj/internal/dataset/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type (
CompressedSize int // CompressedSize is the size of a page after compression.
CRC32 uint32 // CRC32 checksum of the page after encoding and compression.
RowCount int // RowCount is the number of rows in the page, including NULLs.
ValuesCount int // ValuesCount is the number of non-NULL values in the page.

Encoding datasetmd.EncodingType // Encoding used for values in the page.
Stats *datasetmd.Statistics // Optional statistics for the page.
Expand Down
6 changes: 5 additions & 1 deletion pkg/dataobj/internal/dataset/page_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type pageBuilder struct {
presenceEnc *bitmapEncoder
valuesEnc valueEncoder

rows int // Number of rows appended to the builder.
rows int // Number of rows appended to the builder.
values int // Number of non-NULL values appended to the builder.
}

// newPageBuilder creates a new pageBuilder that stores a sequence of [Value]s.
Expand Down Expand Up @@ -104,6 +105,7 @@ func (b *pageBuilder) Append(value Value) bool {
}

b.rows++
b.values++
return true
}

Expand Down Expand Up @@ -209,6 +211,7 @@ func (b *pageBuilder) Flush() (*MemPage, error) {
CompressedSize: finalData.Len(),
CRC32: checksum,
RowCount: b.rows,
ValuesCount: b.values,

Encoding: b.opts.Encoding,

Expand Down Expand Up @@ -237,4 +240,5 @@ func (b *pageBuilder) Reset() {
b.presenceBuffer.Reset()
b.valuesEnc.Reset(b.valuesWriter)
b.rows = 0
b.values = 0
}
2 changes: 2 additions & 0 deletions pkg/dataobj/internal/dataset/page_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ func Test_pageBuilder_WriteRead(t *testing.T) {

page, err := b.Flush()
require.NoError(t, err)
require.Equal(t, len(in), page.Info.RowCount)
require.Equal(t, len(in)-2, page.Info.ValuesCount) // -2 for the empty strings

t.Log("Uncompressed size: ", page.Info.UncompressedSize)
t.Log("Compressed size: ", page.Info.CompressedSize)
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataobj/internal/encoding/dataset_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (col *logsDatasetColumn) ColumnInfo() *dataset.ColumnInfo {
Compression: col.desc.Info.Compression,

RowsCount: int(col.desc.Info.RowsCount),
ValuesCount: int(col.desc.Info.ValuesCount),
CompressedSize: int(col.desc.Info.CompressedSize),
UncompressedSize: int(col.desc.Info.UncompressedSize),

Expand Down Expand Up @@ -133,6 +134,7 @@ func (p *logsDatasetPage) PageInfo() *dataset.PageInfo {
CompressedSize: int(p.desc.Info.CompressedSize),
CRC32: p.desc.Info.Crc32,
RowCount: int(p.desc.Info.RowsCount),
ValuesCount: int(p.desc.Info.ValuesCount),

Encoding: p.desc.Info.Encoding,
Stats: p.desc.Info.Statistics,
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataobj/internal/encoding/dataset_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (col *streamsDatasetColumn) ColumnInfo() *dataset.ColumnInfo {
Compression: col.desc.Info.Compression,

RowsCount: int(col.desc.Info.RowsCount),
ValuesCount: int(col.desc.Info.ValuesCount),
CompressedSize: int(col.desc.Info.CompressedSize),
UncompressedSize: int(col.desc.Info.UncompressedSize),

Expand Down Expand Up @@ -133,6 +134,7 @@ func (p *streamsDatasetPage) PageInfo() *dataset.PageInfo {
CompressedSize: int(p.desc.Info.CompressedSize),
CRC32: p.desc.Info.Crc32,
RowCount: int(p.desc.Info.RowsCount),
ValuesCount: int(p.desc.Info.ValuesCount),

Encoding: p.desc.Info.Encoding,
Stats: p.desc.Info.Statistics,
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataobj/internal/encoding/encoder_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (enc *LogsEncoder) OpenColumn(columnType logsmd.ColumnType, info *dataset.C
Name: info.Name,
ValueType: info.Type,
RowsCount: uint32(info.RowsCount),
ValuesCount: uint32(info.ValuesCount),
Compression: info.Compression,
UncompressedSize: uint32(info.UncompressedSize),
CompressedSize: uint32(info.CompressedSize),
Expand Down Expand Up @@ -199,6 +200,7 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error {
CompressedSize: uint32(page.Info.CompressedSize),
Crc32: page.Info.CRC32,
RowsCount: uint32(page.Info.RowCount),
ValuesCount: uint32(page.Info.ValuesCount),
Encoding: page.Info.Encoding,

DataOffset: uint32(enc.startOffset + enc.data.Len()),
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataobj/internal/encoding/encoder_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (enc *StreamsEncoder) OpenColumn(columnType streamsmd.ColumnType, info *dat
Name: info.Name,
ValueType: info.Type,
RowsCount: uint32(info.RowsCount),
ValuesCount: uint32(info.ValuesCount),
Compression: info.Compression,
UncompressedSize: uint32(info.UncompressedSize),
CompressedSize: uint32(info.CompressedSize),
Expand Down Expand Up @@ -199,6 +200,7 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error {
CompressedSize: uint32(page.Info.CompressedSize),
Crc32: page.Info.CRC32,
RowsCount: uint32(page.Info.RowCount),
ValuesCount: uint32(page.Info.ValuesCount),
Encoding: page.Info.Encoding,

DataOffset: uint32(enc.startOffset + enc.data.Len()),
Expand Down
Loading

0 comments on commit aec8e96

Please sign in to comment.