-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(dataobj): data object encoding and decoding
This commit introduces the encoding package with utilities for writing and reading a data object. This initial commit includes a single section called "streams". The streams section holds a list of streams for which logs are available in the data object file. This does not hold the logs themselves, but rather just the stream labels themselves with an ID. Encoding -------- Encoding presents a hierarchical API to match the file structure: 1. Callers open an encoder 2. Callers open a streams section from the encoder 3. Callers open a column from the streams section 4. Callers append a page into the column Child elements of the hierarchy have a Commit method to flush their written data and metadata to their parent. Each element of the hierarchy exposes its current MetadataSize. Callers should use MetadataSize to control the size of an element. For example, if Encoder.MetadataSize goes past a limit, callers should stop appending new sections to the file and flush the file to disk. To support discarding data after reaching a size limit, each child element of the hierarchy also has a Discard method. Decoding -------- Decoding separates each section into a different Decoder interface to more cleanly separate the APIs. The initial Decoder is for ReadSeekers, but later implementations will include object storage and caching. The Decoder interfaces are designed for batch reading, so that callers can retrieve multiple columns or pages at once. Implementations can then use this to reduce the number of roundtrips (such as retrieving mulitple cache keys in a single cache request). encoding.StreamsDataset converts an instance of a StreamDecoder into a dataset.Dataset, allowing to use the existing dataset utility functions without downloading an entire dataset.
- Loading branch information
Showing
16 changed files
with
4,377 additions
and
105 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
package encoding | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/result" | ||
) | ||
|
||
// StreamsDataset implements returns a [dataset.Dataset] from a | ||
// [StreamsDecoder] for the given section. | ||
func StreamsDataset(dec StreamsDecoder, sec *filemd.SectionInfo) dataset.Dataset { | ||
return &streamsDataset{dec: dec, sec: sec} | ||
} | ||
|
||
type streamsDataset struct { | ||
dec StreamsDecoder | ||
sec *filemd.SectionInfo | ||
} | ||
|
||
func (ds *streamsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] { | ||
return result.Iter(func(yield func(dataset.Column) bool) error { | ||
columns, err := ds.dec.Columns(ctx, ds.sec) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, column := range columns { | ||
if !yield(&streamsDatasetColumn{dec: ds.dec, desc: column}) { | ||
return nil | ||
} | ||
} | ||
|
||
return err | ||
}) | ||
|
||
} | ||
|
||
func (ds *streamsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] { | ||
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column. | ||
return result.Iter(func(yield func(dataset.Pages) bool) error { | ||
for _, column := range columns { | ||
pages, err := result.Collect(column.ListPages(ctx)) | ||
if err != nil { | ||
return err | ||
} else if !yield(pages) { | ||
return nil | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
func (ds *streamsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] { | ||
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page. | ||
return result.Iter(func(yield func(dataset.PageData) bool) error { | ||
for _, page := range pages { | ||
data, err := page.ReadPage(ctx) | ||
if err != nil { | ||
return err | ||
} else if !yield(data) { | ||
return nil | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
type streamsDatasetColumn struct { | ||
dec StreamsDecoder | ||
desc *streamsmd.ColumnDesc | ||
|
||
info *dataset.ColumnInfo | ||
} | ||
|
||
func (col *streamsDatasetColumn) ColumnInfo() *dataset.ColumnInfo { | ||
if col.info != nil { | ||
return col.info | ||
} | ||
|
||
col.info = &dataset.ColumnInfo{ | ||
Name: col.desc.Info.Name, | ||
Type: col.desc.Info.ValueType, | ||
Compression: col.desc.Info.Compression, | ||
|
||
RowsCount: int(col.desc.Info.RowsCount), | ||
CompressedSize: int(col.desc.Info.CompressedSize), | ||
UncompressedSize: int(col.desc.Info.UncompressedSize), | ||
|
||
Statistics: col.desc.Info.Statistics, | ||
} | ||
return col.info | ||
} | ||
|
||
func (col *streamsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] { | ||
return result.Iter(func(yield func(dataset.Page) bool) error { | ||
pageSets, err := result.Collect(col.dec.Pages(ctx, []*streamsmd.ColumnDesc{col.desc})) | ||
if err != nil { | ||
return err | ||
} else if len(pageSets) != 1 { | ||
return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets)) | ||
} | ||
|
||
for _, page := range pageSets[0] { | ||
if !yield(&streamsDatasetPage{dec: col.dec, desc: page}) { | ||
return nil | ||
} | ||
} | ||
|
||
return nil | ||
}) | ||
} | ||
|
||
type streamsDatasetPage struct { | ||
dec StreamsDecoder | ||
desc *streamsmd.PageDesc | ||
|
||
info *dataset.PageInfo | ||
} | ||
|
||
func (p *streamsDatasetPage) PageInfo() *dataset.PageInfo { | ||
if p.info != nil { | ||
return p.info | ||
} | ||
|
||
p.info = &dataset.PageInfo{ | ||
UncompressedSize: int(p.desc.Info.UncompressedSize), | ||
CompressedSize: int(p.desc.Info.CompressedSize), | ||
CRC32: p.desc.Info.Crc32, | ||
RowCount: int(p.desc.Info.RowsCount), | ||
|
||
Encoding: p.desc.Info.Encoding, | ||
Stats: p.desc.Info.Statistics, | ||
} | ||
return p.info | ||
} | ||
|
||
func (p *streamsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) { | ||
pages, err := result.Collect(p.dec.ReadPages(ctx, []*streamsmd.PageDesc{p.desc})) | ||
if err != nil { | ||
return nil, err | ||
} else if len(pages) != 1 { | ||
return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages)) | ||
} | ||
|
||
return pages[0], nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package encoding | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/result" | ||
) | ||
|
||
// Decoders. To cleanly separate the APIs per section, section-specific | ||
// Decoders should be created and returned by the top-level [Decoder] | ||
// interface. | ||
type ( | ||
// A Decoder decodes a data object. | ||
Decoder interface { | ||
// Sections returns the list of sections within a data object. | ||
Sections(ctx context.Context) ([]*filemd.SectionInfo, error) | ||
|
||
// StreamsDecoder returns a decoder for streams sections. | ||
StreamsDecoder() StreamsDecoder | ||
} | ||
|
||
// StreamsDecoder supports decoding data within a streams section. | ||
StreamsDecoder interface { | ||
// Columns describes the set of columns in the provided section. | ||
Columns(ctx context.Context, section *filemd.SectionInfo) ([]*streamsmd.ColumnDesc, error) | ||
|
||
// Pages retrieves the set of pages for the provided columns. The order of | ||
// page lists emitted by the sequence matches the order of columns | ||
// provided: the first page list corresponds to the first column, and so | ||
// on. | ||
Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) result.Seq[[]*streamsmd.PageDesc] | ||
|
||
// ReadPages reads the provided set of pages, iterating over their data | ||
// matching the argument order. If an error is encountered while retrieving | ||
// pages, an error is emitted and iteration stops. | ||
ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData] | ||
} | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package encoding | ||
|
||
import ( | ||
"bytes" | ||
"encoding/binary" | ||
"fmt" | ||
"io" | ||
|
||
"github.com/gogo/protobuf/proto" | ||
|
||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" | ||
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" | ||
) | ||
|
||
// decode* methods for metadata shared by Decoder implementations. | ||
|
||
// decodeFileMetadata decodes file metadata from r. | ||
func decodeFileMetadata(r streamio.Reader) (*filemd.Metadata, error) { | ||
gotVersion, err := streamio.ReadUvarint(r) | ||
if err != nil { | ||
return nil, fmt.Errorf("read file format version: %w", err) | ||
} else if gotVersion != fileFormatVersion { | ||
return nil, fmt.Errorf("unexpected file format version: got=%d want=%d", gotVersion, fileFormatVersion) | ||
} | ||
|
||
var md filemd.Metadata | ||
if err := decodeProto(r, &md); err != nil { | ||
return nil, fmt.Errorf("file metadata: %w", err) | ||
} | ||
return &md, nil | ||
} | ||
|
||
// decodeStreamsMetadata decodes stream section metadta from r. | ||
func decodeStreamsMetadata(r streamio.Reader) (*streamsmd.Metadata, error) { | ||
gotVersion, err := streamio.ReadUvarint(r) | ||
if err != nil { | ||
return nil, fmt.Errorf("read streams section format version: %w", err) | ||
} else if gotVersion != streamsFormatVersion { | ||
return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion) | ||
} | ||
|
||
var md streamsmd.Metadata | ||
if err := decodeProto(r, &md); err != nil { | ||
return nil, fmt.Errorf("streams section metadata: %w", err) | ||
} | ||
return &md, nil | ||
} | ||
|
||
// decodeStreamsColumnMetadata decodes stream column metadata from r. | ||
func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata, error) { | ||
var metadata streamsmd.ColumnMetadata | ||
if err := decodeProto(r, &metadata); err != nil { | ||
return nil, fmt.Errorf("streams column metadata: %w", err) | ||
} | ||
return &metadata, nil | ||
} | ||
|
||
// decodeProto decodes a proto message from r and stores it in pb. Proto | ||
// messages are expected to be encoded with their size, followed by the proto | ||
// bytes. | ||
func decodeProto(r streamio.Reader, pb proto.Message) error { | ||
size, err := binary.ReadUvarint(r) | ||
if err != nil { | ||
return fmt.Errorf("read proto message size: %w", err) | ||
} | ||
|
||
buf := bytesBufferPool.Get().(*bytes.Buffer) | ||
buf.Reset() | ||
defer bytesBufferPool.Put(buf) | ||
|
||
n, err := io.Copy(buf, io.LimitReader(r, int64(size))) | ||
if err != nil { | ||
return fmt.Errorf("read proto message: %w", err) | ||
} else if uint64(n) != size { | ||
return fmt.Errorf("read proto message: got=%d want=%d", n, size) | ||
} | ||
|
||
if err := proto.Unmarshal(buf.Bytes(), pb); err != nil { | ||
return fmt.Errorf("unmarshal proto message: %w", err) | ||
} | ||
return nil | ||
} |
Oops, something went wrong.