diff --git a/pkg/dataobj/internal/encoding/dataset_logs.go b/pkg/dataobj/internal/encoding/dataset_logs.go new file mode 100644 index 0000000000000..37060c443e861 --- /dev/null +++ b/pkg/dataobj/internal/encoding/dataset_logs.go @@ -0,0 +1,152 @@ +package encoding + +import ( + "context" + "fmt" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +// LogsDataset implements returns a [dataset.Dataset] from a [LogsDecoder] for +// the given section. +func LogsDataset(dec LogsDecoder, sec *filemd.SectionInfo) dataset.Dataset { + return &logsDataset{dec: dec, sec: sec} +} + +type logsDataset struct { + dec LogsDecoder + sec *filemd.SectionInfo +} + +func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] { + return result.Iter(func(yield func(dataset.Column) bool) error { + columns, err := ds.dec.Columns(ctx, ds.sec) + if err != nil { + return err + } + + for _, column := range columns { + if !yield(&logsDatasetColumn{dec: ds.dec, desc: column}) { + return nil + } + } + + return err + }) + +} + +func (ds *logsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] { + // TODO(rfratto): Switch to batch retrieval instead of iterating over each column. + return result.Iter(func(yield func(dataset.Pages) bool) error { + for _, column := range columns { + pages, err := result.Collect(column.ListPages(ctx)) + if err != nil { + return err + } else if !yield(pages) { + return nil + } + } + + return nil + }) +} + +func (ds *logsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] { + // TODO(rfratto): Switch to batch retrieval instead of iterating over each page. + return result.Iter(func(yield func(dataset.PageData) bool) error { + for _, page := range pages { + data, err := page.ReadPage(ctx) + if err != nil { + return err + } else if !yield(data) { + return nil + } + } + + return nil + }) +} + +type logsDatasetColumn struct { + dec LogsDecoder + desc *logsmd.ColumnDesc + + info *dataset.ColumnInfo +} + +func (col *logsDatasetColumn) ColumnInfo() *dataset.ColumnInfo { + if col.info != nil { + return col.info + } + + col.info = &dataset.ColumnInfo{ + Name: col.desc.Info.Name, + Type: col.desc.Info.ValueType, + Compression: col.desc.Info.Compression, + + RowsCount: int(col.desc.Info.RowsCount), + CompressedSize: int(col.desc.Info.CompressedSize), + UncompressedSize: int(col.desc.Info.UncompressedSize), + + Statistics: col.desc.Info.Statistics, + } + return col.info +} + +func (col *logsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] { + return result.Iter(func(yield func(dataset.Page) bool) error { + pageSets, err := result.Collect(col.dec.Pages(ctx, []*logsmd.ColumnDesc{col.desc})) + if err != nil { + return err + } else if len(pageSets) != 1 { + return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets)) + } + + for _, page := range pageSets[0] { + if !yield(&logsDatasetPage{dec: col.dec, desc: page}) { + return nil + } + } + + return nil + }) +} + +type logsDatasetPage struct { + dec LogsDecoder + desc *logsmd.PageDesc + + info *dataset.PageInfo +} + +func (p *logsDatasetPage) PageInfo() *dataset.PageInfo { + if p.info != nil { + return p.info + } + + p.info = &dataset.PageInfo{ + UncompressedSize: int(p.desc.Info.UncompressedSize), + CompressedSize: int(p.desc.Info.CompressedSize), + CRC32: p.desc.Info.Crc32, + RowCount: int(p.desc.Info.RowsCount), + + Encoding: p.desc.Info.Encoding, + Stats: p.desc.Info.Statistics, + } + return p.info +} + +func (p *logsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) { + pages, err := result.Collect(p.dec.ReadPages(ctx, []*logsmd.PageDesc{p.desc})) + if err != nil { + return nil, err + } else if len(pages) != 1 { + return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages)) + } + + return pages[0], nil +} diff --git a/pkg/dataobj/internal/encoding/decoder.go b/pkg/dataobj/internal/encoding/decoder.go index d6ed5a87e4dfb..b0916d822578c 100644 --- a/pkg/dataobj/internal/encoding/decoder.go +++ b/pkg/dataobj/internal/encoding/decoder.go @@ -5,6 +5,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" ) @@ -20,6 +21,9 @@ type ( // StreamsDecoder returns a decoder for streams sections. StreamsDecoder() StreamsDecoder + + // LogsDecoder returns a decoder for logs sections. + LogsDecoder() LogsDecoder } // StreamsDecoder supports decoding data within a streams section. @@ -38,4 +42,21 @@ type ( // pages, an error is emitted and iteration stops. ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData] } + + // LogsDecoder supports decoding data within a logs section. + LogsDecoder interface { + // Columns describes the set of columns in the provided section. + Columns(ctx context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error) + + // Pages retrieves the set of pages for the provided columns. The order of + // page lists emitted by the sequence matches the order of columns + // provided: the first page list corresponds to the first column, and so + // on. + Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] + + // ReadPages reads the provided set of pages, iterating over their data + // matching the argument order. If an error is encountered while retrieving + // pages, an error is emitted and iteration stops. + ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] + } ) diff --git a/pkg/dataobj/internal/encoding/decoder_metadata.go b/pkg/dataobj/internal/encoding/decoder_metadata.go index c1c3011ac6612..dfc7622357492 100644 --- a/pkg/dataobj/internal/encoding/decoder_metadata.go +++ b/pkg/dataobj/internal/encoding/decoder_metadata.go @@ -9,6 +9,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" ) @@ -31,7 +32,7 @@ func decodeFileMetadata(r streamio.Reader) (*filemd.Metadata, error) { return &md, nil } -// decodeStreamsMetadata decodes stream section metadta from r. +// decodeStreamsMetadata decodes stream section metadata from r. func decodeStreamsMetadata(r streamio.Reader) (*streamsmd.Metadata, error) { gotVersion, err := streamio.ReadUvarint(r) if err != nil { @@ -56,6 +57,31 @@ func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata, return &metadata, nil } +// decodeLogsMetadata decodes logs section metadata from r. +func decodeLogsMetadata(r streamio.Reader) (*logsmd.Metadata, error) { + gotVersion, err := streamio.ReadUvarint(r) + if err != nil { + return nil, fmt.Errorf("read streams section format version: %w", err) + } else if gotVersion != streamsFormatVersion { + return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion) + } + + var md logsmd.Metadata + if err := decodeProto(r, &md); err != nil { + return nil, fmt.Errorf("streams section metadata: %w", err) + } + return &md, nil +} + +// decodeLogsColumnMetadata decodes logs column metadata from r. +func decodeLogsColumnMetadata(r streamio.Reader) (*logsmd.ColumnMetadata, error) { + var metadata logsmd.ColumnMetadata + if err := decodeProto(r, &metadata); err != nil { + return nil, fmt.Errorf("streams column metadata: %w", err) + } + return &metadata, nil +} + // decodeProto decodes a proto message from r and stores it in pb. Proto // messages are expected to be encoded with their size, followed by the proto // bytes. diff --git a/pkg/dataobj/internal/encoding/decoder_readseeker.go b/pkg/dataobj/internal/encoding/decoder_readseeker.go index 725273603eb63..bf7ba42e7df3c 100644 --- a/pkg/dataobj/internal/encoding/decoder_readseeker.go +++ b/pkg/dataobj/internal/encoding/decoder_readseeker.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" ) @@ -47,6 +48,10 @@ func (dec *readSeekerDecoder) StreamsDecoder() StreamsDecoder { return &readSeekerStreamsDecoder{rs: dec.rs} } +func (dec *readSeekerDecoder) LogsDecoder() LogsDecoder { + return &readSeekerLogsDecoder{rs: dec.rs} +} + type readSeekerStreamsDecoder struct { rs io.ReadSeeker } @@ -121,3 +126,78 @@ func (dec *readSeekerStreamsDecoder) ReadPages(ctx context.Context, pages []*str return nil }) } + +type readSeekerLogsDecoder struct { + rs io.ReadSeeker +} + +func (dec *readSeekerLogsDecoder) Columns(_ context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error) { + if section.Type != filemd.SECTION_TYPE_LOGS { + return nil, fmt.Errorf("unexpected section type: got=%d want=%d", section.Type, filemd.SECTION_TYPE_LOGS) + } + + if _, err := dec.rs.Seek(int64(section.MetadataOffset), io.SeekStart); err != nil { + return nil, fmt.Errorf("seek to streams metadata: %w", err) + } + r := bufio.NewReader(io.LimitReader(dec.rs, int64(section.MetadataSize))) + + md, err := decodeLogsMetadata(r) + if err != nil { + return nil, err + } + return md.Columns, nil +} + +func (dec *readSeekerLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] { + getPages := func(_ context.Context, column *logsmd.ColumnDesc) ([]*logsmd.PageDesc, error) { + if _, err := dec.rs.Seek(int64(column.Info.MetadataOffset), io.SeekStart); err != nil { + return nil, fmt.Errorf("seek to column metadata: %w", err) + } + r := bufio.NewReader(io.LimitReader(dec.rs, int64(column.Info.MetadataSize))) + + md, err := decodeLogsColumnMetadata(r) + if err != nil { + return nil, err + } + return md.Pages, nil + } + + return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error { + for _, column := range columns { + pages, err := getPages(ctx, column) + if err != nil { + return err + } else if !yield(pages) { + return nil + } + } + + return nil + }) +} + +func (dec *readSeekerLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] { + getPageData := func(_ context.Context, page *logsmd.PageDesc) (dataset.PageData, error) { + if _, err := dec.rs.Seek(int64(page.Info.DataOffset), io.SeekStart); err != nil { + return nil, err + } + data := make([]byte, page.Info.DataSize) + if _, err := io.ReadFull(dec.rs, data); err != nil { + return nil, fmt.Errorf("read page data: %w", err) + } + return dataset.PageData(data), nil + } + + return result.Iter(func(yield func(dataset.PageData) bool) error { + for _, page := range pages { + data, err := getPageData(ctx, page) + if err != nil { + return err + } else if !yield(data) { + return nil + } + } + + return nil + }) +} diff --git a/pkg/dataobj/internal/encoding/encoder.go b/pkg/dataobj/internal/encoding/encoder.go index 28a64524e9735..d939b236fddac 100644 --- a/pkg/dataobj/internal/encoding/encoder.go +++ b/pkg/dataobj/internal/encoding/encoder.go @@ -44,7 +44,7 @@ func NewEncoder(w streamio.Writer) *Encoder { } } -// OpenStreams opens a [StreamsEncoder]. OpenSterams fails if there is another +// OpenStreams opens a [StreamsEncoder]. OpenStreams fails if there is another // open section. func (enc *Encoder) OpenStreams() (*StreamsEncoder, error) { if enc.curSection != nil { @@ -66,6 +66,25 @@ func (enc *Encoder) OpenStreams() (*StreamsEncoder, error) { ), nil } +// OpenLogs opens a [LogsEncoder]. OpenLogs fails if there is another open +// section. +func (enc *Encoder) OpenLogs() (*LogsEncoder, error) { + if enc.curSection != nil { + return nil, ErrElementExist + } + + enc.curSection = &filemd.SectionInfo{ + Type: filemd.SECTION_TYPE_LOGS, + MetadataOffset: math.MaxUint32, + MetadataSize: math.MaxUint32, + } + + return newLogsEncoder( + enc, + enc.startOffset+enc.data.Len(), + ), nil +} + // MetadataSize returns an estimate of the current size of the metadata for the // data object. MetadataSize does not include the size of data appended. The // estimate includes the currently open element. diff --git a/pkg/dataobj/internal/encoding/encoder_logs.go b/pkg/dataobj/internal/encoding/encoder_logs.go new file mode 100644 index 0000000000000..f5186d543eddc --- /dev/null +++ b/pkg/dataobj/internal/encoding/encoder_logs.go @@ -0,0 +1,259 @@ +package encoding + +import ( + "bytes" + "math" + + "github.com/gogo/protobuf/proto" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" +) + +// LogsEncoder encodes an individual logs section in a data object. +// LogsEncoders are created by [Encoder]s. +type LogsEncoder struct { + parent *Encoder + + startOffset int // Byte offset in the file where the column starts. + closed bool // true if LogsEncoder has been closed. + + data *bytes.Buffer + columns []*logsmd.ColumnDesc // closed columns. + curColumn *logsmd.ColumnDesc // curColumn is the currently open column. +} + +func newLogsEncoder(parent *Encoder, offset int) *LogsEncoder { + buf := bytesBufferPool.Get().(*bytes.Buffer) + buf.Reset() + + return &LogsEncoder{ + parent: parent, + startOffset: offset, + + data: buf, + } +} + +// OpenColumn opens a new column in the logs section. OpenColumn fails if there +// is another open column or if the LogsEncoder has been closed. +func (enc *LogsEncoder) OpenColumn(columnType logsmd.ColumnType, info *dataset.ColumnInfo) (*LogsColumnEncoder, error) { + if enc.curColumn != nil { + return nil, ErrElementExist + } else if enc.closed { + return nil, ErrClosed + } + + // MetadataOffset and MetadataSize aren't available until the column is + // closed. We temporarily set these fields to the maximum values so they're + // accounted for in the MetadataSize estimate. + enc.curColumn = &logsmd.ColumnDesc{ + Type: columnType, + Info: &datasetmd.ColumnInfo{ + Name: info.Name, + ValueType: info.Type, + RowsCount: uint32(info.RowsCount), + Compression: info.Compression, + UncompressedSize: uint32(info.UncompressedSize), + CompressedSize: uint32(info.CompressedSize), + Statistics: info.Statistics, + + MetadataOffset: math.MaxUint32, + MetadataSize: math.MaxUint32, + }, + } + + return newLogsColumnEncoder( + enc, + enc.startOffset+enc.data.Len(), + ), nil +} + +// MetadataSize returns an estimate of the current size of the metadata for the +// section. MetadataSize includes an estimate for the currently open element. +func (enc *LogsEncoder) MetadataSize() int { return elementMetadataSize(enc) } + +func (enc *LogsEncoder) metadata() proto.Message { + columns := enc.columns[:len(enc.columns):cap(enc.columns)] + if enc.curColumn != nil { + columns = append(columns, enc.curColumn) + } + return &logsmd.Metadata{Columns: columns} +} + +// Commit closes the section, flushing all data to the parent element. After +// Commit is called, the LogsEncoder can no longer be modified. +// +// Commit fails if there is an open column. +func (enc *LogsEncoder) Commit() error { + if enc.closed { + return ErrClosed + } else if enc.curColumn != nil { + return ErrElementExist + } + + defer bytesBufferPool.Put(enc.data) + + if len(enc.columns) == 0 { + // No data was written; discard. + return enc.parent.append(nil, nil) + } + + metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer) + metadataBuffer.Reset() + defer bytesBufferPool.Put(metadataBuffer) + + // The section metadata should start with its version. + if err := streamio.WriteUvarint(metadataBuffer, logsFormatVersion); err != nil { + return err + } else if err := elementMetadataWrite(enc, metadataBuffer); err != nil { + return err + } + return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes()) +} + +// Discard discards the section, discarding any data written to it. After +// Discard is called, the LogsEncoder can no longer be modified. +// +// Discard fails if there is an open column. +func (enc *LogsEncoder) Discard() error { + if enc.closed { + return ErrClosed + } else if enc.curColumn != nil { + return ErrElementExist + } + enc.closed = true + + defer bytesBufferPool.Put(enc.data) + + return enc.parent.append(nil, nil) +} + +// append adds data and metadata to enc. append must only be called from child +// elements on Close and Discard. Discard calls must pass nil for both data and +// metadata to denote a discard. +func (enc *LogsEncoder) append(data, metadata []byte) error { + if enc.closed { + return ErrClosed + } else if enc.curColumn == nil { + return errElementNoExist + } + + if len(data) == 0 && len(metadata) == 0 { + // Column was discarded. + enc.curColumn = nil + return nil + } + + enc.curColumn.Info.MetadataOffset = uint32(enc.startOffset + enc.data.Len() + len(data)) + enc.curColumn.Info.MetadataSize = uint32(len(metadata)) + + // bytes.Buffer.Write never fails. + _, _ = enc.data.Write(data) + _, _ = enc.data.Write(metadata) + + enc.columns = append(enc.columns, enc.curColumn) + enc.curColumn = nil + return nil +} + +// LogsColumnEncoder encodes an individual column in a logs section. +// LogsColumnEncoder are created by [LogsEncoder]. +type LogsColumnEncoder struct { + parent *LogsEncoder + + startOffset int // Byte offset in the file where the column starts. + closed bool // true if LogsColumnEncoder has been closed. + + data *bytes.Buffer // All page data. + pages []*logsmd.PageDesc +} + +func newLogsColumnEncoder(parent *LogsEncoder, offset int) *LogsColumnEncoder { + buf := bytesBufferPool.Get().(*bytes.Buffer) + buf.Reset() + + return &LogsColumnEncoder{ + parent: parent, + startOffset: offset, + + data: buf, + } +} + +// AppendPage appens a new [dataset.MemPage] to the column. AppendPage fails if +// the column has been closed. +func (enc *LogsColumnEncoder) AppendPage(page *dataset.MemPage) error { + if enc.closed { + return ErrClosed + } + + // It's possible the caller can pass an incorrect value for UncompressedSize + // and CompressedSize, but those fields are purely for stats so we don't + // check it. + enc.pages = append(enc.pages, &logsmd.PageDesc{ + Info: &datasetmd.PageInfo{ + UncompressedSize: uint32(page.Info.UncompressedSize), + CompressedSize: uint32(page.Info.CompressedSize), + Crc32: page.Info.CRC32, + RowsCount: uint32(page.Info.RowCount), + Encoding: page.Info.Encoding, + + DataOffset: uint32(enc.startOffset + enc.data.Len()), + DataSize: uint32(len(page.Data)), + + Statistics: page.Info.Stats, + }, + }) + + _, _ = enc.data.Write(page.Data) // bytes.Buffer.Write never fails. + return nil +} + +// MetadataSize returns an estimate of the current size of the metadata for the +// column. MetadataSize does not include the size of data appended. +func (enc *LogsColumnEncoder) MetadataSize() int { return elementMetadataSize(enc) } + +func (enc *LogsColumnEncoder) metadata() proto.Message { + return &logsmd.ColumnMetadata{Pages: enc.pages} +} + +// Commit closes the column, flushing all data to the parent element. After +// Commit is called, the LogsColumnEncoder can no longer be modified. +func (enc *LogsColumnEncoder) Commit() error { + if enc.closed { + return ErrClosed + } + enc.closed = true + + defer bytesBufferPool.Put(enc.data) + + if len(enc.pages) == 0 { + // No data was written; discard. + return enc.parent.append(nil, nil) + } + + metadataBuffer := bytesBufferPool.Get().(*bytes.Buffer) + metadataBuffer.Reset() + defer bytesBufferPool.Put(metadataBuffer) + + if err := elementMetadataWrite(enc, metadataBuffer); err != nil { + return err + } + return enc.parent.append(enc.data.Bytes(), metadataBuffer.Bytes()) +} + +// Discard discards the column, discarding any data written to it. After +// Discard is called, the LogsColumnEncoder can no longer be modified. +func (enc *LogsColumnEncoder) Discard() error { + if enc.closed { + return ErrClosed + } + enc.closed = true + + defer bytesBufferPool.Put(enc.data) + + return enc.parent.append(nil, nil) // Notify parent of discard. +} diff --git a/pkg/dataobj/internal/encoding/encoding.go b/pkg/dataobj/internal/encoding/encoding.go index 8b229970a8aea..a071c9bbdf509 100644 --- a/pkg/dataobj/internal/encoding/encoding.go +++ b/pkg/dataobj/internal/encoding/encoding.go @@ -10,6 +10,7 @@ var ( const ( fileFormatVersion = 0x1 streamsFormatVersion = 0x1 + logsFormatVersion = 0x1 ) var ( diff --git a/pkg/dataobj/internal/encoding/encoding_test.go b/pkg/dataobj/internal/encoding/encoding_test.go index f5f7400e986b9..5c41a11df4b72 100644 --- a/pkg/dataobj/internal/encoding/encoding_test.go +++ b/pkg/dataobj/internal/encoding/encoding_test.go @@ -11,11 +11,12 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/encoding" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" ) -func Test(t *testing.T) { +func TestStreams(t *testing.T) { type Country struct { Name string Capital string @@ -107,3 +108,79 @@ func Test(t *testing.T) { require.Equal(t, countries, actual) }) } + +func TestLogs(t *testing.T) { + var ( + columnA = &dataset.MemColumn{ + Pages: []*dataset.MemPage{ + {Data: []byte("Hello")}, + {Data: []byte("World")}, + {Data: []byte("!")}, + }, + } + columnB = &dataset.MemColumn{ + Pages: []*dataset.MemPage{ + {Data: []byte("metadata")}, + {Data: []byte("column")}, + }, + } + ) + + var buf bytes.Buffer + + t.Run("Encode", func(t *testing.T) { + enc := encoding.NewEncoder(&buf) + logsEnc, err := enc.OpenLogs() + require.NoError(t, err) + + colEnc, err := logsEnc.OpenColumn(logsmd.COLUMN_TYPE_MESSAGE, &columnA.Info) + require.NoError(t, err) + for _, page := range columnA.Pages { + require.NoError(t, colEnc.AppendPage(page)) + } + require.NoError(t, colEnc.Commit()) + + colEnc, err = logsEnc.OpenColumn(logsmd.COLUMN_TYPE_METADATA, &columnB.Info) + require.NoError(t, err) + for _, page := range columnB.Pages { + require.NoError(t, colEnc.AppendPage(page)) + } + require.NoError(t, colEnc.Commit()) + + require.NoError(t, logsEnc.Commit()) + require.NoError(t, enc.Flush()) + }) + + t.Run("Decode", func(t *testing.T) { + dec := encoding.ReadSeekerDecoder(bytes.NewReader(buf.Bytes())) + sections, err := dec.Sections(context.TODO()) + require.NoError(t, err) + require.Len(t, sections, 1) + require.Equal(t, filemd.SECTION_TYPE_LOGS, sections[0].Type) + + logsDec := dec.LogsDecoder() + columns, err := logsDec.Columns(context.TODO(), sections[0]) + require.NoError(t, err) + require.Len(t, columns, 2) + + pageSets, err := result.Collect(logsDec.Pages(context.TODO(), columns)) + require.NoError(t, err) + require.Len(t, pageSets, 2) + + columnAPages, err := result.Collect(logsDec.ReadPages(context.TODO(), pageSets[0])) + require.NoError(t, err) + require.Len(t, columnAPages, len(columnA.Pages)) + + for i := range columnA.Pages { + require.Equal(t, columnA.Pages[i].Data, columnAPages[i]) + } + + columnBPages, err := result.Collect(logsDec.ReadPages(context.TODO(), pageSets[1])) + require.NoError(t, err) + require.Len(t, columnBPages, len(columnB.Pages)) + + for i := range columnB.Pages { + require.Equal(t, columnB.Pages[i].Data, columnBPages[i]) + } + }) +} diff --git a/pkg/dataobj/internal/metadata/filemd/filemd.pb.go b/pkg/dataobj/internal/metadata/filemd/filemd.pb.go index b3b65c6563e8e..41f3cc455fa37 100644 --- a/pkg/dataobj/internal/metadata/filemd/filemd.pb.go +++ b/pkg/dataobj/internal/metadata/filemd/filemd.pb.go @@ -34,16 +34,22 @@ const ( // exist within the data object. SECTION_TYPE_STREAMS does not contain any // actual log data. SECTION_TYPE_STREAMS SectionType = 1 + // SECTION_TYPE_LOGS is a section containing log records across multiple + // streams. Each log record contains a stream ID which refers to a stream + // from SECTION_TYPE_STREAMS. + SECTION_TYPE_LOGS SectionType = 2 ) var SectionType_name = map[int32]string{ 0: "SECTION_TYPE_UNSPECIFIED", 1: "SECTION_TYPE_STREAMS", + 2: "SECTION_TYPE_LOGS", } var SectionType_value = map[string]int32{ "SECTION_TYPE_UNSPECIFIED": 0, "SECTION_TYPE_STREAMS": 1, + "SECTION_TYPE_LOGS": 2, } func (SectionType) EnumDescriptor() ([]byte, []int) { @@ -169,7 +175,7 @@ func init() { } var fileDescriptor_be80f52d1e05bad9 = []byte{ - // 344 bytes of a gzipped FileDescriptorProto + // 356 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x2c, 0xc8, 0x4e, 0xd7, 0x4f, 0x49, 0x2c, 0x49, 0xcc, 0x4f, 0xca, 0xd2, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1, 0xcf, 0x4d, 0x2d, 0x49, 0x04, 0x09, 0xea, 0xa7, 0x65, 0xe6, 0xa4, 0xe6, 0xa6, 0x40, 0x29, @@ -181,17 +187,18 @@ var fileDescriptor_be80f52d1e05bad9 = []byte{ 0x7c, 0x44, 0x18, 0x17, 0x52, 0x59, 0x90, 0x1a, 0x04, 0xd6, 0x22, 0xa4, 0xce, 0xc5, 0x0f, 0x53, 0x15, 0x9f, 0x9f, 0x96, 0x56, 0x9c, 0x5a, 0x22, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x1b, 0xc4, 0x07, 0x13, 0xf6, 0x07, 0x8b, 0x0a, 0x29, 0x73, 0xf1, 0xc2, 0x15, 0x16, 0x67, 0x56, 0xa5, 0x4a, 0x30, - 0x83, 0x95, 0xf1, 0xc0, 0x04, 0x83, 0x33, 0xab, 0x52, 0xb5, 0x5c, 0xe1, 0xee, 0x02, 0x59, 0x21, + 0x83, 0x95, 0xf1, 0xc0, 0x04, 0x83, 0x33, 0xab, 0x52, 0xb5, 0x62, 0xe0, 0xee, 0x02, 0x59, 0x21, 0x24, 0xc3, 0x25, 0x11, 0xec, 0xea, 0x1c, 0xe2, 0xe9, 0xef, 0x17, 0x1f, 0x12, 0x19, 0xe0, 0x1a, 0x1f, 0xea, 0x17, 0x1c, 0xe0, 0xea, 0xec, 0xe9, 0xe6, 0xe9, 0xea, 0x22, 0xc0, 0x20, 0x24, 0xc1, - 0x25, 0x82, 0x22, 0x1b, 0x1c, 0x12, 0xe4, 0xea, 0xe8, 0x1b, 0x2c, 0xc0, 0xe8, 0x54, 0x7a, 0xe1, - 0xa1, 0x1c, 0xc3, 0x8d, 0x87, 0x72, 0x0c, 0x1f, 0x1e, 0xca, 0x31, 0x36, 0x3c, 0x92, 0x63, 0x5c, - 0xf1, 0x48, 0x8e, 0xf1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, - 0x7c, 0xf1, 0x48, 0x8e, 0xe1, 0xc3, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, - 0x63, 0xb8, 0xf1, 0x58, 0x8e, 0x21, 0xca, 0x3e, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, - 0x3f, 0x57, 0x3f, 0xbd, 0x28, 0x31, 0x2d, 0x31, 0x2f, 0x51, 0x3f, 0x27, 0x3f, 0x3b, 0x53, 0xbf, - 0xcc, 0x58, 0x9f, 0x98, 0xc8, 0x4c, 0x62, 0x03, 0x47, 0xa3, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, - 0xc3, 0x39, 0x5a, 0x9d, 0xfb, 0x01, 0x00, 0x00, + 0x25, 0x82, 0x22, 0x1b, 0x1c, 0x12, 0xe4, 0xea, 0xe8, 0x1b, 0x2c, 0xc0, 0x28, 0x24, 0xca, 0x25, + 0x88, 0x22, 0xe3, 0xe3, 0xef, 0x1e, 0x2c, 0xc0, 0xe4, 0x54, 0x7a, 0xe1, 0xa1, 0x1c, 0xc3, 0x8d, + 0x87, 0x72, 0x0c, 0x1f, 0x1e, 0xca, 0x31, 0x36, 0x3c, 0x92, 0x63, 0x5c, 0xf1, 0x48, 0x8e, 0xf1, + 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x7c, 0xf1, 0x48, 0x8e, + 0xe1, 0xc3, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, + 0x8e, 0x21, 0xca, 0x3e, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, 0x3f, 0x57, 0x3f, 0xbd, + 0x28, 0x31, 0x2d, 0x31, 0x2f, 0x51, 0x3f, 0x27, 0x3f, 0x3b, 0x53, 0xbf, 0xcc, 0x58, 0x9f, 0x98, + 0x38, 0x4e, 0x62, 0x03, 0xc7, 0xae, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xad, 0xf4, 0xfc, 0x08, + 0x12, 0x02, 0x00, 0x00, } func (x SectionType) String() string { diff --git a/pkg/dataobj/internal/metadata/filemd/filemd.proto b/pkg/dataobj/internal/metadata/filemd/filemd.proto index e32159e30cfb9..95cb607680f0c 100644 --- a/pkg/dataobj/internal/metadata/filemd/filemd.proto +++ b/pkg/dataobj/internal/metadata/filemd/filemd.proto @@ -36,4 +36,9 @@ enum SectionType { // exist within the data object. SECTION_TYPE_STREAMS does not contain any // actual log data. SECTION_TYPE_STREAMS = 1; + + // SECTION_TYPE_LOGS is a section containing log records across multiple + // streams. Each log record contains a stream ID which refers to a stream + // from SECTION_TYPE_STREAMS. + SECTION_TYPE_LOGS = 2; } diff --git a/pkg/dataobj/internal/metadata/logsmd/logsmd.pb.go b/pkg/dataobj/internal/metadata/logsmd/logsmd.pb.go new file mode 100644 index 0000000000000..2a15f5acf2f8b --- /dev/null +++ b/pkg/dataobj/internal/metadata/logsmd/logsmd.pb.go @@ -0,0 +1,1237 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/dataobj/internal/metadata/logsmd/logsmd.proto + +package logsmd + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + datasetmd "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strconv "strconv" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// ColumnType represents the valid types that a logs column can have. +type ColumnType int32 + +const ( + // Invalid column type. + COLUMN_TYPE_UNSPECIFIED ColumnType = 0 + // COLUMN_TYPE_STREAM_ID is a column containing the stream the log record + // belongs to. + COLUMN_TYPE_STREAM_ID ColumnType = 1 + // COLUMN_TYPE_TIMESTAMP is a column containing the timestamp of the log + // record. + COLUMN_TYPE_TIMESTAMP ColumnType = 2 + // COLUMN_TYPE_METADATA is a column containing structured metadata values for + // a specific key. + COLUMN_TYPE_METADATA ColumnType = 3 + // COLUMN_TYPE_MESSAGE is a column containing the message of the log record. + COLUMN_TYPE_MESSAGE ColumnType = 4 +) + +var ColumnType_name = map[int32]string{ + 0: "COLUMN_TYPE_UNSPECIFIED", + 1: "COLUMN_TYPE_STREAM_ID", + 2: "COLUMN_TYPE_TIMESTAMP", + 3: "COLUMN_TYPE_METADATA", + 4: "COLUMN_TYPE_MESSAGE", +} + +var ColumnType_value = map[string]int32{ + "COLUMN_TYPE_UNSPECIFIED": 0, + "COLUMN_TYPE_STREAM_ID": 1, + "COLUMN_TYPE_TIMESTAMP": 2, + "COLUMN_TYPE_METADATA": 3, + "COLUMN_TYPE_MESSAGE": 4, +} + +func (ColumnType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_50d9821968c7172c, []int{0} +} + +// Metadata describes the metadata for the logs section. +type Metadata struct { + // Columns within the logs. + Columns []*ColumnDesc `protobuf:"bytes,1,rep,name=columns,proto3" json:"columns,omitempty"` +} + +func (m *Metadata) Reset() { *m = Metadata{} } +func (*Metadata) ProtoMessage() {} +func (*Metadata) Descriptor() ([]byte, []int) { + return fileDescriptor_50d9821968c7172c, []int{0} +} +func (m *Metadata) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Metadata.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Metadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_Metadata.Merge(m, src) +} +func (m *Metadata) XXX_Size() int { + return m.Size() +} +func (m *Metadata) XXX_DiscardUnknown() { + xxx_messageInfo_Metadata.DiscardUnknown(m) +} + +var xxx_messageInfo_Metadata proto.InternalMessageInfo + +func (m *Metadata) GetColumns() []*ColumnDesc { + if m != nil { + return m.Columns + } + return nil +} + +// ColumnDesc describes an individual column within the logs table. +type ColumnDesc struct { + // Information about the column. + Info *datasetmd.ColumnInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"` + // Column type. + Type ColumnType `protobuf:"varint,2,opt,name=type,proto3,enum=dataobj.metadata.logs.v1.ColumnType" json:"type,omitempty"` +} + +func (m *ColumnDesc) Reset() { *m = ColumnDesc{} } +func (*ColumnDesc) ProtoMessage() {} +func (*ColumnDesc) Descriptor() ([]byte, []int) { + return fileDescriptor_50d9821968c7172c, []int{1} +} +func (m *ColumnDesc) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ColumnDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ColumnDesc.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ColumnDesc) XXX_Merge(src proto.Message) { + xxx_messageInfo_ColumnDesc.Merge(m, src) +} +func (m *ColumnDesc) XXX_Size() int { + return m.Size() +} +func (m *ColumnDesc) XXX_DiscardUnknown() { + xxx_messageInfo_ColumnDesc.DiscardUnknown(m) +} + +var xxx_messageInfo_ColumnDesc proto.InternalMessageInfo + +func (m *ColumnDesc) GetInfo() *datasetmd.ColumnInfo { + if m != nil { + return m.Info + } + return nil +} + +func (m *ColumnDesc) GetType() ColumnType { + if m != nil { + return m.Type + } + return COLUMN_TYPE_UNSPECIFIED +} + +// ColumnMetadata describes the metadata for a column. +type ColumnMetadata struct { + // Pages within the column. + Pages []*PageDesc `protobuf:"bytes,1,rep,name=pages,proto3" json:"pages,omitempty"` +} + +func (m *ColumnMetadata) Reset() { *m = ColumnMetadata{} } +func (*ColumnMetadata) ProtoMessage() {} +func (*ColumnMetadata) Descriptor() ([]byte, []int) { + return fileDescriptor_50d9821968c7172c, []int{2} +} +func (m *ColumnMetadata) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ColumnMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ColumnMetadata.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ColumnMetadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_ColumnMetadata.Merge(m, src) +} +func (m *ColumnMetadata) XXX_Size() int { + return m.Size() +} +func (m *ColumnMetadata) XXX_DiscardUnknown() { + xxx_messageInfo_ColumnMetadata.DiscardUnknown(m) +} + +var xxx_messageInfo_ColumnMetadata proto.InternalMessageInfo + +func (m *ColumnMetadata) GetPages() []*PageDesc { + if m != nil { + return m.Pages + } + return nil +} + +// PageDesc describes an individual page within a column. +type PageDesc struct { + // Information about the page. + Info *datasetmd.PageInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"` +} + +func (m *PageDesc) Reset() { *m = PageDesc{} } +func (*PageDesc) ProtoMessage() {} +func (*PageDesc) Descriptor() ([]byte, []int) { + return fileDescriptor_50d9821968c7172c, []int{3} +} +func (m *PageDesc) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PageDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PageDesc.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PageDesc) XXX_Merge(src proto.Message) { + xxx_messageInfo_PageDesc.Merge(m, src) +} +func (m *PageDesc) XXX_Size() int { + return m.Size() +} +func (m *PageDesc) XXX_DiscardUnknown() { + xxx_messageInfo_PageDesc.DiscardUnknown(m) +} + +var xxx_messageInfo_PageDesc proto.InternalMessageInfo + +func (m *PageDesc) GetInfo() *datasetmd.PageInfo { + if m != nil { + return m.Info + } + return nil +} + +func init() { + proto.RegisterEnum("dataobj.metadata.logs.v1.ColumnType", ColumnType_name, ColumnType_value) + proto.RegisterType((*Metadata)(nil), "dataobj.metadata.logs.v1.Metadata") + proto.RegisterType((*ColumnDesc)(nil), "dataobj.metadata.logs.v1.ColumnDesc") + proto.RegisterType((*ColumnMetadata)(nil), "dataobj.metadata.logs.v1.ColumnMetadata") + proto.RegisterType((*PageDesc)(nil), "dataobj.metadata.logs.v1.PageDesc") +} + +func init() { + proto.RegisterFile("pkg/dataobj/internal/metadata/logsmd/logsmd.proto", fileDescriptor_50d9821968c7172c) +} + +var fileDescriptor_50d9821968c7172c = []byte{ + // 430 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0x2c, 0xc8, 0x4e, 0xd7, + 0x4f, 0x49, 0x2c, 0x49, 0xcc, 0x4f, 0xca, 0xd2, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, + 0xd1, 0xcf, 0x4d, 0x2d, 0x49, 0x04, 0x09, 0xea, 0xe7, 0xe4, 0xa7, 0x17, 0xe7, 0xa6, 0x40, 0x29, + 0xbd, 0x82, 0xa2, 0xfc, 0x92, 0x7c, 0x21, 0x09, 0xa8, 0x72, 0x3d, 0x98, 0x2a, 0x3d, 0x90, 0xb4, + 0x5e, 0x99, 0xa1, 0x94, 0x39, 0x7e, 0xc3, 0x40, 0x44, 0x71, 0x6a, 0x49, 0x6e, 0x0a, 0x82, 0x05, + 0x31, 0x52, 0xc9, 0x8b, 0x8b, 0xc3, 0x17, 0xaa, 0x4a, 0xc8, 0x8e, 0x8b, 0x3d, 0x39, 0x3f, 0xa7, + 0x34, 0x37, 0xaf, 0x58, 0x82, 0x51, 0x81, 0x59, 0x83, 0xdb, 0x48, 0x45, 0x0f, 0x97, 0x85, 0x7a, + 0xce, 0x60, 0x85, 0x2e, 0xa9, 0xc5, 0xc9, 0x41, 0x30, 0x4d, 0x4a, 0xcd, 0x8c, 0x5c, 0x5c, 0x08, + 0x71, 0x21, 0x6b, 0x2e, 0x96, 0xcc, 0xbc, 0xb4, 0x7c, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x6e, 0x23, + 0x75, 0x4c, 0xb3, 0xa0, 0x6e, 0x41, 0x18, 0xe7, 0x99, 0x97, 0x96, 0x1f, 0x04, 0xd6, 0x24, 0x64, + 0xc1, 0xc5, 0x52, 0x52, 0x59, 0x90, 0x2a, 0xc1, 0xa4, 0xc0, 0xa8, 0xc1, 0x47, 0xd8, 0x21, 0x21, + 0x95, 0x05, 0xa9, 0x41, 0x60, 0x1d, 0x4a, 0x5e, 0x5c, 0x7c, 0x10, 0x31, 0xb8, 0xbf, 0x2c, 0xb8, + 0x58, 0x0b, 0x12, 0xd3, 0x53, 0x61, 0xbe, 0x52, 0xc2, 0x6d, 0x58, 0x40, 0x62, 0x7a, 0x2a, 0xd8, + 0x4f, 0x10, 0x0d, 0x4a, 0xae, 0x5c, 0x1c, 0x30, 0x21, 0x21, 0x4b, 0x14, 0xef, 0xa8, 0xe2, 0xf5, + 0x0e, 0x48, 0x13, 0xc2, 0x33, 0x5a, 0x93, 0xe0, 0x01, 0x03, 0x72, 0xa7, 0x90, 0x34, 0x97, 0xb8, + 0xb3, 0xbf, 0x4f, 0xa8, 0xaf, 0x5f, 0x7c, 0x48, 0x64, 0x80, 0x6b, 0x7c, 0xa8, 0x5f, 0x70, 0x80, + 0xab, 0xb3, 0xa7, 0x9b, 0xa7, 0xab, 0x8b, 0x00, 0x83, 0x90, 0x24, 0x97, 0x28, 0xb2, 0x64, 0x70, + 0x48, 0x90, 0xab, 0xa3, 0x6f, 0xbc, 0xa7, 0x8b, 0x00, 0x23, 0xba, 0x54, 0x88, 0xa7, 0xaf, 0x6b, + 0x70, 0x88, 0xa3, 0x6f, 0x80, 0x00, 0x93, 0x90, 0x04, 0x97, 0x08, 0xb2, 0x94, 0xaf, 0x6b, 0x88, + 0xa3, 0x8b, 0x63, 0x88, 0xa3, 0x00, 0xb3, 0x90, 0x38, 0x97, 0x30, 0xaa, 0x4c, 0x70, 0xb0, 0xa3, + 0xbb, 0xab, 0x00, 0x8b, 0x53, 0xe9, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, + 0x28, 0xc7, 0xd8, 0xf0, 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, + 0x3c, 0x92, 0x63, 0x7c, 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, 0xe4, 0x18, + 0x27, 0x3c, 0x96, 0x63, 0xb8, 0xf0, 0x58, 0x8e, 0xe1, 0xc6, 0x63, 0x39, 0x86, 0x28, 0xfb, 0xf4, + 0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, 0xfd, 0xf4, 0xa2, 0xc4, 0xb4, 0xc4, 0x3c, + 0x50, 0x52, 0xcd, 0xce, 0xd4, 0x2f, 0x33, 0xd6, 0x27, 0x26, 0x41, 0x27, 0xb1, 0x81, 0xd3, 0x9d, + 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x86, 0xa4, 0x27, 0x0d, 0xff, 0x02, 0x00, 0x00, +} + +func (x ColumnType) String() string { + s, ok := ColumnType_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} +func (this *Metadata) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Metadata) + if !ok { + that2, ok := that.(Metadata) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Columns) != len(that1.Columns) { + return false + } + for i := range this.Columns { + if !this.Columns[i].Equal(that1.Columns[i]) { + return false + } + } + return true +} +func (this *ColumnDesc) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ColumnDesc) + if !ok { + that2, ok := that.(ColumnDesc) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Info.Equal(that1.Info) { + return false + } + if this.Type != that1.Type { + return false + } + return true +} +func (this *ColumnMetadata) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ColumnMetadata) + if !ok { + that2, ok := that.(ColumnMetadata) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if len(this.Pages) != len(that1.Pages) { + return false + } + for i := range this.Pages { + if !this.Pages[i].Equal(that1.Pages[i]) { + return false + } + } + return true +} +func (this *PageDesc) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*PageDesc) + if !ok { + that2, ok := that.(PageDesc) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.Info.Equal(that1.Info) { + return false + } + return true +} +func (this *Metadata) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logsmd.Metadata{") + if this.Columns != nil { + s = append(s, "Columns: "+fmt.Sprintf("%#v", this.Columns)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *ColumnDesc) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&logsmd.ColumnDesc{") + if this.Info != nil { + s = append(s, "Info: "+fmt.Sprintf("%#v", this.Info)+",\n") + } + s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *ColumnMetadata) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logsmd.ColumnMetadata{") + if this.Pages != nil { + s = append(s, "Pages: "+fmt.Sprintf("%#v", this.Pages)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *PageDesc) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&logsmd.PageDesc{") + if this.Info != nil { + s = append(s, "Info: "+fmt.Sprintf("%#v", this.Info)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringLogsmd(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *Metadata) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Metadata) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Metadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Columns) > 0 { + for iNdEx := len(m.Columns) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Columns[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintLogsmd(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *ColumnDesc) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ColumnDesc) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ColumnDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Type != 0 { + i = encodeVarintLogsmd(dAtA, i, uint64(m.Type)) + i-- + dAtA[i] = 0x10 + } + if m.Info != nil { + { + size, err := m.Info.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintLogsmd(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *ColumnMetadata) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ColumnMetadata) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ColumnMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Pages) > 0 { + for iNdEx := len(m.Pages) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Pages[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintLogsmd(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *PageDesc) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PageDesc) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PageDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Info != nil { + { + size, err := m.Info.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintLogsmd(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintLogsmd(dAtA []byte, offset int, v uint64) int { + offset -= sovLogsmd(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Metadata) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Columns) > 0 { + for _, e := range m.Columns { + l = e.Size() + n += 1 + l + sovLogsmd(uint64(l)) + } + } + return n +} + +func (m *ColumnDesc) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Info != nil { + l = m.Info.Size() + n += 1 + l + sovLogsmd(uint64(l)) + } + if m.Type != 0 { + n += 1 + sovLogsmd(uint64(m.Type)) + } + return n +} + +func (m *ColumnMetadata) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Pages) > 0 { + for _, e := range m.Pages { + l = e.Size() + n += 1 + l + sovLogsmd(uint64(l)) + } + } + return n +} + +func (m *PageDesc) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Info != nil { + l = m.Info.Size() + n += 1 + l + sovLogsmd(uint64(l)) + } + return n +} + +func sovLogsmd(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozLogsmd(x uint64) (n int) { + return sovLogsmd(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *Metadata) String() string { + if this == nil { + return "nil" + } + repeatedStringForColumns := "[]*ColumnDesc{" + for _, f := range this.Columns { + repeatedStringForColumns += strings.Replace(f.String(), "ColumnDesc", "ColumnDesc", 1) + "," + } + repeatedStringForColumns += "}" + s := strings.Join([]string{`&Metadata{`, + `Columns:` + repeatedStringForColumns + `,`, + `}`, + }, "") + return s +} +func (this *ColumnDesc) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ColumnDesc{`, + `Info:` + strings.Replace(fmt.Sprintf("%v", this.Info), "ColumnInfo", "datasetmd.ColumnInfo", 1) + `,`, + `Type:` + fmt.Sprintf("%v", this.Type) + `,`, + `}`, + }, "") + return s +} +func (this *ColumnMetadata) String() string { + if this == nil { + return "nil" + } + repeatedStringForPages := "[]*PageDesc{" + for _, f := range this.Pages { + repeatedStringForPages += strings.Replace(f.String(), "PageDesc", "PageDesc", 1) + "," + } + repeatedStringForPages += "}" + s := strings.Join([]string{`&ColumnMetadata{`, + `Pages:` + repeatedStringForPages + `,`, + `}`, + }, "") + return s +} +func (this *PageDesc) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PageDesc{`, + `Info:` + strings.Replace(fmt.Sprintf("%v", this.Info), "PageInfo", "datasetmd.PageInfo", 1) + `,`, + `}`, + }, "") + return s +} +func valueToStringLogsmd(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *Metadata) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Metadata: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Metadata: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Columns", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogsmd + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogsmd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Columns = append(m.Columns, &ColumnDesc{}) + if err := m.Columns[len(m.Columns)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogsmd(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogsmd + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogsmd + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ColumnDesc) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ColumnDesc: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ColumnDesc: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Info", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogsmd + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogsmd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Info == nil { + m.Info = &datasetmd.ColumnInfo{} + } + if err := m.Info.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= ColumnType(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipLogsmd(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogsmd + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogsmd + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ColumnMetadata) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ColumnMetadata: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ColumnMetadata: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Pages", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogsmd + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogsmd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Pages = append(m.Pages, &PageDesc{}) + if err := m.Pages[len(m.Pages)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogsmd(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogsmd + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogsmd + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PageDesc) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PageDesc: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PageDesc: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Info", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogsmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthLogsmd + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthLogsmd + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Info == nil { + m.Info = &datasetmd.PageInfo{} + } + if err := m.Info.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipLogsmd(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthLogsmd + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthLogsmd + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipLogsmd(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLogsmd + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLogsmd + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLogsmd + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthLogsmd + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthLogsmd + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowLogsmd + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipLogsmd(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthLogsmd + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthLogsmd = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowLogsmd = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/dataobj/internal/metadata/logsmd/logsmd.proto b/pkg/dataobj/internal/metadata/logsmd/logsmd.proto new file mode 100644 index 0000000000000..467546d8ffd02 --- /dev/null +++ b/pkg/dataobj/internal/metadata/logsmd/logsmd.proto @@ -0,0 +1,57 @@ +// logsmd.proto holds metadata for the logs section of a data object. The logs +// section contains a series of logs records across multiple streams. +syntax = "proto3"; + +package dataobj.metadata.logs.v1; + +import "pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto"; + +option go_package = "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"; + +// Metadata describes the metadata for the logs section. +message Metadata { + // Columns within the logs. + repeated ColumnDesc columns = 1; +} + +// ColumnDesc describes an individual column within the logs table. +message ColumnDesc { + // Information about the column. + dataobj.metadata.dataset.v1.ColumnInfo info = 1; + + // Column type. + ColumnType type = 2; +} + +// ColumnType represents the valid types that a logs column can have. +enum ColumnType { + // Invalid column type. + COLUMN_TYPE_UNSPECIFIED = 0; + + // COLUMN_TYPE_STREAM_ID is a column containing the stream the log record + // belongs to. + COLUMN_TYPE_STREAM_ID = 1; + + // COLUMN_TYPE_TIMESTAMP is a column containing the timestamp of the log + // record. + COLUMN_TYPE_TIMESTAMP = 2; + + // COLUMN_TYPE_METADATA is a column containing structured metadata values for + // a specific key. + COLUMN_TYPE_METADATA = 3; + + // COLUMN_TYPE_MESSAGE is a column containing the message of the log record. + COLUMN_TYPE_MESSAGE = 4; +} + +// ColumnMetadata describes the metadata for a column. +message ColumnMetadata { + // Pages within the column. + repeated PageDesc pages = 1; +} + +// PageDesc describes an individual page within a column. +message PageDesc { + // Information about the page. + dataobj.metadata.dataset.v1.PageInfo info = 1; +}