Skip to content

Commit

Permalink
Make Part an interface (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
thorfour authored Dec 4, 2023
1 parent c44f78e commit d3d19cf
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 52 deletions.
12 changes: 6 additions & 6 deletions index/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type LSMMetrics struct {
type LevelConfig struct {
Level SentinelType
MaxSize int64
Compact func([]*parts.Part, ...parts.Option) ([]*parts.Part, int64, int64, error)
Compact func([]parts.Part, ...parts.Option) ([]parts.Part, int64, int64, error)
}

type LSMOption func(*LSM)
Expand Down Expand Up @@ -187,7 +187,7 @@ func (l *LSM) WaitForPendingCompactions() {

// InsertPart inserts a part into the LSM tree. It will be inserted into the correct level. It does not check if the insert should cause a compaction.
// This should only be used during snapshot recovery.
func (l *LSM) InsertPart(level SentinelType, part *parts.Part) {
func (l *LSM) InsertPart(level SentinelType, part parts.Part) {
l.findLevel(level).Prepend(part)
size := l.sizes[level].Add(int64(part.Size()))
l.metrics.LevelSize.WithLabelValues(level.String()).Set(float64(size))
Expand Down Expand Up @@ -304,7 +304,7 @@ func (l *LSM) EnsureCompaction() error {
return l.compact(true /* ignoreSizes */)
}

func (l *LSM) Rotate(level SentinelType, externalWriter func([]*parts.Part) (*parts.Part, int64, int64, error)) error {
func (l *LSM) Rotate(level SentinelType, externalWriter func([]parts.Part) (parts.Part, int64, int64, error)) error {
for !l.compacting.CompareAndSwap(false, true) { // TODO: should backoff retry this probably
// Satisfy linter with a statement.
continue
Expand All @@ -325,7 +325,7 @@ func (l *LSM) Rotate(level SentinelType, externalWriter func([]*parts.Part) (*pa

// Merge will merge the given level into an arrow record for the next level using the configured Compact function for the given level.
// If this is the max level of the LSM an external writer must be provided to write the merged part elsewhere.
func (l *LSM) merge(level SentinelType, externalWriter func([]*parts.Part) (*parts.Part, int64, int64, error)) error {
func (l *LSM) merge(level SentinelType, externalWriter func([]parts.Part) (parts.Part, int64, int64, error)) error {
if int(level) > len(l.configs) {
return fmt.Errorf("level %d does not exist", level)
}
Expand Down Expand Up @@ -364,9 +364,9 @@ func (l *LSM) merge(level SentinelType, externalWriter func([]*parts.Part) (*par

var size int64
var compactedSize int64
var compacted []*parts.Part
var compacted []parts.Part
var err error
mergeList := make([]*parts.Part, 0, len(nodeList))
mergeList := make([]parts.Part, 0, len(nodeList))
for _, node := range nodeList {
mergeList = append(mergeList, node.part)
}
Expand Down
6 changes: 3 additions & 3 deletions index/lsm_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func (s SentinelType) String() string {
// Node is a Part that is a part of a linked-list.
type Node struct {
next atomic.Pointer[Node]
part *parts.Part
part parts.Part

sentinel SentinelType // sentinel nodes contain no parts, and are to indicate the start of a new sub list
}

func (n *Node) Part() *parts.Part {
func (n *Node) Part() parts.Part {
return n.part
}

Expand Down Expand Up @@ -69,7 +69,7 @@ func (n *Node) Sentinel(s SentinelType) *Node {
}

// Prepend a node onto the front of the list.
func (n *Node) Prepend(part *parts.Part) *Node {
func (n *Node) Prepend(part parts.Part) *Node {
return n.prepend(&Node{
part: part,
})
Expand Down
6 changes: 3 additions & 3 deletions index/lsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/polarsignals/frostdb/parts"
)

func parquetCompaction(compact []*parts.Part, _ ...parts.Option) ([]*parts.Part, int64, int64, error) {
func parquetCompaction(compact []parts.Part, _ ...parts.Option) ([]parts.Part, int64, int64, error) {
b := &bytes.Buffer{}
size, err := compactParts(b, compact)
if err != nil {
Expand All @@ -27,10 +27,10 @@ func parquetCompaction(compact []*parts.Part, _ ...parts.Option) ([]*parts.Part,
if err != nil {
return nil, 0, 0, err
}
return []*parts.Part{parts.NewPart(0, buf)}, size, int64(b.Len()), nil
return []parts.Part{parts.NewPart(0, buf)}, size, int64(b.Len()), nil
}

func compactParts(w io.Writer, compact []*parts.Part) (int64, error) {
func compactParts(w io.Writer, compact []parts.Part) (int64, error) {
schema := dynparquet.NewSampleSchema()
bufs := []dynparquet.DynamicRowGroup{}
var size int64
Expand Down
73 changes: 44 additions & 29 deletions parts/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,22 @@ import (
"github.com/polarsignals/frostdb/pqarrow"
)

type Part struct {
type Part interface {
// Record returns the Arrow record for the part. If the part is not an Arrow
// record part, nil is returned.
Record() arrow.Record
SerializeBuffer(schema *dynparquet.Schema, w dynparquet.ParquetWriter) error
AsSerializedBuffer(schema *dynparquet.Schema) (*dynparquet.SerializedBuffer, error)
NumRows() int64
Size() int64
CompactionLevel() int
TX() uint64
Least() (*dynparquet.DynamicRow, error)
Most() (*dynparquet.DynamicRow, error)
OverlapsWith(schema *dynparquet.Schema, otherPart Part) (bool, error)
}

type part struct {
buf *dynparquet.SerializedBuffer
record arrow.Record
// record relative size is how many bytes are roughly attributed to this record.
Expand All @@ -30,19 +45,19 @@ type Part struct {
maxRow *dynparquet.DynamicRow
}

func (p *Part) Record() arrow.Record {
func (p *part) Record() arrow.Record {
return p.record
}

func (p *Part) SerializeBuffer(schema *dynparquet.Schema, w dynparquet.ParquetWriter) error {
func (p *part) SerializeBuffer(schema *dynparquet.Schema, w dynparquet.ParquetWriter) error {
if p.record == nil {
return fmt.Errorf("not a record part")
}

return pqarrow.RecordToFile(schema, w, p.record)
}

func (p *Part) AsSerializedBuffer(schema *dynparquet.Schema) (*dynparquet.SerializedBuffer, error) {
func (p *part) AsSerializedBuffer(schema *dynparquet.Schema) (*dynparquet.SerializedBuffer, error) {
if p.buf != nil {
return p.buf, nil
}
Expand Down Expand Up @@ -72,17 +87,17 @@ func (p *Part) AsSerializedBuffer(schema *dynparquet.Schema) (*dynparquet.Serial
return buf, nil
}

type Option func(*Part)
type Option func(*part)

func WithCompactionLevel(level int) Option {
return func(p *Part) {
return func(p *part) {
p.compactionLevel = level
}
}

// NewArrowPart returns a new Arrow part.
func NewArrowPart(tx uint64, record arrow.Record, size int, schema *dynparquet.Schema, options ...Option) *Part {
p := &Part{
func NewArrowPart(tx uint64, record arrow.Record, size int, schema *dynparquet.Schema, options ...Option) Part {
p := &part{
tx: tx,
record: record,
recordRelativeSize: size,
Expand All @@ -96,8 +111,8 @@ func NewArrowPart(tx uint64, record arrow.Record, size int, schema *dynparquet.S
return p
}

func NewPart(tx uint64, buf *dynparquet.SerializedBuffer, options ...Option) *Part {
p := &Part{
func NewPart(tx uint64, buf *dynparquet.SerializedBuffer, options ...Option) Part {
p := &part{
tx: tx,
buf: buf,
}
Expand All @@ -109,31 +124,31 @@ func NewPart(tx uint64, buf *dynparquet.SerializedBuffer, options ...Option) *Pa
return p
}

func (p *Part) NumRows() int64 {
func (p *part) NumRows() int64 {
if p.buf != nil {
return p.buf.NumRows()
}

return p.record.NumRows()
}

func (p *Part) Size() int64 {
func (p *part) Size() int64 {
if p.buf != nil {
return p.buf.ParquetFile().Size()
}

return int64(p.recordRelativeSize)
}

func (p *Part) CompactionLevel() int {
func (p *part) CompactionLevel() int {
return p.compactionLevel
}

// TX returns the transaction id for the part.
func (p *Part) TX() uint64 { return p.tx }
func (p *part) TX() uint64 { return p.tx }

// Least returns the least row in the part.
func (p *Part) Least() (*dynparquet.DynamicRow, error) {
func (p *part) Least() (*dynparquet.DynamicRow, error) {
if p.minRow != nil {
return p.minRow, nil
}
Expand Down Expand Up @@ -169,7 +184,7 @@ func (p *Part) Least() (*dynparquet.DynamicRow, error) {
return p.minRow, nil
}

func (p *Part) most() (*dynparquet.DynamicRow, error) {
func (p *part) Most() (*dynparquet.DynamicRow, error) {
if p.maxRow != nil {
return p.maxRow, nil
}
Expand Down Expand Up @@ -210,20 +225,20 @@ func (p *Part) most() (*dynparquet.DynamicRow, error) {
return p.maxRow, nil
}

func (p *Part) OverlapsWith(schema *dynparquet.Schema, otherPart *Part) (bool, error) {
func (p *part) OverlapsWith(schema *dynparquet.Schema, otherPart Part) (bool, error) {
a, err := p.Least()
if err != nil {
return false, err
}
b, err := p.most()
b, err := p.Most()
if err != nil {
return false, err
}
c, err := otherPart.Least()
if err != nil {
return false, err
}
d, err := otherPart.most()
d, err := otherPart.Most()
if err != nil {
return false, err
}
Expand All @@ -234,23 +249,23 @@ func (p *Part) OverlapsWith(schema *dynparquet.Schema, otherPart *Part) (bool, e
// Tombstone marks all the parts with the max tx id to ensure they aren't
// included in reads. Tombstoned parts will be eventually be dropped from the
// database during compaction.
func Tombstone(parts []*Part) {
func Tombstone(parts []*part) {
for _, part := range parts {
part.tx = math.MaxUint64
}
}

func (p *Part) HasTombstone() bool {
func (p *part) HasTombstone() bool {
return p.tx == math.MaxUint64
}

type PartSorter struct {
schema *dynparquet.Schema
parts []*Part
parts []Part
err error
}

func NewPartSorter(schema *dynparquet.Schema, parts []*Part) *PartSorter {
func NewPartSorter(schema *dynparquet.Schema, parts []Part) *PartSorter {
return &PartSorter{
schema: schema,
parts: parts,
Expand Down Expand Up @@ -288,7 +303,7 @@ func (p *PartSorter) Err() error {
// The function returns the non-overlapping parts first and any overlapping
// parts second. The parts returned are in sorted order according to their Least
// row.
func FindMaximumNonOverlappingSet(schema *dynparquet.Schema, parts []*Part) ([]*Part, []*Part, error) {
func FindMaximumNonOverlappingSet(schema *dynparquet.Schema, parts []Part) ([]Part, []Part, error) {
if len(parts) < 2 {
return parts, nil, nil
}
Expand All @@ -300,19 +315,19 @@ func FindMaximumNonOverlappingSet(schema *dynparquet.Schema, parts []*Part) ([]*

// Parts are now sorted according to their Least row.
prev := 0
prevEnd, err := parts[0].most()
prevEnd, err := parts[0].Most()
if err != nil {
return nil, nil, err
}
nonOverlapping := make([]*Part, 0, len(parts))
overlapping := make([]*Part, 0, len(parts))
var missing *Part
nonOverlapping := make([]Part, 0, len(parts))
overlapping := make([]Part, 0, len(parts))
var missing Part
for i := 1; i < len(parts); i++ {
start, err := parts[i].Least()
if err != nil {
return nil, nil, err
}
curEnd, err := parts[i].most()
curEnd, err := parts[i].Most()
if err != nil {
return nil, nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions parts/part_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestFindMaximumNonOverlappingSet(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
parts := make([]*Part, len(tc.ranges))
parts := make([]Part, len(tc.ranges))
for i := range parts {
start := dataModel{Ints: tc.ranges[i].start}
end := dataModel{Ints: tc.ranges[i].end}
Expand All @@ -97,13 +97,13 @@ func TestFindMaximumNonOverlappingSet(t *testing.T) {
nonOverlapping, overlapping, err := FindMaximumNonOverlappingSet(testSchema, parts)
require.NoError(t, err)

verify := func(t *testing.T, expected []rng, actual []*Part) {
verify := func(t *testing.T, expected []rng, actual []Part) {
t.Helper()
require.Len(t, actual, len(expected))
for i := range actual {
start, err := actual[i].Least()
require.NoError(t, err)
end, err := actual[i].most()
end, err := actual[i].Most()
require.NoError(t, err)
require.Equal(t, expected[i].start, start.Row[0].Int64())
require.Equal(t, expected[i].end, end.Row[0].Int64())
Expand Down
2 changes: 1 addition & 1 deletion snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func loadSnapshot(ctx context.Context, db *DB, r io.ReaderAt, size int64) ([]byt
table.mtx.Unlock()

for _, granuleMeta := range tableMeta.GranuleMetadata {
resultParts := make([]*parts.Part, 0, len(granuleMeta.PartMetadata))
resultParts := make([]parts.Part, 0, len(granuleMeta.PartMetadata))
for _, partMeta := range granuleMeta.PartMetadata {
if err := ctx.Err(); err != nil {
return err
Expand Down
14 changes: 7 additions & 7 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ func (t *Table) configureLSMLevels(levels []*index.LevelConfig) []*index.LevelCo
return config
}

func (t *Table) parquetCompaction(compact []*parts.Part, options ...parts.Option) ([]*parts.Part, int64, int64, error) {
func (t *Table) parquetCompaction(compact []parts.Part, options ...parts.Option) ([]parts.Part, int64, int64, error) {
var (
buf *dynparquet.SerializedBuffer
preCompactionSize, postCompactionSize int64
Expand Down Expand Up @@ -1144,11 +1144,11 @@ func (t *Table) parquetCompaction(compact []*parts.Part, options ...parts.Option
postCompactionSize = buf.ParquetFile().Size()
}

return []*parts.Part{parts.NewPart(0, buf, options...)}, preCompactionSize, postCompactionSize, nil
return []parts.Part{parts.NewPart(0, buf, options...)}, preCompactionSize, postCompactionSize, nil
}

func (t *Table) externalParquetCompaction(writer io.Writer) func(compact []*parts.Part) (*parts.Part, int64, int64, error) {
return func(compact []*parts.Part) (*parts.Part, int64, int64, error) {
func (t *Table) externalParquetCompaction(writer io.Writer) func(compact []parts.Part) (parts.Part, int64, int64, error) {
return func(compact []parts.Part) (parts.Part, int64, int64, error) {
size, err := t.compactParts(writer, compact)
if err != nil {
return nil, 0, 0, err
Expand All @@ -1160,7 +1160,7 @@ func (t *Table) externalParquetCompaction(writer io.Writer) func(compact []*part

// compactParts will compact the given parts into a Parquet file written to w.
// It returns the size in bytes of the compacted parts.
func (t *Table) compactParts(w io.Writer, compact []*parts.Part) (int64, error) {
func (t *Table) compactParts(w io.Writer, compact []parts.Part) (int64, error) {
preCompactionSize := int64(0)
for _, p := range compact {
preCompactionSize += p.Size()
Expand Down Expand Up @@ -1237,7 +1237,7 @@ func (t *Table) compactParts(w io.Writer, compact []*parts.Part) (int64, error)
// the minimum slice of dynamic row groups to be merged together for compaction.
// If nil, nil is returned, the resulting serialized buffer is written directly
// to w as an optimization.
func (t *Table) buffersForCompaction(w io.Writer, inputParts []*parts.Part) ([]dynparquet.DynamicRowGroup, error) {
func (t *Table) buffersForCompaction(w io.Writer, inputParts []parts.Part) ([]dynparquet.DynamicRowGroup, error) {
nonOverlappingParts, overlappingParts, err := parts.FindMaximumNonOverlappingSet(t.schema, inputParts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1335,7 +1335,7 @@ func (t *Table) writeRecordsToParquet(w io.Writer, records []arrow.Record, sortI
// least one non-arrow part is found, nil, nil is returned in which case, the
// caller should fall back to normal compaction. On success, the caller is
// responsible for releasing the returned records.
func (t *Table) distinctRecordsForCompaction(compact []*parts.Part) ([]arrow.Record, error) {
func (t *Table) distinctRecordsForCompaction(compact []parts.Part) ([]arrow.Record, error) {
sortingCols := t.schema.SortingColumns()
columnExprs := make([]logicalplan.Expr, 0, len(sortingCols))
for _, col := range sortingCols {
Expand Down

0 comments on commit d3d19cf

Please sign in to comment.