Skip to content

Commit

Permalink
feat: implemented logic to convert WalSegmentWriter to a reader that …
Browse files Browse the repository at this point in the history
…implements io.ReadSeekCloser that satisfies the type required by ObjectClient.PutObject() function.

Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko committed Jun 27, 2024
1 parent 5ae5b31 commit e6cace4
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 28 deletions.
52 changes: 47 additions & 5 deletions pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/wal/chunks"
"github.com/grafana/loki/v3/pkg/storage/wal/index"
"github.com/grafana/loki/v3/pkg/util/encoding"
"github.com/grafana/loki/v3/pkg/util/pool"
)

// LOKW is the magic number for the Loki WAL format.
Expand All @@ -32,6 +33,8 @@ var (
}
},
}
// 512kb - 20 mb
encodedWalSegmentBufferPool = pool.NewBuffer(512*1024, 20*1024*1024, 2)
)

func init() {
Expand Down Expand Up @@ -60,6 +63,10 @@ func (s *streamSegment) Reset() {
s.entries = s.entries[:0]
}

func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
return chunks.WriteChunk(w, s.entries, chunks.EncodingSnappy)
}

// NewWalSegmentWriter creates a new WalSegmentWriter.
func NewWalSegmentWriter() (*SegmentWriter, error) {
idxWriter, err := index.NewWriter()
Expand Down Expand Up @@ -212,6 +219,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
// write index len 4b
b.buf1.PutBE32int(n)
n, err = w.Write(b.buf1.Get())
b.buf1.Reset()
if err != nil {
return total, err
}
Expand All @@ -234,10 +242,6 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
return total, nil
}

func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
return chunks.WriteChunk(w, s.entries, chunks.EncodingSnappy)
}

// Reset clears the writer.
// After calling Reset, the writer can be reused.
func (b *SegmentWriter) Reset() {
Expand All @@ -246,10 +250,48 @@ func (b *SegmentWriter) Reset() {
streamSegmentPool.Put(s)
}
b.streams = make(map[streamID]*streamSegment, 64)
b.buf1.Reset()
b.inputSize = 0
}

func (b *SegmentWriter) ToReader() (io.ReadSeekCloser, error) {
// snappy compression rate is ~5x , but we can not predict it, so we need to allocate bigger buffer to avoid allocations
buffer := encodedWalSegmentBufferPool.Get(int(b.inputSize / 3))
_, err := b.WriteTo(buffer)
if err != nil {
return nil, fmt.Errorf("failed to write segment to create a reader: %w", err)
}
return NewEncodedSegmentReader(buffer), nil
}

var (
_ io.ReadSeekCloser = &EncodedSegmentReader{}
)

type EncodedSegmentReader struct {
delegate io.ReadSeeker
encodedContent *bytes.Buffer
}

func NewEncodedSegmentReader(encodedContent *bytes.Buffer) *EncodedSegmentReader {
return &EncodedSegmentReader{
encodedContent: encodedContent,
delegate: bytes.NewReader(encodedContent.Bytes()),
}
}

func (e EncodedSegmentReader) Read(p []byte) (n int, err error) {
return e.delegate.Read(p)
}

func (e EncodedSegmentReader) Seek(offset int64, whence int) (int64, error) {
return e.delegate.Seek(offset, whence)
}

func (e EncodedSegmentReader) Close() error {
encodedWalSegmentBufferPool.Put(e.encodedContent)
return nil
}

// InputSize returns the total size of the input data written to the writer.
// It doesn't account for timestamps and labels.
func (b *SegmentWriter) InputSize() int64 {
Expand Down
53 changes: 30 additions & 23 deletions pkg/storage/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -333,30 +332,38 @@ func BenchmarkWrites(b *testing.B) {

dst := bytes.NewBuffer(make([]byte, 0, inputSize))

pool := sync.Pool{
New: func() interface{} {
writer, err := NewWalSegmentWriter()
if err != nil {
panic(err)
}
return writer
},
}
writer, err := NewWalSegmentWriter()
require.NoError(b, err)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
writer := pool.Get().(*SegmentWriter)
for _, d := range data {
writer.Append(d.tenant, d.labels, d.lbls, d.entries)
}

dst.Reset()
writer.Reset()
encodedLength, err := writer.WriteTo(dst)
require.NoError(b, err)

b.Run("WriteTo", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
dst.Reset()
n, err := writer.WriteTo(dst)
require.NoError(b, err)
require.EqualValues(b, encodedLength, n)
}
})

for _, d := range data {
writer.Append(d.tenant, d.labels, d.lbls, d.entries)
bytesBuf := make([]byte, inputSize)
b.Run("Reader", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
reader, err := writer.ToReader()
require.NoError(b, err)

n, err := reader.Read(bytesBuf)

Check failure on line 364 in pkg/storage/wal/segment_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

ineffectual assignment to err (ineffassign)
require.EqualValues(b, encodedLength, n)
require.NoError(b, reader.Close())
}
n, err := writer.WriteTo(dst)
require.NoError(b, err)
require.True(b, n > 0)
pool.Put(writer)
}
})
}

0 comments on commit e6cace4

Please sign in to comment.