Skip to content

Commit

Permalink
Fixed choice: Switch to IndexType on the value level
Browse files Browse the repository at this point in the history
  • Loading branch information
jeltsch committed Jan 16, 2025
1 parent 37c6cf4 commit 835860d
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 108 deletions.
4 changes: 2 additions & 2 deletions src/Database/LSMTree/Internal/ChecksumHandle.hs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ writeIndexHeader ::
-> ForIndex (ChecksumHandle (PrimState m) h)
-> IndexType
-> m ()
writeIndexHeader hfs indexHandle indexTypeProxy =
writeIndexHeader hfs indexHandle indexType =
writeToHandle hfs (unForIndex indexHandle) $
Index.headerLBS indexTypeProxy
Index.headerLBS indexType

{-# SPECIALISE writeIndexChunk ::
HasFS IO h
Expand Down
17 changes: 4 additions & 13 deletions src/Database/LSMTree/Internal/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ module Database.LSMTree.Internal.Config (
, bloomFilterAllocForLevel
-- * Fence pointer index
, FencePointerIndex (..)
, withIndexTypeProxyForRun
, withIndexAccTypeProxyForRun
, indexTypeForRun
-- * Disk cache policy
, DiskCachePolicy (..)
, diskCachePolicyForLevel
Expand Down Expand Up @@ -318,17 +317,9 @@ instance NFData FencePointerIndex where
rnf CompactIndex = ()
rnf OrdinaryIndex = ()

withIndexTypeProxyForRun :: FencePointerIndex
-> (forall i . Index i => IndexType -> r)
-> r
withIndexTypeProxyForRun CompactIndex cont = cont (proxy# @IndexCompact)
withIndexTypeProxyForRun OrdinaryIndex cont = cont (proxy# @IndexOrdinary)

withIndexAccTypeProxyForRun :: FencePointerIndex
-> (forall j . IndexAcc j => IndexType -> r)
-> r
withIndexAccTypeProxyForRun CompactIndex cont = cont (proxy# @IndexCompactAcc)
withIndexAccTypeProxyForRun OrdinaryIndex cont = cont (proxy# @IndexOrdinaryAcc)
indexTypeForRun :: FencePointerIndex -> IndexType
indexTypeForRun CompactIndex = Index.Compact
indexTypeForRun OrdinaryIndex = Index.Ordinary

{-------------------------------------------------------------------------------
Disk cache policy
Expand Down
9 changes: 2 additions & 7 deletions src/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,13 @@ new ::
-> Run.RunFsPaths
-> V.Vector (Ref (Run m h))
-> m (Maybe (Merge m h))
new fs hbio mergeCaching alloc indexAccTypeProxy mergeLevel mergeMappend targetPaths runs = do
new fs hbio mergeCaching alloc indexType mergeLevel mergeMappend targetPaths runs = do
-- no offset, no write buffer
mreaders <- Readers.new Readers.NoOffsetKey Nothing runs
for mreaders $ \mergeReaders -> do
-- calculate upper bounds based on input runs
let numEntries = V.foldMap' Run.size runs
mergeBuilder <- Builder.new fs
hbio
targetPaths
numEntries
alloc
indexAccTypeProxy
mergeBuilder <- Builder.new fs hbio targetPaths numEntries alloc indexType
mergeState <- newMutVar $! Merging
return Merge {
mergeHasFS = fs
Expand Down
98 changes: 49 additions & 49 deletions src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -500,33 +500,33 @@ flushWriteBuffer tr conf@TableConfig{confFencePointerIndex, confDiskCachePolicy}
| WB.null (tableWriteBuffer tc) = pure tc
| otherwise = do
!n <- incrUniqCounter uc
let !size = WB.numEntries (tableWriteBuffer tc)
!l = LevelNo 1
!cache = diskCachePolicyForLevel confDiskCachePolicy l
!alloc = bloomFilterAllocForLevel conf l
!path = Paths.runPath root (uniqueToRunNumber n)
withIndexAccTypeProxyForRun confFencePointerIndex $ \ indexAccTypeProxy -> do
traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc
r <- allocateTemp reg
(Run.fromWriteBuffer hfs hbio
cache
alloc
indexAccTypeProxy
path
(tableWriteBuffer tc)
(tableWriteBufferBlobs tc))
releaseRef
freeTemp reg (releaseRef (tableWriteBufferBlobs tc))
wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n))
releaseRef
levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc)
tableCache' <- rebuildCache reg (tableCache tc) levels'
pure $! TableContent {
tableWriteBuffer = WB.empty
, tableWriteBufferBlobs = wbblobs'
, tableLevels = levels'
, tableCache = tableCache'
}
let !size = WB.numEntries (tableWriteBuffer tc)
!l = LevelNo 1
!cache = diskCachePolicyForLevel confDiskCachePolicy l
!alloc = bloomFilterAllocForLevel conf l
!indexType = indexTypeForRun confFencePointerIndex
!path = Paths.runPath root (uniqueToRunNumber n)
traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc
r <- allocateTemp reg
(Run.fromWriteBuffer hfs hbio
cache
alloc
indexType
path
(tableWriteBuffer tc)
(tableWriteBufferBlobs tc))
releaseRef
freeTemp reg (releaseRef (tableWriteBufferBlobs tc))
wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n))
releaseRef
levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc)
tableCache' <- rebuildCache reg (tableCache tc) levels'
pure $! TableContent {
tableWriteBuffer = WB.empty
, tableWriteBufferBlobs = wbblobs'
, tableLevels = levels'
, tableCache = tableCache'
}

{-# SPECIALISE addRunToLevels ::
Tracer IO (AtLevel MergeTrace)
Expand Down Expand Up @@ -654,29 +654,29 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels =
!n <- incrUniqCounter uc
let !caching = diskCachePolicyForLevel confDiskCachePolicy ln
!alloc = bloomFilterAllocForLevel conf ln
!indexType = indexTypeForRun confFencePointerIndex
!runPaths = Paths.runPath root (uniqueToRunNumber n)
withIndexAccTypeProxyForRun confFencePointerIndex $ \ indexAccTypeProxy -> do
traceWith tr $ AtLevel ln $
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel
-- The runs will end up inside the merging run, with fresh references.
-- The original references can be released (but only on the happy path).
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc indexAccTypeProxy mergeLevel runPaths rs)
releaseRef
V.forM_ rs $ \r -> freeTemp reg (releaseRef r)
case confMergeSchedule of
Incremental -> pure ()
OneShot -> do
let !required = MR.Credits (unNumEntries (V.foldMap' Run.size rs))
let !thresh = creditThresholdForLevel conf ln
MR.supplyCredits required thresh mr
-- This ensures the merge is really completed. However, we don't
-- release the merge yet and only briefly inspect the resulting run.
bracket (MR.expectCompleted mr) releaseRef $ \r ->
traceWith tr $ AtLevel ln $
TraceCompletedMerge (Run.size r) (Run.runFsPathsNumber r)

return (Merging mergePolicy mr)
traceWith tr $ AtLevel ln $
TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel
-- The runs will end up inside the merging run, with fresh references.
-- The original references can be released (but only on the happy path).
mr <- allocateTemp reg
(MR.new hfs hbio resolve caching alloc indexType mergeLevel runPaths rs)
releaseRef
V.forM_ rs $ \r -> freeTemp reg (releaseRef r)
case confMergeSchedule of
Incremental -> pure ()
OneShot -> do
let !required = MR.Credits (unNumEntries (V.foldMap' Run.size rs))
let !thresh = creditThresholdForLevel conf ln
MR.supplyCredits required thresh mr
-- This ensures the merge is really completed. However, we don't
-- release the merge yet and only briefly inspect the resulting run.
bracket (MR.expectCompleted mr) releaseRef $ \r ->
traceWith tr $ AtLevel ln $
TraceCompletedMerge (Run.size r) (Run.runFsPathsNumber r)

return (Merging mergePolicy mr)

-- $setup
-- >>> import Database.LSMTree.Internal.Entry
Expand Down
4 changes: 2 additions & 2 deletions src/Database/LSMTree/Internal/MergingRun.hs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ new ::
-> RunFsPaths
-> V.Vector (Ref (Run m h))
-> m (Ref (MergingRun m h))
new hfs hbio resolve caching alloc indexAccTypeProxy mergeLevel runPaths inputRuns =
new hfs hbio resolve caching alloc indexType mergeLevel runPaths inputRuns =
-- If creating the Merge fails, we must release the references again.
withTempRegistry $ \reg -> do
runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns
Expand All @@ -151,7 +151,7 @@ new hfs hbio resolve caching alloc indexAccTypeProxy mergeLevel runPaths inputRu
hbio
caching
alloc
indexAccTypeProxy
indexType
mergeLevel
resolve
runPaths
Expand Down
8 changes: 4 additions & 4 deletions src/Database/LSMTree/Internal/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ fromWriteBuffer ::
-> WriteBuffer
-> Ref (WriteBufferBlobs m h)
-> m (Ref (Run m h))
fromWriteBuffer fs hbio caching alloc indexAccTypeProxy fsPaths buffer blobs = do
fromWriteBuffer fs hbio caching alloc indexType fsPaths buffer blobs = do
builder <- Builder.new fs
hbio
fsPaths
(WB.numEntries buffer)
alloc
indexAccTypeProxy
indexType
for_ (WB.toList buffer) $ \(k, e) ->
Builder.addKeyOp builder k (fmap (WBB.mkRawBlobRef blobs) e)
--TODO: the fmap entry here reallocates even when there are no blobs
Expand Down Expand Up @@ -275,7 +275,7 @@ openFromDisk ::
-> IndexType
-> RunFsPaths
-> m (Ref (Run m h))
openFromDisk fs hbio runRunDataCaching indexTypeProxy runRunFsPaths = do
openFromDisk fs hbio runRunDataCaching indexType runRunFsPaths = do
expectedChecksums <-
expectValidFile (runChecksumsPath runRunFsPaths) . fromChecksumsFile
=<< CRC.readChecksumsFile fs (runChecksumsPath runRunFsPaths)
Expand All @@ -290,7 +290,7 @@ openFromDisk fs hbio runRunDataCaching indexTypeProxy runRunFsPaths = do
expectValidFile (forRunFilterRaw paths) . bloomFilterFromSBS
=<< readCRC (forRunFilterRaw expectedChecksums) (forRunFilterRaw paths)
(runNumEntries, runIndex) <-
expectValidFile (forRunIndexRaw paths) . Index.fromSBS indexTypeProxy
expectValidFile (forRunIndexRaw paths) . Index.fromSBS indexType
=<< readCRC (forRunIndexRaw expectedChecksums) (forRunIndexRaw paths)

runKOpsFile <- FS.hOpen fs (runKOpsPath runRunFsPaths) FS.ReadMode
Expand Down
4 changes: 2 additions & 2 deletions src/Database/LSMTree/Internal/RunAcc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ new :: NumEntries
-> RunBloomFilterAlloc
-> IndexType
-> ST s (RunAcc s)
new (NumEntries nentries) alloc indexAccTypeProxy = do
new (NumEntries nentries) alloc indexType = do
mbloom <- case alloc of
RunAllocFixed !bitsPerEntry ->
let !nbits = fromIntegral bitsPerEntry * fromIntegral nentries
Expand All @@ -108,7 +108,7 @@ new (NumEntries nentries) alloc indexAccTypeProxy = do
MBloom.new
(fromIntegralChecked $ Monkey.numHashFunctions (fromIntegral nbits) (fromIntegral nentries))
nbits
mindex <- Index.newWithDefaults indexAccTypeProxy
mindex <- Index.newWithDefaults indexType
mpageacc <- PageAcc.newPageAcc
entryCount <- newPrimVar 0
pure RunAcc{..}
Expand Down
8 changes: 3 additions & 5 deletions src/Database/LSMTree/Internal/RunBuilder.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,14 @@ new ::
-> RunBloomFilterAlloc
-> IndexType
-> m (RunBuilder m h)
new hfs hbio runBuilderFsPaths numEntries alloc indexAccTypeProxy = do
runBuilderAcc <- ST.stToIO $ RunAcc.new numEntries alloc indexAccTypeProxy
new hfs hbio runBuilderFsPaths numEntries alloc indexType = do
runBuilderAcc <- ST.stToIO $ RunAcc.new numEntries alloc indexType
runBuilderBlobOffset <- newPrimVar 0

runBuilderHandles <- traverse (makeHandle hfs) (pathsForRunFiles runBuilderFsPaths)

let builder = RunBuilder { runBuilderHasFS = hfs, runBuilderHasBlockIO = hbio, .. }
writeIndexHeader hfs
(forRunIndex runBuilderHandles)
(resultingIndexTypeProxy indexAccTypeProxy)
writeIndexHeader hfs (forRunIndex runBuilderHandles) indexType
return builder

{-# SPECIALISE addKeyOp ::
Expand Down
49 changes: 25 additions & 24 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -381,17 +381,19 @@ openRuns
levels' <-
V.iforM levels $ \i level ->
let ln = LevelNo (i+1) in
let caching = diskCachePolicyForLevel confDiskCachePolicy ln in
withIndexTypeProxyForRun confFencePointerIndex $ \ indexTypeProxy -> do
for level $ \runNum -> do
let sourcePaths = RunFsPaths sourceDir runNum
runNum' <- uniqueToRunNumber <$> incrUniqCounter uc
let targetPaths = RunFsPaths targetDir runNum'
hardLinkRunFiles reg hfs hbio NoHardLinkDurable sourcePaths targetPaths

allocateTemp reg
(Run.openFromDisk hfs hbio caching indexTypeProxy targetPaths)
releaseRef
let
caching = diskCachePolicyForLevel confDiskCachePolicy ln
indexType = indexTypeForRun confFencePointerIndex
in
for level $ \runNum -> do
let sourcePaths = RunFsPaths sourceDir runNum
runNum' <- uniqueToRunNumber <$> incrUniqCounter uc
let targetPaths = RunFsPaths targetDir runNum'
hardLinkRunFiles reg hfs hbio NoHardLinkDurable sourcePaths targetPaths

allocateTemp reg
(Run.openFromDisk hfs hbio caching indexType targetPaths)
releaseRef
pure (SnapLevels levels')

{-# SPECIALISE releaseRuns ::
Expand Down Expand Up @@ -442,6 +444,7 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
where
caching = diskCachePolicyForLevel confDiskCachePolicy ln
alloc = bloomFilterAllocForLevel conf ln
indexType = indexTypeForRun confFencePointerIndex

fromSnapIncomingRun ::
SnapIncomingRun (Ref (Run m h))
Expand All @@ -455,19 +458,17 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve

SnapOngoingMerge runs spentCredits lvl -> do
rn <- uniqueToRunNumber <$> incrUniqCounter uc
mr <- withIndexAccTypeProxyForRun confFencePointerIndex $
\ indexAccTypeProxy ->
allocateTemp reg
(MR.new hfs
hbio
resolve
caching
alloc
indexAccTypeProxy
lvl
(mkPath rn)
runs)
releaseRef
mr <- allocateTemp reg
(MR.new hfs
hbio
resolve
caching
alloc
indexType
lvl
(mkPath rn)
runs)
releaseRef
-- When a snapshot is created, merge progress is lost, so we
-- have to redo merging work here. UnspentCredits and
-- SpentCredits track how many credits were supplied before the
Expand Down

0 comments on commit 835860d

Please sign in to comment.