Skip to content

Commit

Permalink
Fix up and reenable the test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
madeline-os committed Jan 26, 2022
1 parent c5a920a commit 151649c
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 74 deletions.
55 changes: 28 additions & 27 deletions beam/task/backend/rhyolite-beam-task-worker-backend.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,31 @@ library
default-language: Haskell2010
ghc-options: -Wall

-- test-suite test
-- other-modules:
-- Types
-- , Utils
--
-- type: exitcode-stdio-1.0
-- main-is: Test.hs
-- hs-source-dirs: test
-- default-language: Haskell2010
-- build-depends:
-- base
-- , beam-automigrate
-- , beam-core
-- , beam-postgres
-- , containers
-- , filepath
-- , gargoyle-postgresql-connect
-- , hspec
-- , lens
-- , monad-logger
-- , postgresql-simple
-- , psql-serializable
-- , resource-pool
-- , rhyolite-beam-task-worker
-- , text
-- , unix
-- ghc-options: -threaded
test-suite test
other-modules:
Types
, Utils

type: exitcode-stdio-1.0
main-is: Test.hs
hs-source-dirs: test
default-language: Haskell2010
build-depends:
rhyolite-beam-task-worker-backend
, rhyolite-beam-task-worker-types
, base
, beam-automigrate
, beam-core
, beam-postgres
, containers
, filepath
, gargoyle-postgresql-connect
, hspec
, lens
, monad-logger
, postgresql-simple
, psql-serializable
, resource-pool
, text
, unix
ghc-options: -threaded
45 changes: 30 additions & 15 deletions beam/task/backend/test/Test.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where

import Control.Concurrent (forkIO, threadDelay, ThreadId)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad (forM_, void, zipWithM_)
import Data.Int (Int32)
import Data.Int (Int64)
import Data.IORef
import Data.List (sort)
import qualified Data.Map as M
Expand Down Expand Up @@ -36,7 +37,7 @@ setupTable pool = do

deleteTable :: Pool Connection -> IO ()
deleteTable pool = void $ withResource pool $ \dbConn ->
runBeamPostgres dbConn $ runDelete $ delete (_tasks tasksDb) (const $ val_ True)
runBeamPostgres dbConn $ runDelete $ delete (_testTasksDb_tasks tasksDb) (const $ val_ True)

withTables :: Pool Connection -> (Connection -> IO ()) -> IO ()
withTables pool = bracket (setupTable pool) deleteTable . const . withResource pool
Expand Down Expand Up @@ -77,7 +78,9 @@ main = do
Nothing -> do
fail "There should have been 1 row in the tasks table"
pure ()
Just (TestTask (Task _ result checkedOutBy) _) -> do
Just r -> do
let result = _testTaskT_result r
checkedOutBy = _testTaskT_checkedOutBy r
result `shouldBe` Just True
checkedOutBy `shouldBe` Nothing
ret `shouldBe` True
Expand Down Expand Up @@ -109,7 +112,7 @@ main = do
Nothing -> do
fail "There should have been 1 row in the tasks table"
pure ()
Just (TestTask (Task _ _ checkedOutBy) _) -> checkedOutBy `shouldBe` Just workerName
Just r -> _testTaskT_checkedOutBy r `shouldBe` Just workerName
readIORef boolRef `shouldReturn` initBool

-- TEST 3
Expand Down Expand Up @@ -139,7 +142,7 @@ main = do
-- Block on awakenMVar
() <- readMVar awakenMVar
atomicModifyIORef countRef (\i -> (i+1, ()))
pure $ pure True
pure $ pure (WrappedColumnar (Just True))
zipWithM_ (spawnTaskWorker work) [c, c2] workerNames

-- ASSERT
Expand All @@ -148,7 +151,7 @@ main = do
-- Both tasks should have been checkedout
threadDelay $ 3 * timeForOneTask
tasks <- allTestTasks c
sort (map (_taskCheckedOutBy . _taskDetails) tasks) `shouldBe` map Just workerNames
sort (map _testTaskT_checkedOutBy tasks) `shouldBe` map Just workerNames

-- ACT
-- Unblock both workers
Expand All @@ -162,8 +165,8 @@ main = do
-- Both tasks' result should be Just True
-- The increment task should also have been completed
tasks <- allTestTasks c
map (_taskCheckedOutBy . _taskDetails) tasks `shouldBe` replicate 2 Nothing
map (_taskResult . _taskDetails) tasks `shouldBe` replicate 2 (Just True)
map _testTaskT_checkedOutBy tasks `shouldBe` replicate 2 Nothing
map _testTaskT_result tasks `shouldBe` replicate 2 (Just True)
readIORef countRef `shouldReturn` (initCount + 2)

