Skip to content

Commit

Permalink
remove block cache
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed May 19, 2024
1 parent 8de9190 commit e77db04
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 96 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 0 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ type Options struct {
// Not a common usage for most users.
SegmentFileExt string

// BlockCache specifies the size of the block cache in number of bytes.
// A block cache is used to store recently accessed data blocks, improving read performance.
// If BlockCache is set to 0, no block cache will be used.
BlockCache uint32

// Sync is whether to synchronize writes through os buffer cache and down onto the actual disk.
// Setting sync is required for durability of a single write operation, but also results in slower writes.
//
Expand All @@ -47,7 +42,6 @@ var DefaultOptions = Options{
DirPath: os.TempDir(),
SegmentSize: GB,
SegmentFileExt: ".SEG",
BlockCache: 32 * KB * 10,
Sync: false,
BytesPerSync: 0,
}
36 changes: 5 additions & 31 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"
"sync"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/valyala/bytebufferpool"
)

Expand Down Expand Up @@ -53,7 +52,6 @@ type segment struct {
currentBlockNumber uint32
currentBlockSize uint32
closed bool
cache *lru.Cache[uint64, []byte]
header []byte
blockPool sync.Pool
}
Expand Down Expand Up @@ -86,7 +84,7 @@ type ChunkPosition struct {
}

