Skip to content

Commit

Permalink
chore(dataobj): logs section metadata and encoding/decoding
Browse files Browse the repository at this point in the history
This PR introduces metadata for a "logs section," which is intended to
hold a sequence of log records across one or more streams.

The code is a near-identical copy of grafana#15676. Future work is needed to
identify if dataset sections can have their encoding, decoding, and
dataset implementations deduplicated.
  • Loading branch information
rfratto committed Jan 13, 2025
1 parent e838ee3 commit 316dab8
Show file tree
Hide file tree
Showing 12 changed files with 1,954 additions and 13 deletions.
152 changes: 152 additions & 0 deletions pkg/dataobj/internal/encoding/dataset_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package encoding

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// LogsDataset implements returns a [dataset.Dataset] from a [LogsDecoder] for
// the given section.
func LogsDataset(dec LogsDecoder, sec *filemd.SectionInfo) dataset.Dataset {
return &logsDataset{dec: dec, sec: sec}
}

type logsDataset struct {
dec LogsDecoder
sec *filemd.SectionInfo
}

func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] {
return result.Iter(func(yield func(dataset.Column) bool) error {
columns, err := ds.dec.Columns(ctx, ds.sec)
if err != nil {
return err
}

for _, column := range columns {
if !yield(&logsDatasetColumn{dec: ds.dec, desc: column}) {
return nil
}
}

return err
})

}

func (ds *logsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, column := range columns {
pages, err := result.Collect(column.ListPages(ctx))
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (ds *logsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := page.ReadPage(ctx)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}

type logsDatasetColumn struct {
dec LogsDecoder
desc *logsmd.ColumnDesc

info *dataset.ColumnInfo
}

func (col *logsDatasetColumn) ColumnInfo() *dataset.ColumnInfo {
if col.info != nil {
return col.info
}

col.info = &dataset.ColumnInfo{
Name: col.desc.Info.Name,
Type: col.desc.Info.ValueType,
Compression: col.desc.Info.Compression,

RowsCount: int(col.desc.Info.RowsCount),
CompressedSize: int(col.desc.Info.CompressedSize),
UncompressedSize: int(col.desc.Info.UncompressedSize),

Statistics: col.desc.Info.Statistics,
}
return col.info
}

func (col *logsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] {
return result.Iter(func(yield func(dataset.Page) bool) error {
pageSets, err := result.Collect(col.dec.Pages(ctx, []*logsmd.ColumnDesc{col.desc}))
if err != nil {
return err
} else if len(pageSets) != 1 {
return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets))
}

for _, page := range pageSets[0] {
if !yield(&logsDatasetPage{dec: col.dec, desc: page}) {
return nil
}
}

return nil
})
}

type logsDatasetPage struct {
dec LogsDecoder
desc *logsmd.PageDesc

info *dataset.PageInfo
}

func (p *logsDatasetPage) PageInfo() *dataset.PageInfo {
if p.info != nil {
return p.info
}

p.info = &dataset.PageInfo{
UncompressedSize: int(p.desc.Info.UncompressedSize),
CompressedSize: int(p.desc.Info.CompressedSize),
CRC32: p.desc.Info.Crc32,
RowCount: int(p.desc.Info.RowsCount),

Encoding: p.desc.Info.Encoding,
Stats: p.desc.Info.Statistics,
}
return p.info
}

func (p *logsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) {
pages, err := result.Collect(p.dec.ReadPages(ctx, []*logsmd.PageDesc{p.desc}))
if err != nil {
return nil, err
} else if len(pages) != 1 {
return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages))
}

return pages[0], nil
}
21 changes: 21 additions & 0 deletions pkg/dataobj/internal/encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
Expand All @@ -20,6 +21,9 @@ type (

// StreamsDecoder returns a decoder for streams sections.
StreamsDecoder() StreamsDecoder

// LogsDecoder returns a decoder for logs sections.
LogsDecoder() LogsDecoder
}

// StreamsDecoder supports decoding data within a streams section.
Expand All @@ -38,4 +42,21 @@ type (
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData]
}

// LogsDecoder supports decoding data within a logs section.
LogsDecoder interface {
// Columns describes the set of columns in the provided section.
Columns(ctx context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error)

// Pages retrieves the set of pages for the provided columns. The order of
// page lists emitted by the sequence matches the order of columns
// provided: the first page list corresponds to the first column, and so
// on.
Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc]

// ReadPages reads the provided set of pages, iterating over their data
// matching the argument order. If an error is encountered while retrieving
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData]
}
)
28 changes: 27 additions & 1 deletion pkg/dataobj/internal/encoding/decoder_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gogo/protobuf/proto"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)
Expand All @@ -31,7 +32,7 @@ func decodeFileMetadata(r streamio.Reader) (*filemd.Metadata, error) {
return &md, nil
}

// decodeStreamsMetadata decodes stream section metadta from r.
// decodeStreamsMetadata decodes stream section metadata from r.
func decodeStreamsMetadata(r streamio.Reader) (*streamsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
Expand All @@ -56,6 +57,31 @@ func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata,
return &metadata, nil
}

