Skip to content

Commit

Permalink
Merge pull request #466 from IntersectMBO/jdral/proper-snapshot-impl
Browse files Browse the repository at this point in the history
Proper snapshot implementation
  • Loading branch information
jorisdral authored Nov 18, 2024
2 parents 70a4296 + ffd6e60 commit 11738bd
Show file tree
Hide file tree
Showing 17 changed files with 949 additions and 935 deletions.
3 changes: 2 additions & 1 deletion lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ library
Database.LSMTree.Internal.Serialise
Database.LSMTree.Internal.Serialise.Class
Database.LSMTree.Internal.Snapshot
Database.LSMTree.Internal.Snapshot.Codec
Database.LSMTree.Internal.UniqCounter
Database.LSMTree.Internal.Unsliced
Database.LSMTree.Internal.Vector
Expand Down Expand Up @@ -371,7 +372,7 @@ test-suite lsm-tree-test
Test.Database.LSMTree.Internal.RunReaders
Test.Database.LSMTree.Internal.Serialise
Test.Database.LSMTree.Internal.Serialise.Class
Test.Database.LSMTree.Internal.Snapshot
Test.Database.LSMTree.Internal.Snapshot.Codec
Test.Database.LSMTree.Internal.Vector
Test.Database.LSMTree.Internal.Vector.Growing
Test.Database.LSMTree.Model.Table
Expand Down
6 changes: 0 additions & 6 deletions src/Database/LSMTree/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,6 @@ class Labellable a where
-- Exceptions:
--
-- * Deleting a snapshot that doesn't exist is an error.
--
-- TODO: this function currently has a temporary implementation until we have
-- proper snapshots.
deleteSnapshot ::
IOLike m
=> Session m
Expand All @@ -219,9 +216,6 @@ deleteSnapshot (Internal.Session' sesh) = Internal.deleteSnapshot sesh
Session IO
-> IO [Internal.SnapshotName] #-}
-- | List snapshots by name.
--
-- TODO: this function currently has a temporary implementation until we have
-- proper snapshots.
listSnapshots ::
IOLike m
=> Session m
Expand Down
157 changes: 79 additions & 78 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ import Control.Monad.Primitive
import Control.TempRegistry
import Control.Tracer
import Data.Arena (ArenaManager, newArenaManager)
import Data.Char (isNumber)
import Data.Foldable
import Data.Functor.Compose (Compose (..))
import Data.Kind
Expand Down Expand Up @@ -111,6 +110,7 @@ import qualified Database.LSMTree.Internal.RunReaders as Readers
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
SerialisedKey, SerialisedValue)
import Database.LSMTree.Internal.Snapshot
import Database.LSMTree.Internal.Snapshot.Codec
import Database.LSMTree.Internal.UniqCounter
import qualified Database.LSMTree.Internal.WriteBuffer as WB
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
Expand Down Expand Up @@ -397,15 +397,15 @@ openSession tr hfs hbio dir = do
where
root = Paths.SessionRoot dir
lockFilePath = Paths.lockFile root
activeDirPath = Paths.activeDir root
activeDirPath = Paths.getActiveDir (Paths.activeDir root)
snapshotsDirPath = Paths.snapshotsDir root

acquireLock = try @m @FsError $ FS.tryLockFile hbio lockFilePath FS.ExclusiveLock

releaseLock lockFile = forM_ (Compose lockFile) $ \lockFile' -> FS.hUnlock lockFile'

