From e6cace42103490d5fac06de17e99b9e8d856596c Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 27 Jun 2024 12:56:21 +0300 Subject: [PATCH 1/3] feat: implemented logic to convert WalSegmentWriter to a reader that implements io.ReadSeekCloser that satisfies the type required by ObjectClient.PutObject() function. Signed-off-by: Vladyslav Diachenko --- pkg/storage/wal/segment.go | 52 ++++++++++++++++++++++++++++---- pkg/storage/wal/segment_test.go | 53 +++++++++++++++++++-------------- 2 files changed, 77 insertions(+), 28 deletions(-) diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 8ced5da1cef45..9f65e563d652c 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/wal/chunks" "github.com/grafana/loki/v3/pkg/storage/wal/index" "github.com/grafana/loki/v3/pkg/util/encoding" + "github.com/grafana/loki/v3/pkg/util/pool" ) // LOKW is the magic number for the Loki WAL format. @@ -32,6 +33,8 @@ var ( } }, } + // 512kb - 20 mb + encodedWalSegmentBufferPool = pool.NewBuffer(512*1024, 20*1024*1024, 2) ) func init() { @@ -60,6 +63,10 @@ func (s *streamSegment) Reset() { s.entries = s.entries[:0] } +func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) { + return chunks.WriteChunk(w, s.entries, chunks.EncodingSnappy) +} + // NewWalSegmentWriter creates a new WalSegmentWriter. func NewWalSegmentWriter() (*SegmentWriter, error) { idxWriter, err := index.NewWriter() @@ -212,6 +219,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) { // write index len 4b b.buf1.PutBE32int(n) n, err = w.Write(b.buf1.Get()) + b.buf1.Reset() if err != nil { return total, err } @@ -234,10 +242,6 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) { return total, nil } -func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) { - return chunks.WriteChunk(w, s.entries, chunks.EncodingSnappy) -} - // Reset clears the writer. // After calling Reset, the writer can be reused. func (b *SegmentWriter) Reset() { @@ -246,10 +250,48 @@ func (b *SegmentWriter) Reset() { streamSegmentPool.Put(s) } b.streams = make(map[streamID]*streamSegment, 64) - b.buf1.Reset() b.inputSize = 0 } +func (b *SegmentWriter) ToReader() (io.ReadSeekCloser, error) { + // snappy compression rate is ~5x , but we can not predict it, so we need to allocate bigger buffer to avoid allocations + buffer := encodedWalSegmentBufferPool.Get(int(b.inputSize / 3)) + _, err := b.WriteTo(buffer) + if err != nil { + return nil, fmt.Errorf("failed to write segment to create a reader: %w", err) + } + return NewEncodedSegmentReader(buffer), nil +} + +var ( + _ io.ReadSeekCloser = &EncodedSegmentReader{} +) + +type EncodedSegmentReader struct { + delegate io.ReadSeeker + encodedContent *bytes.Buffer +} + +func NewEncodedSegmentReader(encodedContent *bytes.Buffer) *EncodedSegmentReader { + return &EncodedSegmentReader{ + encodedContent: encodedContent, + delegate: bytes.NewReader(encodedContent.Bytes()), + } +} + +func (e EncodedSegmentReader) Read(p []byte) (n int, err error) { + return e.delegate.Read(p) +} + +func (e EncodedSegmentReader) Seek(offset int64, whence int) (int64, error) { + return e.delegate.Seek(offset, whence) +} + +func (e EncodedSegmentReader) Close() error { + encodedWalSegmentBufferPool.Put(e.encodedContent) + return nil +} + // InputSize returns the total size of the input data written to the writer. // It doesn't account for timestamps and labels. func (b *SegmentWriter) InputSize() int64 { diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index ddcc7afc16b3e..d17893ef2b656 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "sort" - "sync" "testing" "time" @@ -333,30 +332,38 @@ func BenchmarkWrites(b *testing.B) { dst := bytes.NewBuffer(make([]byte, 0, inputSize)) - pool := sync.Pool{ - New: func() interface{} { - writer, err := NewWalSegmentWriter() - if err != nil { - panic(err) - } - return writer - }, - } + writer, err := NewWalSegmentWriter() + require.NoError(b, err) - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - writer := pool.Get().(*SegmentWriter) + for _, d := range data { + writer.Append(d.tenant, d.labels, d.lbls, d.entries) + } - dst.Reset() - writer.Reset() + encodedLength, err := writer.WriteTo(dst) + require.NoError(b, err) + + b.Run("WriteTo", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + dst.Reset() + n, err := writer.WriteTo(dst) + require.NoError(b, err) + require.EqualValues(b, encodedLength, n) + } + }) - for _, d := range data { - writer.Append(d.tenant, d.labels, d.lbls, d.entries) + bytesBuf := make([]byte, inputSize) + b.Run("Reader", func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + reader, err := writer.ToReader() + require.NoError(b, err) + + n, err := reader.Read(bytesBuf) + require.EqualValues(b, encodedLength, n) + require.NoError(b, reader.Close()) } - n, err := writer.WriteTo(dst) - require.NoError(b, err) - require.True(b, n > 0) - pool.Put(writer) - } + }) } From 8d094f10c311245acfc1b9784885a40873b97176 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 27 Jun 2024 14:05:35 +0300 Subject: [PATCH 2/3] fixed linter Signed-off-by: Vladyslav Diachenko --- pkg/storage/wal/segment_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index d17893ef2b656..db0e9959ebebe 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -358,10 +358,12 @@ func BenchmarkWrites(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { + var err error reader, err := writer.ToReader() require.NoError(b, err) n, err := reader.Read(bytesBuf) + require.NoError(b, err) require.EqualValues(b, encodedLength, n) require.NoError(b, reader.Close()) } From 24039daa879fbb788827ef10f3cb8bafa6a53536 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 27 Jun 2024 14:09:55 +0300 Subject: [PATCH 3/3] remove values on close() and fixed function receiver's type Signed-off-by: Vladyslav Diachenko --- pkg/storage/wal/segment.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 9f65e563d652c..3e4fb0c2fa302 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -279,16 +279,18 @@ func NewEncodedSegmentReader(encodedContent *bytes.Buffer) *EncodedSegmentReader } } -func (e EncodedSegmentReader) Read(p []byte) (n int, err error) { +func (e *EncodedSegmentReader) Read(p []byte) (n int, err error) { return e.delegate.Read(p) } -func (e EncodedSegmentReader) Seek(offset int64, whence int) (int64, error) { +func (e *EncodedSegmentReader) Seek(offset int64, whence int) (int64, error) { return e.delegate.Seek(offset, whence) } -func (e EncodedSegmentReader) Close() error { +func (e *EncodedSegmentReader) Close() error { encodedWalSegmentBufferPool.Put(e.encodedContent) + e.encodedContent = nil + e.delegate = nil return nil }