diff --git a/pkg/dataobj/internal/encoding/encoder.go b/pkg/dataobj/internal/encoding/encoder.go index d939b236fddac..52b99944f90b5 100644 --- a/pkg/dataobj/internal/encoding/encoder.go +++ b/pkg/dataobj/internal/encoding/encoder.go @@ -162,6 +162,7 @@ func (enc *Encoder) append(data, metadata []byte) error { enc.curSection.MetadataSize = uint32(len(metadata)) // bytes.Buffer.Write never fails. + enc.data.Grow(len(data) + len(metadata)) _, _ = enc.data.Write(data) _, _ = enc.data.Write(metadata) diff --git a/pkg/dataobj/internal/encoding/encoder_logs.go b/pkg/dataobj/internal/encoding/encoder_logs.go index 6e7cc9d026aac..13f558ba01705 100644 --- a/pkg/dataobj/internal/encoding/encoder_logs.go +++ b/pkg/dataobj/internal/encoding/encoder_logs.go @@ -153,6 +153,7 @@ func (enc *LogsEncoder) append(data, metadata []byte) error { enc.curColumn.Info.MetadataSize = uint32(len(metadata)) // bytes.Buffer.Write never fails. + enc.data.Grow(len(data) + len(metadata)) _, _ = enc.data.Write(data) _, _ = enc.data.Write(metadata) @@ -169,8 +170,11 @@ type LogsColumnEncoder struct { startOffset int // Byte offset in the file where the column starts. closed bool // true if LogsColumnEncoder has been closed. - data *bytes.Buffer // All page data. - pages []*logsmd.PageDesc + data *bytes.Buffer // All page data. + pageHeaders []*logsmd.PageDesc + + memPages []*dataset.MemPage // Pages to write. + totalPageSize int // Size of bytes across all pages. } func newLogsColumnEncoder(parent *LogsEncoder, offset int) *LogsColumnEncoder { @@ -195,7 +199,7 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error { // It's possible the caller can pass an incorrect value for UncompressedSize // and CompressedSize, but those fields are purely for stats so we don't // check it. - enc.pages = append(enc.pages, &logsmd.PageDesc{ + enc.pageHeaders = append(enc.pageHeaders, &logsmd.PageDesc{ Info: &datasetmd.PageInfo{ UncompressedSize: uint32(page.Info.UncompressedSize), CompressedSize: uint32(page.Info.CompressedSize), @@ -204,14 +208,15 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error { ValuesCount: uint32(page.Info.ValuesCount), Encoding: page.Info.Encoding, - DataOffset: uint32(enc.startOffset + enc.data.Len()), + DataOffset: uint32(enc.startOffset + enc.totalPageSize), DataSize: uint32(len(page.Data)), Statistics: page.Info.Stats, }, }) - _, _ = enc.data.Write(page.Data) // bytes.Buffer.Write never fails. + enc.memPages = append(enc.memPages, page) + enc.totalPageSize += len(page.Data) return nil } @@ -220,7 +225,7 @@ func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error { func (enc *LogsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) } func (enc *LogsColumnEncoder) metadata() proto.Message { - return &logsmd.ColumnMetadata{Pages: enc.pages} + return &logsmd.ColumnMetadata{Pages: enc.pageHeaders} } // Commit closes the column, flushing all data to the parent element. After @@ -233,11 +238,18 @@ func (enc *LogsColumnEncoder) Commit() error { defer bytesBufferPool.Put(enc.data) - if len(enc.pages) == 0 { + if len(enc.pageHeaders) == 0 { // No data was written; discard. return enc.parent.append(nil, nil) } + // Write all pages. To avoid costly reallocations, we grow our buffer to fit + // all data first. + enc.data.Grow(enc.totalPageSize) + for _, p := range enc.memPages { + _, _ = enc.data.Write(p.Data) // bytes.Buffer.Write never fails. + } + metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer) metadataBuffer.Reset() defer bytesBufferPool.Put(metadataBuffer) @@ -245,6 +257,7 @@ func (enc *LogsColumnEncoder) Commit() error { if err := elementMetadataWrite(enc, metadataBuffer); err != nil { return err } + return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes()) } diff --git a/pkg/dataobj/internal/encoding/encoder_streams.go b/pkg/dataobj/internal/encoding/encoder_streams.go index 2f58984b5abf2..4592a14e793ab 100644 --- a/pkg/dataobj/internal/encoding/encoder_streams.go +++ b/pkg/dataobj/internal/encoding/encoder_streams.go @@ -153,6 +153,7 @@ func (enc *StreamsEncoder) append(data, metadata []byte) error { enc.curColumn.Info.MetadataSize = uint32(len(metadata)) // bytes.Buffer.Write never fails. + enc.data.Grow(len(data) + len(metadata)) _, _ = enc.data.Write(data) _, _ = enc.data.Write(metadata) @@ -169,8 +170,11 @@ type StreamsColumnEncoder struct { startOffset int // Byte offset in the file where the column starts. closed bool // true if StreamsColumnEncoder has been closed. - data *bytes.Buffer // All page data. - pages []*streamsmd.PageDesc + data *bytes.Buffer // All page data. + pageHeaders []*streamsmd.PageDesc + + memPages []*dataset.MemPage // Pages to write. + totalPageSize int // Total size of all pages. } func newStreamsColumnEncoder(parent *StreamsEncoder, offset int) *StreamsColumnEncoder { @@ -195,7 +199,7 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error { // It's possible the caller can pass an incorrect value for UncompressedSize // and CompressedSize, but those fields are purely for stats so we don't // check it. - enc.pages = append(enc.pages, &streamsmd.PageDesc{ + enc.pageHeaders = append(enc.pageHeaders, &streamsmd.PageDesc{ Info: &datasetmd.PageInfo{ UncompressedSize: uint32(page.Info.UncompressedSize), CompressedSize: uint32(page.Info.CompressedSize), @@ -204,14 +208,15 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error { ValuesCount: uint32(page.Info.ValuesCount), Encoding: page.Info.Encoding, - DataOffset: uint32(enc.startOffset + enc.data.Len()), + DataOffset: uint32(enc.startOffset + enc.totalPageSize), DataSize: uint32(len(page.Data)), Statistics: page.Info.Stats, }, }) - _, _ = enc.data.Write(page.Data) // bytes.Buffer.Write never fails. + enc.memPages = append(enc.memPages, page) + enc.totalPageSize += len(page.Data) return nil } @@ -220,7 +225,7 @@ func (enc *StreamsColumnEncoder) AppendPage(page *dataset.MemPage) error { func (enc *StreamsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) } func (enc *StreamsColumnEncoder) metadata() proto.Message { - return &streamsmd.ColumnMetadata{Pages: enc.pages} + return &streamsmd.ColumnMetadata{Pages: enc.pageHeaders} } // Commit closes the column, flushing all data to the parent element. After @@ -233,11 +238,18 @@ func (enc *StreamsColumnEncoder) Commit() error { defer bytesBufferPool.Put(enc.data) - if len(enc.pages) == 0 { + if len(enc.pageHeaders) == 0 { // No data was written; discard. return enc.parent.append(nil, nil) } + // Write all pages. To avoid costly reallocations, we grow our buffer to fit + // all data first. + enc.data.Grow(enc.totalPageSize) + for _, p := range enc.memPages { + _, _ = enc.data.Write(p.Data) // bytes.Buffer.Write never fails. + } + metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer) metadataBuffer.Reset() defer bytesBufferPool.Put(metadataBuffer)