mkSession lockFile x = do
counterVar <- newUniqCounter x
mkSession lockFile = do
counterVar <- newUniqCounter 0
openTablesVar <- newMVar Map.empty
openCursorsVar <- newMVar Map.empty
sessionVar <- RW.new $ SessionOpen $ SessionEnv {
Expand All @@ -423,29 +423,22 @@ openSession tr hfs hbio dir = do
traceWith tr TraceNewSession
FS.createDirectory hfs activeDirPath
FS.createDirectory hfs snapshotsDirPath
mkSession sessionFileLock 0
mkSession sessionFileLock

restoreSession sessionFileLock = do
traceWith tr TraceRestoreSession
-- If the layouts are wrong, we throw an exception, and the lock file
-- is automatically released by bracketOnError.
checkTopLevelDirLayout

-- Clear the active directory by removing the directory and recreating
-- it again.
FS.removeDirectoryRecursive hfs activeDirPath
`finally` FS.createDirectoryIfMissing hfs False activeDirPath

checkActiveDirLayout
checkSnapshotsDirLayout
-- TODO: remove once we have proper snapshotting. Before that, we must
-- prevent name clashes with runs that are still present in the active
-- directory by starting the unique counter at a strictly higher number
-- than the name of any run in the active directory. When we do
-- snapshoting properly, then we'll hard link files into the active
-- directory under new names/numbers, and so session counters will
-- always be able to start at 0.
files <- FS.listDirectory hfs activeDirPath
let (x :: Int) | Set.null files = 0
-- TODO: read is not very robust, but it is only a
-- temporary solution
| otherwise = maximum [ read (takeWhile isNumber f)
| f <- Set.toList files ]
mkSession sessionFileLock (fromIntegral x)
mkSession sessionFileLock

-- Check that the active directory and snapshots directory exist. We assume
-- the lock file already exists at this point.
Expand All @@ -459,12 +452,10 @@ openSession tr hfs hbio dir = do
FS.doesDirectoryExist hfs snapshotsDirPath >>= \b ->
unless b $ throwIO (SessionDirMalformed (FS.mkFsErrorPath hfs snapshotsDirPath))

-- Nothing to check: runs are verified when loading a table, not when
-- a session is restored.
--
-- TODO: when we implement proper snapshotting, the files in the active
-- directory should be ignored and cleaned up.
checkActiveDirLayout = pure ()
-- The active directory should be empty
checkActiveDirLayout = do
contents <- FS.listDirectory hfs activeDirPath
unless (Set.null contents) $ throwIO (SessionDirMalformed (FS.mkFsErrorPath hfs activeDirPath))

-- Nothing to check: snapshots are verified when they are loaded, not when a
-- session is restored.
Expand Down Expand Up @@ -1062,7 +1053,7 @@ readCursorWhile resolve keyIsWanted n Cursor {..} fromEntry = do
-> SnapshotLabel
-> SnapshotTableType
-> Table IO h
-> IO Int #-}
-> IO () #-}
-- | See 'Database.LSMTree.Normal.createSnapshot''.
createSnapshot ::
(MonadFix m, MonadMask m, MonadMVar m, MonadST m, MonadSTM m)
Expand All @@ -1071,59 +1062,62 @@ createSnapshot ::
-> SnapshotLabel
-> SnapshotTableType
-> Table m h
-> m Int
-> m ()
createSnapshot resolve snap label tableType t = do
traceWith (tableTracer t) $ TraceSnapshot snap
let conf = tableConfig t
withOpenTable t $ \thEnv -> do
let hfs = tableHasFS thEnv

-- Guard that the snapshot does not exist already
let snapDir = Paths.namedSnapshotDir (tableSessionRoot thEnv) snap
doesSnapshotExist <-
FS.doesDirectoryExist (tableHasFS thEnv) (Paths.getNamedSnapshotDir snapDir)
if doesSnapshotExist then
throwIO (ErrSnapshotExists snap)
else
-- we assume the snapshots directory already exists, so we just have to
-- create the directory for this specific snapshot.
FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir)

-- For the temporary implementation it is okay to just flush the buffer
-- before taking the snapshot.
content <- modifyWithTempRegistry
(RW.unsafeAcquireWriteAccess (tableContent thEnv))
(atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv))
$ \reg content -> do
-- TODO: When we flush the buffer here, it might be underfull, which
-- could mess up the scheduling. The conservative approach is to supply
-- credits as if the buffer was full, and then flush the (possibly)
-- underfull buffer. However, note that this bit of code
-- here is probably going to change anyway because of #392
supplyCredits conf (Credit $ unNumEntries $ case confWriteBufferAlloc conf of AllocNumEntries x -> x) (tableLevels content)
content' <- flushWriteBuffer
(TraceMerge `contramap` tableTracer t)
conf
resolve
hfs
(tableHasBlockIO thEnv)
(tableSessionRoot thEnv)
(tableSessionUniqCounter thEnv)
reg
content
pure (content', content')
-- At this point, we've flushed the write buffer but we haven't created the
-- snapshot file yet. If an asynchronous exception happens beyond this
-- point, we'll take that loss, as the inner state of the table is still
-- consistent.

