Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

running betteralign on the project code #30

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions memstore/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type MemStoreI interface {
// Size Returns how many elements are in this memstore. This also includes tombstoned keys.
Size() int
}

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem CRLF proof

type ValueStruct struct {
// when deleting, we're simply tomb-stoning the key by setting value = nil, which also saves memory
value *[]byte
Expand All @@ -60,15 +59,14 @@ func (v ValueStruct) GetValue() []byte {
}

type MemStore struct {
comparator skiplist.BytesComparator
skipListMap skiplist.MapI[[]byte, ValueStruct]
estimatedSize uint64
comparator skiplist.BytesComparator
}

func (m *MemStore) Add(key []byte, value []byte) error {
return upsertInternal(m, key, value, true)
}

func (m *MemStore) Contains(key []byte) bool {
element, err := m.skipListMap.Get(key)
// we can return false if we didn't find it by error, or when the key is tomb-stoned
Expand All @@ -80,7 +78,6 @@ func (m *MemStore) Contains(key []byte) bool {
}
return true
}

func (m *MemStore) Get(key []byte) ([]byte, error) {
element, err := m.skipListMap.Get(key)
// we can return false if we didn't find it by error, or when the key is tomb-stoned
Expand All @@ -93,20 +90,16 @@ func (m *MemStore) Get(key []byte) ([]byte, error) {
}
return val, nil
}

func (m *MemStore) Upsert(key []byte, value []byte) error {
return upsertInternal(m, key, value, false)
}

func upsertInternal(m *MemStore, key []byte, value []byte, errorIfKeyExist bool) error {
if key == nil {
return KeyNil
}

if value == nil {
return ValueNil
}

element, err := m.skipListMap.Get(key)
if !errors.Is(err, skiplist.NotFound) {
if *element.value != nil && errorIfKeyExist {
Expand All @@ -121,15 +114,12 @@ func upsertInternal(m *MemStore, key []byte, value []byte, errorIfKeyExist bool)
}
return nil
}

func (m *MemStore) Delete(key []byte) error {
return deleteInternal(m, key, true)
}

func (m *MemStore) DeleteIfExists(key []byte) error {
return deleteInternal(m, key, false)
}

func deleteInternal(m *MemStore, key []byte, errorIfKeyNotFound bool) error {
element, err := m.skipListMap.Get(key)
if errors.Is(err, skiplist.NotFound) {
Expand All @@ -140,10 +130,8 @@ func deleteInternal(m *MemStore, key []byte, errorIfKeyNotFound bool) error {
m.estimatedSize -= uint64(len(*element.value))
*element.value = nil
}

return nil
}

func (m *MemStore) Tombstone(key []byte) error {
element, err := m.skipListMap.Get(key)
if !errors.Is(err, skiplist.NotFound) {
Expand All @@ -158,40 +146,32 @@ func (m *MemStore) Tombstone(key []byte) error {
}
return nil
}

func (m *MemStore) EstimatedSizeInBytes() uint64 {
// we account for ~15% overhead
return uint64(1.15 * float32(m.estimatedSize))
}

func (m *MemStore) Size() int {
return m.skipListMap.Size()
}

func (m *MemStore) Flush(writerOptions ...sstables.WriterOption) error {
return flushMemstore(m, false, writerOptions...)
}

func (m *MemStore) FlushWithTombstones(writerOptions ...sstables.WriterOption) error {
return flushMemstore(m, true, writerOptions...)
}

func flushMemstore(m *MemStore, includeTombstones bool, writerOptions ...sstables.WriterOption) (err error) {
writerOptions = append(writerOptions, sstables.WithKeyComparator(m.comparator))
writer, err := sstables.NewSSTableStreamWriter(writerOptions...)
if err != nil {
return err
}

err = writer.Open()
if err != nil {
return err
}

defer func() {
err = errors.Join(err, writer.Close())
}()

it, _ := m.skipListMap.Iterator()
for {
k, v, err := it.Next()
Expand All @@ -201,7 +181,6 @@ func flushMemstore(m *MemStore, includeTombstones bool, writerOptions ...sstable
if err != nil {
return err
}

if includeTombstones {
if err := writer.WriteNext(k, *v.value); err != nil {
return err
Expand All @@ -215,15 +194,12 @@ func flushMemstore(m *MemStore, includeTombstones bool, writerOptions ...sstable
}
}
}

return nil
}

func (m *MemStore) SStableIterator() sstables.SSTableIteratorI {
it, _ := m.skipListMap.Iterator()
return &SkipListSStableIterator{iterator: it}
}

func NewMemStore() MemStoreI {
cmp := skiplist.BytesComparator{}
return &MemStore{skipListMap: skiplist.NewSkipListMap[[]byte, ValueStruct](cmp), comparator: cmp}
Expand Down
22 changes: 3 additions & 19 deletions pq/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,31 @@ type IteratorWithContext[K any, V any, CTX any] interface {
// Context returns the context to identify the given iterator.
Context() CTX
}

type PriorityQueueI[K any, V any, CTX any] interface {
// Next returns the next key, value and context in sequence.
// Returns Done as the error when the iterator is exhausted.
Next() (K, V, CTX, error)
}

type Element[K any, V any, CTX any] struct {
heapIndex int
key K
value V
iterator IteratorWithContext[K, V, CTX]
heapIndex int
}

type PriorityQueue[K any, V any, CTX any] struct {
size int
heap []*Element[K, V, CTX]
comp skiplist.Comparator[K]
heap []*Element[K, V, CTX]
size int
}

func (pq *PriorityQueue[K, V, CTX]) lessThan(i, j *Element[K, V, CTX]) bool {
return pq.comp.Compare(i.key, j.key) < 0
}

func (pq *PriorityQueue[K, V, CTX]) swap(i, j int) {
pq.heap[i], pq.heap[j] = pq.heap[j], pq.heap[i]
pq.heap[i].heapIndex = i
pq.heap[j].heapIndex = j
}

func (pq *PriorityQueue[K, V, CTX]) init(iterators []IteratorWithContext[K, V, CTX]) error {
// reserve the 0th element for nil, makes it easier to implement the rest of the logic
pq.heap = []*Element[K, V, CTX]{nil}
Expand All @@ -61,13 +56,10 @@ func (pq *PriorityQueue[K, V, CTX]) init(iterators []IteratorWithContext[K, V, C
return fmt.Errorf("INIT couldn't fill next heap entry: %w", err)
}
}

return nil
}

func (pq *PriorityQueue[K, V, CTX]) Next() (_ K, _ V, _ CTX, err error) {
err = Done

if pq.size == 0 {
return
}
Expand All @@ -82,7 +74,6 @@ func (pq *PriorityQueue[K, V, CTX]) Next() (_ K, _ V, _ CTX, err error) {
err = fmt.Errorf("NEXT couldn't fill next heap entry: %w", err)
return
}

// remove the element from the heap completely if its iterator is exhausted
if errors.Is(err, Done) {
// move the root away to the bottom leaf
Expand All @@ -91,13 +82,10 @@ func (pq *PriorityQueue[K, V, CTX]) Next() (_ K, _ V, _ CTX, err error) {
pq.heap = pq.heap[0:pq.size]
pq.size--
}

// always down the heap at the end
pq.downHeap()

return k, v, c, nil
}

func (pq *PriorityQueue[K, V, CTX]) upHeap(i int) {
element := pq.heap[i]
j := i >> 1
Expand All @@ -108,12 +96,10 @@ func (pq *PriorityQueue[K, V, CTX]) upHeap(i int) {
}
pq.heap[i] = element
}

func (pq *PriorityQueue[K, V, CTX]) downHeap() {
if pq.size == 0 {
return
}

i := 1
element := pq.heap[i]
j := i << 1
Expand All @@ -132,14 +118,12 @@ func (pq *PriorityQueue[K, V, CTX]) downHeap() {
}
pq.heap[i] = element
}

func (pq *PriorityQueue[K, V, CTX]) fillNext(item *Element[K, V, CTX]) error {
k, v, e := item.iterator.Next()
item.key = k
item.value = v
return e
}

func NewPriorityQueue[K any, V any, CTX any](comp skiplist.Comparator[K], iterators []IteratorWithContext[K, V, CTX]) (PriorityQueueI[K, V, CTX], error) {
q := &PriorityQueue[K, V, CTX]{comp: comp}
err := q.init(iterators)
Expand Down
15 changes: 3 additions & 12 deletions recordio/bufio_vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type WriteCloserFlusher interface {
// Additionally, several methods that were not needed are removed to reduce the test surface of the original.
type Writer struct {
err error
wr io.WriteCloser
buf []byte
n int
wr io.WriteCloser
alignFlush bool
}

Expand All @@ -35,14 +35,12 @@ func (b *Writer) Close() error {
}
return b.wr.Close()
}

func NewWriterBuf(w io.WriteCloser, buf []byte) WriteCloserFlusher {
return &Writer{
buf: buf,
wr: w,
}
}

func NewAlignedWriterBuf(w io.WriteCloser, buf []byte) WriteCloserFlusher {
return &Writer{
buf: buf,
Expand All @@ -62,7 +60,6 @@ func (b *Writer) Flush() error {
if b.n == 0 {
return nil
}

toFlush := b.buf[0:b.n]
// zero the remainder of the buffer for safety before an aligned flush
if b.alignFlush {
Expand Down Expand Up @@ -122,15 +119,14 @@ func (b *Writer) Write(p []byte) (nn int, err error) {
}

// Buffered input.

// Reader implements buffering for an io.Reader object.
// This is the same writer as bufio.Reader, but it allows us to supply the buffer from the outside.
// Namely, it only has a new constructor in NewReaderBuf and implements Close()
type Reader struct {
buf []byte
rd io.Reader // reader provided by the client
r, w int // buf read and write positions
err error
buf []byte
r, w int // buf read and write positions
lastByte int // last byte read for UnreadByte; -1 means invalid
lastRuneSize int // size of last rune read for UnreadRune; -1 means invalid
}
Expand All @@ -154,7 +150,6 @@ func (b *Reader) Buffered() int { return b.w - b.r }
func (b *Reader) Reset(r io.Reader) {
b.reset(b.buf, r)
}

func (b *Reader) reset(buf []byte, r io.Reader) {
*b = Reader{
buf: buf,
Expand All @@ -174,11 +169,9 @@ func (b *Reader) fill() {
b.w -= b.r
b.r = 0
}

if b.w >= len(b.buf) {
panic("bufio: tried to fill full buffer")
}

// Read new data: try a limited number of times.
for i := maxConsecutiveEmptyReads; i > 0; i-- {
n, err := b.rd.Read(b.buf[b.w:])
Expand All @@ -196,7 +189,6 @@ func (b *Reader) fill() {
}
b.err = io.ErrNoProgress
}

func (b *Reader) readErr() error {
err := b.err
b.err = nil
Expand Down Expand Up @@ -247,7 +239,6 @@ func (b *Reader) Read(p []byte) (n int, err error) {
}
b.w += n
}

// copy as much as we can
n = copy(p, b.buf[b.r:b.w])
b.r += n
Expand Down
Loading
Loading