Skip to content

Commit

Permalink
chore(dataobj): reduce total allocations while encoding (#15846)
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto authored Jan 20, 2025
1 parent bb5373d commit 2a60011
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/dataobj/internal/encoding/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (enc *Encoder) append(data, metadata []byte) error {
enc.curSection.MetadataSize = uint32(len(metadata))

// bytes.Buffer.Write never fails.
enc.data.Grow(len(data) + len(metadata))
_, _ = enc.data.Write(data)
_, _ = enc.data.Write(metadata)

Expand Down
27 changes: 20 additions & 7 deletions pkg/dataobj/internal/encoding/encoder_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (enc *LogsEncoder) append(data, metadata []byte) error {
enc.curColumn.Info.MetadataSize = uint32(len(metadata))

// bytes.Buffer.Write never fails.
enc.data.Grow(len(data) + len(metadata))
_, _ = enc.data.Write(data)
_, _ = enc.data.Write(metadata)

Expand All @@ -169,8 +170,11 @@ type LogsColumnEncoder struct {
startOffset int // Byte offset in the file where the column starts.
closed bool // true if LogsColumnEncoder has been closed.

data *bytes.Buffer // All page data.
pages []*logsmd.PageDesc
data *bytes.Buffer // All page data.
pageHeaders []*logsmd.PageDesc

memPages []*dataset.MemPage // Pages to write.
totalPageSize int // Size of bytes across all pages.
}

func newLogsColumnEncoder(parent *LogsEncoder, offset int) *LogsColumnEncoder {
Expand All @@ -195,7 +199,7 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error {
// It's possible the caller can pass an incorrect value for UncompressedSize
// and CompressedSize, but those fields are purely for stats so we don't
// check it.
enc.pages = append(enc.pages, &logsmd.PageDesc{
enc.pageHeaders = append(enc.pageHeaders, &logsmd.PageDesc{
Info: &datasetmd.PageInfo{
UncompressedSize: uint32(page.Info.UncompressedSize),
CompressedSize: uint32(page.Info.CompressedSize),
Expand All @@ -204,14 +208,15 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error {
ValuesCount: uint32(page.Info.ValuesCount),
Encoding: page.Info.Encoding,

DataOffset: uint32(enc.startOffset + enc.data.Len()),
DataOffset: uint32(enc.startOffset + enc.totalPageSize),
DataSize: uint32(len(page.Data)),

Statistics: page.Info.Stats,
},
})

_, _ = enc.data.Write(page.Data) // bytes.Buffer.Write never fails.
enc.memPages = append(enc.memPages, page)
enc.totalPageSize += len(page.Data)
return nil
}

Expand All @@ -220,7 +225,7 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error {
func (enc *LogsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) }

func (enc *LogsColumnEncoder) metadata() proto.Message {
return &logsmd.ColumnMetadata{Pages: enc.pages}
return &logsmd.ColumnMetadata{Pages: enc.pageHeaders}
}

// Commit closes the column, flushing all data to the parent element. After
Expand All @@ -233,18 +238,26 @@ func (enc *LogsColumnEncoder) Commit() error {

defer bytesBufferPool.Put(enc.data)

if len(enc.pages) == 0 {
if len(enc.pageHeaders) == 0 {
// No data was written; discard.
return enc.parent.append(nil, nil)
}

// Write all pages. To avoid costly reallocations, we grow our buffer to fit
// all data first.
enc.data.Grow(enc.totalPageSize)
for _, p := range enc.memPages {
_, _ = enc.data.Write(p.Data) // bytes.Buffer.Write never fails.
}

metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer)
metadataBuffer.Reset()
defer bytesBufferPool.Put(metadataBuffer)

if err := elementMetadataWrite(enc, metadataBuffer); err != nil {
return err
}

return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes())
}

Expand Down
26 changes: 19 additions & 7 deletions pkg/dataobj/internal/encoding/encoder_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (enc *StreamsEncoder) append(data, metadata []byte) error {
enc.curColumn.Info.MetadataSize = uint32(len(metadata))

// bytes.Buffer.Write never fails.
enc.data.Grow(len(data) + len(metadata))
_, _ = enc.data.Write(data)
_, _ = enc.data.Write(metadata)

Expand All @@ -169,8 +170,11 @@ type StreamsColumnEncoder struct {
startOffset int // Byte offset in the file where the column starts.
closed bool // true if StreamsColumnEncoder has been closed.

data *bytes.Buffer // All page data.
pages []*streamsmd.PageDesc
data *bytes.Buffer // All page data.
pageHeaders []*streamsmd.PageDesc

memPages []*dataset.MemPage // Pages to write.
totalPageSize int // Total size of all pages.
}

func newStreamsColumnEncoder(parent *StreamsEncoder, offset int) *StreamsColumnEncoder {
Expand All @@ -195,7 +199,7 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error {
// It's possible the caller can pass an incorrect value for UncompressedSize
// and CompressedSize, but those fields are purely for stats so we don't
// check it.
enc.pages = append(enc.pages, &streamsmd.PageDesc{
enc.pageHeaders = append(enc.pageHeaders, &streamsmd.PageDesc{
Info: &datasetmd.PageInfo{
UncompressedSize: uint32(page.Info.UncompressedSize),
CompressedSize: uint32(page.Info.CompressedSize),
Expand All @@ -204,14 +208,15 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error {
ValuesCount: uint32(page.Info.ValuesCount),
Encoding: page.Info.Encoding,

DataOffset: uint32(enc.startOffset + enc.data.Len()),
DataOffset: uint32(enc.startOffset + enc.totalPageSize),
DataSize: uint32(len(page.Data)),

Statistics: page.Info.Stats,
},
})

_, _ = enc.data.Write(page.Data) // bytes.Buffer.Write never fails.
enc.memPages = append(enc.memPages, page)
enc.totalPageSize += len(page.Data)
return nil
}

Expand All @@ -220,7 +225,7 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error {
func (enc *StreamsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) }

func (enc *StreamsColumnEncoder) metadata() proto.Message {
return &streamsmd.ColumnMetadata{Pages: enc.pages}
return &streamsmd.ColumnMetadata{Pages: enc.pageHeaders}
}

// Commit closes the column, flushing all data to the parent element. After
Expand All @@ -233,11 +238,18 @@ func (enc *StreamsColumnEncoder) Commit() error {

defer bytesBufferPool.Put(enc.data)

if len(enc.pages) == 0 {
if len(enc.pageHeaders) == 0 {
// No data was written; discard.
return enc.parent.append(nil, nil)
}

// Write all pages. To avoid costly reallocations, we grow our buffer to fit
// all data first.
enc.data.Grow(enc.totalPageSize)
for _, p := range enc.memPages {
_, _ = enc.data.Write(p.Data) // bytes.Buffer.Write never fails.
}

metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer)
metadataBuffer.Reset()
defer bytesBufferPool.Put(metadataBuffer)
Expand Down

0 comments on commit 2a60011

Please sign in to comment.