Skip to content

Commit

Permalink
Merge pull request #435 from IntersectMBO/jdral/batch-merge-work
Browse files Browse the repository at this point in the history
Do merge work in batches
  • Loading branch information
jorisdral authored Nov 8, 2024
2 parents 14950a7 + 9c4dd1f commit d45f32c
Show file tree
Hide file tree
Showing 8 changed files with 572 additions and 147 deletions.
45 changes: 45 additions & 0 deletions bench/micro/Bench/Database/LSMTree/Normal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ benchmarks :: Benchmark
benchmarks = bgroup "Bench.Database.LSMTree.Normal" [
benchLargeValueVsSmallValueBlob
, benchCursorScanVsRangeLookupScan
, benchInsertBatches
]

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -215,6 +216,50 @@ benchCursorScanVsRangeLookupScan =
Normal.closeSession s
cleanupFiles (tmpDir, hfs, hbio)


{-------------------------------------------------------------------------------
Benchmark batches of inserts
-------------------------------------------------------------------------------}

benchInsertBatches :: Benchmark
benchInsertBatches =
env genInserts $ \iss ->
withEnv $ \ ~(_, _, _, _, t :: Normal.Table IO Word64 Word64 Void) -> do
bench "benchInsertBatches" $ whnfIO $
V.mapM_ (flip Normal.inserts t) iss
where
!initialSize = 100_000
!batchSize = 256

_benchConfig :: Common.TableConfig
_benchConfig = Common.defaultTableConfig {
Common.confWriteBufferAlloc = Common.AllocNumEntries (Common.NumEntries 1000)
}

randomInserts :: Int -> V.Vector (Word64, Word64, Maybe Void)
randomInserts n = V.unfoldrExactN n f (mkStdGen 17)
where f !g = let (!k, !g') = uniform g
in ((k, v, Nothing), g')
-- The exact value does not matter much, so we pick an arbitrary
-- hardcoded one.
!v = 17

genInserts :: IO (V.Vector (V.Vector (Word64, Word64, Maybe Void)))
genInserts = pure $ vgroupsOfN batchSize $ randomInserts initialSize

withEnv = envWithCleanup initialise cleanup

initialise = do
(tmpDir, hfs, hbio) <- mkFiles
s <- Normal.openSession nullTracer hfs hbio (FS.mkFsPath [])
t <- Normal.new s _benchConfig
pure (tmpDir, hfs, hbio, s, t)

cleanup (tmpDir, hfs, hbio, s, t) = do
Normal.close t
Normal.closeSession s
cleanupFiles (tmpDir, hfs, hbio)

{-------------------------------------------------------------------------------
Setup
-------------------------------------------------------------------------------}
Expand Down
12 changes: 12 additions & 0 deletions src-extras/Database/LSMTree/Extras/NoThunks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,18 @@ deriving anyclass instance NoThunks MergePolicyForLevel
deriving stock instance Generic NumRuns
deriving anyclass instance NoThunks NumRuns

deriving stock instance Generic (UnspentCreditsVar s)
deriving anyclass instance Typeable s => NoThunks (UnspentCreditsVar s)

deriving stock instance Generic (TotalStepsVar s)
deriving anyclass instance Typeable s => NoThunks (TotalStepsVar s)

deriving stock instance Generic (SpentCreditsVar s)
deriving anyclass instance Typeable s => NoThunks (SpentCreditsVar s)

deriving stock instance Generic MergeKnownCompleted
deriving anyclass instance NoThunks MergeKnownCompleted

{-------------------------------------------------------------------------------
Entry
-------------------------------------------------------------------------------}
Expand Down
2 changes: 1 addition & 1 deletion src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ snapshot resolve snap label tableType t = do
-- credits as if the buffer was full, and then flush the (possibly)
-- underfull buffer. However, note that this bit of code
-- here is probably going to change anyway because of #392
supplyCredits (unNumEntries $ case confWriteBufferAlloc conf of AllocNumEntries x -> x) (tableLevels content)
supplyCredits conf (Credit $ unNumEntries $ case confWriteBufferAlloc conf of AllocNumEntries x -> x) (tableLevels content)
content' <- flushWriteBuffer
(TraceMerge `contramap` tableTracer t)
conf
Expand Down
16 changes: 8 additions & 8 deletions src/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ data Merge m h = Merge {
, mergeMappend :: !Mappend
, mergeReaders :: {-# UNPACK #-} !(Readers m h)
, mergeBuilder :: !(RunBuilder m h)
-- | The caching policy to use for the Run in the 'MergeComplete'.
-- | The caching policy to use for the output Run.
, mergeCaching :: !RunDataCaching
-- | The result of the latest call to 'steps'. This is used to determine
-- whether a merge can be 'complete'd.
Expand Down Expand Up @@ -227,7 +227,7 @@ stepsToCompletion m stepBatchSize = go
go = do
steps m stepBatchSize >>= \case
(_, MergeInProgress) -> go
(_, MergeComplete) -> complete m
(_, MergeDone) -> complete m

{-# SPECIALISE stepsToCompletionCounted ::
Merge IO h
Expand All @@ -246,10 +246,10 @@ stepsToCompletionCounted m stepBatchSize = go 0
go !stepsSum = do
steps m stepBatchSize >>= \case
(n, MergeInProgress) -> go (stepsSum + n)
(n, MergeComplete) -> let !stepsSum' = stepsSum + n
(n, MergeDone) -> let !stepsSum' = stepsSum + n
in (stepsSum',) <$> complete m

data StepResult = MergeInProgress | MergeComplete
data StepResult = MergeInProgress | MergeDone
deriving stock Eq

stepsInvariant :: Int -> (Int, StepResult) -> Bool
Expand Down Expand Up @@ -285,7 +285,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
-- check.
readMutVar mergeState >>= \case
Merging -> go 0
MergingDone -> pure (0, MergeComplete)
MergingDone -> pure (0, MergeDone)
Completed -> error "steps: Merge is completed"
Closed -> error "steps: Merge is closed"
where
Expand All @@ -304,7 +304,7 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
-- no future entries, no previous entry to resolve, just write!
writeReaderEntry mergeLevel mergeBuilder key entry
writeMutVar mergeState $! MergingDone
pure (n + 1, MergeComplete)
pure (n + 1, MergeDone)

handleEntry !n !key (Reader.Entry (Mupdate v)) =
-- resolve small mupsert vals with the following entries of the same key
Expand Down Expand Up @@ -343,15 +343,15 @@ steps Merge {..} requestedSteps = assertStepsInvariant <$> do
Readers.Drained -> do
writeSerialisedEntry mergeLevel mergeBuilder key resolved
writeMutVar mergeState $! MergingDone
pure (n + 1, MergeComplete)
pure (n + 1, MergeDone)

dropRemaining !n !key = do
(dropped, hasMore) <- Readers.dropWhileKey mergeReaders key
case hasMore of
Readers.HasMore -> go (n + dropped)
Readers.Drained -> do
writeMutVar mergeState $! MergingDone
pure (n + dropped, MergeComplete)
pure (n + dropped, MergeDone)

{-# SPECIALISE writeReaderEntry ::
Level
Expand Down
Loading

0 comments on commit d45f32c

Please sign in to comment.