-- TEST 4
Expand All @@ -176,8 +179,20 @@ main = do
let
initBool = False
tasks =
[ TestTask (Task 1 (Just True) Nothing) 1 -- Completed Task
, TestTask (Task 2 Nothing (Just "Test-Worker 1")) 2 -- Task already checked out
[ TestTaskT
{ _testTaskT_id = 1
, _testTaskT_payload = 1
, _testTaskT_result = Just True
, _testTaskT_checkedOutBy = Nothing
, _testTaskT_finished = True
}
, TestTaskT
{ _testTaskT_id = 2
, _testTaskT_payload = 2
, _testTaskT_result = Nothing
, _testTaskT_checkedOutBy = Just "Test-Worker 1"
, _testTaskT_finished = False
}
]
boolRef <- newIORef initBool
insertTestTasks c tasks
Expand All @@ -201,12 +216,12 @@ main = do
it "runs parallel taskWorkers on a number of tasks" $ \c -> do
-- SETUP
-- Create a limited number of tasks
-- Create an IORef that stores a map, from Task IDs (Int32) -> Set of worker names that processed the task (Set Text)
-- Create an IORef that stores a map, from Task IDs (Int64) -> Set of worker names that processed the task (Set Text)
let
taskCount = 1000
threadCount = 8
tasks = map createTask [1..fromIntegral taskCount]
mapRef <- newIORef (M.empty :: M.Map Int32 (S.Set Text))
mapRef <- newIORef (M.empty :: M.Map Int64 (S.Set Text))

-- ACT
-- Insert all the tasks into the table
Expand All @@ -216,7 +231,7 @@ main = do
let
work workerName testTaskId = pure $ do
atomicModifyIORef mapRef (\old -> (M.insertWith S.union testTaskId (S.singleton workerName) old, ()))
pure $ pure True
pure $ pure (WrappedColumnar (Just True))
repeatWork conn threadId = do
let workerName = "Task-Worker " <> pack (show threadId)
b <- createTaskWorker conn (work workerName) workerName
Expand All @@ -239,7 +254,7 @@ main = do
threadDelay $ (taskCount * timeForOneTask) `div` threadCount
taskMap <- readIORef mapRef
tasks <- allTestTasks c
map (_taskCheckedOutBy . _taskDetails) tasks `shouldBe` replicate taskCount Nothing
map (_taskResult . _taskDetails) tasks `shouldBe` replicate taskCount (Just True)
map _testTaskT_checkedOutBy tasks `shouldBe` replicate taskCount Nothing
map _testTaskT_result tasks `shouldBe` replicate taskCount (Just True)
M.keys taskMap `shouldBe` [1..fromIntegral taskCount]
all (\set -> S.size set == 1) (M.elems taskMap) `shouldBe` True
71 changes: 47 additions & 24 deletions beam/task/backend/test/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# Language DeriveAnyClass #-}
{-# Language DeriveGeneric #-}
{-# Language FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# Language StandaloneDeriving #-}
{-# Language TemplateHaskell #-}
Expand All @@ -13,7 +14,7 @@ module Types where
import Control.Exception
import Control.Lens
import Control.Monad.Logger
import Data.Int (Int32)
import Data.Int (Int64)
import Data.Proxy
import Data.String (fromString)
import Data.Text (Text)
Expand All @@ -23,10 +24,13 @@ import Database.Beam.Postgres

import Rhyolite.Task.Beam

data TestTaskT f = TestTask
{ _taskDetails :: Task Int32 Bool Text f
, _taskId :: C f Int32
} deriving (Generic, Beamable)
data TestTaskT f = TestTaskT
{ _testTaskT_id :: Columnar f Int64
, _testTaskT_checkedOutBy :: Columnar f (Maybe Text)
, _testTaskT_payload :: Columnar f Int64
, _testTaskT_result :: Columnar f (Maybe Bool)
, _testTaskT_finished :: Columnar f Bool
} deriving (Generic)

makeLenses ''TestTaskT

Expand All @@ -35,25 +39,50 @@ type TestTaskId = PrimaryKey TestTaskT Identity

deriving instance Eq TestTask

instance Beamable TestTaskT
instance Table TestTaskT where
data PrimaryKey TestTaskT f = TestTaskId (Columnar f Int32) deriving (Generic, Beamable)
primaryKey = TestTaskId . _taskId
newtype PrimaryKey TestTaskT f = TestTaskId { unTestTaskId :: Columnar f Int64 }
deriving (Generic)
primaryKey = TestTaskId . _testTaskT_id

instance Beamable (PrimaryKey TestTaskT)

newtype WrappedColumnar a f = WrappedColumnar { unWrappedColumnar :: Columnar f a }
deriving (Generic)

instance Beamable (WrappedColumnar a)

testTask :: Task Postgres TestTaskT (WrappedColumnar Int64) Text (WrappedColumnar (Maybe Bool))
testTask = Task
{ _task_filter = \_ -> val_ True
, _task_payload = WrappedColumnar . _testTaskT_payload
, _task_checkedOutBy = testTaskT_checkedOutBy
, _task_hasRun = testTaskT_finished
, _task_result = lens
(\t -> WrappedColumnar (_testTaskT_result t))
(\t (WrappedColumnar s) -> t
{ _testTaskT_result = s
}
)
}

createTask :: Int32 -> TestTask
createTask i = TestTask (Task i Nothing Nothing) i
createTask :: Int64 -> TestTask
createTask i = TestTaskT
{ _testTaskT_id = i
, _testTaskT_checkedOutBy = Nothing
, _testTaskT_payload = i
, _testTaskT_result = Nothing
, _testTaskT_finished = False
}

newtype TestTasksDb f = TestTasksDb
{ _tasks :: f (TableEntity TestTaskT) }
deriving (Generic, Database be)
{ _testTasksDb_tasks :: f (TableEntity TestTaskT)
} deriving (Generic)

instance Database be TestTasksDb

tasksDb :: DatabaseSettings be TestTasksDb
tasksDb = defaultDbSettings `withDbModification`
dbModification {
_tasks =
modifyTableFields (TestTask taskFields "id")
}
where
taskFields = Task (fromString "payload") (fromString "result") (fromString "checked_out_by")
tasksDb = defaultDbSettings

tasksDbPostgres :: BA.AnnotatedDatabaseSettings Postgres TestTasksDb
tasksDbPostgres = BA.defaultAnnotatedDbSettings tasksDb
Expand All @@ -65,9 +94,3 @@ data TestException = TestException
deriving (Eq, Show, Typeable)

instance Exception TestException

instance MonadLogger IO where
monadLoggerLog _ _ _ msg = print $ toLogStr msg

instance MonadLoggerIO IO where
askLoggerIO = pure (\_ _ _ logStr -> print logStr)
19 changes: 12 additions & 7 deletions beam/task/backend/test/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Utils where
import Control.Concurrent (forkIO)
import Control.Exception (throw)
import Control.Monad (void, when)
import Data.Int (Int32)
import Data.Int (Int64)
import Data.IORef
import Data.Text (Text)
import Database.Beam
Expand All @@ -16,23 +16,28 @@ import Database.PostgreSQL.Serializable
import Rhyolite.Task.Beam.Worker
import Types

type Work = Int32 -> Serializable (IO (Serializable Bool))
type Work = Int64 -> Pg (IO (Pg (WrappedColumnar (Maybe Bool) Identity)))

insertTestTasks :: Connection -> [TestTask] -> IO ()
insertTestTasks c = runBeamPostgres c . runInsert . insert (_tasks tasksDb) . insertValues
insertTestTasks c = runBeamPostgres c . runInsert . insert (_testTasksDb_tasks tasksDb) . insertValues

createTaskWorker :: Connection -> Work -> Text -> IO Bool
createTaskWorker c = taskWorker c (_tasks tasksDb) (val_ True) taskDetails
createTaskWorker conn work wId = taskWorker
conn
(_testTasksDb_tasks tasksDb)
testTask
(\_ -> work . unWrappedColumnar)
wId

allTestTasks :: Connection -> IO [TestTask]
allTestTasks c =
runBeamPostgres c $ runSelectReturningList $
select $ all_ (_tasks tasksDb)
select $ all_ (_testTasksDb_tasks tasksDb)

justOneTestTask :: Connection -> IO (Maybe TestTask)
justOneTestTask c =
runBeamPostgres c $ runSelectReturningOne $
select $ all_ (_tasks tasksDb)
select $ all_ (_testTasksDb_tasks tasksDb)

spawnTaskWorker :: Work -> Connection -> Text -> IO ()
spawnTaskWorker work conn = void . forkIO . void . createTaskWorker conn work
Expand All @@ -41,4 +46,4 @@ toggleBoolIORef :: IORef Bool -> Bool -> Work
toggleBoolIORef boolRef flag = const $ pure $ do
when flag $ throw TestException
atomicModifyIORef boolRef (\b -> (not b, ()))
pure $ pure True
pure $ pure $ WrappedColumnar (Just True)
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{ obelisk ? import ./.obelisk/impl (builtins.removeAttrs args ["pkgs"])
{ obelisk ? import ./.obelisk/impl (builtins.removeAttrs args ["pkgs" "inNixShell"])
, pkgs ? obelisk.nixpkgs
, ... } @ args:

Expand Down

0 comments on commit 151649c

Please sign in to comment.