Skip to content

Commit

Permalink
chore(dataobj): add Dataset with iteration and sorting support (#15652)
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto authored Jan 9, 2025
1 parent e4543ed commit c581218
Show file tree
Hide file tree
Showing 7 changed files with 602 additions and 1 deletion.
36 changes: 35 additions & 1 deletion pkg/dataobj/internal/dataset/column.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package dataset

import "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
import (
"context"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// Helper types.
type (
Expand All @@ -18,8 +23,37 @@ type (
}
)

// A Column represents a sequence of values within a dataset. Columns are split
// up across one or more [Page]s to limit the amount of memory needed to read a
// portion of the column at a time.
type Column interface {
// ColumnInfo returns the metadata for the Column.
ColumnInfo() *ColumnInfo

// ListPages returns the set of ordered pages in the column.
ListPages(ctx context.Context) result.Seq[Page]
}

// MemColumn holds a set of pages of a common type.
type MemColumn struct {
Info ColumnInfo // Information about the column.
Pages []*MemPage // The set of pages in the column.
}

var _ Column = (*MemColumn)(nil)

// ColumnInfo implements [Column] and returns c.Info.
func (c *MemColumn) ColumnInfo() *ColumnInfo { return &c.Info }

// ListPages implements [Column] and iterates through c.Pages.
func (c *MemColumn) ListPages(_ context.Context) result.Seq[Page] {
return result.Iter(func(yield func(Page) bool) error {
for _, p := range c.Pages {
if !yield(p) {
return nil
}
}

return nil
})
}
79 changes: 79 additions & 0 deletions pkg/dataobj/internal/dataset/dataset.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,82 @@
// Package dataset contains utilities for working with datasets. Datasets hold
// columnar data across multiple pages.
package dataset

import (
"context"

"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// A Dataset holds a collection of [Columns], each of which is split into a set
// of [Pages] and further split into a sequence of [Values].
//
// Dataset is read-only; callers must not modify any of the values returned by
// methods in Dataset.
type Dataset interface {
// ListColumns returns the set of [Column]s in the Dataset. The order of
// Columns in the returned sequence must be consistent across calls.
ListColumns(ctx context.Context) result.Seq[Column]

// ListPages retrieves a set of [Pages] given a list of [Column]s.
// Implementations of Dataset may use ListPages to optimize for batch reads.
// The order of [Pages] in the returned sequence must match the order of the
// columns argument.
ListPages(ctx context.Context, columns []Column) result.Seq[Pages]

// ReadPages returns the set of [PageData] for the specified slice of pages.
// Implementations of Dataset may use ReadPages to optimize for batch reads.
// The order of [PageData] in the returned sequence must match the order of
// the pages argument.
ReadPages(ctx context.Context, pages []Page) result.Seq[PageData]
}

// FromMemory returns an in-memory [Dataset] from the given list of
// [MemColumn]s.
func FromMemory(columns []*MemColumn) Dataset {
return memDataset(columns)
}

type memDataset []*MemColumn

func (d memDataset) ListColumns(_ context.Context) result.Seq[Column] {
return result.Iter(func(yield func(Column) bool) error {
for _, c := range d {
if !yield(c) {
return nil
}
}

return nil
})
}

func (d memDataset) ListPages(ctx context.Context, columns []Column) result.Seq[Pages] {
return result.Iter(func(yield func(Pages) bool) error {
for _, c := range columns {
pages, err := result.Collect(c.ListPages(ctx))
if err != nil {
return err
} else if !yield(Pages(pages)) {
return nil
}
}

return nil
})
}

func (d memDataset) ReadPages(ctx context.Context, pages []Page) result.Seq[PageData] {
return result.Iter(func(yield func(PageData) bool) error {
for _, p := range pages {
data, err := p.ReadPage(ctx)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}
112 changes: 112 additions & 0 deletions pkg/dataobj/internal/dataset/dataset_iter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package dataset

import (
"context"
"math"

"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// Iter iterates over the rows for the given list of columns. Each [Row] in the
// returned sequence will only contain values for the columns matching the
// columns argument. Values in each row match the order of the columns argument
// slice.
//
// Iter lazily fetches pages as needed.
func Iter(ctx context.Context, columns []Column) result.Seq[Row] {
// TODO(rfratto): Iter is insufficient for reading at scale:
//
// * Pages are lazily fetched one at a time, which would cause many round
// trips to the underlying storage.
//
// * There's no support for filtering at the page or row level, meaning we
// may overfetch data.
//
// The current implementation is acceptable only for in-memory Datasets. A
// more efficient implementation is needed for reading Datasets backed by
// object storage.

totalRows := math.MinInt64
for _, col := range columns {
totalRows = max(totalRows, col.ColumnInfo().RowsCount)
}

type pullColumnIter struct {
Next func() (result.Result[Value], bool)
Stop func()
}

return result.Iter(func(yield func(Row) bool) error {
// Create pull-based iters for all of our columns; this will allow us to
// get one value at a time per column.
pullColumns := make([]pullColumnIter, 0, len(columns))
for _, col := range columns {
pages, err := result.Collect(col.ListPages(ctx))
if err != nil {
return err
}

next, stop := result.Pull(lazyColumnIter(ctx, col.ColumnInfo(), pages))
pullColumns = append(pullColumns, pullColumnIter{Next: next, Stop: stop})
}

// Start emitting rows; each row is composed of the next value from all of
// our columns. If a column ends early, it's left as the zero [Value],
// corresponding to a NULL.
for rowIndex := 0; rowIndex < totalRows; rowIndex++ {
rowValues := make([]Value, len(pullColumns))

for i, column := range pullColumns {
res, ok := column.Next()
value, err := res.Value()
if !ok {
continue
} else if err != nil {
return err
}

rowValues[i] = value
}

row := Row{Index: rowIndex, Values: rowValues}
if !yield(row) {
return nil
}
}

return nil
})
}

func lazyColumnIter(ctx context.Context, column *ColumnInfo, pages []Page) result.Seq[Value] {
return result.Iter(func(yield func(Value) bool) error {
for _, page := range pages {
pageData, err := page.ReadPage(ctx)
if err != nil {
return err
}
memPage := &MemPage{
Info: *page.PageInfo(),
Data: pageData,
}

for result := range iterMemPage(memPage, column.Type, column.Compression) {
val, err := result.Value()
if err != nil {
return err
} else if !yield(val) {
return nil
}
}
}

return nil
})
}

// A Row in a Dataset is a set of values across multiple columns with the same
// row number.
type Row struct {
Index int // Index of the row in the dataset.
Values []Value // Values for the row, one per [Column].
}
Loading

0 comments on commit c581218

Please sign in to comment.