snappedLevels <- snapLevels (tableLevels content)
let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snappedLevels
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData

pure $! numSnapRuns snappedLevels
withOpenTable t $ \thEnv ->
withTempRegistry $ \reg -> do -- TODO: use the temp registry for all side effects
let hfs = tableHasFS thEnv

-- Guard that the snapshot does not exist already
let snapDir = Paths.namedSnapshotDir (tableSessionRoot thEnv) snap
doesSnapshotExist <-
FS.doesDirectoryExist (tableHasFS thEnv) (Paths.getNamedSnapshotDir snapDir)
if doesSnapshotExist then
throwIO (ErrSnapshotExists snap)
else
-- we assume the snapshots directory already exists, so we just have to
-- create the directory for this specific snapshot.
FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir)

-- For the temporary implementation it is okay to just flush the buffer
-- before taking the snapshot.
content <- modifyWithTempRegistry
(RW.unsafeAcquireWriteAccess (tableContent thEnv))
(atomically . RW.unsafeReleaseWriteAccess (tableContent thEnv))
$ \innerReg content -> do
-- TODO: When we flush the buffer here, it might be underfull, which
-- could mess up the scheduling. The conservative approach is to supply
-- credits as if the buffer was full, and then flush the (possibly)
-- underfull buffer. However, note that this bit of code
-- here is probably going to change anyway because of #392
supplyCredits conf (Credit $ unNumEntries $ case confWriteBufferAlloc conf of AllocNumEntries x -> x) (tableLevels content)
content' <- flushWriteBuffer
(TraceMerge `contramap` tableTracer t)
conf
resolve
hfs
(tableHasBlockIO thEnv)
(tableSessionRoot thEnv)
(tableSessionUniqCounter thEnv)
innerReg
content
pure (content', content')
-- At this point, we've flushed the write buffer but we haven't created the
-- snapshot file yet. If an asynchronous exception happens beyond this
-- point, we'll take that loss, as the inner state of the table is still
-- consistent.

-- Convert to snapshot format
snapLevels <- toSnapLevels (tableLevels content)
-- Hard link runs into the named snapshot directory
snapLevels' <- snapshotRuns reg snapDir snapLevels

let snapMetaData = SnapshotMetaData label tableType (tableConfig t) snapLevels'
SnapshotMetaDataFile contentPath = Paths.snapshotMetaDataFile snapDir
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData

