Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some exception safety fixes #519

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ test-suite lsm-tree-test
Test.Database.LSMTree.Class
Test.Database.LSMTree.Generators
Test.Database.LSMTree.Internal
Test.Database.LSMTree.Internal.BlobFile.FS
Test.Database.LSMTree.Internal.BloomFilter
Test.Database.LSMTree.Internal.Chunk
Test.Database.LSMTree.Internal.CRC32C
Expand Down
31 changes: 19 additions & 12 deletions src/Database/LSMTree/Internal/BlobFile.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ module Database.LSMTree.Internal.BlobFile (
) where

import Control.DeepSeq (NFData (..))
import Control.Monad.Class.MonadThrow (MonadThrow)
import Control.Monad.Class.MonadThrow (MonadCatch (bracketOnError),
MonadThrow (..))
import Control.Monad.Primitive (PrimMonad)
import Control.RefCount
import qualified Data.Primitive.ByteArray as P
Expand Down Expand Up @@ -51,24 +52,30 @@ instance NFData BlobSpan where

-- | Open the given file to make a 'BlobFile'. The finaliser will close and
-- delete the file.
--
-- REF: the resulting reference must be released once it is no longer used.
--
-- ASYNC: this should be called with asynchronous exceptions masked.
{-# SPECIALISE openBlobFile :: HasCallStack => HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (Ref (BlobFile IO h)) #-}
openBlobFile ::
PrimMonad m
(PrimMonad m, MonadCatch m)
=> HasCallStack
=> HasFS m h
-> FS.FsPath
-> FS.OpenMode
-> m (Ref (BlobFile m h))
openBlobFile fs path mode = do
blobFileHandle <- FS.hOpen fs path mode
let finaliser = do
FS.hClose fs blobFileHandle
FS.removeFile fs (FS.handlePath blobFileHandle)
newRef finaliser $ \blobFileRefCounter ->
BlobFile {
blobFileHandle,
blobFileRefCounter
}
openBlobFile fs path mode =
bracketOnError (FS.hOpen fs path mode) (FS.hClose fs) $ \blobFileHandle -> do
let finaliser = do
FS.hClose fs blobFileHandle
-- If we fail to close the file handle, then we won't try to remove
-- the file.
FS.removeFile fs (FS.handlePath blobFileHandle)
newRef finaliser $ \blobFileRefCounter ->
BlobFile {
blobFileHandle,
blobFileRefCounter
}

{-# INLINE readBlob #-}
readBlob ::
Expand Down
11 changes: 10 additions & 1 deletion src/Database/LSMTree/Internal/ChecksumHandle.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Database.LSMTree.Internal.ChecksumHandle
readChecksum,
dropCache,
closeHandle,
removeHandle,
writeToHandle,
-- * Specialised writers
writeRawPage,
Expand All @@ -22,7 +23,7 @@ module Database.LSMTree.Internal.ChecksumHandle
) where

import Control.Monad.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadThrow (MonadThrow)
import Control.Monad.Class.MonadThrow (MonadThrow (..))
import Control.Monad.Primitive
import Data.BloomFilter (Bloom)
import qualified Data.ByteString.Lazy as BSL
Expand Down Expand Up @@ -92,6 +93,14 @@ dropCache hbio (ChecksumHandle h _) = FS.hDropCacheAll hbio h
closeHandle :: HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
closeHandle fs (ChecksumHandle h _checksum) = FS.hClose fs h

{-# SPECIALISE removeHandle ::
HasFS IO h
-> ChecksumHandle RealWorld h
-> IO () #-}
removeHandle :: MonadThrow m => HasFS m h -> ChecksumHandle (PrimState m) h -> m ()
removeHandle fs (ChecksumHandle h _checksum) =
FS.hClose fs h `finally` FS.removeFile fs (handlePath h)

{-# SPECIALISE writeToHandle ::
HasFS IO h
-> ChecksumHandle RealWorld h
Expand Down
4 changes: 4 additions & 0 deletions src/Database/LSMTree/Internal/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ setRunDataCaching hbio runKOpsFile NoCacheRunData = do
RunDataCaching
-> RunBuilder IO h
-> IO (Ref (Run IO h)) #-}
-- TODO: make exception safe
fromMutable ::
(MonadST m, MonadSTM m, MonadMask m)
=> RunDataCaching
Expand All @@ -192,6 +193,7 @@ fromMutable runRunDataCaching builder = do
(runHasFS, runHasBlockIO, runRunFsPaths, runFilter, runIndex, runNumEntries) <-
Builder.unsafeFinalise (runRunDataCaching == NoCacheRunData) builder
runKOpsFile <- FS.hOpen runHasFS (runKOpsPath runRunFsPaths) FS.ReadMode
-- TODO: openBlobFile should be called with exceptions masked
runBlobFile <- openBlobFile runHasFS (runBlobPath runRunFsPaths) FS.ReadMode
setRunDataCaching runHasBlockIO runKOpsFile runRunDataCaching
newRef (finaliser runHasFS runKOpsFile runBlobFile runRunFsPaths)
Expand Down Expand Up @@ -263,6 +265,7 @@ openFromDisk ::
-> RunDataCaching
-> RunFsPaths
-> m (Ref (Run m h))
-- TODO: make exception safe
openFromDisk fs hbio runRunDataCaching runRunFsPaths = do
expectedChecksums <-
expectValidFile (runChecksumsPath runRunFsPaths) . fromChecksumsFile
Expand All @@ -282,6 +285,7 @@ openFromDisk fs hbio runRunDataCaching runRunFsPaths = do
=<< readCRC (forRunIndexRaw expectedChecksums) (forRunIndexRaw paths)

runKOpsFile <- FS.hOpen fs (runKOpsPath runRunFsPaths) FS.ReadMode
-- TODO: openBlobFile should be called with exceptions masked
runBlobFile <- openBlobFile fs (runBlobPath runRunFsPaths) FS.ReadMode
setRunDataCaching hbio runKOpsFile runRunDataCaching
newRef (finaliser fs runKOpsFile runBlobFile runRunFsPaths) $ \runRefCounter ->
Expand Down
69 changes: 61 additions & 8 deletions src/Database/LSMTree/Internal/WriteBufferBlobs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,23 @@ import System.FS.API (HasFS)
--
data WriteBufferBlobs m h =
WriteBufferBlobs {
blobFile :: !(Ref (BlobFile m h))

-- | The manually tracked file pointer.
, blobFilePointer :: !(FilePointer m)
-- | The blob file
--
-- INVARIANT: the file may contain garbage bytes, but no blob reference
-- ('RawBlobRef', 'WeakBlobRef', or 'StrongBlobRef) will reference these
-- bytes.
blobFile :: !(Ref (BlobFile m h))

-- The 'WriteBufferBlobs' is a shared reference-counted object type
, writeBufRefCounter :: !(RefCounter m)
}
-- | The manually tracked file pointer.
--
-- INVARIANT: the file pointer points to a file offset at or beyond the
-- file size.
, blobFilePointer :: !(FilePointer m)

-- The 'WriteBufferBlobs' is a shared reference-counted object type
, writeBufRefCounter :: !(RefCounter m)
}

instance NFData h => NFData (WriteBufferBlobs m h) where
rnf (WriteBufferBlobs a b c) = rnf a `seq` rnf b `seq` rnf c
Expand All @@ -119,6 +128,11 @@ instance RefCounted m (WriteBufferBlobs m h) where
getRefCounter = writeBufRefCounter

{-# SPECIALISE new :: HasFS IO h -> FS.FsPath -> IO (Ref (WriteBufferBlobs IO h)) #-}
-- | Create a new 'WriteBufferBlobs' with a new file.
--
-- REF: the resulting reference must be released once it is no longer used.
--
-- ASYNC: this should be called with asynchronous exceptions masked.
new ::
(PrimMonad m, MonadMask m)
=> HasFS m h
Expand All @@ -128,6 +142,10 @@ new fs blobFileName = open fs blobFileName FS.MustBeNew

{-# SPECIALISE open :: HasFS IO h -> FS.FsPath -> FS.AllowExisting -> IO (Ref (WriteBufferBlobs IO h)) #-}
-- | Open a `WriteBufferBlobs` file and sets the file pointer to the end of the file.
--
-- REF: the resulting reference must be released once it is no longer used.
--
-- ASYNC: this should be called with asynchronous exceptions masked.
open ::
(PrimMonad m, MonadMask m)
=> HasFS m h
Expand All @@ -137,10 +155,18 @@ open ::
open fs blobFileName blobFileAllowExisting = do
-- Must use read/write mode because we write blobs when adding, but
-- we can also be asked to retrieve blobs at any time.
fromBlobFile fs =<< openBlobFile fs blobFileName (FS.ReadWriteMode blobFileAllowExisting)
bracketOnError
(openBlobFile fs blobFileName (FS.ReadWriteMode blobFileAllowExisting))
releaseRef
(fromBlobFile fs)

{-# SPECIALISE fromBlobFile :: HasFS IO h -> Ref (BlobFile IO h) -> IO (Ref (WriteBufferBlobs IO h)) #-}
-- | Make a `WriteBufferBlobs` from a `BlobFile` and set the file pointer to the end of the file.
-- | Make a `WriteBufferBlobs` from a `BlobFile` and set the file pointer to the
-- end of the file.
--
-- REF: the resulting reference must be released once it is no longer used.
--
-- ASYNC: this should be called with asynchronous exceptions masked.
fromBlobFile ::
(PrimMonad m, MonadMask m)
=> HasFS m h
Expand All @@ -159,14 +185,27 @@ fromBlobFile fs blobFile = do
}

{-# SPECIALISE addBlob :: HasFS IO h -> Ref (WriteBufferBlobs IO h) -> SerialisedBlob -> IO BlobSpan #-}
-- | Append a blob.
--
-- If no exception is returned, then the file pointer will be set to exactly the
-- file size.
--
-- If an exception is returned, the file pointer points to a file
-- offset at or beyond the file size. The bytes between the old and new offset
-- might be garbage or missing.
addBlob :: (PrimMonad m, MonadThrow m)
=> HasFS m h
-> Ref (WriteBufferBlobs m h)
-> SerialisedBlob
-> m BlobSpan
addBlob fs (DeRef WriteBufferBlobs {blobFile, blobFilePointer}) blob = do
let blobsize = sizeofBlob blob
-- If an exception happens after updating the file pointer, then no write
-- takes place. The next 'addBlob' will start writing at the new file
-- offset, so there are going to be some uninitialised bytes in the file.
bloboffset <- updateFilePointer blobFilePointer blobsize
-- If an exception happens while writing the blob, the bytes in the file
-- might be corrupted.
BlobFile.writeBlob fs blobFile blob bloboffset
return BlobSpan {
blobSpanOffset = bloboffset,
Expand All @@ -175,6 +214,13 @@ addBlob fs (DeRef WriteBufferBlobs {blobFile, blobFilePointer}) blob = do

-- | Helper function to make a 'RawBlobRef' that points into a
-- 'WriteBufferBlobs'.
--
-- This function should only be used on the result of 'addBlob' on the same
-- 'WriteBufferBlobs'. For example:
--
-- @
-- 'addBlob' hfs wbb blob >>= \\span -> pure ('mkRawBlobRef' wbb span)
-- @
mkRawBlobRef :: Ref (WriteBufferBlobs m h)
-> BlobSpan
-> RawBlobRef m h
Expand All @@ -186,6 +232,13 @@ mkRawBlobRef (DeRef WriteBufferBlobs {blobFile = DeRef blobfile}) blobspan =

-- | Helper function to make a 'WeakBlobRef' that points into a
-- 'WriteBufferBlobs'.
--
-- This function should only be used on the result of 'addBlob' on the same
-- 'WriteBufferBlobs'. For example:
--
-- @
-- 'addBlob' hfs wbb blob >>= \\span -> pure ('mkWeakBlobRef' wbb span)
-- @
mkWeakBlobRef :: Ref (WriteBufferBlobs m h)
-> BlobSpan
-> WeakBlobRef m h
Expand Down
39 changes: 24 additions & 15 deletions src/Database/LSMTree/Internal/WriteBufferReader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ module Database.LSMTree.Internal.WriteBufferReader (
import Control.Concurrent.Class.MonadMVar.Strict
import Control.Monad.Class.MonadST (MonadST (..))
import Control.Monad.Class.MonadSTM (MonadSTM (..))
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..))
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow (..),
bracketOnError)
import Control.Monad.Primitive (PrimMonad (..))
import Control.RefCount (Ref, dupRef, releaseRef)
import Data.Primitive.MutVar (MutVar, newMutVar, readMutVar,
Expand Down Expand Up @@ -42,9 +43,6 @@ import System.FS.BlockIO.API (HasBlockIO)
-> IO WriteBuffer
#-}
-- | Read a serialised `WriteBuffer` back into memory.
--
-- NOTE: The `BlobFile` argument /must be/ the blob file associated with the
-- write buffer; @`readWriteBuffer`@ does not check this.
readWriteBuffer ::
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
=> ResolveSerialisedValue
Expand Down Expand Up @@ -88,23 +86,31 @@ data WriteBufferReader m h = WriteBufferReader {
-> IO (WriteBufferReader IO h)
#-}
-- | See 'Database.LSMTree.Internal.RunReader.new'.
--
-- REF: the resulting 'WriteBufferReader' must be closed once it is no longer
-- used.
--
-- ASYNC: this should be called with asynchronous exceptions masked.
new :: forall m h.
(MonadMVar m, MonadST m, MonadMask m)
=> HasFS m h
-> HasBlockIO m h
-> ForKOps FS.FsPath
-> Ref (BlobFile m h)
-> m (WriteBufferReader m h)
new readerHasFS readerHasBlockIO kOpsPath blobFile = do
readerKOpsHandle <- FS.hOpen readerHasFS (unForKOps kOpsPath) FS.ReadMode
-- Double the file readahead window (only applies to this file descriptor)
FS.hAdviseAll readerHasBlockIO readerKOpsHandle FS.AdviceSequential
readerBlobFile <- dupRef blobFile
-- Load first page from disk, if it exists.
readerCurrentEntryNo <- newPrimVar (0 :: Word16)
firstPage <- readDiskPage readerHasFS readerKOpsHandle
readerCurrentPage <- newMutVar firstPage
pure $ WriteBufferReader{..}
new readerHasFS readerHasBlockIO kOpsPath blobFile =
bracketOnError openKOps (FS.hClose readerHasFS) $ \readerKOpsHandle -> do
-- Double the file readahead window (only applies to this file descriptor)
FS.hAdviseAll readerHasBlockIO readerKOpsHandle FS.AdviceSequential
bracketOnError (dupRef blobFile) releaseRef $ \readerBlobFile -> do
-- Load first page from disk, if it exists.
readerCurrentEntryNo <- newPrimVar (0 :: Word16)
firstPage <- readDiskPage readerHasFS readerKOpsHandle
readerCurrentPage <- newMutVar firstPage
pure $ WriteBufferReader{..}
where
openKOps = FS.hOpen readerHasFS (unForKOps kOpsPath) FS.ReadMode


{-# SPECIALISE
next ::
Expand Down Expand Up @@ -154,10 +160,13 @@ next WriteBufferReader {..} = do
return (ReadEntry key rawEntry)

{-# SPECIALISE close :: WriteBufferReader IO h -> IO () #-}
-- | Close the 'WriteBufferReader'.
--
-- ASYNC: this should be called with asynchronous exceptions masked.
close ::
(MonadMask m, PrimMonad m)
=> WriteBufferReader m h
-> m ()
close WriteBufferReader{..} = do
FS.hClose readerHasFS readerKOpsHandle
releaseRef readerBlobFile
`finally` releaseRef readerBlobFile
Loading
Loading