Skip to content

Commit

Permalink
Merge pull request #385 from IntersectMBO/mheinzel/fix-readctx-type-p…
Browse files Browse the repository at this point in the history
…aram

Add type parameter m to ReadCtx
  • Loading branch information
dcoutts authored Sep 13, 2024
2 parents 374035f + 6c576b3 commit 6ea0be4
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 45 deletions.
25 changes: 13 additions & 12 deletions src-extras/Database/LSMTree/Extras/NoThunks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -233,23 +233,23 @@ deriving anyclass instance NoThunks PageNo
-------------------------------------------------------------------------------}

deriving stock instance Generic (TableContent m h)
deriving anyclass instance (Typeable (PrimState m), Typeable h)
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (TableContent m h)

deriving stock instance Generic (LevelsCache m (Handle h))
deriving anyclass instance (Typeable (PrimState m), Typeable h)
=> NoThunks (LevelsCache m (Handle h))

deriving stock instance Generic (Level m (Handle h))
deriving anyclass instance (Typeable (PrimState m), Typeable h)
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (Level m (Handle h))

deriving stock instance Generic (MergingRun m (Handle h))
deriving anyclass instance (Typeable (PrimState m), Typeable h)
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (MergingRun m (Handle h))

deriving stock instance Generic (MergingRunState m (Handle h))
deriving anyclass instance (Typeable (PrimState m), Typeable h)
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (MergingRunState m (Handle h))

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -306,9 +306,9 @@ deriving anyclass instance Typeable s
Merge
-------------------------------------------------------------------------------}

deriving stock instance Generic (Merge s (Handle h))
deriving anyclass instance (Typeable s, Typeable h)
=> NoThunks (Merge s (Handle h))
deriving stock instance Generic (Merge m (Handle h))
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (Merge m (Handle h))

deriving stock instance Generic Merge.Level
deriving anyclass instance NoThunks Merge.Level
Expand All @@ -317,9 +317,9 @@ deriving anyclass instance NoThunks Merge.Level
Readers
-------------------------------------------------------------------------------}

deriving stock instance Generic (Readers s (Handle h))
deriving anyclass instance (Typeable s, Typeable h)
=> NoThunks (Readers s (Handle h))
deriving stock instance Generic (Readers m (Handle h))
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (Readers m (Handle h))

deriving stock instance Generic (Reader m (Handle h))
instance (Typeable m, Typeable (PrimState m), Typeable h)
Expand All @@ -332,8 +332,9 @@ instance (Typeable m, Typeable (PrimState m), Typeable h)
deriving stock instance Generic ReaderNumber
deriving anyclass instance NoThunks ReaderNumber

deriving stock instance Generic (ReadCtx (Handle h))
deriving anyclass instance Typeable h => NoThunks (ReadCtx (Handle h))
deriving stock instance Generic (ReadCtx m (Handle h))
deriving anyclass instance (Typeable m, Typeable (PrimState m), Typeable h)
=> NoThunks (ReadCtx m (Handle h))

