Skip to content

Commit

Permalink
chore(dataobj): data object encoding and decoding (#15676)
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto authored Jan 13, 2025
1 parent a96a695 commit 896e138
Show file tree
Hide file tree
Showing 16 changed files with 4,377 additions and 105 deletions.
152 changes: 152 additions & 0 deletions pkg/dataobj/internal/encoding/dataset_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package encoding

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// StreamsDataset implements returns a [dataset.Dataset] from a
// [StreamsDecoder] for the given section.
func StreamsDataset(dec StreamsDecoder, sec *filemd.SectionInfo) dataset.Dataset {
return &streamsDataset{dec: dec, sec: sec}
}

type streamsDataset struct {
dec StreamsDecoder
sec *filemd.SectionInfo
}

func (ds *streamsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] {
return result.Iter(func(yield func(dataset.Column) bool) error {
columns, err := ds.dec.Columns(ctx, ds.sec)
if err != nil {
return err
}

for _, column := range columns {
if !yield(&streamsDatasetColumn{dec: ds.dec, desc: column}) {
return nil
}
}

return err
})

}

func (ds *streamsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, column := range columns {
pages, err := result.Collect(column.ListPages(ctx))
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (ds *streamsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := page.ReadPage(ctx)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}

type streamsDatasetColumn struct {
dec StreamsDecoder
desc *streamsmd.ColumnDesc

info *dataset.ColumnInfo
}

func (col *streamsDatasetColumn) ColumnInfo() *dataset.ColumnInfo {
if col.info != nil {
return col.info
}

col.info = &dataset.ColumnInfo{
Name: col.desc.Info.Name,
Type: col.desc.Info.ValueType,
Compression: col.desc.Info.Compression,

RowsCount: int(col.desc.Info.RowsCount),
CompressedSize: int(col.desc.Info.CompressedSize),
UncompressedSize: int(col.desc.Info.UncompressedSize),

Statistics: col.desc.Info.Statistics,
}
return col.info
}

func (col *streamsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] {
return result.Iter(func(yield func(dataset.Page) bool) error {
pageSets, err := result.Collect(col.dec.Pages(ctx, []*streamsmd.ColumnDesc{col.desc}))
if err != nil {
return err
} else if len(pageSets) != 1 {
return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets))
}

for _, page := range pageSets[0] {
if !yield(&streamsDatasetPage{dec: col.dec, desc: page}) {
return nil
}
}

return nil
})
}

type streamsDatasetPage struct {
dec StreamsDecoder
desc *streamsmd.PageDesc

info *dataset.PageInfo
}

func (p *streamsDatasetPage) PageInfo() *dataset.PageInfo {
if p.info != nil {
return p.info
}

p.info = &dataset.PageInfo{
UncompressedSize: int(p.desc.Info.UncompressedSize),
CompressedSize: int(p.desc.Info.CompressedSize),
CRC32: p.desc.Info.Crc32,
RowCount: int(p.desc.Info.RowsCount),

Encoding: p.desc.Info.Encoding,
Stats: p.desc.Info.Statistics,
}
return p.info
}

func (p *streamsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) {
pages, err := result.Collect(p.dec.ReadPages(ctx, []*streamsmd.PageDesc{p.desc}))
if err != nil {
return nil, err
} else if len(pages) != 1 {
return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages))
}

return pages[0], nil
}
41 changes: 41 additions & 0 deletions pkg/dataobj/internal/encoding/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package encoding

import (
"context"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// Decoders. To cleanly separate the APIs per section, section-specific
// Decoders should be created and returned by the top-level [Decoder]
// interface.
type (
// A Decoder decodes a data object.
Decoder interface {
// Sections returns the list of sections within a data object.
Sections(ctx context.Context) ([]*filemd.SectionInfo, error)

// StreamsDecoder returns a decoder for streams sections.
StreamsDecoder() StreamsDecoder
}

// StreamsDecoder supports decoding data within a streams section.
StreamsDecoder interface {
// Columns describes the set of columns in the provided section.
Columns(ctx context.Context, section *filemd.SectionInfo) ([]*streamsmd.ColumnDesc, error)

// Pages retrieves the set of pages for the provided columns. The order of
// page lists emitted by the sequence matches the order of columns
// provided: the first page list corresponds to the first column, and so
// on.
Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) result.Seq[[]*streamsmd.PageDesc]

// ReadPages reads the provided set of pages, iterating over their data
// matching the argument order. If an error is encountered while retrieving
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData]
}
)
83 changes: 83 additions & 0 deletions pkg/dataobj/internal/encoding/decoder_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package encoding

import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/gogo/protobuf/proto"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)

// decode* methods for metadata shared by Decoder implementations.

// decodeFileMetadata decodes file metadata from r.
func decodeFileMetadata(r streamio.Reader) (*filemd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("read file format version: %w", err)
} else if gotVersion != fileFormatVersion {
return nil, fmt.Errorf("unexpected file format version: got=%d want=%d", gotVersion, fileFormatVersion)
}

var md filemd.Metadata
if err := decodeProto(r, &md); err != nil {
return nil, fmt.Errorf("file metadata: %w", err)
}
return &md, nil
}

// decodeStreamsMetadata decodes stream section metadta from r.
func decodeStreamsMetadata(r streamio.Reader) (*streamsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("read streams section format version: %w", err)
} else if gotVersion != streamsFormatVersion {
return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion)
}

var md streamsmd.Metadata
if err := decodeProto(r, &md); err != nil {
return nil, fmt.Errorf("streams section metadata: %w", err)
}
return &md, nil
}

// decodeStreamsColumnMetadata decodes stream column metadata from r.
func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata, error) {
var metadata streamsmd.ColumnMetadata
if err := decodeProto(r, &metadata); err != nil {
return nil, fmt.Errorf("streams column metadata: %w", err)
}
return &metadata, nil
}

// decodeProto decodes a proto message from r and stores it in pb. Proto
// messages are expected to be encoded with their size, followed by the proto
// bytes.
func decodeProto(r streamio.Reader, pb proto.Message) error {
size, err := binary.ReadUvarint(r)
if err != nil {
return fmt.Errorf("read proto message size: %w", err)
}

buf := bytesBufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer bytesBufferPool.Put(buf)

n, err := io.Copy(buf, io.LimitReader(r, int64(size)))
if err != nil {
return fmt.Errorf("read proto message: %w", err)
} else if uint64(n) != size {
return fmt.Errorf("read proto message: got=%d want=%d", n, size)
}

if err := proto.Unmarshal(buf.Bytes(), pb); err != nil {
return fmt.Errorf("unmarshal proto message: %w", err)
}
return nil
}
Loading

0 comments on commit 896e138

Please sign in to comment.