Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cockroachkvs: pull in MVCC block-property collector, filter #4281

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions cockroachkvs/blockproperties.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package cockroachkvs

import (
"encoding/binary"
"math"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
)

const mvccWallTimeIntervalCollector = "MVCCTimeInterval"

// BlockPropertyCollectors is a list of constructors for block-property
// collectors used by CockroachDB.
var BlockPropertyCollectors = []func() pebble.BlockPropertyCollector{
func() pebble.BlockPropertyCollector {
return sstable.NewBlockIntervalCollector(
mvccWallTimeIntervalCollector,
pebbleIntervalMapper{},
MVCCBlockIntervalSuffixReplacer{},
)
},
}

// NewMVCCTimeIntervalFilter constructs a new block-property filter that skips
// keys that encode timestamps with wall times that do not fall within the
// interval [minWallTime,maxWallTime].
func NewMVCCTimeIntervalFilter(minWallTime, maxWallTime uint64) sstable.BlockPropertyFilter {
return sstable.NewBlockIntervalFilter(mvccWallTimeIntervalCollector,
uint64(minWallTime),
uint64(maxWallTime)+1,
MVCCBlockIntervalSuffixReplacer{},
)
}

// MVCCWallTimeIntervalRangeKeyMask implements pebble.BlockPropertyFilterMask
// for filtering blocks using the MVCCTimeInterval block property during range
// key masking.
type MVCCWallTimeIntervalRangeKeyMask struct {
sstable.BlockIntervalFilter
}

// SetSuffix implements the pebble.BlockPropertyFilterMask interface.
func (m *MVCCWallTimeIntervalRangeKeyMask) SetSuffix(suffix []byte) error {
if len(suffix) == 0 {
// This is currently impossible, because the only range key Cockroach
// writes today is the MVCC Delete Range that's always suffixed.
return nil
}
wall, _, err := DecodeMVCCTimestampSuffix(suffix)
if err != nil {
return err
}
m.BlockIntervalFilter.SetInterval(wall, math.MaxUint64)
return nil
}

var _ sstable.BlockIntervalSuffixReplacer = MVCCBlockIntervalSuffixReplacer{}

// MVCCBlockIntervalSuffixReplacer implements the
// sstable.BlockIntervalSuffixReplacer interface for MVCC timestamp intervals.
type MVCCBlockIntervalSuffixReplacer struct{}

func (MVCCBlockIntervalSuffixReplacer) ApplySuffixReplacement(
interval sstable.BlockInterval, newSuffix []byte,
) (sstable.BlockInterval, error) {
synthDecodedWalltime, _, err := DecodeMVCCTimestampSuffix(newSuffix)
if err != nil {
return sstable.BlockInterval{}, errors.AssertionFailedf("could not decode synthetic suffix")
}
// The returned bound includes the synthetic suffix, regardless of its logical
// component.
return sstable.BlockInterval{Lower: synthDecodedWalltime, Upper: synthDecodedWalltime + 1}, nil
}

type pebbleIntervalMapper struct{}

var _ sstable.IntervalMapper = pebbleIntervalMapper{}

// MapPointKey is part of the sstable.IntervalMapper interface.
func (pebbleIntervalMapper) MapPointKey(
key pebble.InternalKey, value []byte,
) (sstable.BlockInterval, error) {
return mapSuffixToInterval(key.UserKey)
}

// MapRangeKey is part of the sstable.IntervalMapper interface.
func (pebbleIntervalMapper) MapRangeKeys(span sstable.Span) (sstable.BlockInterval, error) {
var res sstable.BlockInterval
for _, k := range span.Keys {
i, err := mapSuffixToInterval(k.Suffix)
if err != nil {
return sstable.BlockInterval{}, err
}
res.UnionWith(i)
}
return res, nil
}

// mapSuffixToInterval maps the suffix of a key to a timestamp interval.
// The buffer can be an entire key or just the suffix.
func mapSuffixToInterval(b []byte) (sstable.BlockInterval, error) {
if len(b) == 0 {
return sstable.BlockInterval{}, nil
}
// Last byte is the version length + 1 when there is a version,
// else it is 0.
versionLen := int(b[len(b)-1])
if versionLen == 0 {
// This is not an MVCC key that we can collect.
return sstable.BlockInterval{}, nil
}
// prefixPartEnd points to the sentinel byte, unless this is a bare suffix, in
// which case the index is -1.
prefixPartEnd := len(b) - 1 - versionLen
// Sanity check: the index should be >= -1. Additionally, if the index is >=
// 0, it should point to the sentinel byte, as this is a full EngineKey.
if prefixPartEnd < -1 || (prefixPartEnd >= 0 && b[prefixPartEnd] != 0x00) {
return sstable.BlockInterval{}, errors.Errorf("invalid key %x", b)
}
// We don't need the last byte (the version length).
versionLen--
// Only collect if this looks like an MVCC timestamp.
if versionLen == engineKeyVersionWallTimeLen ||
versionLen == engineKeyVersionWallAndLogicalTimeLen ||
versionLen == engineKeyVersionWallLogicalAndSyntheticTimeLen {
// INVARIANT: -1 <= prefixPartEnd < len(b) - 1.
// Version consists of the bytes after the sentinel and before the length.
ts := binary.BigEndian.Uint64(b[prefixPartEnd+1:])
return sstable.BlockInterval{Lower: ts, Upper: ts + 1}, nil
}
return sstable.BlockInterval{}, nil
}
35 changes: 35 additions & 0 deletions cockroachkvs/cockroachkvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,41 @@ func EncodeTimestamp(key []byte, walltime uint64, logical uint32) []byte {
return key
}

// DecodeMVCCTimestampSuffix decodes an MVCC timestamp from its Pebble representation,
// including the length suffix.
func DecodeMVCCTimestampSuffix(encodedTS []byte) (wallTime uint64, logical uint32, err error) {
if len(encodedTS) == 0 {
return 0, 0, nil
}
encodedLen := len(encodedTS)
if suffixLen := int(encodedTS[encodedLen-1]); suffixLen != encodedLen {
return 0, 0, errors.Errorf(
"bad timestamp: found length suffix %d, actual length %d", suffixLen, encodedLen)
}
return decodeMVCCTimestamp(encodedTS[:encodedLen-1])
}

// decodeMVCCTimestamp decodes an MVCC timestamp from its Pebble representation,
// excluding the length suffix.
func decodeMVCCTimestamp(encodedTS []byte) (wallTime uint64, logical uint32, err error) {
// NB: This logic is duplicated in enginepb.DecodeKey() to avoid the
// overhead of an additional function call there (~13%).
switch len(encodedTS) {
case 0:
// No-op.
case 8:
wallTime = binary.BigEndian.Uint64(encodedTS[0:8])
case 12, 13:
wallTime = binary.BigEndian.Uint64(encodedTS[0:8])
logical = binary.BigEndian.Uint32(encodedTS[8:12])
// NOTE: byte 13 used to store the timestamp's synthetic bit, but this is no
// longer consulted and can be ignored during decoding.
default:
return 0, 0, errors.Errorf("bad timestamp %x", encodedTS)
}
return wallTime, logical, nil
}

// DecodeEngineKey decodes the given bytes as an EngineKey.
func DecodeEngineKey(b []byte) (roachKey, version []byte, ok bool) {
if len(b) == 0 {
Expand Down
Loading