// decodeLogsMetadata decodes logs section metadata from r.
func decodeLogsMetadata(r streamio.Reader) (*logsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("read streams section format version: %w", err)
} else if gotVersion != streamsFormatVersion {
return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion)
}

var md logsmd.Metadata
if err := decodeProto(r, &md); err != nil {
return nil, fmt.Errorf("streams section metadata: %w", err)
}
return &md, nil
}

// decodeLogsColumnMetadata decodes logs column metadata from r.
func decodeLogsColumnMetadata(r streamio.Reader) (*logsmd.ColumnMetadata, error) {
var metadata logsmd.ColumnMetadata
if err := decodeProto(r, &metadata); err != nil {
return nil, fmt.Errorf("streams column metadata: %w", err)
}
return &metadata, nil
}

// decodeProto decodes a proto message from r and stores it in pb. Proto
// messages are expected to be encoded with their size, followed by the proto
// bytes.
Expand Down
80 changes: 80 additions & 0 deletions pkg/dataobj/internal/encoding/decoder_readseeker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
Expand Down Expand Up @@ -47,6 +48,10 @@ func (dec *readSeekerDecoder) StreamsDecoder() StreamsDecoder {
return &readSeekerStreamsDecoder{rs: dec.rs}
}

func (dec *readSeekerDecoder) LogsDecoder() LogsDecoder {
return &readSeekerLogsDecoder{rs: dec.rs}
}

type readSeekerStreamsDecoder struct {
rs io.ReadSeeker
}
Expand Down Expand Up @@ -121,3 +126,78 @@ func (dec *readSeekerStreamsDecoder) ReadPages(ctx context.Context, pages []*str
return nil
})
}

type readSeekerLogsDecoder struct {
rs io.ReadSeeker
}

func (dec *readSeekerLogsDecoder) Columns(_ context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error) {
if section.Type != filemd.SECTION_TYPE_LOGS {
return nil, fmt.Errorf("unexpected section type: got=%d want=%d", section.Type, filemd.SECTION_TYPE_LOGS)
}

if _, err := dec.rs.Seek(int64(section.MetadataOffset), io.SeekStart); err != nil {
return nil, fmt.Errorf("seek to streams metadata: %w", err)
}
r := bufio.NewReader(io.LimitReader(dec.rs, int64(section.MetadataSize)))

md, err := decodeLogsMetadata(r)
if err != nil {
return nil, err
}
return md.Columns, nil
}

func (dec *readSeekerLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] {
getPages := func(_ context.Context, column *logsmd.ColumnDesc) ([]*logsmd.PageDesc, error) {
if _, err := dec.rs.Seek(int64(column.Info.MetadataOffset), io.SeekStart); err != nil {
return nil, fmt.Errorf("seek to column metadata: %w", err)
}
r := bufio.NewReader(io.LimitReader(dec.rs, int64(column.Info.MetadataSize)))

md, err := decodeLogsColumnMetadata(r)
if err != nil {
return nil, err
}
return md.Pages, nil
}

return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error {
for _, column := range columns {
pages, err := getPages(ctx, column)
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (dec *readSeekerLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] {
getPageData := func(_ context.Context, page *logsmd.PageDesc) (dataset.PageData, error) {
if _, err := dec.rs.Seek(int64(page.Info.DataOffset), io.SeekStart); err != nil {
return nil, err
}
data := make([]byte, page.Info.DataSize)
if _, err := io.ReadFull(dec.rs, data); err != nil {
return nil, fmt.Errorf("read page data: %w", err)
}
return dataset.PageData(data), nil
}

return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := getPageData(ctx, page)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}
21 changes: 20 additions & 1 deletion pkg/dataobj/internal/encoding/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewEncoder(w streamio.Writer) *Encoder {
}
}

// OpenStreams opens a [StreamsEncoder]. OpenSterams fails if there is another
// OpenStreams opens a [StreamsEncoder]. OpenStreams fails if there is another
// open section.
func (enc *Encoder) OpenStreams() (*StreamsEncoder, error) {
if enc.curSection != nil {
Expand All @@ -66,6 +66,25 @@ func (enc *Encoder) OpenStreams() (*StreamsEncoder, error) {
), nil
}

// OpenLogs opens a [LogsEncoder]. OpenLogs fails if there is another open
// section.
func (enc *Encoder) OpenLogs() (*LogsEncoder, error) {
if enc.curSection != nil {
return nil, ErrElementExist
}

enc.curSection = &filemd.SectionInfo{
Type: filemd.SECTION_TYPE_LOGS,
MetadataOffset: math.MaxUint32,
MetadataSize: math.MaxUint32,
}

return newLogsEncoder(
enc,
enc.startOffset+enc.data.Len(),
), nil
}

// MetadataSize returns an estimate of the current size of the metadata for the
// data object. MetadataSize does not include the size of data appended. The
// estimate includes the currently open element.
Expand Down
Loading

0 comments on commit 316dab8

Please sign in to comment.