From c581218c4f9d0e35038cc1b1af57d554a88b5a12 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Thu, 9 Jan 2025 11:09:26 -0500 Subject: [PATCH] chore(dataobj): add Dataset with iteration and sorting support (#15652) --- pkg/dataobj/internal/dataset/column.go | 36 +++- pkg/dataobj/internal/dataset/dataset.go | 79 ++++++++ pkg/dataobj/internal/dataset/dataset_iter.go | 112 ++++++++++++ pkg/dataobj/internal/dataset/dataset_sort.go | 173 ++++++++++++++++++ .../internal/dataset/dataset_sort_test.go | 143 +++++++++++++++ pkg/dataobj/internal/dataset/page.go | 26 +++ pkg/dataobj/internal/dataset/value.go | 34 ++++ 7 files changed, 602 insertions(+), 1 deletion(-) create mode 100644 pkg/dataobj/internal/dataset/dataset_iter.go create mode 100644 pkg/dataobj/internal/dataset/dataset_sort.go create mode 100644 pkg/dataobj/internal/dataset/dataset_sort_test.go diff --git a/pkg/dataobj/internal/dataset/column.go b/pkg/dataobj/internal/dataset/column.go index 73c63c3512984..0243ad3cd49b3 100644 --- a/pkg/dataobj/internal/dataset/column.go +++ b/pkg/dataobj/internal/dataset/column.go @@ -1,6 +1,11 @@ package dataset -import "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" +import ( + "context" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) // Helper types. type ( @@ -18,8 +23,37 @@ type ( } ) +// A Column represents a sequence of values within a dataset. Columns are split +// up across one or more [Page]s to limit the amount of memory needed to read a +// portion of the column at a time. +type Column interface { + // ColumnInfo returns the metadata for the Column. + ColumnInfo() *ColumnInfo + + // ListPages returns the set of ordered pages in the column. + ListPages(ctx context.Context) result.Seq[Page] +} + // MemColumn holds a set of pages of a common type. type MemColumn struct { Info ColumnInfo // Information about the column. Pages []*MemPage // The set of pages in the column. } + +var _ Column = (*MemColumn)(nil) + +// ColumnInfo implements [Column] and returns c.Info. +func (c *MemColumn) ColumnInfo() *ColumnInfo { return &c.Info } + +// ListPages implements [Column] and iterates through c.Pages. +func (c *MemColumn) ListPages(_ context.Context) result.Seq[Page] { + return result.Iter(func(yield func(Page) bool) error { + for _, p := range c.Pages { + if !yield(p) { + return nil + } + } + + return nil + }) +} diff --git a/pkg/dataobj/internal/dataset/dataset.go b/pkg/dataobj/internal/dataset/dataset.go index 6c1cbd2402bb8..9adbcbb7676c0 100644 --- a/pkg/dataobj/internal/dataset/dataset.go +++ b/pkg/dataobj/internal/dataset/dataset.go @@ -1,3 +1,82 @@ // Package dataset contains utilities for working with datasets. Datasets hold // columnar data across multiple pages. package dataset + +import ( + "context" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +// A Dataset holds a collection of [Columns], each of which is split into a set +// of [Pages] and further split into a sequence of [Values]. +// +// Dataset is read-only; callers must not modify any of the values returned by +// methods in Dataset. +type Dataset interface { + // ListColumns returns the set of [Column]s in the Dataset. The order of + // Columns in the returned sequence must be consistent across calls. + ListColumns(ctx context.Context) result.Seq[Column] + + // ListPages retrieves a set of [Pages] given a list of [Column]s. + // Implementations of Dataset may use ListPages to optimize for batch reads. + // The order of [Pages] in the returned sequence must match the order of the + // columns argument. + ListPages(ctx context.Context, columns []Column) result.Seq[Pages] + + // ReadPages returns the set of [PageData] for the specified slice of pages. + // Implementations of Dataset may use ReadPages to optimize for batch reads. + // The order of [PageData] in the returned sequence must match the order of + // the pages argument. + ReadPages(ctx context.Context, pages []Page) result.Seq[PageData] +} + +// FromMemory returns an in-memory [Dataset] from the given list of +// [MemColumn]s. +func FromMemory(columns []*MemColumn) Dataset { + return memDataset(columns) +} + +type memDataset []*MemColumn + +func (d memDataset) ListColumns(_ context.Context) result.Seq[Column] { + return result.Iter(func(yield func(Column) bool) error { + for _, c := range d { + if !yield(c) { + return nil + } + } + + return nil + }) +} + +func (d memDataset) ListPages(ctx context.Context, columns []Column) result.Seq[Pages] { + return result.Iter(func(yield func(Pages) bool) error { + for _, c := range columns { + pages, err := result.Collect(c.ListPages(ctx)) + if err != nil { + return err + } else if !yield(Pages(pages)) { + return nil + } + } + + return nil + }) +} + +func (d memDataset) ReadPages(ctx context.Context, pages []Page) result.Seq[PageData] { + return result.Iter(func(yield func(PageData) bool) error { + for _, p := range pages { + data, err := p.ReadPage(ctx) + if err != nil { + return err + } else if !yield(data) { + return nil + } + } + + return nil + }) +} diff --git a/pkg/dataobj/internal/dataset/dataset_iter.go b/pkg/dataobj/internal/dataset/dataset_iter.go new file mode 100644 index 0000000000000..d223b87d1abef --- /dev/null +++ b/pkg/dataobj/internal/dataset/dataset_iter.go @@ -0,0 +1,112 @@ +package dataset + +import ( + "context" + "math" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +// Iter iterates over the rows for the given list of columns. Each [Row] in the +// returned sequence will only contain values for the columns matching the +// columns argument. Values in each row match the order of the columns argument +// slice. +// +// Iter lazily fetches pages as needed. +func Iter(ctx context.Context, columns []Column) result.Seq[Row] { + // TODO(rfratto): Iter is insufficient for reading at scale: + // + // * Pages are lazily fetched one at a time, which would cause many round + // trips to the underlying storage. + // + // * There's no support for filtering at the page or row level, meaning we + // may overfetch data. + // + // The current implementation is acceptable only for in-memory Datasets. A + // more efficient implementation is needed for reading Datasets backed by + // object storage. + + totalRows := math.MinInt64 + for _, col := range columns { + totalRows = max(totalRows, col.ColumnInfo().RowsCount) + } + + type pullColumnIter struct { + Next func() (result.Result[Value], bool) + Stop func() + } + + return result.Iter(func(yield func(Row) bool) error { + // Create pull-based iters for all of our columns; this will allow us to + // get one value at a time per column. + pullColumns := make([]pullColumnIter, 0, len(columns)) + for _, col := range columns { + pages, err := result.Collect(col.ListPages(ctx)) + if err != nil { + return err + } + + next, stop := result.Pull(lazyColumnIter(ctx, col.ColumnInfo(), pages)) + pullColumns = append(pullColumns, pullColumnIter{Next: next, Stop: stop}) + } + + // Start emitting rows; each row is composed of the next value from all of + // our columns. If a column ends early, it's left as the zero [Value], + // corresponding to a NULL. + for rowIndex := 0; rowIndex < totalRows; rowIndex++ { + rowValues := make([]Value, len(pullColumns)) + + for i, column := range pullColumns { + res, ok := column.Next() + value, err := res.Value() + if !ok { + continue + } else if err != nil { + return err + } + + rowValues[i] = value + } + + row := Row{Index: rowIndex, Values: rowValues} + if !yield(row) { + return nil + } + } + + return nil + }) +} + +func lazyColumnIter(ctx context.Context, column *ColumnInfo, pages []Page) result.Seq[Value] { + return result.Iter(func(yield func(Value) bool) error { + for _, page := range pages { + pageData, err := page.ReadPage(ctx) + if err != nil { + return err + } + memPage := &MemPage{ + Info: *page.PageInfo(), + Data: pageData, + } + + for result := range iterMemPage(memPage, column.Type, column.Compression) { + val, err := result.Value() + if err != nil { + return err + } else if !yield(val) { + return nil + } + } + } + + return nil + }) +} + +// A Row in a Dataset is a set of values across multiple columns with the same +// row number. +type Row struct { + Index int // Index of the row in the dataset. + Values []Value // Values for the row, one per [Column]. +} diff --git a/pkg/dataobj/internal/dataset/dataset_sort.go b/pkg/dataobj/internal/dataset/dataset_sort.go new file mode 100644 index 0000000000000..0cda0097145a6 --- /dev/null +++ b/pkg/dataobj/internal/dataset/dataset_sort.go @@ -0,0 +1,173 @@ +package dataset + +import ( + "context" + "fmt" + "slices" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +// Sort returns a new Dataset with rows sorted by the given sortBy columns in +// ascending order. The order of columns in the new Dataset will match the +// order in set. pageSizeHint specifies the page size to target for newly +// created pages. +// +// If sortBy is empty or if the columns in sortBy contain no rows, Sort returns +// set. +func Sort(ctx context.Context, set Dataset, sortBy []Column, pageSizeHint int) (Dataset, error) { + if len(sortBy) == 0 { + return set, nil + } + + // First, we need to know the final sort over of entries in sortBy. Then, we + // sort the rows by sortBy. + var sortedRows []Row + for ent := range Iter(ctx, sortBy) { + row, err := ent.Value() + if err != nil { + return nil, fmt.Errorf("iterating over existing dataset: %w", err) + } else if len(row.Values) != len(sortBy) { + return nil, fmt.Errorf("row column mismatch: expected %d, got %d", len(sortBy), len(row.Values)) + } + sortedRows = append(sortedRows, row) + } + + if len(sortedRows) == 0 { + // Empty dataset; nothing to sort. + return set, nil + } + + // Order sortedRows in the proper order. After this, sortRows[0] correponds + // to what should be the new first row, while sortRows[0].Index tells us what + // its original index (in the unsorted Dataset) was. + slices.SortStableFunc(sortedRows, func(a, b Row) int { + for i, aValue := range a.Values { + if i >= len(b.Values) { + // a.Values and b.Values are always be the same length, but we check + // here anyways to avoid a panic. + break + } + bValue := b.Values[i] + + // We return the first non-zero comparison result. Otherwise, the rows + // are equal (in terms of sortBy). + switch CompareValues(aValue, bValue) { + case -1: + return -1 + case 1: + return 1 + } + } + + return 0 + }) + + origColumns, err := result.Collect(set.ListColumns(ctx)) + if err != nil { + return nil, fmt.Errorf("listing columns: %w", err) + } + + newColumns := make([]*MemColumn, 0, len(origColumns)) + for _, origColumn := range origColumns { + origInfo := origColumn.ColumnInfo() + pages, err := result.Collect(origColumn.ListPages(ctx)) + if err != nil { + return nil, fmt.Errorf("getting pages: %w", err) + } else if len(pages) == 0 { + return nil, fmt.Errorf("unexpected column with no pages") + } + + newColumn, err := NewColumnBuilder(origInfo.Name, BuilderOptions{ + PageSizeHint: pageSizeHint, + Value: origInfo.Type, + Compression: origInfo.Compression, + + // TODO(rfratto): This only works now as all pages have the same + // encoding. If we add support for mixed encoding we'll need to do + // something different here. + Encoding: pages[0].PageInfo().Encoding, + }) + if err != nil { + return nil, fmt.Errorf("creating new column: %w", err) + } + + // newColumn becomes populated in sorted order by populating row 0 from + // sortedRows[0].Index, row 1 from sortedRows[1].Index, and so on. + // + // While sortedRows contains all rows across the columns to sort by, + // columns to sort by have a smaller memory footprint than other columns + // (for example, loading the entire set of timestamps is far smaller than + // the entire set of log lines). + // + // To minimize the memory overhead of sorting, we only collect values from + // a single page at a time. This is more memory-efficient, but comes at the + // cost of more page reads for very unsorted data. + var ( + curPage Page // Current page for iteration. + curPageValues []Value // Values in the current page. + ) + for i, sortRow := range sortedRows { + // To avoid re-reading pages for every row to sort, we cache the most + // recent nextPage and its values. This way we only need to read a + // nextPage once sortRow crosses over to a new nextPage. + // + // If origColumn is shorter than other columns in set, pageForRow will + // return (nil, -1), indicating that the row isn't available in the + // column. In case, we want to append the zero [Value] to denote a NULL. + nextPage, rowInPage := pageForRow(pages, sortRow.Index) + if nextPage != nil && nextPage != curPage { + curPage = nextPage + + data, err := curPage.ReadPage(ctx) + if err != nil { + return nil, fmt.Errorf("getting page data: %w", err) + } + + memPage := &MemPage{ + Info: *curPage.PageInfo(), + Data: data, + } + curPageValues, err = result.Collect(iterMemPage(memPage, origInfo.Type, origInfo.Compression)) + if err != nil { + return nil, fmt.Errorf("reading page: %w", err) + } + } + + var value Value + if rowInPage != -1 { + value = curPageValues[rowInPage] + } + + if err := newColumn.Append(i, value); err != nil { + return nil, fmt.Errorf("appending value: %w", err) + } + } + + memColumn, err := newColumn.Flush() + if err != nil { + return nil, fmt.Errorf("flushing column: %w", err) + } + newColumns = append(newColumns, memColumn) + } + + return FromMemory(newColumns), nil +} + +// pageForRow returns the page that contains the provided column row number +// along with the relative row number for that page. +func pageForRow(pages []Page, row int) (Page, int) { + startRow := 0 + + for _, page := range pages { + info := page.PageInfo() + + if row >= startRow && row < startRow+info.RowCount { + return page, row - startRow + } + + startRow += info.RowCount + } + + return nil, -1 +} diff --git a/pkg/dataobj/internal/dataset/dataset_sort_test.go b/pkg/dataobj/internal/dataset/dataset_sort_test.go new file mode 100644 index 0000000000000..5510931a93dbd --- /dev/null +++ b/pkg/dataobj/internal/dataset/dataset_sort_test.go @@ -0,0 +1,143 @@ +package dataset_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" + "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" + "github.com/grafana/loki/v3/pkg/dataobj/internal/result" +) + +func TestSort(t *testing.T) { + var ( + in = []int64{1, 5, 3, 2, 9, 6, 8, 4, 7} + expect = []int64{1, 2, 3, 4, 5, 6, 7, 8, 9} + ) + + colBuilder, err := dataset.NewColumnBuilder("", dataset.BuilderOptions{ + PageSizeHint: 1, // Generate a ton of pages. + + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + }) + require.NoError(t, err) + + for i, v := range in { + require.NoError(t, colBuilder.Append(i, dataset.Int64Value(v))) + } + col, err := colBuilder.Flush() + require.NoError(t, err) + + dset := dataset.FromMemory([]*dataset.MemColumn{col}) + dset, err = dataset.Sort(context.Background(), dset, []dataset.Column{col}, 1024) + require.NoError(t, err) + + newColumns, err := result.Collect(dset.ListColumns(context.Background())) + require.NoError(t, err) + + var actual []int64 + for entry := range dataset.Iter(context.Background(), newColumns) { + row, err := entry.Value() + require.NoError(t, err) + require.Equal(t, len(actual), row.Index) + require.Len(t, row.Values, 1) + require.Equal(t, datasetmd.VALUE_TYPE_INT64, row.Values[0].Type()) + + actual = append(actual, row.Values[0].Int64()) + } + require.Equal(t, expect, actual) +} + +func TestSort_MultipleFields(t *testing.T) { + type Person struct { + Name string + Age int + Location string + } + people := []Person{ + {"Bob", 25, "New York"}, + {"David", 35, "Chicago"}, + {"Eve", 30, "Los Angeles"}, + {"Alice", 30, "San Francisco"}, + {"Charlie", 40, ""}, // Ensure NULLs are handled properly in sorting. + } + + nameBuilder, err := dataset.NewColumnBuilder("name", dataset.BuilderOptions{ + Value: datasetmd.VALUE_TYPE_STRING, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + }) + require.NoError(t, err) + + ageBuilder, err := dataset.NewColumnBuilder("age", dataset.BuilderOptions{ + Value: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + }) + require.NoError(t, err) + + locationBuilder, err := dataset.NewColumnBuilder("location", dataset.BuilderOptions{ + Value: datasetmd.VALUE_TYPE_STRING, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + }) + require.NoError(t, err) + + for i, p := range people { + require.NoError(t, nameBuilder.Append(i, dataset.StringValue(p.Name))) + require.NoError(t, ageBuilder.Append(i, dataset.Int64Value(int64(p.Age)))) + require.NoError(t, locationBuilder.Append(i, dataset.StringValue(p.Location))) + } + + nameCol, err := nameBuilder.Flush() + require.NoError(t, err) + ageCol, err := ageBuilder.Flush() + require.NoError(t, err) + locationCol, err := locationBuilder.Flush() + require.NoError(t, err) + + dset := dataset.FromMemory([]*dataset.MemColumn{nameCol, ageCol, locationCol}) + dset, err = dataset.Sort(context.Background(), dset, []dataset.Column{ageCol, nameCol}, 1024) + require.NoError(t, err) + + newColumns, err := result.Collect(dset.ListColumns(context.Background())) + require.NoError(t, err) + + expect := []Person{ + {"Bob", 25, "New York"}, + {"Alice", 30, "San Francisco"}, + {"Eve", 30, "Los Angeles"}, + {"David", 35, "Chicago"}, + {"Charlie", 40, ""}, + } + var actual []Person + + for result := range dataset.Iter(context.Background(), newColumns) { + row, err := result.Value() + require.NoError(t, err) + require.Equal(t, len(actual), row.Index) + require.Len(t, row.Values, 3) + + var p Person + + if !row.Values[0].IsNil() { + require.Equal(t, datasetmd.VALUE_TYPE_STRING, row.Values[0].Type()) + p.Name = row.Values[0].String() + } + if !row.Values[1].IsNil() { + require.Equal(t, datasetmd.VALUE_TYPE_INT64, row.Values[1].Type()) + p.Age = int(row.Values[1].Int64()) + } + if !row.Values[2].IsNil() { + require.Equal(t, datasetmd.VALUE_TYPE_STRING, row.Values[2].Type()) + p.Location = row.Values[2].String() + } + + actual = append(actual, p) + } + require.Equal(t, expect, actual) +} diff --git a/pkg/dataobj/internal/dataset/page.go b/pkg/dataobj/internal/dataset/page.go index 31cae969a9c76..6ae52e35d8814 100644 --- a/pkg/dataobj/internal/dataset/page.go +++ b/pkg/dataobj/internal/dataset/page.go @@ -2,6 +2,7 @@ package dataset import ( "bytes" + "context" "encoding/binary" "fmt" "hash/crc32" @@ -37,8 +38,21 @@ type ( Encoding datasetmd.EncodingType // Encoding used for values in the page. Stats *datasetmd.Statistics // Optional statistics for the page. } + + // Pages is a set of [Page]s. + Pages []Page ) +// A Page holds an encoded and optionally compressed sequence of [Value]s +// within a [Column]. +type Page interface { + // PageInfo returns the metadata for the Page. + PageInfo() *PageInfo + + // ReadPage returns the [PageData] for the Page. + ReadPage(ctx context.Context) (PageData, error) +} + // MemPage holds an encoded (and optionally compressed) sequence of [Value] // entries of a common type. Use [ColumnBuilder] to construct sets of pages. type MemPage struct { @@ -46,6 +60,18 @@ type MemPage struct { Data PageData // Data for the page. } +var _ Page = (*MemPage)(nil) + +// PageInfo implements [Page] and returns p.Info. +func (p *MemPage) PageInfo() *PageInfo { + return &p.Info +} + +// ReadPage implements [Page] and returns p.Data. +func (p *MemPage) ReadPage(_ context.Context) (PageData, error) { + return p.Data, nil +} + var checksumTable = crc32.MakeTable(crc32.Castagnoli) // reader returns a reader for decompressed page data. Reader returns an error diff --git a/pkg/dataobj/internal/dataset/value.go b/pkg/dataobj/internal/dataset/value.go index fec2da8f70c24..02b66b9504bf6 100644 --- a/pkg/dataobj/internal/dataset/value.go +++ b/pkg/dataobj/internal/dataset/value.go @@ -1,6 +1,7 @@ package dataset import ( + "cmp" "fmt" "unsafe" @@ -123,3 +124,36 @@ func (v Value) String() string { } return v.Type().String() } + +// CompareValues returns -1 if ab. CompareValues +// panics if a and b are not the same type. +// +// As a special case, either a or b may be nil. Two nil values are equal, and a +// nil value is always less than a non-nil value. +func CompareValues(a, b Value) int { + // nil handling. This must be done before the typecheck since nil has a + // special type. + switch { + case a.IsNil() && !b.IsNil(): + return -1 + case !a.IsNil() && b.IsNil(): + return 1 + case a.IsNil() && b.IsNil(): + return 0 + } + + if a.Type() != b.Type() { + panic(fmt.Sprintf("page.CompareValues: cannot compare values of type %s and %s", a.Type(), b.Type())) + } + + switch a.Type() { + case datasetmd.VALUE_TYPE_INT64: + return cmp.Compare(a.Int64(), b.Int64()) + case datasetmd.VALUE_TYPE_UINT64: + return cmp.Compare(a.Uint64(), b.Uint64()) + case datasetmd.VALUE_TYPE_STRING: + return cmp.Compare(a.String(), b.String()) + default: + panic(fmt.Sprintf("page.CompareValues: unsupported type %s", a.Type())) + } +}