// openSegmentFile a new segment file.
func openSegmentFile(dirPath, extName string, id uint32, cache *lru.Cache[uint64, []byte]) (*segment, error) {
func openSegmentFile(dirPath, extName string, id uint32) (*segment, error) {
fd, err := os.OpenFile(
SegmentFileName(dirPath, extName, id),
os.O_CREATE|os.O_RDWR|os.O_APPEND,
Expand All @@ -106,7 +104,6 @@ func openSegmentFile(dirPath, extName string, id uint32, cache *lru.Cache[uint64
return &segment{
id: id,
fd: fd,
cache: cache,
header: make([]byte, chunkHeaderSize),
blockPool: sync.Pool{New: newBlockAndHeader},
currentBlockNumber: uint32(offset / blockSize),
Expand Down Expand Up @@ -394,29 +391,10 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte,
return nil, nil, io.EOF
}

var ok bool
var cachedBlock []byte
// try to read from the cache if it is enabled
if seg.cache != nil {
cachedBlock, ok = seg.cache.Get(seg.getCacheKey(blockNumber))
}
// cache hit, get block from the cache
if ok {
copy(bh.block, cachedBlock)
} else {
// cache miss, read block from the segment file
_, err := seg.fd.ReadAt(bh.block[0:size], offset)
if err != nil {
return nil, nil, err
}
// cache the block, so that the next time it can be read from the cache.
// if the block size is smaller than blockSize, it means that the block is not full,
// so we will not cache it.
if seg.cache != nil && size == blockSize && len(cachedBlock) == 0 {
cacheBlock := make([]byte, blockSize)
copy(cacheBlock, bh.block)
seg.cache.Add(seg.getCacheKey(blockNumber), cacheBlock)
}
// cache miss, read block from the segment file
_, err := seg.fd.ReadAt(bh.block[0:size], offset)
if err != nil {
return nil, nil, err
}

// header
Expand Down Expand Up @@ -457,10 +435,6 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte,
return result, nextChunk, nil
}

func (seg *segment) getCacheKey(blockNumber uint32) uint64 {
return uint64(seg.id)<<32 | uint64(blockNumber)
}

// Next returns the Next chunk data.
// You can call it repeatedly until io.EOF is returned.
func (segReader *segmentReader) Next() ([]byte, *ChunkPosition, error) {
Expand Down
28 changes: 12 additions & 16 deletions segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"strings"
"testing"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
)

func TestSegment_Write_FULL1(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-full1")
seg, err := openSegmentFile(dir, ".SEG", 1, nil)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand All @@ -40,15 +39,15 @@ func TestSegment_Write_FULL1(t *testing.T) {
for i := 0; i < 100000; i++ {
pos, err := seg.Write(val)
assert.Nil(t, err)
val, err := seg.Read(pos.BlockNumber, pos.ChunkOffset)
res, err := seg.Read(pos.BlockNumber, pos.ChunkOffset)
assert.Nil(t, err)
assert.Equal(t, val, val)
assert.Equal(t, val, res)
}
}

func TestSegment_Write_FULL2(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-full2")
seg, err := openSegmentFile(dir, ".SEG", 1, nil)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand Down Expand Up @@ -76,7 +75,7 @@ func TestSegment_Write_FULL2(t *testing.T) {

func TestSegment_Write_Padding(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-padding")
seg, err := openSegmentFile(dir, ".SEG", 1, nil)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand All @@ -99,7 +98,7 @@ func TestSegment_Write_Padding(t *testing.T) {

func TestSegment_Write_NOT_FULL(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-not-full")
seg, err := openSegmentFile(dir, ".SEG", 1, nil)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand Down Expand Up @@ -137,7 +136,7 @@ func TestSegment_Write_NOT_FULL(t *testing.T) {

func TestSegment_Reader_FULL(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-reader-full")
seg, err := openSegmentFile(dir, ".SEG", 1, nil)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand Down Expand Up @@ -169,7 +168,7 @@ func TestSegment_Reader_FULL(t *testing.T) {

func TestSegment_Reader_Padding(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-reader-padding")
seg, err := openSegmentFile(dir, ".SEG", 1, nil)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand Down Expand Up @@ -203,7 +202,7 @@ func TestSegment_Reader_Padding(t *testing.T) {

func TestSegment_Reader_NOT_FULL(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-reader-not-full")
seg, err := openSegmentFile(dir, ".SEG", 1, nil)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand Down Expand Up @@ -249,8 +248,7 @@ func TestSegment_Reader_NOT_FULL(t *testing.T) {

func TestSegment_Reader_ManyChunks_FULL(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-reader-ManyChunks_FULL")
cache, _ := lru.New[uint64, []byte](5)
seg, err := openSegmentFile(dir, ".SEG", 1, cache)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand Down Expand Up @@ -287,8 +285,7 @@ func TestSegment_Reader_ManyChunks_FULL(t *testing.T) {

func TestSegment_Reader_ManyChunks_NOT_FULL(t *testing.T) {
dir, _ := os.MkdirTemp("", "seg-test-reader-ManyChunks_NOT_FULL")
cache, _ := lru.New[uint64, []byte](5)
seg, err := openSegmentFile(dir, ".SEG", 1, cache)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand Down Expand Up @@ -337,8 +334,7 @@ func TestSegment_Write_LargeSize(t *testing.T) {

func testSegmentReaderLargeSize(t *testing.T, size int, count int) {
dir, _ := os.MkdirTemp("", "seg-test-reader-ManyChunks_large_size")
cache, _ := lru.New[uint64, []byte](5)
seg, err := openSegmentFile(dir, ".SEG", 1, cache)
seg, err := openSegmentFile(dir, ".SEG", 1)
assert.Nil(t, err)
defer func() {
_ = seg.Remove()
Expand Down
37 changes: 4 additions & 33 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"sort"
"strings"
"sync"

lru "github.com/hashicorp/golang-lru/v2"
)

const (
Expand Down Expand Up @@ -41,7 +39,6 @@ type WAL struct {
olderSegments map[SegmentID]*segment // older segment files, only used for read.
options Options
mu sync.RWMutex
blockCache *lru.Cache[uint64, []byte]
bytesWrite uint32
renameIds []SegmentID
pendingWrites [][]byte
Expand All @@ -67,9 +64,6 @@ func Open(options Options) (*WAL, error) {
if !strings.HasPrefix(options.SegmentFileExt, ".") {
return nil, fmt.Errorf("segment file extension must start with '.'")
}
if options.BlockCache > uint32(options.SegmentSize) {
return nil, fmt.Errorf("BlockCache must be smaller than SegmentSize")
}
wal := &WAL{
options: options,
olderSegments: make(map[SegmentID]*segment),
Expand All @@ -81,19 +75,6 @@ func Open(options Options) (*WAL, error) {
return nil, err
}

// create the block cache if needed.
if options.BlockCache > 0 {
var lruSize = options.BlockCache / blockSize
if options.BlockCache%blockSize != 0 {
lruSize += 1
}
cache, err := lru.New[uint64, []byte](int(lruSize))
if err != nil {
return nil, err
}
wal.blockCache = cache
}

// iterate the dir and open all segment files.
entries, err := os.ReadDir(options.DirPath)
if err != nil {
Expand All @@ -117,7 +98,7 @@ func Open(options Options) (*WAL, error) {
// empty directory, just initialize a new segment file.
if len(segmentIDs) == 0 {
segment, err := openSegmentFile(options.DirPath, options.SegmentFileExt,
initialSegmentFileID, wal.blockCache)
initialSegmentFileID)
if err != nil {
return nil, err
}
Expand All @@ -128,7 +109,7 @@ func Open(options Options) (*WAL, error) {

for i, segId := range segmentIDs {
segment, err := openSegmentFile(options.DirPath, options.SegmentFileExt,
uint32(segId), wal.blockCache)
uint32(segId))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -163,7 +144,7 @@ func (wal *WAL) OpenNewActiveSegment() error {
}
// create a new segment file and set it as the active one.
segment, err := openSegmentFile(wal.options.DirPath, wal.options.SegmentFileExt,
wal.activeSegment.id+1, wal.blockCache)
wal.activeSegment.id+1)
if err != nil {
return err
}
Expand Down Expand Up @@ -331,7 +312,7 @@ func (wal *WAL) rotateActiveSegment() error {
}
wal.bytesWrite = 0
segment, err := openSegmentFile(wal.options.DirPath, wal.options.SegmentFileExt,
wal.activeSegment.id+1, wal.blockCache)
wal.activeSegment.id+1)
if err != nil {
return err
}
Expand Down Expand Up @@ -440,11 +421,6 @@ func (wal *WAL) Close() error {
wal.mu.Lock()
defer wal.mu.Unlock()

// purge the block cache.
if wal.blockCache != nil {
wal.blockCache.Purge()
}

// close all segment files.
for _, segment := range wal.olderSegments {
if err := segment.Close(); err != nil {
Expand All @@ -464,11 +440,6 @@ func (wal *WAL) Delete() error {
wal.mu.Lock()
defer wal.mu.Unlock()

// purge the block cache.
if wal.blockCache != nil {
wal.blockCache.Purge()
}

// delete all segment files.
for _, segment := range wal.olderSegments {
if err := segment.Remove(); err != nil {
Expand Down
10 changes: 1 addition & 9 deletions wal_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package wal

import (
"github.com/stretchr/testify/assert"
"io"
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func destroyWAL(wal *WAL) {
Expand All @@ -22,7 +21,6 @@ func TestWAL_WriteALL(t *testing.T) {
DirPath: dir,
SegmentFileExt: ".SEG",
SegmentSize: 32 * 1024 * 1024,
BlockCache: 32 * KB * 10,
}
wal, err := Open(opts)
assert.Nil(t, err)
Expand All @@ -41,7 +39,6 @@ func TestWAL_Write(t *testing.T) {
DirPath: dir,
SegmentFileExt: ".SEG",
SegmentSize: 32 * 1024 * 1024,
BlockCache: 32 * KB * 10,
}
wal, err := Open(opts)
assert.Nil(t, err)
Expand Down Expand Up @@ -75,7 +72,6 @@ func TestWAL_Write_large(t *testing.T) {
DirPath: dir,
SegmentFileExt: ".SEG",
SegmentSize: 32 * 1024 * 1024,
BlockCache: 32 * KB * 10,
}
wal, err := Open(opts)
assert.Nil(t, err)
Expand Down Expand Up @@ -143,7 +139,6 @@ func TestWAL_Reader(t *testing.T) {
DirPath: dir,
SegmentFileExt: ".SEG",
SegmentSize: 32 * 1024 * 1024,
BlockCache: 32 * KB * 10,
}
wal, err := Open(opts)
assert.Nil(t, err)
Expand Down Expand Up @@ -248,7 +243,6 @@ func TestWAL_Delete(t *testing.T) {
DirPath: dir,
SegmentFileExt: ".SEG",
SegmentSize: 32 * 1024 * 1024,
BlockCache: 32 * KB * 10,
}
wal, err := Open(opts)
assert.Nil(t, err)
Expand All @@ -270,7 +264,6 @@ func TestWAL_ReaderWithStart(t *testing.T) {
DirPath: dir,
SegmentFileExt: ".SEG",
SegmentSize: 8 * 1024 * 1024,
BlockCache: 32 * KB * 10,
}
wal, err := Open(opts)
assert.Nil(t, err)
Expand Down Expand Up @@ -306,7 +299,6 @@ func TestWAL_RenameFileExt(t *testing.T) {
DirPath: dir,
SegmentFileExt: ".VLOG.1.temp",
SegmentSize: 8 * 1024 * 1024,
BlockCache: 32 * KB * 10,
}
wal, err := Open(opts)
assert.Nil(t, err)
Expand Down

0 comments on commit e77db04

Please sign in to comment.