{-------------------------------------------------------------------------------
Reader
Expand Down
4 changes: 2 additions & 2 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ data CursorEnv m h = CursorEnv {
-- However, the reference counts to the runs only get removed when calling
-- 'closeCursor', as there might still be 'BlobRef's that need the
-- corresponding run to stay alive.
, cursorReaders :: !(Maybe (Readers.Readers (PrimState m) (Handle h)))
, cursorReaders :: !(Maybe (Readers.Readers m (Handle h)))
-- | The runs held open by the cursor. We must remove a reference when the
-- cursor gets closed.
, cursorRuns :: !(V.Vector (Run m (Handle h)))
Expand Down Expand Up @@ -936,7 +936,7 @@ readCursorEntries ::
-> HasBlockIO m h
-> (SerialisedValue -> SerialisedValue -> SerialisedValue)
-> (SerialisedKey -> SerialisedValue -> Maybe (WeakBlobRef m (Handle h)) -> res)
-> Readers.Readers RealWorld (Handle h)
-> Readers.Readers IO (Handle h)
-> Int
-> m (V.Vector res, Readers.HasMore)
readCursorEntries hfs hbio resolve fromEntry readers n =
Expand Down
14 changes: 7 additions & 7 deletions src/Database/LSMTree/Internal/Merge.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Database.LSMTree.Internal.Merge (

import Control.Exception (assert)
import Control.Monad (when)
import Control.Monad.Primitive (RealWorld)
import Control.Monad.Primitive (PrimState, RealWorld)
import Control.RefCount (RefCount (..))
import Data.Coerce (coerce)
import Data.Traversable (for)
Expand All @@ -36,11 +36,11 @@ import System.FS.BlockIO.API (HasBlockIO)
--
-- TODO: Reference counting will have to be done somewhere, either here or in
-- the layer above.
data Merge s fhandle = Merge {
data Merge m fhandle = Merge {
mergeLevel :: !Level
, mergeMappend :: !Mappend
, mergeReaders :: {-# UNPACK #-} !(Readers.Readers s fhandle)
, mergeBuilder :: !(RunBuilder s fhandle)
, mergeReaders :: {-# UNPACK #-} !(Readers.Readers m fhandle)
, mergeBuilder :: !(RunBuilder (PrimState m) fhandle)
, mergeCaching :: !RunDataCaching
-- ^ The caching policy to use for the Run in the 'MergeComplete'.
}
Expand All @@ -61,7 +61,7 @@ new ::
-> Mappend
-> Run.RunFsPaths
-> [Run IO (FS.Handle h)]
-> IO (Maybe (Merge RealWorld (FS.Handle h)))
-> IO (Maybe (Merge IO (FS.Handle h)))
new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
mreaders <- Readers.new fs hbio Nothing runs
for mreaders $ \mergeReaders -> do
Expand All @@ -78,7 +78,7 @@ new fs hbio mergeCaching alloc mergeLevel mergeMappend targetPaths runs = do
close ::
HasFS IO h
-> HasBlockIO IO h
-> Merge RealWorld (FS.Handle h)
-> Merge IO (FS.Handle h)
-> IO ()
close fs hbio Merge {..} = do
Builder.close fs mergeBuilder
Expand All @@ -105,7 +105,7 @@ stepsInvariant requestedSteps = \case
steps ::
HasFS IO h
-> HasBlockIO IO h
-> Merge RealWorld (FS.Handle h)
-> Merge IO (FS.Handle h)
-> Int -- ^ How many input entries to consume (at least)
-> IO (Int, StepResult IO (FS.Handle h))
steps fs hbio Merge {..} requestedSteps =
Expand Down
2 changes: 1 addition & 1 deletion src/Database/LSMTree/Internal/MergeSchedule.hs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ data MergingRun m h =

data MergingRunState m h =
CompletedMerge !(Run m h)
| OngoingMerge !(V.Vector (Run m h)) !(Merge (PrimState m) h)
| OngoingMerge !(V.Vector (Run m h)) !(Merge m h)

{-# SPECIALISE forRunM_ :: Levels IO h -> (Run IO h -> IO ()) -> IO () #-}
forRunM_ :: PrimMonad m => Levels m h -> (Run m h -> m ()) -> m ()
Expand Down
37 changes: 17 additions & 20 deletions src/Database/LSMTree/Internal/RunReaders.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ import System.FS.BlockIO.API (HasBlockIO)
--
-- Creating a 'RunReaders' does not increase the runs' reference count, so make
-- sure they remain open while using the 'RunReaders'.
data Readers s fhandle = Readers {
readersHeap :: !(Heap.MutableHeap s (ReadCtx fhandle))
data Readers m fhandle = Readers {
readersHeap :: !(Heap.MutableHeap (PrimState m) (ReadCtx m fhandle))
-- | Since there is always one reader outside of the heap, we need to
-- store it separately. This also contains the next k\/op to yield, unless
-- all readers are drained, i.e. both:
-- 1. the reader inside the 'ReadCtx' is empty
-- 2. the heap is empty
, readersNext :: !(MutVar s (ReadCtx fhandle))
, readersNext :: !(MutVar (PrimState m) (ReadCtx m fhandle))
}

newtype ReaderNumber = ReaderNumber Int
Expand All @@ -61,30 +61,27 @@ newtype ReaderNumber = ReaderNumber Int
--
-- TODO(optimisation): We allocate this record for each k/op. This might be
-- avoidable, see ideas below.
-- TODO: This should be parametrised over a monad m instead of using IO.
-- Alternatively, we could remove all type parameters on types in this module,
-- as they are only used with IO in any case.
data ReadCtx fhandle = ReadCtx {
data ReadCtx m fhandle = ReadCtx {
-- We could avoid this using a more specialised mutable heap with separate
-- arrays for keys and values (or even each of their components).
-- Using an 'STRef' could avoid reallocating the record for every entry,
-- but that might not be straightforward to integrate with the heap.
readCtxHeadKey :: !SerialisedKey
, readCtxHeadEntry :: !(Reader.Entry IO fhandle)
, readCtxHeadEntry :: !(Reader.Entry m fhandle)
-- We could get rid of this by making 'LoserTree' stable (for which there
-- is a prototype already).
-- Alternatively, if we decide to have an invariant that the number in
-- 'RunFsPaths' is always higher for newer runs, then we could use that
-- in the 'Ord' instance.
, readCtxNumber :: !ReaderNumber
, readCtxReader :: !(Reader IO fhandle)
, readCtxReader :: !(Reader m fhandle)
}

instance Eq (ReadCtx fhandle) where
instance Eq (ReadCtx m fhandle) where
(==) = (==) `on` (\r -> (readCtxHeadKey r, readCtxNumber r))

-- | Makes sure we resolve entries in the right order.
instance Ord (ReadCtx fhandle) where
instance Ord (ReadCtx m fhandle) where
compare = compare `on` (\r -> (readCtxHeadKey r, readCtxNumber r))

-- TODO: This is slightly inelegant. This module could work generally for
Expand All @@ -111,7 +108,7 @@ new :: forall h .
-> HasBlockIO IO h
-> Maybe WB.WriteBuffer
-> [Run IO (FS.Handle h)]
-> IO (Maybe (Readers RealWorld (FS.Handle h)))
-> IO (Maybe (Readers IO (FS.Handle h)))
new fs hbio wbs runs = do
wbCtx <- maybe (pure Nothing) fromWB wbs
runCtxs <- zipWithM (fromRun . ReaderNumber) [1..] runs
Expand All @@ -121,12 +118,12 @@ new fs hbio wbs runs = do
readersNext <- newMutVar readCtx
return Readers {..}
where
fromWB :: WB.WriteBuffer -> IO (Maybe (ReadCtx (FS.Handle h)))
fromWB :: WB.WriteBuffer -> IO (Maybe (ReadCtx IO (FS.Handle h)))
fromWB wb = do
kops <- newMutVar $ map (fmap errOnBlob) $ WB.toList wb
nextReadCtx fs hbio (ReaderNumber 0) (ReadBuffer kops)

fromRun :: ReaderNumber -> Run IO (FS.Handle h) -> IO (Maybe (ReadCtx (FS.Handle h)))
fromRun :: ReaderNumber -> Run IO (FS.Handle h) -> IO (Maybe (ReadCtx IO (FS.Handle h)))
fromRun n run = nextReadCtx fs hbio n . ReadRun =<< Reader.new fs hbio run

-- | TODO: remove once blob references are implemented
Expand All @@ -140,7 +137,7 @@ errOnBlob Delete = Delete
close ::
HasFS IO h
-> HasBlockIO IO h
-> Readers RealWorld (FS.Handle h)
-> Readers IO (FS.Handle h)
-> IO ()
close fs hbio Readers {..} = do
ReadCtx {readCtxReader} <- readMutVar readersNext
Expand All @@ -158,7 +155,7 @@ close fs hbio Readers {..} = do
closeHeap

peekKey ::
Readers RealWorld (FS.Handle h)
Readers IO (FS.Handle h)
-> IO SerialisedKey
peekKey Readers {..} = do
readCtxHeadKey <$> readMutVar readersNext
Expand All @@ -170,7 +167,7 @@ data HasMore = HasMore | Drained
pop ::
HasFS IO h
-> HasBlockIO IO h
-> Readers RealWorld (FS.Handle h)
-> Readers IO (FS.Handle h)
-> IO (SerialisedKey, Reader.Entry IO (FS.Handle h), HasMore)
pop fs hbio r@Readers {..} = do
ReadCtx {..} <- readMutVar readersNext
Expand All @@ -180,7 +177,7 @@ pop fs hbio r@Readers {..} = do
dropWhileKey ::
HasFS IO h
-> HasBlockIO IO h
-> Readers RealWorld (FS.Handle h)
-> Readers IO (FS.Handle h)
-> SerialisedKey
-> IO (Int, HasMore) -- ^ How many were dropped?
dropWhileKey fs hbio Readers {..} key = do
Expand Down Expand Up @@ -211,7 +208,7 @@ dropWhileKey fs hbio Readers {..} key = do
dropOne ::
HasFS IO h
-> HasBlockIO IO h
-> Readers RealWorld (FS.Handle h)
-> Readers IO (FS.Handle h)
-> ReaderNumber
-> Reader IO (FS.Handle h)
-> IO HasMore
Expand All @@ -231,7 +228,7 @@ nextReadCtx ::
-> HasBlockIO IO h
-> ReaderNumber
-> Reader IO (FS.Handle h)
-> IO (Maybe (ReadCtx (FS.Handle h)))
-> IO (Maybe (ReadCtx IO (FS.Handle h)))
nextReadCtx fs hbio readCtxNumber readCtxReader =
case readCtxReader of
ReadRun r -> Reader.next fs hbio r <&> \case
Expand Down
5 changes: 2 additions & 3 deletions test/Test/Database/LSMTree/Internal/RunReaders.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import Test.Tasty (TestTree, testGroup)
import Test.Tasty.QuickCheck
import Test.Util.Orphans ()

import Control.Monad.Primitive (RealWorld)
import Test.QuickCheck.StateModel
import Test.QuickCheck.StateModel.Lockstep
import qualified Test.QuickCheck.StateModel.Lockstep.Defaults as Lockstep
Expand Down Expand Up @@ -281,7 +280,7 @@ data RealState =
!(Maybe ReadersCtx)

-- | Readers, together with the runs being read, so they can be cleaned up at the end
type ReadersCtx = ([Run.Run IO Handle], Readers RealWorld Handle)
type ReadersCtx = ([Run.Run IO Handle], Readers IO Handle)

closeReadersCtx :: FS.HasFS IO MockFS.HandleMock -> FS.HasBlockIO IO MockFS.HandleMock -> ReadersCtx -> IO ()
closeReadersCtx hfs hbio (runs, readers) = do
Expand Down Expand Up @@ -337,7 +336,7 @@ runIO act lu = case act of
return (hasMore, (key, fullEntry, hasMore))

expectReaders ::
(FS.HasFS IO MockFS.HandleMock -> FS.HasBlockIO IO MockFS.HandleMock -> Readers RealWorld Handle -> IO (HasMore, a))
(FS.HasFS IO MockFS.HandleMock -> FS.HasBlockIO IO MockFS.HandleMock -> Readers IO Handle -> IO (HasMore, a))
-> RealMonad (Either () a)
expectReaders f =
ReaderT $ \(hfs, hbio) -> do
Expand Down

0 comments on commit 6ea0be4

Please sign in to comment.