{-# SPECIALISE openSnapshot ::
Session IO h
Expand Down Expand Up @@ -1161,7 +1155,7 @@ openSnapshot sesh label tableType override snap resolve = do
Left e -> throwIO (ErrSnapshotDeserialiseFailure e snap)
Right x -> pure x

let SnapshotMetaData label' tableType' conf snappedLevels = snapMetaData
let SnapshotMetaData label' tableType' conf snapLevels = snapMetaData

unless (tableType == tableType') $
throwIO (ErrSnapshotWrongTableType snap tableType tableType')
Expand All @@ -1177,7 +1171,14 @@ openSnapshot sesh label tableType override snap resolve = do
<- allocateTemp reg
(WBB.new hfs blobpath)
WBB.removeReference
tableLevels <- openLevels reg hfs hbio conf (sessionUniqCounter seshEnv) (sessionRoot seshEnv) resolve snappedLevels

let actDir = Paths.activeDir (sessionRoot seshEnv)

-- Hard link runs into the active directory,
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir actDir snapLevels
-- Convert from the snapshot format, restoring merge progress in the process
tableLevels <- fromSnapLevels reg hfs hbio conf (sessionUniqCounter seshEnv) resolve actDir snapLevels'

tableCache <- mkLevelsCache reg tableLevels
newWith reg sesh seshEnv conf' am $! TableContent {
tableWriteBuffer = WB.empty
Expand Down
19 changes: 3 additions & 16 deletions src/Database/LSMTree/Internal/BlobFile.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ module Database.LSMTree.Internal.BlobFile (
BlobFile (..)
, BlobSpan (..)
, removeReference
, RemoveFileOnClose (..)
, openBlobFile
, readBlob
, writeBlob
) where

import Control.DeepSeq (NFData (..))
import Control.Monad (unless)
import Control.Monad.Class.MonadThrow (MonadMask, MonadThrow)
import Control.Monad.Primitive (PrimMonad)
import Control.RefCount (RefCounter)
Expand Down Expand Up @@ -55,31 +53,20 @@ removeReference ::
removeReference BlobFile{blobFileRefCounter} =
RC.removeReference blobFileRefCounter

-- | TODO: this hack can be removed once snapshots are done properly and so
-- runs can delete their files on close.
data RemoveFileOnClose = RemoveFileOnClose | DoNotRemoveFileOnClose
deriving stock Eq

-- | Open the given file to make a 'BlobFile'. The finaliser will close and
-- delete the file.
--
-- TODO: Temporarily we have a 'RemoveFileOnClose' flag, which can be removed
-- once 'Run' no longer needs it, when snapshots are implemented.
--
{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> RemoveFileOnClose -> IO (BlobFile IO h) #-}
{-# SPECIALISE openBlobFile :: HasFS IO h -> FS.FsPath -> FS.OpenMode -> IO (BlobFile IO h) #-}
openBlobFile ::
PrimMonad m
=> HasFS m h
-> FS.FsPath
-> FS.OpenMode
-> RemoveFileOnClose
-> m (BlobFile m h)
openBlobFile fs path mode remove = do
openBlobFile fs path mode = do
blobFileHandle <- FS.hOpen fs path mode
let finaliser = do
FS.hClose fs blobFileHandle
unless (remove == DoNotRemoveFileOnClose) $
FS.removeFile fs (FS.handlePath blobFileHandle)
FS.removeFile fs (FS.handlePath blobFileHandle)
blobFileRefCounter <- RC.mkRefCounter1 (Just finaliser)
return BlobFile {
blobFileHandle,
Expand Down
2 changes: 0 additions & 2 deletions src/Database/LSMTree/Internal/CRC32C.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ module Database.LSMTree.Internal.CRC32C (
readChecksumsFile,
writeChecksumsFile,
writeChecksumsFile',

hexdigitsToInt
) where

import Control.Monad
Expand Down
11 changes: 7 additions & 4 deletions src/Database/LSMTree/Internal/Paths.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Database.LSMTree.Internal.Paths (
SessionRoot (..)
, lockFile
, ActiveDir (..)
, activeDir
, runPath
, snapshotsDir
Expand Down Expand Up @@ -62,11 +63,13 @@ newtype SessionRoot = SessionRoot { getSessionRoot :: FsPath }
lockFile :: SessionRoot -> FsPath
lockFile (SessionRoot dir) = dir </> mkFsPath ["lock"]

activeDir :: SessionRoot -> FsPath
activeDir (SessionRoot dir) = dir </> mkFsPath ["active"]
newtype ActiveDir = ActiveDir { getActiveDir :: FsPath }

activeDir :: SessionRoot -> ActiveDir
activeDir (SessionRoot dir) = ActiveDir (dir </> mkFsPath ["active"])

runPath :: SessionRoot -> RunNumber -> RunFsPaths
runPath root = RunFsPaths (activeDir root)
runPath root = RunFsPaths (getActiveDir (activeDir root))

snapshotsDir :: SessionRoot -> FsPath
snapshotsDir (SessionRoot dir) = dir </> mkFsPath ["snapshots"]
Expand Down Expand Up @@ -146,7 +149,7 @@ mkSnapshotName s
-- | The file name for a table's write buffer blob file
tableBlobPath :: SessionRoot -> Unique -> FsPath
tableBlobPath session n =
activeDir session </> mkFsPath [show (uniqueToWord64 n)] <.> "wbblobs"
getActiveDir (activeDir session) </> mkFsPath [show (uniqueToWord64 n)] <.> "wbblobs"

{-------------------------------------------------------------------------------
Run paths
Expand Down
Loading

0 comments on commit 11738bd

Please sign in to comment.