From 835860d8afe914f0c0db0f911f55012d9d5a4ea8 Mon Sep 17 00:00:00 2001 From: Wolfgang Jeltsch Date: Thu, 16 Jan 2025 20:27:55 +0200 Subject: [PATCH] Fixed choice: Switch to `IndexType` on the value level --- .../LSMTree/Internal/ChecksumHandle.hs | 4 +- src/Database/LSMTree/Internal/Config.hs | 17 +--- src/Database/LSMTree/Internal/Merge.hs | 9 +- .../LSMTree/Internal/MergeSchedule.hs | 98 +++++++++---------- src/Database/LSMTree/Internal/MergingRun.hs | 4 +- src/Database/LSMTree/Internal/Run.hs | 8 +- src/Database/LSMTree/Internal/RunAcc.hs | 4 +- src/Database/LSMTree/Internal/RunBuilder.hs | 8 +- src/Database/LSMTree/Internal/Snapshot.hs | 49 +++++----- 9 files changed, 93 insertions(+), 108 deletions(-) diff --git a/src/Database/LSMTree/Internal/ChecksumHandle.hs b/src/Database/LSMTree/Internal/ChecksumHandle.hs index 3956934a0..e5c033a4a 100644 --- a/src/Database/LSMTree/Internal/ChecksumHandle.hs +++ b/src/Database/LSMTree/Internal/ChecksumHandle.hs @@ -217,9 +217,9 @@ writeIndexHeader :: -> ForIndex (ChecksumHandle (PrimState m) h) -> IndexType -> m () -writeIndexHeader hfs indexHandle indexTypeProxy = +writeIndexHeader hfs indexHandle indexType = writeToHandle hfs (unForIndex indexHandle) $ - Index.headerLBS indexTypeProxy + Index.headerLBS indexType {-# SPECIALISE writeIndexChunk :: HasFS IO h diff --git a/src/Database/LSMTree/Internal/Config.hs b/src/Database/LSMTree/Internal/Config.hs index 6eb4b8d0c..9f84e5b00 100644 --- a/src/Database/LSMTree/Internal/Config.hs +++ b/src/Database/LSMTree/Internal/Config.hs @@ -23,8 +23,7 @@ module Database.LSMTree.Internal.Config ( , bloomFilterAllocForLevel -- * Fence pointer index , FencePointerIndex (..) - , withIndexTypeProxyForRun - , withIndexAccTypeProxyForRun + , indexTypeForRun -- * Disk cache policy , DiskCachePolicy (..) , diskCachePolicyForLevel @@ -318,17 +317,9 @@ instance NFData FencePointerIndex where rnf CompactIndex = () rnf OrdinaryIndex = () -withIndexTypeProxyForRun :: FencePointerIndex - -> (forall i . Index i => IndexType -> r) - -> r -withIndexTypeProxyForRun CompactIndex cont = cont (proxy# @IndexCompact) -withIndexTypeProxyForRun OrdinaryIndex cont = cont (proxy# @IndexOrdinary) - -withIndexAccTypeProxyForRun :: FencePointerIndex - -> (forall j . IndexAcc j => IndexType -> r) - -> r -withIndexAccTypeProxyForRun CompactIndex cont = cont (proxy# @IndexCompactAcc) -withIndexAccTypeProxyForRun OrdinaryIndex cont = cont (proxy# @IndexOrdinaryAcc) +indexTypeForRun :: FencePointerIndex -> IndexType +indexTypeForRun CompactIndex = Index.Compact +indexTypeForRun OrdinaryIndex = Index.Ordinary {------------------------------------------------------------------------------- Disk cache policy diff --git a/src/Database/LSMTree/Internal/Merge.hs b/src/Database/LSMTree/Internal/Merge.hs index 3707ee2e5..ed7a54228 100644 --- a/src/Database/LSMTree/Internal/Merge.hs +++ b/src/Database/LSMTree/Internal/Merge.hs @@ -110,18 +110,13 @@ new :: -> Run.RunFsPaths -> V.Vector (Ref (Run m h)) -> m (Maybe (Merge m h)) -new fs hbio mergeCaching alloc indexAccTypeProxy mergeLevel mergeMappend targetPaths runs = do +new fs hbio mergeCaching alloc indexType mergeLevel mergeMappend targetPaths runs = do -- no offset, no write buffer mreaders <- Readers.new Readers.NoOffsetKey Nothing runs for mreaders $ \mergeReaders -> do -- calculate upper bounds based on input runs let numEntries = V.foldMap' Run.size runs - mergeBuilder <- Builder.new fs - hbio - targetPaths - numEntries - alloc - indexAccTypeProxy + mergeBuilder <- Builder.new fs hbio targetPaths numEntries alloc indexType mergeState <- newMutVar $! Merging return Merge { mergeHasFS = fs diff --git a/src/Database/LSMTree/Internal/MergeSchedule.hs b/src/Database/LSMTree/Internal/MergeSchedule.hs index f08847bd9..a9aba30a7 100644 --- a/src/Database/LSMTree/Internal/MergeSchedule.hs +++ b/src/Database/LSMTree/Internal/MergeSchedule.hs @@ -500,33 +500,33 @@ flushWriteBuffer tr conf@TableConfig{confFencePointerIndex, confDiskCachePolicy} | WB.null (tableWriteBuffer tc) = pure tc | otherwise = do !n <- incrUniqCounter uc - let !size = WB.numEntries (tableWriteBuffer tc) - !l = LevelNo 1 - !cache = diskCachePolicyForLevel confDiskCachePolicy l - !alloc = bloomFilterAllocForLevel conf l - !path = Paths.runPath root (uniqueToRunNumber n) - withIndexAccTypeProxyForRun confFencePointerIndex $ \ indexAccTypeProxy -> do - traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc - r <- allocateTemp reg - (Run.fromWriteBuffer hfs hbio - cache - alloc - indexAccTypeProxy - path - (tableWriteBuffer tc) - (tableWriteBufferBlobs tc)) - releaseRef - freeTemp reg (releaseRef (tableWriteBufferBlobs tc)) - wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n)) - releaseRef - levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc) - tableCache' <- rebuildCache reg (tableCache tc) levels' - pure $! TableContent { - tableWriteBuffer = WB.empty - , tableWriteBufferBlobs = wbblobs' - , tableLevels = levels' - , tableCache = tableCache' - } + let !size = WB.numEntries (tableWriteBuffer tc) + !l = LevelNo 1 + !cache = diskCachePolicyForLevel confDiskCachePolicy l + !alloc = bloomFilterAllocForLevel conf l + !indexType = indexTypeForRun confFencePointerIndex + !path = Paths.runPath root (uniqueToRunNumber n) + traceWith tr $ AtLevel l $ TraceFlushWriteBuffer size (runNumber path) cache alloc + r <- allocateTemp reg + (Run.fromWriteBuffer hfs hbio + cache + alloc + indexType + path + (tableWriteBuffer tc) + (tableWriteBufferBlobs tc)) + releaseRef + freeTemp reg (releaseRef (tableWriteBufferBlobs tc)) + wbblobs' <- allocateTemp reg (WBB.new hfs (Paths.tableBlobPath root n)) + releaseRef + levels' <- addRunToLevels tr conf resolve hfs hbio root uc r reg (tableLevels tc) + tableCache' <- rebuildCache reg (tableCache tc) levels' + pure $! TableContent { + tableWriteBuffer = WB.empty + , tableWriteBufferBlobs = wbblobs' + , tableLevels = levels' + , tableCache = tableCache' + } {-# SPECIALISE addRunToLevels :: Tracer IO (AtLevel MergeTrace) @@ -654,29 +654,29 @@ addRunToLevels tr conf@TableConfig{..} resolve hfs hbio root uc r0 reg levels = !n <- incrUniqCounter uc let !caching = diskCachePolicyForLevel confDiskCachePolicy ln !alloc = bloomFilterAllocForLevel conf ln + !indexType = indexTypeForRun confFencePointerIndex !runPaths = Paths.runPath root (uniqueToRunNumber n) - withIndexAccTypeProxyForRun confFencePointerIndex $ \ indexAccTypeProxy -> do - traceWith tr $ AtLevel ln $ - TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel - -- The runs will end up inside the merging run, with fresh references. - -- The original references can be released (but only on the happy path). - mr <- allocateTemp reg - (MR.new hfs hbio resolve caching alloc indexAccTypeProxy mergeLevel runPaths rs) - releaseRef - V.forM_ rs $ \r -> freeTemp reg (releaseRef r) - case confMergeSchedule of - Incremental -> pure () - OneShot -> do - let !required = MR.Credits (unNumEntries (V.foldMap' Run.size rs)) - let !thresh = creditThresholdForLevel conf ln - MR.supplyCredits required thresh mr - -- This ensures the merge is really completed. However, we don't - -- release the merge yet and only briefly inspect the resulting run. - bracket (MR.expectCompleted mr) releaseRef $ \r -> - traceWith tr $ AtLevel ln $ - TraceCompletedMerge (Run.size r) (Run.runFsPathsNumber r) - - return (Merging mergePolicy mr) + traceWith tr $ AtLevel ln $ + TraceNewMerge (V.map Run.size rs) (runNumber runPaths) caching alloc mergePolicy mergeLevel + -- The runs will end up inside the merging run, with fresh references. + -- The original references can be released (but only on the happy path). + mr <- allocateTemp reg + (MR.new hfs hbio resolve caching alloc indexType mergeLevel runPaths rs) + releaseRef + V.forM_ rs $ \r -> freeTemp reg (releaseRef r) + case confMergeSchedule of + Incremental -> pure () + OneShot -> do + let !required = MR.Credits (unNumEntries (V.foldMap' Run.size rs)) + let !thresh = creditThresholdForLevel conf ln + MR.supplyCredits required thresh mr + -- This ensures the merge is really completed. However, we don't + -- release the merge yet and only briefly inspect the resulting run. + bracket (MR.expectCompleted mr) releaseRef $ \r -> + traceWith tr $ AtLevel ln $ + TraceCompletedMerge (Run.size r) (Run.runFsPathsNumber r) + + return (Merging mergePolicy mr) -- $setup -- >>> import Database.LSMTree.Internal.Entry diff --git a/src/Database/LSMTree/Internal/MergingRun.hs b/src/Database/LSMTree/Internal/MergingRun.hs index dfefbeb4f..f76012ab3 100644 --- a/src/Database/LSMTree/Internal/MergingRun.hs +++ b/src/Database/LSMTree/Internal/MergingRun.hs @@ -142,7 +142,7 @@ new :: -> RunFsPaths -> V.Vector (Ref (Run m h)) -> m (Ref (MergingRun m h)) -new hfs hbio resolve caching alloc indexAccTypeProxy mergeLevel runPaths inputRuns = +new hfs hbio resolve caching alloc indexType mergeLevel runPaths inputRuns = -- If creating the Merge fails, we must release the references again. withTempRegistry $ \reg -> do runs <- V.mapM (\r -> allocateTemp reg (dupRef r) releaseRef) inputRuns @@ -151,7 +151,7 @@ new hfs hbio resolve caching alloc indexAccTypeProxy mergeLevel runPaths inputRu hbio caching alloc - indexAccTypeProxy + indexType mergeLevel resolve runPaths diff --git a/src/Database/LSMTree/Internal/Run.hs b/src/Database/LSMTree/Internal/Run.hs index 47a51a7e9..df103c09a 100644 --- a/src/Database/LSMTree/Internal/Run.hs +++ b/src/Database/LSMTree/Internal/Run.hs @@ -228,13 +228,13 @@ fromWriteBuffer :: -> WriteBuffer -> Ref (WriteBufferBlobs m h) -> m (Ref (Run m h)) -fromWriteBuffer fs hbio caching alloc indexAccTypeProxy fsPaths buffer blobs = do +fromWriteBuffer fs hbio caching alloc indexType fsPaths buffer blobs = do builder <- Builder.new fs hbio fsPaths (WB.numEntries buffer) alloc - indexAccTypeProxy + indexType for_ (WB.toList buffer) $ \(k, e) -> Builder.addKeyOp builder k (fmap (WBB.mkRawBlobRef blobs) e) --TODO: the fmap entry here reallocates even when there are no blobs @@ -275,7 +275,7 @@ openFromDisk :: -> IndexType -> RunFsPaths -> m (Ref (Run m h)) -openFromDisk fs hbio runRunDataCaching indexTypeProxy runRunFsPaths = do +openFromDisk fs hbio runRunDataCaching indexType runRunFsPaths = do expectedChecksums <- expectValidFile (runChecksumsPath runRunFsPaths) . fromChecksumsFile =<< CRC.readChecksumsFile fs (runChecksumsPath runRunFsPaths) @@ -290,7 +290,7 @@ openFromDisk fs hbio runRunDataCaching indexTypeProxy runRunFsPaths = do expectValidFile (forRunFilterRaw paths) . bloomFilterFromSBS =<< readCRC (forRunFilterRaw expectedChecksums) (forRunFilterRaw paths) (runNumEntries, runIndex) <- - expectValidFile (forRunIndexRaw paths) . Index.fromSBS indexTypeProxy + expectValidFile (forRunIndexRaw paths) . Index.fromSBS indexType =<< readCRC (forRunIndexRaw expectedChecksums) (forRunIndexRaw paths) runKOpsFile <- FS.hOpen fs (runKOpsPath runRunFsPaths) FS.ReadMode diff --git a/src/Database/LSMTree/Internal/RunAcc.hs b/src/Database/LSMTree/Internal/RunAcc.hs index 21f94fb51..68a5363d7 100644 --- a/src/Database/LSMTree/Internal/RunAcc.hs +++ b/src/Database/LSMTree/Internal/RunAcc.hs @@ -95,7 +95,7 @@ new :: NumEntries -> RunBloomFilterAlloc -> IndexType -> ST s (RunAcc s) -new (NumEntries nentries) alloc indexAccTypeProxy = do +new (NumEntries nentries) alloc indexType = do mbloom <- case alloc of RunAllocFixed !bitsPerEntry -> let !nbits = fromIntegral bitsPerEntry * fromIntegral nentries @@ -108,7 +108,7 @@ new (NumEntries nentries) alloc indexAccTypeProxy = do MBloom.new (fromIntegralChecked $ Monkey.numHashFunctions (fromIntegral nbits) (fromIntegral nentries)) nbits - mindex <- Index.newWithDefaults indexAccTypeProxy + mindex <- Index.newWithDefaults indexType mpageacc <- PageAcc.newPageAcc entryCount <- newPrimVar 0 pure RunAcc{..} diff --git a/src/Database/LSMTree/Internal/RunBuilder.hs b/src/Database/LSMTree/Internal/RunBuilder.hs index 500ecfd8e..d3926ec7a 100644 --- a/src/Database/LSMTree/Internal/RunBuilder.hs +++ b/src/Database/LSMTree/Internal/RunBuilder.hs @@ -90,16 +90,14 @@ new :: -> RunBloomFilterAlloc -> IndexType -> m (RunBuilder m h) -new hfs hbio runBuilderFsPaths numEntries alloc indexAccTypeProxy = do - runBuilderAcc <- ST.stToIO $ RunAcc.new numEntries alloc indexAccTypeProxy +new hfs hbio runBuilderFsPaths numEntries alloc indexType = do + runBuilderAcc <- ST.stToIO $ RunAcc.new numEntries alloc indexType runBuilderBlobOffset <- newPrimVar 0 runBuilderHandles <- traverse (makeHandle hfs) (pathsForRunFiles runBuilderFsPaths) let builder = RunBuilder { runBuilderHasFS = hfs, runBuilderHasBlockIO = hbio, .. } - writeIndexHeader hfs - (forRunIndex runBuilderHandles) - (resultingIndexTypeProxy indexAccTypeProxy) + writeIndexHeader hfs (forRunIndex runBuilderHandles) indexType return builder {-# SPECIALISE addKeyOp :: diff --git a/src/Database/LSMTree/Internal/Snapshot.hs b/src/Database/LSMTree/Internal/Snapshot.hs index b5fb29809..3d2c0cc3f 100644 --- a/src/Database/LSMTree/Internal/Snapshot.hs +++ b/src/Database/LSMTree/Internal/Snapshot.hs @@ -381,17 +381,19 @@ openRuns levels' <- V.iforM levels $ \i level -> let ln = LevelNo (i+1) in - let caching = diskCachePolicyForLevel confDiskCachePolicy ln in - withIndexTypeProxyForRun confFencePointerIndex $ \ indexTypeProxy -> do - for level $ \runNum -> do - let sourcePaths = RunFsPaths sourceDir runNum - runNum' <- uniqueToRunNumber <$> incrUniqCounter uc - let targetPaths = RunFsPaths targetDir runNum' - hardLinkRunFiles reg hfs hbio NoHardLinkDurable sourcePaths targetPaths - - allocateTemp reg - (Run.openFromDisk hfs hbio caching indexTypeProxy targetPaths) - releaseRef + let + caching = diskCachePolicyForLevel confDiskCachePolicy ln + indexType = indexTypeForRun confFencePointerIndex + in + for level $ \runNum -> do + let sourcePaths = RunFsPaths sourceDir runNum + runNum' <- uniqueToRunNumber <$> incrUniqCounter uc + let targetPaths = RunFsPaths targetDir runNum' + hardLinkRunFiles reg hfs hbio NoHardLinkDurable sourcePaths targetPaths + + allocateTemp reg + (Run.openFromDisk hfs hbio caching indexType targetPaths) + releaseRef pure (SnapLevels levels') {-# SPECIALISE releaseRuns :: @@ -442,6 +444,7 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve where caching = diskCachePolicyForLevel confDiskCachePolicy ln alloc = bloomFilterAllocForLevel conf ln + indexType = indexTypeForRun confFencePointerIndex fromSnapIncomingRun :: SnapIncomingRun (Ref (Run m h)) @@ -455,19 +458,17 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve SnapOngoingMerge runs spentCredits lvl -> do rn <- uniqueToRunNumber <$> incrUniqCounter uc - mr <- withIndexAccTypeProxyForRun confFencePointerIndex $ - \ indexAccTypeProxy -> - allocateTemp reg - (MR.new hfs - hbio - resolve - caching - alloc - indexAccTypeProxy - lvl - (mkPath rn) - runs) - releaseRef + mr <- allocateTemp reg + (MR.new hfs + hbio + resolve + caching + alloc + indexType + lvl + (mkPath rn) + runs) + releaseRef -- When a snapshot is created, merge progress is lost, so we -- have to redo merging work here. UnspentCredits and -- SpentCredits track how many credits were supplied before the