Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compaction without tombstone #33

Merged
merged 12 commits into from
Nov 7, 2024
14 changes: 9 additions & 5 deletions simpledb/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package simpledb

import (
"errors"
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
"github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
"log"
"os"
"path/filepath"
"sort"
"strings"
"time"

rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
"github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
)

func backgroundCompaction(db *DB) {
Expand Down Expand Up @@ -115,7 +116,10 @@ func executeCompaction(db *DB) (compactionMetadata *proto.CompactionMetadata, er
}
}()

// TODO(thomas): this includes tombstones, do we really need to keep them?
// we need to keep it if the sstable is not the first with this key.
// SS1(KEY1=toto) SS2(KEY2=deleted) in this case the KEY2 can be removed in SS2
// SS1(KEY2=toto) SS2(KEY2=deleted) in this case the KEY2 can't be removed in SS2

err = sstables.NewSSTableMerger(db.cmp).MergeCompact(iterators, writer, sstables.ScanReduceLatestWins)
if err != nil {
return nil, err
Expand Down
68 changes: 67 additions & 1 deletion simpledb/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package simpledb

import (
"fmt"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/thomasjungblut/go-sstables/memstore"
"github.com/thomasjungblut/go-sstables/sstables"
"github.com/thomasjungblut/go-sstables/sstables/proto"
"testing"
)

func TestExecCompactionLessFilesThanExpected(t *testing.T) {
Expand Down Expand Up @@ -40,10 +45,71 @@ func TestExecCompactionSameContent(t *testing.T) {
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)

// Where is the corresponding Put ???
sebheitzmann marked this conversation as resolved.
Show resolved Hide resolved
v, err := db.Get("hello")
assert.Nil(t, err)
assert.Equal(t, "world", v)

// for cleanups
assert.Nil(t, db.sstableManager.currentReader.Close())
}

func writeSSTableWithDataInDatabaseFolder(t *testing.T, db *DB, p string) {
fakeTablePath := filepath.Join(db.basePath, p)
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
mStore := memstore.NewMemStore()
for i := 0; i < 1000; i++ {
assert.Nil(t, mStore.Add([]byte(fmt.Sprintf("%d", i)), []byte(fmt.Sprintf("%d", i))))
}
assert.Nil(t, mStore.Flush(
sstables.WriteBasePath(fakeTablePath),
sstables.WithKeyComparator(db.cmp),
))
}

func writeSSTableWithTombstoneInDatabaseFolder(t *testing.T, db *DB, p string) {
fakeTablePath := filepath.Join(db.basePath, p)
assert.Nil(t, os.MkdirAll(fakeTablePath, 0700))
mStore := memstore.NewMemStore()

// delete all key between 500 and 800
for i := 500; i < 800; i++ {
assert.Nil(t, mStore.Tombstone([]byte(fmt.Sprintf("%d", i))))
}
assert.Nil(t, mStore.FlushWithTombstones(
sstables.WriteBasePath(fakeTablePath),
sstables.WithKeyComparator(db.cmp),
))
}

func TestExecCompactionWithTombstone(t *testing.T) {
db := newOpenedSimpleDB(t, "simpledb_compactionSameContent")
defer cleanDatabaseFolder(t, db)
// we'll close the database to mock some internals directly, yes it's very hacky
closeDatabase(t, db)
db.closed = false
db.compactionThreshold = 0

writeSSTableWithDataInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 42))
// only one SStable with holes should shrink
writeSSTableWithTombstoneInDatabaseFolder(t, db, fmt.Sprintf(SSTablePattern, 43))
assert.Nil(t, db.reconstructSSTables())
// 1000 initial + 300 Tombstone on second table
assert.Equal(t, 1300, int(db.sstableManager.currentSSTable().MetaData().GetNumRecords()))

compactionMeta, err := executeCompaction(db)
assert.Nil(t, err)
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)
fmt.Print(compactionMeta)
err = db.sstableManager.reflectCompactionResult(compactionMeta)
assert.NoError(t, err)
v, err := db.Get("512")
assert.ErrorIs(t, err, ErrNotFound)
assert.Equal(t, "", v)
// for cleanups
assert.Nil(t, db.sstableManager.currentReader.Close())

// check size of compacted sstable
sebheitzmann marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, 700, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
}
5 changes: 3 additions & 2 deletions simpledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type DatabaseI interface {
}

type compactionAction struct {
pathsToCompact []string
totalRecords uint64
pathsToCompact []string
totalRecords uint64
canRemoveTombstone bool // if the compaction don't start from the first sstable we cannot remove tombstone
}

type memStoreFlushAction struct {
Expand Down
18 changes: 12 additions & 6 deletions simpledb/sstable_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package simpledb

import (
"fmt"
"github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
"golang.org/x/exp/slices"
"os"
"path/filepath"
"sort"
"sync"

"github.com/thomasjungblut/go-sstables/simpledb/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables"
"golang.org/x/exp/slices"
)

type SSTableManager struct {
Expand Down Expand Up @@ -117,21 +118,26 @@ func (s *SSTableManager) candidateTablesForCompaction(compactionMaxSizeBytes uin
defer s.managerLock.RUnlock()

numRecords := uint64(0)
canRemoveTombstone := false
var paths []string
for i := 0; i < len(s.allSSTableReaders); i++ {
reader := s.allSSTableReaders[i]
// avoid the EmptySStableReader (or empty files) and only include small enough SSTables
if reader.MetaData().NumRecords > 0 && reader.MetaData().TotalBytes < compactionMaxSizeBytes {
paths = append(paths, reader.BasePath())
numRecords += reader.MetaData().NumRecords
if i == 0 {
canRemoveTombstone = true
}
}
}

sort.Strings(paths)

return compactionAction{
pathsToCompact: paths,
totalRecords: numRecords,
pathsToCompact: paths,
totalRecords: numRecords,
canRemoveTombstone: canRemoveTombstone,
}
}

Expand Down
1 change: 1 addition & 0 deletions sstables/proto/sstable.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ message MetaData {
uint64 totalBytes = 6;
uint32 version = 7; // currently version 1, the default is version 0 with protos as values
uint64 skippedRecords = 8;
uint64 tombstonedRecords = 9;
}
10 changes: 7 additions & 3 deletions sstables/super_sstable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package sstables

import (
"errors"
"strings"

"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables/proto"
"strings"
)

// SuperSSTableReader unifies several sstables under one single reader with the same interface.
Expand Down Expand Up @@ -108,8 +109,11 @@ func ScanReduceLatestWins(key []byte, values [][]byte, context []int) ([]byte, [
maxCtxIndex = i
}
}

return key, values[maxCtxIndex]
val := values[maxCtxIndex]
if len(val) == 0 {
sebheitzmann marked this conversation as resolved.
Show resolved Hide resolved
return nil, nil
}
return key, val
}

func (s SuperSSTableReader) Close() (err error) {
Expand Down