From 240cea6c76fe4872949dc0fc0d85fbe2df5f9811 Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Fri, 16 Oct 2020 23:34:59 +0530 Subject: [PATCH 01/14] getting the test suites to work again --- odd-jobs.cabal | 16 ++-- package.yaml | 5 +- test/Test.hs | 203 +++++++++++++++++++++++++------------------------ test/Try.hs | 28 ------- test/Try2.hs | 62 --------------- 5 files changed, 117 insertions(+), 197 deletions(-) delete mode 100644 test/Try.hs delete mode 100644 test/Try2.hs diff --git a/odd-jobs.cabal b/odd-jobs.cabal index 1adcc3f..64b9cd6 100644 --- a/odd-jobs.cabal +++ b/odd-jobs.cabal @@ -1,10 +1,10 @@ cabal-version: 1.12 --- This file has been generated from package.yaml by hpack version 0.31.2. +-- This file has been generated from package.yaml by hpack version 0.33.0. -- -- see: https://github.com/sol/hpack -- --- hash: e6616995768a7a1fd8654b632013cf28cfa6e31926533c4470e8e970d1d2d3bd +-- hash: 559bd60bc7a5df63861a213412268e713c83719c49399149be8dcbcc620aa0e5 name: odd-jobs version: 0.2.2 @@ -207,11 +207,18 @@ test-suite jobrunner main-is: Test.hs other-modules: CliParser - Try - Try2 + OddJobs.Cli + OddJobs.ConfigBuilder + OddJobs.Endpoints + OddJobs.Job + OddJobs.Migrations + OddJobs.Types + OddJobs.Web + UI Paths_odd_jobs hs-source-dirs: test + src default-extensions: NamedFieldPuns LambdaCase TemplateHaskell ScopedTypeVariables GeneralizedNewtypeDeriving QuasiQuotes OverloadedStrings ghc-options: -Wall -fno-warn-orphans -fno-warn-unused-imports -fno-warn-dodgy-exports -Werror=missing-fields -threaded -with-rtsopts=-N -main-is Test build-depends: @@ -235,7 +242,6 @@ test-suite jobrunner , monad-control , monad-logger , mtl - , odd-jobs , optparse-applicative , postgresql-simple , random diff --git a/package.yaml b/package.yaml index f971e29..5a31f90 100644 --- a/package.yaml +++ b/package.yaml @@ -133,13 +133,14 @@ tests: - -threaded - -with-rtsopts=-N main: Test - source-dirs: test + source-dirs: + - test + - src dependencies: - tasty - tasty-discover - hedgehog - tasty-hedgehog - - odd-jobs - tasty-hunit - random - monad-control diff --git a/test/Test.hs b/test/Test.hs index 6fe6aa5..0a25f47 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -17,7 +17,9 @@ import Data.Aeson.TH as Aeson import Control.Concurrent.Lifted import Control.Concurrent.Async.Lifted import OddJobs.Job (Job(..), JobId, delaySeconds, Seconds(..)) -import System.Log.FastLogger (fromLogStr, withFastLogger, LogType(..), defaultBufSize, FastLogger, FileLogSpec(..), newTimedFastLogger) +import System.Log.FastLogger ( fromLogStr, withFastLogger, LogType'(..) + , defaultBufSize, FastLogger, FileLogSpec(..), newTimedFastLogger + , withTimedFastLogger) import System.Log.FastLogger.Date (newTimeCache, simpleTimeFormat') import Data.String.Conv (toS) import Data.Time @@ -37,6 +39,7 @@ import qualified Data.Time.Convenience as Time import qualified Data.Text as T import Data.Ord (comparing, Down(..)) import Data.Maybe (fromMaybe) +import qualified OddJobs.ConfigBuilder as Job $(Aeson.deriveJSON Aeson.defaultOptions ''Seconds) @@ -75,9 +78,9 @@ tests appPool jobPool = testGroup "All tests" , testEnsureShutdown appPool jobPool , testGracefuleShutdown appPool jobPool ] - , testGroup "property tests" [ testEverything appPool jobPool - -- , propFilterJobs appPool jobPool - ] + -- , testGroup "property tests" [ testEverything appPool jobPool + -- -- , propFilterJobs appPool jobPool + -- ] ] myTestCase @@ -159,14 +162,11 @@ withRandomTable jobPool action = do withNewJobMonitor jobPool actualTest = withRandomTable jobPool $ \tname -> withNamedJobMonitor tname jobPool (actualTest tname) withNamedJobMonitor tname jobPool actualTest = do - (defaults, cleanup) <- Test.defaultJobMonitor tname jobPool - let jobMonitorSettings = defaults{ Job.monitorJobRunner = jobRunner - , Job.monitorDefaultMaxAttempts = 3 - } - finally - (withAsync (Job.runJobMonitor jobMonitorSettings) (const actualTest)) - (cleanup) - + tcache <- newTimeCache simpleTimeFormat' + withTimedFastLogger tcache LogNone $ \tlogger -> do + let flogger logLevel logEvent = tlogger $ \t -> toLogStr t <> " | " <> (Job.defaultLogStr Job.defaultJobType logLevel logEvent) + cfg = Job.mkConfig flogger tname jobPool Job.UnlimitedConcurrentJobs jobRunner (\cfg -> cfg{Job.cfgDefaultMaxAttempts=3}) + withAsync (Job.startJobRunner cfg) (const actualTest) payloadGen :: MonadGen m => m JobPayload payloadGen = Gen.recursive Gen.choice nonRecursive recursive @@ -232,96 +232,97 @@ data JobEvent = JobStart | JobFailed deriving (Eq, Show) -testEverything appPool jobPool = testProperty "test everything" $ property $ do +-- testEverything appPool jobPool = testProperty "test everything" $ property $ do - jobPayloads <- forAll $ Gen.list (Range.linear 300 1000) payloadGen - jobsMVar <- liftIO $ newMVar (Map.empty :: Map.IntMap [(JobEvent, Job.Job)]) +-- jobPayloads <- forAll $ Gen.list (Range.linear 300 1000) payloadGen +-- jobsMVar <- liftIO $ newMVar (Map.empty :: Map.IntMap [(JobEvent, Job.Job)]) - let maxDelay = sum $ map (payloadDelay jobPollingInterval) jobPayloads - completeRun = ((unSeconds maxDelay) `div` concurrencyFactor) + (2 * (unSeconds testPollingInterval)) +-- let maxDelay = sum $ map (payloadDelay jobPollingInterval) jobPayloads +-- completeRun = ((unSeconds maxDelay) `div` concurrencyFactor) + (2 * (unSeconds testPollingInterval)) - shutdownAfter <- forAll $ Gen.choice [ pure $ Seconds completeRun -- Either we allow all jobs to be completed properly - , (Seconds <$> (Gen.int $ Range.linear 1 completeRun)) -- Or, we shutdown early after a random number of seconds - ] +-- shutdownAfter <- forAll $ Gen.choice +-- [ pure $ Seconds completeRun -- Either we allow all jobs to be completed properly +-- , (Seconds <$> (Gen.int $ Range.linear 1 completeRun)) -- Or, we shutdown early after a random number of seconds +-- ] - test $ withRandomTable jobPool $ \tname -> do - (defaults, cleanup) <- liftIO $ Test.defaultJobMonitor tname jobPool - let jobMonitorSettings = defaults { Job.monitorJobRunner = jobRunner - , Job.monitorTableName = tname - , Job.monitorOnJobStart = onJobEvent JobStart jobsMVar - , Job.monitorOnJobFailed = onJobEvent JobRetry jobsMVar - , Job.monitorOnJobPermanentlyFailed = onJobEvent JobFailed jobsMVar - , Job.monitorOnJobSuccess = onJobEvent JobSuccess jobsMVar - , Job.monitorDefaultMaxAttempts = 3 - , Job.monitorPollingInterval = jobPollingInterval - } +-- test $ withRandomTable jobPool $ \tname -> do +-- (defaults, cleanup) <- liftIO $ Test.defaultJobMonitor tname jobPool +-- let jobMonitorSettings = defaults { Job.monitorJobRunner = jobRunner +-- , Job.monitorTableName = tname +-- , Job.monitorOnJobStart = onJobEvent JobStart jobsMVar +-- , Job.monitorOnJobFailed = onJobEvent JobRetry jobsMVar +-- , Job.monitorOnJobPermanentlyFailed = onJobEvent JobFailed jobsMVar +-- , Job.monitorOnJobSuccess = onJobEvent JobSuccess jobsMVar +-- , Job.monitorDefaultMaxAttempts = 3 +-- , Job.monitorPollingInterval = jobPollingInterval +-- } - (jobs :: [Job]) <- withAsync - (liftIO $ Job.runJobMonitor jobMonitorSettings) - (const $ finally - (liftIO $ actualTest shutdownAfter jobPayloads tname jobsMVar) - (liftIO cleanup)) +-- (jobs :: [Job]) <- withAsync +-- (liftIO $ Job.runJobMonitor jobMonitorSettings) +-- (const $ finally +-- (liftIO $ actualTest shutdownAfter jobPayloads tname jobsMVar) +-- (liftIO cleanup)) - jobAudit <- takeMVar jobsMVar +-- jobAudit <- takeMVar jobsMVar - [(Only (lockedJobCount :: Int))] <- liftIO $ Pool.withResource appPool $ \conn -> - PGS.query conn ("SELECT coalesce(count(id), 0) FROM " <> tname <> " where status=?") (Only Job.Locked) +-- [(Only (lockedJobCount :: Int))] <- liftIO $ Pool.withResource appPool $ \conn -> +-- PGS.query conn ("SELECT coalesce(count(id), 0) FROM " <> tname <> " where status=?") (Only Job.Locked) - -- ALL jobs should show up in the audit, which means they should have - -- been attempted /at least/ once - (DL.sort $ map jobId jobs) === (DL.sort $ Map.keys jobAudit) +-- -- ALL jobs should show up in the audit, which means they should have +-- -- been attempted /at least/ once +-- (DL.sort $ map jobId jobs) === (DL.sort $ Map.keys jobAudit) - -- No job should be in a locked state - 0 === lockedJobCount +-- -- No job should be in a locked state +-- 0 === lockedJobCount - -- No job should've been simultaneously picked-up by more than one - -- worker - True === (Map.foldl (\m js -> m && noRaceCondition js) True jobAudit) +-- -- No job should've been simultaneously picked-up by more than one +-- -- worker +-- True === (Map.foldl (\m js -> m && noRaceCondition js) True jobAudit) - liftIO $ print $ "Test passed with job-count = " <> show (length jobPayloads) +-- liftIO $ print $ "Test passed with job-count = " <> show (length jobPayloads) - where +-- where - testPollingInterval = 5 - concurrencyFactor = 5 - - noRaceCondition js = DL.foldl (&&) True $ DL.zipWith (\(x,_) (y,_) -> not $ x==JobStart && y==JobStart) js (tail js) - - jobPollingInterval = Seconds 2 - - onJobEvent evt jobsMVar job@Job{jobId} = void $ modifyMVar_ jobsMVar $ \jobMap -> do - pure $ Map.insertWith (++) jobId [(evt, job)] jobMap - - actualTest :: Seconds -> [JobPayload] -> Job.TableName -> MVar (Map.IntMap [(JobEvent, Job.Job)]) -> IO [Job] - actualTest shutdownAfter jobPayloads tname jobsMVar = do - jobs <- forConcurrently jobPayloads $ \pload -> - Pool.withResource appPool $ \conn -> - liftIO $ Job.createJob conn tname pload - let poller nextAction = case nextAction of - Left s -> pure $ Left s - Right remaining -> - if remaining == Seconds 0 - then pure (Right $ Seconds 0) - else do delaySeconds testPollingInterval - print $ "------- Polling (remaining = " <> show (unSeconds remaining) <> " sec)------" - x <- withMVar jobsMVar $ \jobMap -> - if (Map.foldl (\m js -> m && (isJobTerminalState js)) True jobMap) - then pure (Right $ Seconds 0) - else if remaining < testPollingInterval - then pure (Left $ "Timeout. Job count=" <> show (length jobPayloads) <> " shutdownAfter=" <> show shutdownAfter) - else pure $ Right (remaining - testPollingInterval) - poller x - - poller (Right shutdownAfter) >>= \case - Left s -> pure jobs -- Prelude.error s - Right _ -> pure jobs - - isJobTerminalState js = case js of - [] -> False - (_, j):_ -> case (Job.jobStatus j) of - Job.Failed -> True - Job.Success -> True - _ -> False +-- testPollingInterval = 5 +-- concurrencyFactor = 5 + +-- noRaceCondition js = DL.foldl (&&) True $ DL.zipWith (\(x,_) (y,_) -> not $ x==JobStart && y==JobStart) js (tail js) + +-- jobPollingInterval = Seconds 2 + +-- onJobEvent evt jobsMVar job@Job{jobId} = void $ modifyMVar_ jobsMVar $ \jobMap -> do +-- pure $ Map.insertWith (++) jobId [(evt, job)] jobMap + +-- actualTest :: Seconds -> [JobPayload] -> Job.TableName -> MVar (Map.IntMap [(JobEvent, Job.Job)]) -> IO [Job] +-- actualTest shutdownAfter jobPayloads tname jobsMVar = do +-- jobs <- forConcurrently jobPayloads $ \pload -> +-- Pool.withResource appPool $ \conn -> +-- liftIO $ Job.createJob conn tname pload +-- let poller nextAction = case nextAction of +-- Left s -> pure $ Left s +-- Right remaining -> +-- if remaining == Seconds 0 +-- then pure (Right $ Seconds 0) +-- else do delaySeconds testPollingInterval +-- print $ "------- Polling (remaining = " <> show (unSeconds remaining) <> " sec)------" +-- x <- withMVar jobsMVar $ \jobMap -> +-- if (Map.foldl (\m js -> m && (isJobTerminalState js)) True jobMap) +-- then pure (Right $ Seconds 0) +-- else if remaining < testPollingInterval +-- then pure (Left $ "Timeout. Job count=" <> show (length jobPayloads) <> " shutdownAfter=" <> show shutdownAfter) +-- else pure $ Right (remaining - testPollingInterval) +-- poller x + +-- poller (Right shutdownAfter) >>= \case +-- Left s -> pure jobs -- Prelude.error s +-- Right _ -> pure jobs + +-- isJobTerminalState js = case js of +-- [] -> False +-- (_, j):_ -> case (Job.jobStatus j) of +-- Job.Failed -> True +-- Job.Success -> True +-- _ -> False @@ -434,7 +435,7 @@ filterJobs Web.Filter{filterStatuses, filterCreatedAfter, filterCreatedBefore, f Web.OrdUpdatedAt -> (comparing jobUpdatedAt) Web.OrdLockedAt -> (comparing jobLockedAt) Web.OrdStatus -> (comparing jobStatus) - Web.OrdJobType -> comparing Job.jobType + Web.OrdJobType -> comparing Job.defaultJobType resultOrder fn = \x y -> case fn x y of EQ -> compare (Down $ jobId x) (Down $ jobId y) LT -> case dir of @@ -454,13 +455,15 @@ filterJobs Web.Filter{filterStatuses, filterCreatedAfter, filterCreatedBefore, f filterByUpdatedBefore Job.Job{jobUpdatedAt} = maybe True (> jobUpdatedAt) filterUpdatedBefore filterByRunAfter Job.Job{jobRunAt} = maybe True (< jobRunAt) filterRunAfter -defaultJobMonitor :: Job.TableName - -> Pool Connection - -> IO (Job.JobMonitor, IO ()) -defaultJobMonitor tname pool = do - tcache <- newTimeCache simpleTimeFormat' - (tlogger, cleanup) <- newTimedFastLogger tcache LogNone - let flogger loc lsource llevel lstr = tlogger $ \t -> toLogStr t <> " | " <> defaultLogStr loc lsource llevel lstr - pure ( Job.defaultJobMonitor flogger tname pool - , cleanup - ) +-- defaultJobMonitor :: Job.TableName +-- -> Pool Connection +-- -> (Job -> IO ()) +-- -> IO (Job.JobMonitor, IO ()) +-- defaultJobMonitor tname pool = do +-- tcache <- newTimeCache simpleTimeFormat' +-- (tlogger, cleanup) <- newTimedFastLogger tcache LogNone +-- let flogger loc lsource llevel lstr = tlogger $ \t -> toLogStr t <> " | " <> defaultLogStr loc lsource llevel lstr +-- Job.mkConfig _loggingFn tname pool Job.UnlimitedConcurrentJobs (pure ()) +-- pure ( Job.defaultJobMonitor flogger tname pool +-- , cleanup +-- ) diff --git a/test/Try.hs b/test/Try.hs deleted file mode 100644 index cedd0fe..0000000 --- a/test/Try.hs +++ /dev/null @@ -1,28 +0,0 @@ -{-# LANGUAGE FlexibleContexts #-} -module Try where - -import Hedgehog -import qualified Hedgehog.Gen as Gen -import qualified Hedgehog.Range as Range -import Control.Exception.Lifted -import Control.Concurrent.Async.Lifted -import Control.Monad -import Data.Pool as Pool -import Debug.Trace -import Control.Monad.IO.Unlift (liftIO) -import qualified System.Random as R -import Data.String (fromString) - -withRandomTable pool action = do - tname <- liftIO ((("jobs_" <>) . fromString) <$> (replicateM 10 (R.randomRIO ('a', 'z')))) - finally - (Pool.withResource pool $ \conn -> (liftIO $ traceM "I will create the random table here") >> (action tname)) - (Pool.withResource pool $ \conn -> liftIO $ traceM "I will drop the random table here") - -myTest pool = property $ do - randomData <- forAll $ Gen.list (Range.linear 1 100) (Gen.element [1, 2, 3]) - test $ withRandomTable pool $ \tname -> do - withAsync - (traceM "I will be a long-running background thread") - (const $ traceM $ "hooray... I got the random table name " <> tname) - True === True diff --git a/test/Try2.hs b/test/Try2.hs deleted file mode 100644 index a24e16a..0000000 --- a/test/Try2.hs +++ /dev/null @@ -1,62 +0,0 @@ -module Try2 where - --- import Control.Concurrent.Lifted --- import Control.Concurrent.Async.Lifted --- import Control.Exception.Lifted -import UnliftIO.Concurrent -import UnliftIO.Async -import UnliftIO.Exception -import Control.Concurrent.Async (AsyncCancelled(..)) -import Debug.Trace -import Control.Monad -import Data.Functor -import Control.Monad.Reader -import Data.Maybe - --- oneSec :: Int --- oneSec = 1000000 - - --- main :: IO () --- main = finally --- (withAsync --- (catch runMonitoringThread (\AsyncCancelled -> print "---- delivered in main")) --- (const $ threadDelay $ oneSec * 5)) --- (print "finally executed") - - --- myPrint = liftIO . print - --- runMonitoringThread :: IO () --- runMonitoringThread = do --- print "something random" --- runReaderT runMonitoringThread2 10 - --- runMonitoringThread2 :: ReaderT Int IO () --- runMonitoringThread2 = do --- x <- ask --- myPrint $ "==== got the number " <> show x --- runMonitoringThread_ Nothing Nothing --- where --- runMonitoringThread_ mThread1 mThread2= do --- thread1 <- maybe (async task1) pure mThread1 --- thread2 <- maybe (async task2) pure mThread2 --- -- catch (myWait thread1 thread2) (\AsyncCancelled -> myPrint "signal delivered in myWait" >> cancel thread1 >> cancel thread2) --- finally --- (myWait thread1 thread2) --- (myPrint "FINALLY" >> cancel thread1 >> cancel thread2) - --- task1 = forever $ do --- myPrint "thread a is running" --- threadDelay oneSec - --- task2 = forever $ do --- myPrint "thread 2 is running" --- threadDelay (oneSec * 2) - --- myWait :: Async () -> Async () -> ReaderT Int IO () --- myWait thread1 thread2 = do --- waitEitherCatch thread1 thread2 >>= \case --- Left _ -> runMonitoringThread_ Nothing (Just thread2) --- Right _ -> runMonitoringThread_ (Just thread1) Nothing --- myPrint "this should never get executed" From 1ff4676e95e71e62b98f679b18e29af21152022b Mon Sep 17 00:00:00 2001 From: Saurabh Nanda Date: Sat, 17 Oct 2020 00:51:23 +0530 Subject: [PATCH 02/14] got more tests to pass LogJobStart was not being logged - FIXED --- src/OddJobs/Job.hs | 1 + test/Test.hs | 182 +++++++++++++++++++++++++++++++-------------- 2 files changed, 126 insertions(+), 57 deletions(-) diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index b231001..913a701 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -382,6 +382,7 @@ runJob jid = do Just job -> do startTime <- liftIO getCurrentTime lockTimeout <- getDefaultJobTimeout + log LevelInfo $ LogJobStart job (flip catches) [Handler $ timeoutHandler job startTime, Handler $ exceptionHandler job startTime] $ do runJobWithTimeout lockTimeout job endTime <- liftIO getCurrentTime diff --git a/test/Test.hs b/test/Test.hs index 0a25f47..f423346 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -9,13 +9,13 @@ import Data.Functor (void) import Data.Pool as Pool import Test.Tasty.HUnit import Debug.Trace -import Control.Exception.Lifted (finally, catch, bracket) +-- import Control.Exception.Lifted (finally, catch, bracket) import Control.Monad.Logger import Control.Monad.Reader import Data.Aeson as Aeson import Data.Aeson.TH as Aeson -import Control.Concurrent.Lifted -import Control.Concurrent.Async.Lifted +-- import Control.Concurrent.Lifted +-- import Control.Concurrent.Async.Lifted import OddJobs.Job (Job(..), JobId, delaySeconds, Seconds(..)) import System.Log.FastLogger ( fromLogStr, withFastLogger, LogType'(..) , defaultBufSize, FastLogger, FileLogSpec(..), newTimedFastLogger @@ -40,6 +40,7 @@ import qualified Data.Text as T import Data.Ord (comparing, Down(..)) import Data.Maybe (fromMaybe) import qualified OddJobs.ConfigBuilder as Job +import UnliftIO $(Aeson.deriveJSON Aeson.defaultOptions ''Seconds) @@ -76,7 +77,7 @@ tests appPool jobPool = testGroup "All tests" , testJobScheduling appPool jobPool , testJobFailure appPool jobPool , testEnsureShutdown appPool jobPool - , testGracefuleShutdown appPool jobPool + , testGracefulShutdown appPool jobPool ] -- , testGroup "property tests" [ testEverything appPool jobPool -- -- , propFilterJobs appPool jobPool @@ -141,10 +142,64 @@ instance FromJSON JobPayload where parseJSON = genericParseJSON Aeson.defaultOptions -assertJobIdStatus :: (HasCallStack) => Connection -> Job.TableName -> String -> Job.Status -> JobId -> Assertion -assertJobIdStatus conn tname msg st jid = Job.findJobByIdIO conn tname jid >>= \case - Nothing -> assertFailure $ "Not expecting job to be deleted. JobId=" <> show jid - Just (Job{jobStatus}) -> assertEqual msg st jobStatus +logEventToJob :: Job.LogEvent -> Maybe Job.Job +logEventToJob le = case le of + Job.LogJobStart j -> Just j + Job.LogJobSuccess j _ -> Just j + Job.LogJobFailed j _ _ _ -> Just j + Job.LogJobTimeout j -> Just j + Job.LogPoll -> Nothing + Job.LogWebUIRequest -> Nothing + Job.LogText _ -> Nothing + +assertJobIdStatus :: (HasCallStack) + => Connection + -> Job.TableName + -> IORef [Job.LogEvent] + -> String + -> Job.Status + -> JobId + -> Assertion +assertJobIdStatus conn tname logRef msg st jid = do + logs <- readIORef logRef + let mjid = Just jid + case st of + Job.Success -> + assertBool (msg <> ": Success event not found in job-logs for JobId=" <> show jid) $ + (flip DL.any) logs $ \logEvent -> case logEvent of + Job.LogJobSuccess j _ -> jid == Job.jobId j + _ -> False + Job.Queued -> + assertBool (msg <> ": Not expecting to find a queued job in the the job-logs JobId=" <> show jid) $ + not $ (flip DL.any) logs $ \logEvent -> case logEvent of + Job.LogJobStart j -> jid == Job.jobId j + Job.LogJobSuccess j _ -> jid == Job.jobId j + Job.LogJobFailed j _ _ _ -> jid == Job.jobId j + Job.LogJobTimeout j -> jid == Job.jobId j + Job.LogWebUIRequest -> False + Job.LogText _ -> False + Job.Failed -> + assertBool (msg <> ": Failed event not found in job-logs for JobId=" <> show jid) $ + (flip DL.any) logs $ \logEvent -> case logEvent of + Job.LogJobFailed j _ _ _ -> jid == Job.jobId j + _ -> False + + Job.Retry -> + assertBool (msg <> ": Failed event not found in job-logs for JobId=" <> show jid) $ + (flip DL.any) logs $ \logEvent -> case logEvent of + Job.LogJobFailed j _ _ _ -> jid == Job.jobId j + _ -> False + + Job.Locked -> + assertBool (msg <> ": Start event should be present in the log for a locked job JobId=" <> show jid) $ + (flip DL.any) logs $ \logEvent -> case logEvent of + Job.LogJobStart j -> jid == Job.jobId j + _ -> False + + when (st /= Job.Success) $ do + Job.findJobByIdIO conn tname jid >>= \case + Nothing -> assertFailure $ "Not expecting job to be deleted. JobId=" <> show jid + Just (Job{jobStatus}) -> assertEqual msg st jobStatus ensureJobId :: (HasCallStack) => Connection -> Job.TableName -> JobId -> IO Job ensureJobId conn tname jid = Job.findJobByIdIO conn tname jid >>= \case @@ -159,14 +214,19 @@ withRandomTable jobPool action = do (Pool.withResource jobPool $ \conn -> liftIO $ void $ PGS.execute_ conn ("drop table if exists " <> tname <> ";")) -- withNewJobMonitor :: (Pool Connection) -> (TableName -> Assertion) -> Assertion -withNewJobMonitor jobPool actualTest = withRandomTable jobPool $ \tname -> withNamedJobMonitor tname jobPool (actualTest tname) +withNewJobMonitor jobPool actualTest = do + withRandomTable jobPool $ \tname -> do + withNamedJobMonitor tname jobPool (actualTest tname) withNamedJobMonitor tname jobPool actualTest = do + logRef :: IORef [Job.LogEvent] <- newIORef [] tcache <- newTimeCache simpleTimeFormat' withTimedFastLogger tcache LogNone $ \tlogger -> do - let flogger logLevel logEvent = tlogger $ \t -> toLogStr t <> " | " <> (Job.defaultLogStr Job.defaultJobType logLevel logEvent) + let flogger logLevel logEvent = do + tlogger $ \t -> toLogStr t <> " | " <> (Job.defaultLogStr Job.defaultJobType logLevel logEvent) + atomicModifyIORef' logRef (\logs -> (logEvent:logs, ())) cfg = Job.mkConfig flogger tname jobPool Job.UnlimitedConcurrentJobs jobRunner (\cfg -> cfg{Job.cfgDefaultMaxAttempts=3}) - withAsync (Job.startJobRunner cfg) (const actualTest) + withAsync (Job.startJobRunner cfg) (const $ actualTest logRef) payloadGen :: MonadGen m => m JobPayload payloadGen = Gen.recursive Gen.choice nonRecursive recursive @@ -175,56 +235,64 @@ payloadGen = Gen.recursive Gen.choice nonRecursive recursive , PayloadSucceed <$> Gen.element [1, 2, 3]] recursive = [ PayloadFail <$> (Gen.element [1, 2, 3]) <*> payloadGen ] -testJobCreation appPool jobPool = testCase "job creation" $ withNewJobMonitor jobPool $ \tname -> Pool.withResource appPool $ \conn -> do - Job{jobId} <- Job.createJob conn tname (PayloadSucceed 0) - delaySeconds $ Seconds 6 - assertJobIdStatus conn tname "Expecting job to tbe successful by now" Job.Success jobId - -testEnsureShutdown appPool jobPool = testCase "ensure shutdown" $ withRandomTable jobPool $ \tname -> do - jid <- scheduleJob tname - delaySeconds (2 * Job.defaultPollingInterval) - Pool.withResource appPool $ \conn -> - assertJobIdStatus conn tname "Job should still be in queued state if job-monitor is no longer running" Job.Queued jid +testJobCreation appPool jobPool = testCase "job creation" $ do + withNewJobMonitor jobPool $ \tname logRef -> do + Pool.withResource appPool $ \conn -> do + Job{jobId} <- Job.createJob conn tname (PayloadSucceed 0) + delaySeconds $ Seconds 6 + assertJobIdStatus conn tname logRef "Expecting job to be successful by now" Job.Success jobId + +testEnsureShutdown appPool jobPool = testCase "ensure shutdown" $ do + withRandomTable jobPool $ \tname -> do + (jid, logRef) <- scheduleJob tname + delaySeconds (2 * Job.defaultPollingInterval) + Pool.withResource appPool $ \conn -> do + assertJobIdStatus conn tname logRef "Job should still be in queued state if job-monitor is no longer running" Job.Queued jid where - scheduleJob tname = withNamedJobMonitor tname jobPool $ do + scheduleJob tname = withNamedJobMonitor tname jobPool $ \logRef -> do t <- getCurrentTime Pool.withResource appPool $ \conn -> do Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral (2 * (unSeconds Job.defaultPollingInterval))) t) - assertJobIdStatus conn tname "Job is scheduled in future, should still be queueud" Job.Queued jobId - pure jobId - -testGracefuleShutdown appPool jobPool = testCase "ensure graceful shutdown" $ withRandomTable jobPool $ \tname -> do - (j1, j2) <- withNamedJobMonitor tname jobPool $ Pool.withResource appPool $ \conn -> do - t <- getCurrentTime - j1 <- Job.createJob conn tname (PayloadSucceed $ 2 * Job.defaultPollingInterval) - j2 <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral $ unSeconds $ Job.defaultPollingInterval) t) - liftIO $ putStrLn "created" - pure (j1, j2) - Pool.withResource appPool $ \conn -> do - delaySeconds 1 - assertJobIdStatus conn tname "Expecting the first job to be in locked state because it should be running" Job.Locked (jobId j1) - assertJobIdStatus conn tname "Expecting the second job to be queued because no new job should be picked up during graceful shutdown" Job.Queued (jobId j2) - delaySeconds $ 3 * Job.defaultPollingInterval - assertJobIdStatus conn tname "Expecting the first job to be completed successfully if graceful shutdown is implemented correctly" Job.Success (jobId j1) - assertJobIdStatus conn tname "Expecting the second job to be queued because no new job should be picked up during graceful shutdown" Job.Queued (jobId j2) - - pure () - -testJobScheduling appPool jobPool = testCase "job scheduling" $ withNewJobMonitor jobPool $ \tname -> Pool.withResource appPool $ \conn -> do - t <- getCurrentTime - job@Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral 3600) t) - delaySeconds $ Seconds 2 - assertJobIdStatus conn tname "Job is scheduled in the future. It should NOT have been successful by now" Job.Queued jobId - j <- Job.saveJobIO conn tname job{jobRunAt = (addUTCTime (fromIntegral (-1)) t)} - delaySeconds (Job.defaultPollingInterval + (Seconds 2)) - assertJobIdStatus conn tname "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId - -testJobFailure appPool jobPool = testCase "job retry" $ withNewJobMonitor jobPool $ \tname -> Pool.withResource appPool $ \conn -> do - Job{jobId} <- Job.createJob conn tname (PayloadAlwaysFail 0) - delaySeconds $ Seconds 15 - Job{jobAttempts, jobStatus} <- ensureJobId conn tname jobId - assertEqual "Exepcting job to be in Failed status" Job.Failed jobStatus - assertEqual ("Expecting job attempts to be 3. Found " <> show jobAttempts) 3 jobAttempts + assertJobIdStatus conn tname logRef "Job is scheduled in future, should still be queueud" Job.Queued jobId + pure (jobId, logRef) + +testGracefulShutdown appPool jobPool = testCase "ensure graceful shutdown" $ do + withRandomTable jobPool $ \tname -> do + (j1, j2, logRef) <- withNamedJobMonitor tname jobPool $ \logRef -> do + Pool.withResource appPool $ \conn -> do + t <- getCurrentTime + j1 <- Job.createJob conn tname (PayloadSucceed $ 2 * Job.defaultPollingInterval) + j2 <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral $ unSeconds $ Job.defaultPollingInterval) t) + pure (j1, j2, logRef) + Pool.withResource appPool $ \conn -> do + delaySeconds 1 + assertJobIdStatus conn tname logRef "Expecting the first job to be in locked state because it should be running" Job.Locked (jobId j1) + assertJobIdStatus conn tname logRef "Expecting the second job to be queued because no new job should be picked up during graceful shutdown" Job.Queued (jobId j2) + delaySeconds $ 3 * Job.defaultPollingInterval + assertJobIdStatus conn tname logRef "Expecting the first job to be completed successfully if graceful shutdown is implemented correctly" Job.Success (jobId j1) + assertJobIdStatus conn tname logRef "Expecting the second job to be queued because no new job should be picked up during graceful shutdown" Job.Queued (jobId j2) + + pure () + +testJobScheduling appPool jobPool = testCase "job scheduling" $ do + withNewJobMonitor jobPool $ \tname logRef -> do + Pool.withResource appPool $ \conn -> do + t <- getCurrentTime + job@Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral 3600) t) + delaySeconds $ Seconds 2 + assertJobIdStatus conn tname logRef "Job is scheduled in the future. It should NOT have been successful by now" Job.Queued jobId + j <- Job.saveJobIO conn tname job{jobRunAt = (addUTCTime (fromIntegral (-1)) t)} + delaySeconds (Job.defaultPollingInterval + (Seconds 2)) + assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId + +testJobFailure appPool jobPool = testCase "job retry" $ do + withNewJobMonitor jobPool $ \tname logRef -> do + Pool.withResource appPool $ \conn -> do + Job{jobId} <- Job.createJob conn tname (PayloadAlwaysFail 0) + delaySeconds $ Seconds 15 + Job{jobAttempts, jobStatus} <- ensureJobId conn tname jobId + assertEqual "Exepcting job to be in Failed status" Job.Failed jobStatus + assertEqual ("Expecting job attempts to be 3. Found " <> show jobAttempts) 3 jobAttempts data JobEvent = JobStart | JobRetry From 3e2186ecd09bdeec4b9726fef40e708e538aca2b Mon Sep 17 00:00:00 2001 From: David Ellis Date: Wed, 14 Oct 2020 21:41:24 -0600 Subject: [PATCH 03/14] Updated `.gitignore` to ignore some Cabal artifacts --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index b47b7c1..a02a0bc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ stack.yaml.lock test.log TAGS .stack-work/ +dist-* +cabal.project.local* \ No newline at end of file From baeff3287f2dce448376bd0efda9889f94ee67fb Mon Sep 17 00:00:00 2001 From: David Ellis Date: Wed, 14 Oct 2020 21:48:24 -0600 Subject: [PATCH 04/14] Draft implementation for core functionality for resource limited jobs (#38) Many small changes were required, but the majority were mechanical changes. With the exception of the `createJob`/`scheduleJob` family of function, this should be mostly backwards compatible with existing user code that does not use "internal" functions. Further discussion about how to handle backwards compatibility is needed. Note also that this commit by itself is NOT compatible with existing databases. A future commit will address migration from existing to new database definitions. The intention is for this feature to have a negligable impact on performance when the resource limiting features are not being used, and only a minor impact when they are used. However, performance testing has not yet been done. Material changes inclue: - New resource table - New `resource_id` column in to job table - New `cfgDefaultResourceLimit` configuration value - New `createResourceJob` and `scheduleResourceJob` functions - New `TableNames` type to carry the now multiple values needed - Updated existing functions and queries to account for these changes - Updated dequeueing queries to implement resource concurrency limits - New `DevelDb` module to the `devel` executable to help ad hoc testing TODO: - Resource record management functions - Migration functionality (need to discuss database versioning) - Unit tests - Performance tests - Documentation updates - Resource management UI --- dev/DevelDb.hs | 49 ++++++++ odd-jobs.cabal | 1 + src/OddJobs/ConfigBuilder.hs | 14 ++- src/OddJobs/Endpoints.hs | 8 +- src/OddJobs/Job.hs | 217 ++++++++++++++++++++++++----------- src/OddJobs/Migrations.hs | 36 ++++-- src/OddJobs/Types.hs | 43 +++++-- src/OddJobs/Web.hs | 4 +- 8 files changed, 274 insertions(+), 98 deletions(-) create mode 100644 dev/DevelDb.hs diff --git a/dev/DevelDb.hs b/dev/DevelDb.hs new file mode 100644 index 0000000..7ccb0d5 --- /dev/null +++ b/dev/DevelDb.hs @@ -0,0 +1,49 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE OverloadedStrings #-} + +-- | Provides tools to facilitate database interactions during interactive +-- development. +module DevelDb where + +import Control.Exception (bracket) +import Data.Aeson(FromJSON, ToJSON) +import Data.ByteString.Char8 (ByteString, pack) +import Data.Text (Text) +import qualified Database.PostgreSQL.Simple as PGS +import System.Environment (lookupEnv) + +import GHC.Generics + +import OddJobs.Migrations +import OddJobs.Job +import OddJobs.Types + +data TestJob = TestJob Text + deriving (Eq, Show, Generic, FromJSON, ToJSON) + +devTableNames :: TableNames +devTableNames = TableNames + { tnJob = "job", tnResource = "resource" } + +devConnectionString :: IO ByteString +devConnectionString = do + maybe (error "devConnectionString: Expected environment variable \"ODD_JOBS_DEV_DB_CONNECT\" to provide a connection string") + pack + <$> lookupEnv "ODD_JOBS_DEV_DB_CONNECT" + +createDevDatabase :: IO () +createDevDatabase = do + connStr <- devConnectionString + conn <- PGS.connectPostgreSQL connStr + PGS.withTransaction conn $ createJobTables conn devTableNames + +openDevConnection :: IO PGS.Connection +openDevConnection = devConnectionString >>= PGS.connectPostgreSQL + +withDevConnection :: (PGS.Connection -> IO a) -> IO a +withDevConnection = bracket openDevConnection PGS.close + +withDevTransaction :: (PGS.Connection -> IO a) -> IO a +withDevTransaction act = + withDevConnection $ \conn -> PGS.withTransaction conn (act conn) diff --git a/odd-jobs.cabal b/odd-jobs.cabal index 1adcc3f..b30ec59 100644 --- a/odd-jobs.cabal +++ b/odd-jobs.cabal @@ -103,6 +103,7 @@ library executable devel main-is: DevelMain.hs other-modules: + DevelDb OddJobs.Cli OddJobs.ConfigBuilder OddJobs.Endpoints diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 464dd81..ebf51f9 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -34,7 +34,7 @@ import qualified System.Log.FastLogger as FLogger mkConfig :: (LogLevel -> LogEvent -> IO ()) -- ^ "Structured logging" function. Ref: 'cfgLogger' -> TableName - -- ^ DB table which holds your jobs. Ref: 'cfgTableName' + -- ^ DB table which holds your jobs (resource table name will be generated). Ref: 'cfgTableNames' -> Pool Connection -- ^ DB connection-pool to be used by job-runner. Ref: 'cfgDbPool' -> ConcurrencyControl @@ -63,14 +63,15 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn = , cfgDbPool = dbpool , cfgOnJobStart = (const $ pure ()) , cfgDefaultMaxAttempts = 10 - , cfgTableName = tname + , cfgTableNames = simpleTableNames tname , cfgOnJobTimeout = (const $ pure ()) , cfgConcurrencyControl = ccControl + , cfgDefaultResourceLimit = 1 , cfgPidFile = Nothing , cfgJobType = defaultJobType , cfgDefaultJobTimeout = Seconds 600 , cfgJobToHtml = defaultJobToHtml (cfgJobType cfg) - , cfgAllJobTypes = (defaultDynamicJobTypes (cfgTableName cfg) (cfgJobTypeSql cfg)) + , cfgAllJobTypes = (defaultDynamicJobTypes (cfgTableNames cfg) (cfgJobTypeSql cfg)) , cfgJobTypeSql = defaultJobTypeSql } in cfg @@ -184,11 +185,12 @@ defaultConstantJobTypes :: forall a . (Generic a, ConNames (Rep a)) defaultConstantJobTypes _ = AJTFixed $ DL.map toS $ conNames (undefined :: a) -defaultDynamicJobTypes :: TableName +defaultDynamicJobTypes :: TableNames -> PGS.Query -> AllJobTypes -defaultDynamicJobTypes tname jobTypeSql = AJTSql $ \conn -> do - fmap (DL.map ((fromMaybe "(unknown)") . fromOnly)) $ PGS.query_ conn $ "select distinct(" <> jobTypeSql <> ") from " <> tname <> " order by 1 nulls last" +defaultDynamicJobTypes tnames jobTypeSql = AJTSql $ \conn -> do + fmap (DL.map ((fromMaybe "(unknown)") . fromOnly)) $ PGS.query_ conn $ + "select distinct(" <> jobTypeSql <> ") from " <> tnJob tnames <> " order by 1 nulls last" -- | This makes __two important assumptions__. First, this /assumes/ that jobs -- in your app are represented by a sum-type. For example: diff --git a/src/OddJobs/Endpoints.hs b/src/OddJobs/Endpoints.hs index 703e0ce..b4000cc 100644 --- a/src/OddJobs/Endpoints.hs +++ b/src/OddJobs/Endpoints.hs @@ -152,7 +152,7 @@ cancelJob :: Config -> JobId -> Handler NoContent cancelJob Config{..} env jid = do - liftIO $ withResource cfgDbPool $ \conn -> void $ cancelJobIO conn cfgTableName jid + liftIO $ withResource cfgDbPool $ \conn -> void $ cancelJobIO conn cfgTableNames jid redirectToHome env runJobNow :: Config @@ -160,7 +160,7 @@ runJobNow :: Config -> JobId -> Handler NoContent runJobNow Config{..} env jid = do - liftIO $ withResource cfgDbPool $ \conn -> void $ runJobNowIO conn cfgTableName jid + liftIO $ withResource cfgDbPool $ \conn -> void $ runJobNowIO conn cfgTableNames jid redirectToHome env enqueueJob :: Config @@ -169,8 +169,8 @@ enqueueJob :: Config -> Handler NoContent enqueueJob Config{..} env jid = do liftIO $ withResource cfgDbPool $ \conn -> do - void $ unlockJobIO conn cfgTableName jid - void $ runJobNowIO conn cfgTableName jid + void $ unlockJobIO conn cfgTableNames jid + void $ runJobNowIO conn cfgTableNames jid redirectToHome env redirectToHome :: Env -> Handler NoContent diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index b231001..9dcbbd1 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -19,7 +19,9 @@ module OddJobs.Job -- -- $createJobs , createJob + , createResourceJob , scheduleJob + , scheduleResourceJob -- * @Job@ and associated data-types -- @@ -29,10 +31,14 @@ module OddJobs.Job , Status(..) , JobRunnerName(..) , TableName + , TableNames(..) + , simpleTableNames , delaySeconds , Seconds(..) , JobErrHandler(..) , AllJobTypes(..) + , ResourceId(..) + -- ** Structured logging -- @@ -133,11 +139,12 @@ class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where onJobFailed :: forall a . m [JobErrHandler a] getJobRunner :: m (Job -> IO ()) getDbPool :: m (Pool Connection) - getTableName :: m TableName + getTableNames :: m TableNames onJobStart :: Job -> m () getDefaultMaxAttempts :: m Int getRunnerEnv :: m RunnerEnv getConcurrencyControl :: m ConcurrencyControl + getDefaultResourceLimit :: m Int getPidFile :: m (Maybe FilePath) log :: LogLevel -> LogEvent -> m () getDefaultJobTimeout :: m Seconds @@ -174,7 +181,7 @@ instance HasJobRunner RunnerM where logCallbackErrors (jobId job) "onJobSuccess" $ liftIO $ fn job getJobRunner = cfgJobRunner . envConfig <$> ask getDbPool = cfgDbPool . envConfig <$> ask - getTableName = cfgTableName . envConfig <$> ask + getTableNames = cfgTableNames . envConfig <$> ask onJobStart job = do fn <- cfgOnJobStart . envConfig <$> ask logCallbackErrors (jobId job) "onJobStart" $ liftIO $ fn job @@ -185,6 +192,8 @@ instance HasJobRunner RunnerM where getConcurrencyControl = (cfgConcurrencyControl . envConfig <$> ask) + getDefaultResourceLimit = cfgDefaultResourceLimit . envConfig <$> ask + getPidFile = cfgPidFile . envConfig <$> ask log logLevel logEvent = do @@ -216,8 +225,8 @@ jobWorkerName = do -- the jobs table it is __recommended__ that you do not issue a @SELECT *@ or -- @RETURNIG *@. List out specific DB columns using 'jobDbColumns' and -- 'concatJobDbColumns' instead. This will insulate you from runtime errors --- caused by addition of new columns to 'cfgTableName' in future versions of --- OddJobs. +-- caused by addition of new columns to job table of 'cfgTableNames' in future +-- versions of OddJobs. jobDbColumns :: (IsString s, Semigroup s) => [s] jobDbColumns = [ "id" @@ -230,6 +239,7 @@ jobDbColumns = , "attempts" , "locked_at" , "locked_by" + , "resource_id" ] -- | All 'jobDbColumns' joined together with commas. Useful for constructing SQL @@ -245,8 +255,8 @@ concatJobDbColumns = concatJobDbColumns_ jobDbColumns "" concatJobDbColumns_ (col:cols) x = concatJobDbColumns_ cols (x <> col <> ", ") -findJobByIdQuery :: TableName -> PGS.Query -findJobByIdQuery tname = "SELECT " <> concatJobDbColumns <> " FROM " <> tname <> " WHERE id = ?" +findJobByIdQuery :: TableNames -> PGS.Query +findJobByIdQuery tnames = "SELECT " <> concatJobDbColumns <> " FROM " <> tnJob tnames <> " WHERE id = ?" withDbConnection :: (HasJobRunner m) => (Connection -> m a) @@ -258,9 +268,10 @@ withDbConnection action = do -- -- $dbHelpers -- --- A bunch of functions that help you query 'cfgTableName' and change the status --- of individual jobs. Most of these functions are in @IO@ and you /might/ want --- to write wrappers that lift them into you application's custom monad. +-- A bunch of functions that help you query tables in 'cfgTableNames' and change +-- the status of individual jobs. Most of these functions are in @IO@ and you +-- /might/ want to write wrappers that lift them into you application's custom +-- monad. -- -- __Note:__ When passing a 'Connection' to these function, it is -- __recommended__ __to not__ take a connection from 'cfgDbPool'. Use your @@ -271,29 +282,29 @@ findJobById :: (HasJobRunner m) => JobId -> m (Maybe Job) findJobById jid = do - tname <- getTableName - withDbConnection $ \conn -> liftIO $ findJobByIdIO conn tname jid + tnames <- getTableNames + withDbConnection $ \conn -> liftIO $ findJobByIdIO conn tnames jid -findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job) -findJobByIdIO conn tname jid = PGS.query conn (findJobByIdQuery tname) (Only jid) >>= \case +findJobByIdIO :: Connection -> TableNames -> JobId -> IO (Maybe Job) +findJobByIdIO conn tnames jid = PGS.query conn (findJobByIdQuery tnames) (Only jid) >>= \case [] -> pure Nothing [j] -> pure (Just j) js -> Prelude.error $ "Not expecting to find multiple jobs by id=" <> (show jid) -saveJobQuery :: TableName -> PGS.Query -saveJobQuery tname = "UPDATE " <> tname <> " set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ? WHERE id = ? RETURNING " <> concatJobDbColumns +saveJobQuery :: TableNames -> PGS.Query +saveJobQuery tnames = "UPDATE " <> tnJob tnames <> " set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ?, resource_id = ? WHERE id = ? RETURNING " <> concatJobDbColumns -deleteJobQuery :: TableName -> PGS.Query -deleteJobQuery tname = "DELETE FROM " <> tname <> " WHERE id = ?" +deleteJobQuery :: TableNames -> PGS.Query +deleteJobQuery tnames = "DELETE FROM " <> tnJob tnames <> " WHERE id = ?" saveJob :: (HasJobRunner m) => Job -> m Job saveJob j = do - tname <- getTableName - withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j + tnames <- getTableNames + withDbConnection $ \conn -> liftIO $ saveJobIO conn tnames j -saveJobIO :: Connection -> TableName -> Job -> IO Job -saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobId} = do +saveJobIO :: Connection -> TableNames -> Job -> IO Job +saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResourceId, jobId} = do rs <- PGS.query conn (saveJobQuery tname) ( jobRunAt , jobStatus @@ -302,6 +313,7 @@ saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttem , jobAttempts , jobLockedAt , jobLockedBy + , jobResourceId , jobId ) case rs of @@ -311,38 +323,38 @@ saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttem deleteJob :: (HasJobRunner m) => JobId -> m () deleteJob jid = do - tname <- getTableName - withDbConnection $ \conn -> liftIO $ deleteJobIO conn tname jid + tnames <- getTableNames + withDbConnection $ \conn -> liftIO $ deleteJobIO conn tnames jid -deleteJobIO :: Connection -> TableName -> JobId -> IO () -deleteJobIO conn tname jid = do - void $ PGS.execute conn (deleteJobQuery tname) (Only jid) +deleteJobIO :: Connection -> TableNames -> JobId -> IO () +deleteJobIO conn tnames jid = + void $ PGS.execute conn (deleteJobQuery tnames) (Only jid) -runJobNowIO :: Connection -> TableName -> JobId -> IO (Maybe Job) -runJobNowIO conn tname jid = do +runJobNowIO :: Connection -> TableNames -> JobId -> IO (Maybe Job) +runJobNowIO conn tnames jid = do t <- getCurrentTime - updateJobHelper tname conn (Queued, [Queued, Retry, Failed], Just t, jid) + updateJobHelper tnames conn (Queued, [Queued, Retry, Failed], Just t, jid) -- | TODO: First check in all job-runners if this job is still running, or not, -- and somehow send an uninterruptibleCancel to that thread. -unlockJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) -unlockJobIO conn tname jid = do +unlockJobIO :: Connection -> TableNames -> JobId -> IO (Maybe Job) +unlockJobIO conn tnames jid = do fmap listToMaybe $ PGS.query conn q (Retry, jid, In [Locked]) where - q = "update " <> tname <> " set status=?, run_at=now(), locked_at=null, locked_by=null where id=? and status in ? returning " <> concatJobDbColumns + q = "update " <> tnJob tnames <> " set status=?, run_at=now(), locked_at=null, locked_by=null where id=? and status in ? returning " <> concatJobDbColumns -cancelJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) -cancelJobIO conn tname jid = - updateJobHelper tname conn (Failed, [Queued, Retry], Nothing, jid) +cancelJobIO :: Connection -> TableNames -> JobId -> IO (Maybe Job) +cancelJobIO conn tnames jid = + updateJobHelper tnames conn (Failed, [Queued, Retry], Nothing, jid) -updateJobHelper :: TableName +updateJobHelper :: TableNames -> Connection -> (Status, [Status], Maybe UTCTime, JobId) -> IO (Maybe Job) -updateJobHelper tname conn (newStatus, existingStates, mRunAt, jid) = +updateJobHelper tnames conn (newStatus, existingStates, mRunAt, jid) = fmap listToMaybe $ PGS.query conn q (newStatus, runAt, jid, PGS.In existingStates) where - q = "update " <> tname <> " set attempts=0, status=?, run_at=? where id=? and status in ? returning " <> concatJobDbColumns + q = "update " <> tnJob tnames <> " set attempts=0, status=?, run_at=? where id=? and status in ? returning " <> concatJobDbColumns runAt = case mRunAt of Nothing -> PGS.toField $ PGS.Identifier "run_at" Just t -> PGS.toField t @@ -453,8 +465,21 @@ jobMonitor = do liftIO $ Dir.removePathForcibly f -- | Ref: 'jobPoller' -jobPollingSql :: TableName -> Query -jobPollingSql tname = "update " <> tname <> " set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1 WHERE id in (select id from " <> tname <> " where (run_at<=? AND ((status in ?) OR (status = ? and locked_at Query +jobPollingSql tnames = + "update " <> tnJob tnames <> + "\n set status = ?, locked_at = now(), locked_by = ?, attempts=attempts+1" <> + "\nwhere id in ( select id from " <> tnJob tnames <> " j_out" <> + "\n where (run_at<=now() AND (status in ? OR (status = ? and locked_at < now() - ? * interval '1 second')))" <> + "\n and case when resource_id is null then true" <> + "\n else ( select count(id) from " <> tnJob tnames <> " j_in" <> + "\n where j_in.resource_id = j_out.resource_id and j_in.status = ?" <> + "\n and j_in.locked_at >= now() - ? * interval '1 second' )" <> + "\n < coalesce(( select resource_limit from " <> tnResource tnames <> " r" <> + "\n where r.resource_id = j_out.resource_id ), ?)" <> + "\n end" <> + "\n order by run_at asc limit 1 for update )" <> + "\nreturning id" waitForJobs :: (HasJobRunner m) => m () @@ -497,19 +522,19 @@ jobPoller :: (HasJobRunner m) => m () jobPoller = do processName <- liftIO jobWorkerName pool <- getDbPool - tname <- getTableName + tnames <- getTableNames lockTimeout <- getDefaultJobTimeout log LevelInfo $ LogText $ toS $ "Starting the job monitor via DB polling with processName=" <> processName concurrencyControlFn <- getConcurrencyControlFn + defaultResourceLimit <- getDefaultResourceLimit withResource pool $ \pollerDbConn -> forever $ concurrencyControlFn >>= \case False -> log LevelWarn $ LogText $ "NOT polling the job queue due to concurrency control" True -> do nextAction <- mask_ $ do log LevelDebug $ LogText $ toS $ "[" <> processName <> "] Polling the job queue.." - t <- liftIO getCurrentTime - r <- liftIO $ - PGS.query pollerDbConn (jobPollingSql tname) - (Locked, t, processName, t, (In [Queued, Retry]), Locked, (addUTCTime (fromIntegral $ negate $ unSeconds lockTimeout) t)) + r <- liftIO $ PGS.query pollerDbConn + (jobPollingSql tnames) + (Locked, processName, In [Queued, Retry], Locked, unSeconds lockTimeout, Locked, unSeconds lockTimeout, defaultResourceLimit) case r of -- When we don't have any jobs to run, we can relax a bit... [] -> pure delayAction @@ -532,21 +557,33 @@ jobEventListener :: (HasJobRunner m) jobEventListener = do log LevelInfo $ LogText "Starting the job monitor via LISTEN/NOTIFY..." pool <- getDbPool - tname <- getTableName - jwName <- liftIO jobWorkerName + tnames <- getTableNames + lockTimeout <- getDefaultJobTimeout concurrencyControlFn <- getConcurrencyControlFn + defaultResourceLimit <- getDefaultResourceLimit + jwName <- liftIO jobWorkerName + let tryLockingJob jid = do - let q = "UPDATE " <> tname <> " SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 WHERE id=? AND status in ? RETURNING id" - (withDbConnection $ \conn -> (liftIO $ PGS.query conn q (Locked, jwName, jid, In [Queued, Retry]))) >>= \case + let q = "update " <> tnJob tnames <> " j_out" <> + "\n set status=?, locked_at=now(), locked_by=?, attempts=attempts+1" <> + "\nwhere id=? and status in ? " <> + "\n and case when resource_id is null then true" <> + "\n else ( select count(id) from " <> tnJob tnames <> " j_in" <> + "\n where j_in.resource_id = j_out.resource_id and j_in.status = ?" <> + "\n and j_in.locked_at >= now() - ? * interval '1 second' )" <> + "\n < coalesce((select resource_limit from " <> tnResource tnames <> " r where r.resource_id = j_out.resource_id), ?)" <> + "\n end" <> + "\nRETURNING id" + (withDbConnection $ \conn -> (liftIO $ PGS.query conn q (Locked, jwName, jid, In [Queued, Retry], Locked, unSeconds lockTimeout, defaultResourceLimit))) >>= \case [] -> do - log LevelDebug $ LogText $ toS $ "Job was locked by someone else before I could start. Skipping it. JobId=" <> show jid + log LevelDebug $ LogText $ toS $ "Job was locked by someone else or resource limit was reached before I could start. Skipping it. JobId=" <> show jid pure Nothing [Only (_ :: JobId)] -> pure $ Just jid x -> error $ "WTF just happned? Was expecting a single row to be returned, received " ++ (show x) withResource pool $ \monitorDbConn -> do - void $ liftIO $ PGS.execute monitorDbConn ("LISTEN " <> pgEventName tname) () + void $ liftIO $ PGS.execute monitorDbConn ("LISTEN " <> pgEventName tnames) () forever $ do log LevelDebug $ LogText "[LISTEN/NOFIFY] Event loop" notif <- liftIO $ getNotification monitorDbConn @@ -583,8 +620,8 @@ jobEventListener = do -createJobQuery :: TableName -> PGS.Query -createJobQuery tname = "INSERT INTO " <> tname <> "(run_at, status, payload, last_error, attempts, locked_at, locked_by) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns +createJobQuery :: TableNames -> PGS.Query +createJobQuery tnames = "INSERT INTO " <> tnJob tnames <> "(run_at, status, payload, last_error, attempts, locked_at, locked_by, resource_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns -- $createJobs -- @@ -599,12 +636,27 @@ createJobQuery tname = "INSERT INTO " <> tname <> "(run_at, status, payload, las -- 'scheduleJob' for further documentation. createJob :: ToJSON p => Connection - -> TableName + -> TableNames -> p -> IO Job -createJob conn tname payload = do +createJob conn tnames payload = do t <- getCurrentTime - scheduleJob conn tname payload t + scheduleJob conn tnames payload t + +-- | Create a job for immediate execution, with additional concurrnecy limits +-- determined by the given 'resource'. +-- +-- Internally calls 'scheduleResourceJob' passing it the current time. Read +-- 'scheduleResourceJob' for further documentation. +createResourceJob :: ToJSON p + => Connection + -> TableNames + -> p + -> Maybe ResourceId + -> IO Job +createResourceJob conn tnames payload resource = do + t <- getCurrentTime + scheduleResourceJob conn tnames payload resource t -- | Create a job for execution at the given time. -- @@ -618,14 +670,45 @@ scheduleJob :: ToJSON p => Connection -- ^ DB connection to use. __Note:__ This should -- /ideally/ come out of your application's DB pool, -- not the 'cfgDbPool' you used in the job-runner. - -> TableName -- ^ DB-table which holds your jobs + -> TableNames -- ^ DB-tables which holds your jobs and resources -> p -- ^ Job payload -> UTCTime -- ^ when should the job be executed -> IO Job -scheduleJob conn tname payload runAt = do - let args = ( runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text ) - queryFormatter = toS <$> (PGS.formatQuery conn (createJobQuery tname) args) - rs <- PGS.query conn (createJobQuery tname) args +scheduleJob conn tnames payload runAt = + scheduleResourceJob conn tnames payload Nothing runAt + +-- | Create a job for execution at the given time, with additional concurrnecy +-- limits determined by the given 'resource'. +-- +-- * If time has already past, 'jobEventListener' is going to pick this up +-- for execution immediately. +-- +-- * If time is in the future, 'jobPoller' is going to pick this up with an +-- error of +/- 'cfgPollingInterval' seconds. Please do not expect very high +-- accuracy of when the job is actually executed. +-- +-- * If 'resource' is 'Nothing', then only the machine-bound concurrency +-- control limit will be enforced (ref: 'cfgConcurrencyControl'). If a +-- 'ResourceId' is supplied, and that id is found in the resource table (ref: +-- 'cfgTableNames'), then the resource's concurrency limit will be enforced +-- for this job, in addtion to the concurrency control limit. If the resource +-- does not specify a resource limit, or if the resource id is not found, +-- then the default resource limit (ref: 'cfgDefaultResourceLimit') will be +-- used. +scheduleResourceJob :: ToJSON p + => Connection -- ^ DB connection to use. __Note:__ This should + -- /ideally/ come out of your application's DB pool, + -- not the 'cfgDbPool' you used in the job-runner. + -> TableNames -- ^ DB-tables which holds your jobs and resources + -> p -- ^ Job payload + -> Maybe ResourceId -- ^ Id of the resource, if any, whose concurrency + -- limits will apply to this job + -> UTCTime -- ^ when should the job be executed + -> IO Job +scheduleResourceJob conn tnames payload resource runAt = do + let args = ( runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text, resource ) + queryFormatter = toS <$> (PGS.formatQuery conn (createJobQuery tnames) args) + rs <- PGS.query conn (createJobQuery tnames) args case rs of [] -> (Prelude.error . (<> "Not expecting a blank result set when creating a job. Query=")) <$> queryFormatter [r] -> pure r @@ -677,12 +760,12 @@ fetchAllJobTypes Config{cfgAllJobTypes, cfgDbPool} = liftIO $ do -- | Used by web\/admin IO to fetch a \"master list\" of all known job-runners. -- There is a known issue with the way this has been implemented: -- --- * Since this looks at the 'jobLockedBy' column of 'cfgTableName', it will --- discover only those job-runners that are actively executing at least one --- job at the time this function is executed. +-- * Since this looks at the 'jobLockedBy' column of the job table of +-- 'cfgTableNames', it will discover only those job-runners that are +-- actively executing at least one job at the time this function is +-- executed. fetchAllJobRunners :: (MonadIO m) => Config -> m [JobRunnerName] -fetchAllJobRunners Config{cfgTableName, cfgDbPool} = liftIO $ withResource cfgDbPool $ \conn -> do - fmap (mapMaybe fromOnly) $ PGS.query_ conn $ "select distinct locked_by from " <> cfgTableName - +fetchAllJobRunners Config{cfgTableNames, cfgDbPool} = liftIO $ withResource cfgDbPool $ \conn -> do + fmap (mapMaybe fromOnly) $ PGS.query_ conn $ "select distinct locked_by from " <> tnJob cfgTableNames diff --git a/src/OddJobs/Migrations.hs b/src/OddJobs/Migrations.hs index 6f4b05f..59b2ff1 100644 --- a/src/OddJobs/Migrations.hs +++ b/src/OddJobs/Migrations.hs @@ -8,8 +8,8 @@ import Database.PostgreSQL.Simple as PGS import Data.Functor (void) import OddJobs.Types -createJobTableQuery :: TableName -> Query -createJobTableQuery tname = "CREATE TABLE " <> tname <> +createJobTableQuery :: TableNames -> Query +createJobTableQuery (TableNames tname _) = "CREATE TABLE " <> tname <> "( id serial primary key" <> ", created_at timestamp with time zone default now() not null" <> ", updated_at timestamp with time zone default now() not null" <> @@ -20,6 +20,7 @@ createJobTableQuery tname = "CREATE TABLE " <> tname <> ", attempts int not null default 0" <> ", locked_at timestamp with time zone null" <> ", locked_by text null" <> + ", resource_id text null" <> ", constraint incorrect_locking_info CHECK ((status <> 'locked' and locked_at is null and locked_by is null) or (status = 'locked' and locked_at is not null and locked_by is not null))" <> ");" <> "create index idx_" <> tname <> "_created_at on " <> tname <> "(created_at);" <> @@ -27,23 +28,34 @@ createJobTableQuery tname = "CREATE TABLE " <> tname <> "create index idx_" <> tname <> "_locked_at on " <> tname <> "(locked_at);" <> "create index idx_" <> tname <> "_locked_by on " <> tname <> "(locked_by);" <> "create index idx_" <> tname <> "_status on " <> tname <> "(status);" <> - "create index idx_" <> tname <> "_run_at on " <> tname <> "(run_at);" + "create index idx_" <> tname <> "_run_at on " <> tname <> "(run_at);" <> + "create index idx_" <> tname <> "_resource_id on " <> tname <>"(resource_id) where resource_id is not null;" -createNotificationTrigger :: TableName -> Query -createNotificationTrigger tname = "create or replace function " <> fnName <> "() returns trigger as $$" <> +createResourceTableQuery :: TableNames -> Query +createResourceTableQuery tnames = "create table " <> tnResource tnames <> + "( resource_id text primary key" <> + ", resource_limit int" <> + ");" + +createNotificationTrigger :: TableNames -> Query +createNotificationTrigger tnames = "create or replace function " <> fnName <> "() returns trigger as $$" <> "begin \n" <> - " perform pg_notify('" <> pgEventName tname <> "', \n" <> + " perform pg_notify('" <> pgEventName tnames <> "', \n" <> " json_build_object('id', new.id, 'run_at', new.run_at, 'locked_at', new.locked_at)::text); \n" <> " return new; \n" <> "end; \n" <> "$$ language plpgsql;" <> - "create trigger " <> trgName <> " after insert on " <> tname <> " for each row execute procedure " <> fnName <> "();" + "create trigger " <> trgName <> " after insert on " <> tnJob tnames <> " for each row execute procedure " <> fnName <> "();" where - fnName = "notify_job_monitor_for_" <> tname - trgName = "trg_notify_job_monitor_for_" <> tname + fnName = "notify_job_monitor_for_" <> tnJob tnames + trgName = "trg_notify_job_monitor_for_" <> tnJob tnames createJobTable :: Connection -> TableName -> IO () -createJobTable conn tname = void $ do - PGS.execute_ conn (createJobTableQuery tname) - PGS.execute_ conn (createNotificationTrigger tname) +createJobTable conn tname = createJobTables conn $ simpleTableNames tname + +createJobTables :: Connection -> TableNames -> IO () +createJobTables conn tnames = do + void $ PGS.execute_ conn (createJobTableQuery tnames) + void $ PGS.execute_ conn (createResourceTableQuery tnames) + void $ PGS.execute_ conn (createNotificationTrigger tnames) diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index eb65cde..44669ca 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -35,8 +35,22 @@ import Control.Monad.Logger (LogLevel) -- @ type TableName = PGS.Query -pgEventName :: TableName -> Query -pgEventName tname = "job_created_" <> tname +-- | Specifies the table names used for job management. +data TableNames = TableNames + { tnJob :: TableName + , tnResource :: TableName + } + +-- | Convenience function for building 'TableNames' when resources are not being +-- used or the name of the resource table does not need to be controlled. +simpleTableNames :: TableName -> TableNames +simpleTableNames jobTableName = TableNames + { tnJob = jobTableName + , tnResource = jobTableName <> "_resource" + } + +pgEventName :: TableNames -> Query +pgEventName tnames = "job_created_" <> tnJob tnames newtype Seconds = Seconds { unSeconds :: Int } deriving (Eq, Show, Ord, Num, Read) @@ -158,9 +172,10 @@ instance FromJSON Status where Left e -> fail e Right r -> pure r - newtype JobRunnerName = JobRunnerName { unJobRunnerName :: Text } deriving (Eq, Show, FromField, ToField, Generic, ToJSON, FromJSON) +newtype ResourceId = ResourceId { unResourceId :: Text } deriving (Eq, Show, FromField, ToField, Generic, ToJSON, FromJSON) + data Job = Job { jobId :: JobId , jobCreatedAt :: UTCTime @@ -172,6 +187,7 @@ data Job = Job , jobAttempts :: Int , jobLockedAt :: Maybe UTCTime , jobLockedBy :: Maybe JobRunnerName + , jobResourceId :: Maybe ResourceId } deriving (Eq, Show, Generic) instance ToText Status where @@ -211,6 +227,7 @@ instance FromRow Job where <*> field -- attempts <*> field -- lockedAt <*> field -- lockedBy + <*> field -- resourceId -- TODO: Add a sum-type for return status which can signal the monitor about -- whether the job needs to be retried, marked successfull, or whether it has @@ -230,7 +247,7 @@ data AllJobTypes -- that represents your job payload. = AJTFixed [Text] -- | Construct the list of job-types dynamically by looking at the actual - -- payloads in 'cfgTableName' (using an SQL query). + -- payloads in the job table of 'cfgTableNames' (using an SQL query). | AJTSql (Connection -> IO [Text]) -- | A custom 'IO' action for fetching the list of job-types. | AJTCustom (IO [Text]) @@ -243,9 +260,10 @@ data AllJobTypes -- 'OddJobs.ConfigBuilder.mkConfig' function (to get something with sensible -- defaults) and then tweaking config parameters on a case-by-case basis. data Config = Config - { -- | The DB table which holds your jobs. Please note, this should have been - -- created by the 'OddJobs.Migrations.createJobTable' function. - cfgTableName :: TableName + { -- | The DB tables which hold your jobs and optional resource. Please note, + -- this should have been created by the 'OddJobs.Migrations.createJobTables' + -- function. + cfgTableNames :: TableNames -- | The actualy "job-runner" that __you__ need to provide. If this function -- throws a runtime exception, the job will be retried @@ -269,6 +287,17 @@ data Config = Config -- in the implementtion guide. , cfgConcurrencyControl :: ConcurrencyControl + -- | The default per-resource concurrency limit to be used when a job + -- specifies a resource id and that resource does not exist or does not + -- specify a limit. This has no effect for jobs that do not specify a + -- resource id. + -- __Note:__ this is is always an additional limit beyond whatever is + -- supplied in 'cfgConcurrencyContol' on any given machine, but also note + -- that unlike the limit of `cfgConcurrencyControl`, this IS a global limit + -- across the entire job-queue. This means that this limit will be applied + -- regardless of what machine the jobs are running on. + , cfgDefaultResourceLimit :: Int + -- | The DB connection-pool to use for the job-runner. __Note:__ in case -- your jobs require a DB connection, please create a separate -- connection-pool for them. This pool will be used ONLY for monitoring jobs diff --git a/src/OddJobs/Web.hs b/src/OddJobs/Web.hs index 99882e9..c45ec13 100644 --- a/src/OddJobs/Web.hs +++ b/src/OddJobs/Web.hs @@ -127,8 +127,8 @@ data Routes = Routes filterJobsQuery :: Config -> Filter -> (PGS.Query, [Action]) -filterJobsQuery Config{cfgTableName, cfgJobTypeSql} Filter{..} = - ( "SELECT " <> Job.concatJobDbColumns <> " FROM " <> cfgTableName <> whereClause <> " " <> (orderClause $ fromMaybe (OrdUpdatedAt, Desc) filterOrder) <> " " <> limitOffsetClause +filterJobsQuery Config{cfgTableNames, cfgJobTypeSql} Filter{..} = + ( "SELECT " <> Job.concatJobDbColumns <> " FROM " <> Job.tnJob cfgTableNames <> whereClause <> " " <> (orderClause $ fromMaybe (OrdUpdatedAt, Desc) filterOrder) <> " " <> limitOffsetClause , whereActions ) where From 05bd9cce996fce024e0723acfd492def3ac63cd1 Mon Sep 17 00:00:00 2001 From: Kanagaraj M Date: Thu, 1 Oct 2020 12:22:08 +0530 Subject: [PATCH 05/14] Fix import path in the doc (#49) --- examples/OddJobsCliExample.lhs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/OddJobsCliExample.lhs b/examples/OddJobsCliExample.lhs index 9cc857e..ff90716 100644 --- a/examples/OddJobsCliExample.lhs +++ b/examples/OddJobsCliExample.lhs @@ -4,7 +4,7 @@ In this example, our jobs table will be called `jobs_test`
``` -ghci> import Datasbe.PostgreSQL.Simple (connectPostgreSQL) +ghci> import Database.PostgreSQL.Simple (connectPostgreSQL) ghci> import OddJobs.Migrations ghci> conn <- connectPostgreSQL "dbname=jobs_test user=jobs_test password=jobs_test host=localhost" ghci> createJobTable conn "jobs_test" From ac25d5bbc23833dd2f804bad9e1e4872ddecc4c8 Mon Sep 17 00:00:00 2001 From: Maciej J Date: Fri, 2 Oct 2020 08:46:40 +0100 Subject: [PATCH 06/14] Setup CI using Github Actions (#50) * setup a simple CI using GitHub actions * describe a simple dev setup process * build branches with pull requests against master, run build on multiple operating systems * trigger CI for all branches * always checkout the ref * build only on pull_request events not to duplicate builds * build only on Ubuntu --- .github/workflows/ci.yml | 38 ++++++++++++++++++++++++++++++++++++++ README.md | 15 +++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..86c96e2 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,38 @@ +name: CI + +on: + pull_request: + branches: + - '*' + +jobs: + stack: + name: ${{ matrix.os }} / stack / ghc ${{ matrix.ghc }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [ubuntu-latest] + stack: ["2.3.3"] + ghc: ["8.8.3"] + + steps: + - uses: actions/checkout@v2 + + - uses: actions/setup-haskell@v1.1 + name: Setup Haskell Stack + with: + enable-stack: true + stack-no-global: true + stack-setup-ghc: true + stack-version: ${{ matrix.stack }} + ghc-version: ${{ matrix.ghc }} + + - uses: actions/cache@v2 + name: Cache ~/.stack + with: + path: ~/.stack + key: ${{ matrix.os }}-${{ matrix.ghc }}-stack + + - name: Build + run: | + stack build \ No newline at end of file diff --git a/README.md b/README.md index ff6ddd7..c38e470 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[![GitHub CI](https://github.com/saurabhnanda/odd-jobs/workflows/CI/badge.svg)](https://github.com/saurabhnanda/odd-jobs/actions) + # Introduction - [Odd Jobs home page](https://www.haskelltutorials.com/odd-jobs) - contains a description of top-level features of this library @@ -14,3 +16,16 @@ If you are already using, or considering using, `odd-jobs` in production, please # Contributing Please read the [contribution guidelines](./CONTRIBUTING.md) + +## Development + +### Prerequisites + +- [The Haskell Tool Stack](https://docs.haskellstack.org/en/stable/README/#how-to-install) +- `libpq-dev` library (required for PostgreSQL dependency) + +### Build + +```bash +stack build +``` From cec5558147941dee75040ce4b73fafbbe888f910 Mon Sep 17 00:00:00 2001 From: mrputty <47194789+mrputty@users.noreply.github.com> Date: Tue, 13 Oct 2020 01:18:47 -0600 Subject: [PATCH 07/14] Reduced database notification payload (#43) Because PostgreSQL limits notification payloads to 8000 bytes by default, passing the full job record in the notification artificially limits the job payload size. By passing only required fields in the notification, this limitation is removed. --- src/OddJobs/Migrations.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/OddJobs/Migrations.hs b/src/OddJobs/Migrations.hs index 8eb3565..6f4b05f 100644 --- a/src/OddJobs/Migrations.hs +++ b/src/OddJobs/Migrations.hs @@ -32,7 +32,8 @@ createJobTableQuery tname = "CREATE TABLE " <> tname <> createNotificationTrigger :: TableName -> Query createNotificationTrigger tname = "create or replace function " <> fnName <> "() returns trigger as $$" <> "begin \n" <> - " perform pg_notify('" <> pgEventName tname <> "', row_to_json(new)::text); \n" <> + " perform pg_notify('" <> pgEventName tname <> "', \n" <> + " json_build_object('id', new.id, 'run_at', new.run_at, 'locked_at', new.locked_at)::text); \n" <> " return new; \n" <> "end; \n" <> "$$ language plpgsql;" <> From 265d06d50ab03fd6c9ef3f2eaf41563ab4d722fe Mon Sep 17 00:00:00 2001 From: David Ellis Date: Wed, 14 Oct 2020 21:41:24 -0600 Subject: [PATCH 08/14] Updated `.gitignore` to ignore some Cabal artifacts --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index b47b7c1..a02a0bc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ stack.yaml.lock test.log TAGS .stack-work/ +dist-* +cabal.project.local* \ No newline at end of file From ee7d216bcfd36b0f3ffbb4f17c449105fa7cc4a5 Mon Sep 17 00:00:00 2001 From: David Ellis Date: Wed, 14 Oct 2020 21:48:24 -0600 Subject: [PATCH 09/14] Draft implementation for core functionality for resource limited jobs (#38) Many small changes were required, but the majority were mechanical changes. With the exception of the `createJob`/`scheduleJob` family of function, this should be mostly backwards compatible with existing user code that does not use "internal" functions. Further discussion about how to handle backwards compatibility is needed. Note also that this commit by itself is NOT compatible with existing databases. A future commit will address migration from existing to new database definitions. The intention is for this feature to have a negligable impact on performance when the resource limiting features are not being used, and only a minor impact when they are used. However, performance testing has not yet been done. Material changes inclue: - New resource table - New `resource_id` column in to job table - New `cfgDefaultResourceLimit` configuration value - New `createResourceJob` and `scheduleResourceJob` functions - New `TableNames` type to carry the now multiple values needed - Updated existing functions and queries to account for these changes - Updated dequeueing queries to implement resource concurrency limits - New `DevelDb` module to the `devel` executable to help ad hoc testing TODO: - Resource record management functions - Migration functionality (need to discuss database versioning) - Unit tests - Performance tests - Documentation updates - Resource management UI --- dev/DevelDb.hs | 49 ++++++++ odd-jobs.cabal | 1 + src/OddJobs/ConfigBuilder.hs | 14 ++- src/OddJobs/Endpoints.hs | 8 +- src/OddJobs/Job.hs | 217 ++++++++++++++++++++++++----------- src/OddJobs/Migrations.hs | 36 ++++-- src/OddJobs/Types.hs | 43 +++++-- src/OddJobs/Web.hs | 4 +- 8 files changed, 274 insertions(+), 98 deletions(-) create mode 100644 dev/DevelDb.hs diff --git a/dev/DevelDb.hs b/dev/DevelDb.hs new file mode 100644 index 0000000..7ccb0d5 --- /dev/null +++ b/dev/DevelDb.hs @@ -0,0 +1,49 @@ +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE OverloadedStrings #-} + +-- | Provides tools to facilitate database interactions during interactive +-- development. +module DevelDb where + +import Control.Exception (bracket) +import Data.Aeson(FromJSON, ToJSON) +import Data.ByteString.Char8 (ByteString, pack) +import Data.Text (Text) +import qualified Database.PostgreSQL.Simple as PGS +import System.Environment (lookupEnv) + +import GHC.Generics + +import OddJobs.Migrations +import OddJobs.Job +import OddJobs.Types + +data TestJob = TestJob Text + deriving (Eq, Show, Generic, FromJSON, ToJSON) + +devTableNames :: TableNames +devTableNames = TableNames + { tnJob = "job", tnResource = "resource" } + +devConnectionString :: IO ByteString +devConnectionString = do + maybe (error "devConnectionString: Expected environment variable \"ODD_JOBS_DEV_DB_CONNECT\" to provide a connection string") + pack + <$> lookupEnv "ODD_JOBS_DEV_DB_CONNECT" + +createDevDatabase :: IO () +createDevDatabase = do + connStr <- devConnectionString + conn <- PGS.connectPostgreSQL connStr + PGS.withTransaction conn $ createJobTables conn devTableNames + +openDevConnection :: IO PGS.Connection +openDevConnection = devConnectionString >>= PGS.connectPostgreSQL + +withDevConnection :: (PGS.Connection -> IO a) -> IO a +withDevConnection = bracket openDevConnection PGS.close + +withDevTransaction :: (PGS.Connection -> IO a) -> IO a +withDevTransaction act = + withDevConnection $ \conn -> PGS.withTransaction conn (act conn) diff --git a/odd-jobs.cabal b/odd-jobs.cabal index 64b9cd6..7a51216 100644 --- a/odd-jobs.cabal +++ b/odd-jobs.cabal @@ -103,6 +103,7 @@ library executable devel main-is: DevelMain.hs other-modules: + DevelDb OddJobs.Cli OddJobs.ConfigBuilder OddJobs.Endpoints diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index 464dd81..ebf51f9 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -34,7 +34,7 @@ import qualified System.Log.FastLogger as FLogger mkConfig :: (LogLevel -> LogEvent -> IO ()) -- ^ "Structured logging" function. Ref: 'cfgLogger' -> TableName - -- ^ DB table which holds your jobs. Ref: 'cfgTableName' + -- ^ DB table which holds your jobs (resource table name will be generated). Ref: 'cfgTableNames' -> Pool Connection -- ^ DB connection-pool to be used by job-runner. Ref: 'cfgDbPool' -> ConcurrencyControl @@ -63,14 +63,15 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn = , cfgDbPool = dbpool , cfgOnJobStart = (const $ pure ()) , cfgDefaultMaxAttempts = 10 - , cfgTableName = tname + , cfgTableNames = simpleTableNames tname , cfgOnJobTimeout = (const $ pure ()) , cfgConcurrencyControl = ccControl + , cfgDefaultResourceLimit = 1 , cfgPidFile = Nothing , cfgJobType = defaultJobType , cfgDefaultJobTimeout = Seconds 600 , cfgJobToHtml = defaultJobToHtml (cfgJobType cfg) - , cfgAllJobTypes = (defaultDynamicJobTypes (cfgTableName cfg) (cfgJobTypeSql cfg)) + , cfgAllJobTypes = (defaultDynamicJobTypes (cfgTableNames cfg) (cfgJobTypeSql cfg)) , cfgJobTypeSql = defaultJobTypeSql } in cfg @@ -184,11 +185,12 @@ defaultConstantJobTypes :: forall a . (Generic a, ConNames (Rep a)) defaultConstantJobTypes _ = AJTFixed $ DL.map toS $ conNames (undefined :: a) -defaultDynamicJobTypes :: TableName +defaultDynamicJobTypes :: TableNames -> PGS.Query -> AllJobTypes -defaultDynamicJobTypes tname jobTypeSql = AJTSql $ \conn -> do - fmap (DL.map ((fromMaybe "(unknown)") . fromOnly)) $ PGS.query_ conn $ "select distinct(" <> jobTypeSql <> ") from " <> tname <> " order by 1 nulls last" +defaultDynamicJobTypes tnames jobTypeSql = AJTSql $ \conn -> do + fmap (DL.map ((fromMaybe "(unknown)") . fromOnly)) $ PGS.query_ conn $ + "select distinct(" <> jobTypeSql <> ") from " <> tnJob tnames <> " order by 1 nulls last" -- | This makes __two important assumptions__. First, this /assumes/ that jobs -- in your app are represented by a sum-type. For example: diff --git a/src/OddJobs/Endpoints.hs b/src/OddJobs/Endpoints.hs index 703e0ce..b4000cc 100644 --- a/src/OddJobs/Endpoints.hs +++ b/src/OddJobs/Endpoints.hs @@ -152,7 +152,7 @@ cancelJob :: Config -> JobId -> Handler NoContent cancelJob Config{..} env jid = do - liftIO $ withResource cfgDbPool $ \conn -> void $ cancelJobIO conn cfgTableName jid + liftIO $ withResource cfgDbPool $ \conn -> void $ cancelJobIO conn cfgTableNames jid redirectToHome env runJobNow :: Config @@ -160,7 +160,7 @@ runJobNow :: Config -> JobId -> Handler NoContent runJobNow Config{..} env jid = do - liftIO $ withResource cfgDbPool $ \conn -> void $ runJobNowIO conn cfgTableName jid + liftIO $ withResource cfgDbPool $ \conn -> void $ runJobNowIO conn cfgTableNames jid redirectToHome env enqueueJob :: Config @@ -169,8 +169,8 @@ enqueueJob :: Config -> Handler NoContent enqueueJob Config{..} env jid = do liftIO $ withResource cfgDbPool $ \conn -> do - void $ unlockJobIO conn cfgTableName jid - void $ runJobNowIO conn cfgTableName jid + void $ unlockJobIO conn cfgTableNames jid + void $ runJobNowIO conn cfgTableNames jid redirectToHome env redirectToHome :: Env -> Handler NoContent diff --git a/src/OddJobs/Job.hs b/src/OddJobs/Job.hs index 913a701..edf3ded 100644 --- a/src/OddJobs/Job.hs +++ b/src/OddJobs/Job.hs @@ -19,7 +19,9 @@ module OddJobs.Job -- -- $createJobs , createJob + , createResourceJob , scheduleJob + , scheduleResourceJob -- * @Job@ and associated data-types -- @@ -29,10 +31,14 @@ module OddJobs.Job , Status(..) , JobRunnerName(..) , TableName + , TableNames(..) + , simpleTableNames , delaySeconds , Seconds(..) , JobErrHandler(..) , AllJobTypes(..) + , ResourceId(..) + -- ** Structured logging -- @@ -133,11 +139,12 @@ class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where onJobFailed :: forall a . m [JobErrHandler a] getJobRunner :: m (Job -> IO ()) getDbPool :: m (Pool Connection) - getTableName :: m TableName + getTableNames :: m TableNames onJobStart :: Job -> m () getDefaultMaxAttempts :: m Int getRunnerEnv :: m RunnerEnv getConcurrencyControl :: m ConcurrencyControl + getDefaultResourceLimit :: m Int getPidFile :: m (Maybe FilePath) log :: LogLevel -> LogEvent -> m () getDefaultJobTimeout :: m Seconds @@ -174,7 +181,7 @@ instance HasJobRunner RunnerM where logCallbackErrors (jobId job) "onJobSuccess" $ liftIO $ fn job getJobRunner = cfgJobRunner . envConfig <$> ask getDbPool = cfgDbPool . envConfig <$> ask - getTableName = cfgTableName . envConfig <$> ask + getTableNames = cfgTableNames . envConfig <$> ask onJobStart job = do fn <- cfgOnJobStart . envConfig <$> ask logCallbackErrors (jobId job) "onJobStart" $ liftIO $ fn job @@ -185,6 +192,8 @@ instance HasJobRunner RunnerM where getConcurrencyControl = (cfgConcurrencyControl . envConfig <$> ask) + getDefaultResourceLimit = cfgDefaultResourceLimit . envConfig <$> ask + getPidFile = cfgPidFile . envConfig <$> ask log logLevel logEvent = do @@ -216,8 +225,8 @@ jobWorkerName = do -- the jobs table it is __recommended__ that you do not issue a @SELECT *@ or -- @RETURNIG *@. List out specific DB columns using 'jobDbColumns' and -- 'concatJobDbColumns' instead. This will insulate you from runtime errors --- caused by addition of new columns to 'cfgTableName' in future versions of --- OddJobs. +-- caused by addition of new columns to job table of 'cfgTableNames' in future +-- versions of OddJobs. jobDbColumns :: (IsString s, Semigroup s) => [s] jobDbColumns = [ "id" @@ -230,6 +239,7 @@ jobDbColumns = , "attempts" , "locked_at" , "locked_by" + , "resource_id" ] -- | All 'jobDbColumns' joined together with commas. Useful for constructing SQL @@ -245,8 +255,8 @@ concatJobDbColumns = concatJobDbColumns_ jobDbColumns "" concatJobDbColumns_ (col:cols) x = concatJobDbColumns_ cols (x <> col <> ", ") -findJobByIdQuery :: TableName -> PGS.Query -findJobByIdQuery tname = "SELECT " <> concatJobDbColumns <> " FROM " <> tname <> " WHERE id = ?" +findJobByIdQuery :: TableNames -> PGS.Query +findJobByIdQuery tnames = "SELECT " <> concatJobDbColumns <> " FROM " <> tnJob tnames <> " WHERE id = ?" withDbConnection :: (HasJobRunner m) => (Connection -> m a) @@ -258,9 +268,10 @@ withDbConnection action = do -- -- $dbHelpers -- --- A bunch of functions that help you query 'cfgTableName' and change the status --- of individual jobs. Most of these functions are in @IO@ and you /might/ want --- to write wrappers that lift them into you application's custom monad. +-- A bunch of functions that help you query tables in 'cfgTableNames' and change +-- the status of individual jobs. Most of these functions are in @IO@ and you +-- /might/ want to write wrappers that lift them into you application's custom +-- monad. -- -- __Note:__ When passing a 'Connection' to these function, it is -- __recommended__ __to not__ take a connection from 'cfgDbPool'. Use your @@ -271,29 +282,29 @@ findJobById :: (HasJobRunner m) => JobId -> m (Maybe Job) findJobById jid = do - tname <- getTableName - withDbConnection $ \conn -> liftIO $ findJobByIdIO conn tname jid + tnames <- getTableNames + withDbConnection $ \conn -> liftIO $ findJobByIdIO conn tnames jid -findJobByIdIO :: Connection -> TableName -> JobId -> IO (Maybe Job) -findJobByIdIO conn tname jid = PGS.query conn (findJobByIdQuery tname) (Only jid) >>= \case +findJobByIdIO :: Connection -> TableNames -> JobId -> IO (Maybe Job) +findJobByIdIO conn tnames jid = PGS.query conn (findJobByIdQuery tnames) (Only jid) >>= \case [] -> pure Nothing [j] -> pure (Just j) js -> Prelude.error $ "Not expecting to find multiple jobs by id=" <> (show jid) -saveJobQuery :: TableName -> PGS.Query -saveJobQuery tname = "UPDATE " <> tname <> " set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ? WHERE id = ? RETURNING " <> concatJobDbColumns +saveJobQuery :: TableNames -> PGS.Query +saveJobQuery tnames = "UPDATE " <> tnJob tnames <> " set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ?, resource_id = ? WHERE id = ? RETURNING " <> concatJobDbColumns -deleteJobQuery :: TableName -> PGS.Query -deleteJobQuery tname = "DELETE FROM " <> tname <> " WHERE id = ?" +deleteJobQuery :: TableNames -> PGS.Query +deleteJobQuery tnames = "DELETE FROM " <> tnJob tnames <> " WHERE id = ?" saveJob :: (HasJobRunner m) => Job -> m Job saveJob j = do - tname <- getTableName - withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j + tnames <- getTableNames + withDbConnection $ \conn -> liftIO $ saveJobIO conn tnames j -saveJobIO :: Connection -> TableName -> Job -> IO Job -saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobId} = do +saveJobIO :: Connection -> TableNames -> Job -> IO Job +saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResourceId, jobId} = do rs <- PGS.query conn (saveJobQuery tname) ( jobRunAt , jobStatus @@ -302,6 +313,7 @@ saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttem , jobAttempts , jobLockedAt , jobLockedBy + , jobResourceId , jobId ) case rs of @@ -311,38 +323,38 @@ saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttem deleteJob :: (HasJobRunner m) => JobId -> m () deleteJob jid = do - tname <- getTableName - withDbConnection $ \conn -> liftIO $ deleteJobIO conn tname jid + tnames <- getTableNames + withDbConnection $ \conn -> liftIO $ deleteJobIO conn tnames jid -deleteJobIO :: Connection -> TableName -> JobId -> IO () -deleteJobIO conn tname jid = do - void $ PGS.execute conn (deleteJobQuery tname) (Only jid) +deleteJobIO :: Connection -> TableNames -> JobId -> IO () +deleteJobIO conn tnames jid = + void $ PGS.execute conn (deleteJobQuery tnames) (Only jid) -runJobNowIO :: Connection -> TableName -> JobId -> IO (Maybe Job) -runJobNowIO conn tname jid = do +runJobNowIO :: Connection -> TableNames -> JobId -> IO (Maybe Job) +runJobNowIO conn tnames jid = do t <- getCurrentTime - updateJobHelper tname conn (Queued, [Queued, Retry, Failed], Just t, jid) + updateJobHelper tnames conn (Queued, [Queued, Retry, Failed], Just t, jid) -- | TODO: First check in all job-runners if this job is still running, or not, -- and somehow send an uninterruptibleCancel to that thread. -unlockJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) -unlockJobIO conn tname jid = do +unlockJobIO :: Connection -> TableNames -> JobId -> IO (Maybe Job) +unlockJobIO conn tnames jid = do fmap listToMaybe $ PGS.query conn q (Retry, jid, In [Locked]) where - q = "update " <> tname <> " set status=?, run_at=now(), locked_at=null, locked_by=null where id=? and status in ? returning " <> concatJobDbColumns + q = "update " <> tnJob tnames <> " set status=?, run_at=now(), locked_at=null, locked_by=null where id=? and status in ? returning " <> concatJobDbColumns -cancelJobIO :: Connection -> TableName -> JobId -> IO (Maybe Job) -cancelJobIO conn tname jid = - updateJobHelper tname conn (Failed, [Queued, Retry], Nothing, jid) +cancelJobIO :: Connection -> TableNames -> JobId -> IO (Maybe Job) +cancelJobIO conn tnames jid = + updateJobHelper tnames conn (Failed, [Queued, Retry], Nothing, jid) -updateJobHelper :: TableName +updateJobHelper :: TableNames -> Connection -> (Status, [Status], Maybe UTCTime, JobId) -> IO (Maybe Job) -updateJobHelper tname conn (newStatus, existingStates, mRunAt, jid) = +updateJobHelper tnames conn (newStatus, existingStates, mRunAt, jid) = fmap listToMaybe $ PGS.query conn q (newStatus, runAt, jid, PGS.In existingStates) where - q = "update " <> tname <> " set attempts=0, status=?, run_at=? where id=? and status in ? returning " <> concatJobDbColumns + q = "update " <> tnJob tnames <> " set attempts=0, status=?, run_at=? where id=? and status in ? returning " <> concatJobDbColumns runAt = case mRunAt of Nothing -> PGS.toField $ PGS.Identifier "run_at" Just t -> PGS.toField t @@ -454,8 +466,21 @@ jobMonitor = do liftIO $ Dir.removePathForcibly f -- | Ref: 'jobPoller' -jobPollingSql :: TableName -> Query -jobPollingSql tname = "update " <> tname <> " set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1 WHERE id in (select id from " <> tname <> " where (run_at<=? AND ((status in ?) OR (status = ? and locked_at Query +jobPollingSql tnames = + "update " <> tnJob tnames <> + "\n set status = ?, locked_at = now(), locked_by = ?, attempts=attempts+1" <> + "\nwhere id in ( select id from " <> tnJob tnames <> " j_out" <> + "\n where (run_at<=now() AND (status in ? OR (status = ? and locked_at < now() - ? * interval '1 second')))" <> + "\n and case when resource_id is null then true" <> + "\n else ( select count(id) from " <> tnJob tnames <> " j_in" <> + "\n where j_in.resource_id = j_out.resource_id and j_in.status = ?" <> + "\n and j_in.locked_at >= now() - ? * interval '1 second' )" <> + "\n < coalesce(( select resource_limit from " <> tnResource tnames <> " r" <> + "\n where r.resource_id = j_out.resource_id ), ?)" <> + "\n end" <> + "\n order by run_at asc limit 1 for update )" <> + "\nreturning id" waitForJobs :: (HasJobRunner m) => m () @@ -498,19 +523,19 @@ jobPoller :: (HasJobRunner m) => m () jobPoller = do processName <- liftIO jobWorkerName pool <- getDbPool - tname <- getTableName + tnames <- getTableNames lockTimeout <- getDefaultJobTimeout log LevelInfo $ LogText $ toS $ "Starting the job monitor via DB polling with processName=" <> processName concurrencyControlFn <- getConcurrencyControlFn + defaultResourceLimit <- getDefaultResourceLimit withResource pool $ \pollerDbConn -> forever $ concurrencyControlFn >>= \case False -> log LevelWarn $ LogText $ "NOT polling the job queue due to concurrency control" True -> do nextAction <- mask_ $ do log LevelDebug $ LogText $ toS $ "[" <> processName <> "] Polling the job queue.." - t <- liftIO getCurrentTime - r <- liftIO $ - PGS.query pollerDbConn (jobPollingSql tname) - (Locked, t, processName, t, (In [Queued, Retry]), Locked, (addUTCTime (fromIntegral $ negate $ unSeconds lockTimeout) t)) + r <- liftIO $ PGS.query pollerDbConn + (jobPollingSql tnames) + (Locked, processName, In [Queued, Retry], Locked, unSeconds lockTimeout, Locked, unSeconds lockTimeout, defaultResourceLimit) case r of -- When we don't have any jobs to run, we can relax a bit... [] -> pure delayAction @@ -533,21 +558,33 @@ jobEventListener :: (HasJobRunner m) jobEventListener = do log LevelInfo $ LogText "Starting the job monitor via LISTEN/NOTIFY..." pool <- getDbPool - tname <- getTableName - jwName <- liftIO jobWorkerName + tnames <- getTableNames + lockTimeout <- getDefaultJobTimeout concurrencyControlFn <- getConcurrencyControlFn + defaultResourceLimit <- getDefaultResourceLimit + jwName <- liftIO jobWorkerName + let tryLockingJob jid = do - let q = "UPDATE " <> tname <> " SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 WHERE id=? AND status in ? RETURNING id" - (withDbConnection $ \conn -> (liftIO $ PGS.query conn q (Locked, jwName, jid, In [Queued, Retry]))) >>= \case + let q = "update " <> tnJob tnames <> " j_out" <> + "\n set status=?, locked_at=now(), locked_by=?, attempts=attempts+1" <> + "\nwhere id=? and status in ? " <> + "\n and case when resource_id is null then true" <> + "\n else ( select count(id) from " <> tnJob tnames <> " j_in" <> + "\n where j_in.resource_id = j_out.resource_id and j_in.status = ?" <> + "\n and j_in.locked_at >= now() - ? * interval '1 second' )" <> + "\n < coalesce((select resource_limit from " <> tnResource tnames <> " r where r.resource_id = j_out.resource_id), ?)" <> + "\n end" <> + "\nRETURNING id" + (withDbConnection $ \conn -> (liftIO $ PGS.query conn q (Locked, jwName, jid, In [Queued, Retry], Locked, unSeconds lockTimeout, defaultResourceLimit))) >>= \case [] -> do - log LevelDebug $ LogText $ toS $ "Job was locked by someone else before I could start. Skipping it. JobId=" <> show jid + log LevelDebug $ LogText $ toS $ "Job was locked by someone else or resource limit was reached before I could start. Skipping it. JobId=" <> show jid pure Nothing [Only (_ :: JobId)] -> pure $ Just jid x -> error $ "WTF just happned? Was expecting a single row to be returned, received " ++ (show x) withResource pool $ \monitorDbConn -> do - void $ liftIO $ PGS.execute monitorDbConn ("LISTEN " <> pgEventName tname) () + void $ liftIO $ PGS.execute monitorDbConn ("LISTEN " <> pgEventName tnames) () forever $ do log LevelDebug $ LogText "[LISTEN/NOFIFY] Event loop" notif <- liftIO $ getNotification monitorDbConn @@ -584,8 +621,8 @@ jobEventListener = do -createJobQuery :: TableName -> PGS.Query -createJobQuery tname = "INSERT INTO " <> tname <> "(run_at, status, payload, last_error, attempts, locked_at, locked_by) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns +createJobQuery :: TableNames -> PGS.Query +createJobQuery tnames = "INSERT INTO " <> tnJob tnames <> "(run_at, status, payload, last_error, attempts, locked_at, locked_by, resource_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns -- $createJobs -- @@ -600,12 +637,27 @@ createJobQuery tname = "INSERT INTO " <> tname <> "(run_at, status, payload, las -- 'scheduleJob' for further documentation. createJob :: ToJSON p => Connection - -> TableName + -> TableNames -> p -> IO Job -createJob conn tname payload = do +createJob conn tnames payload = do t <- getCurrentTime - scheduleJob conn tname payload t + scheduleJob conn tnames payload t + +-- | Create a job for immediate execution, with additional concurrnecy limits +-- determined by the given 'resource'. +-- +-- Internally calls 'scheduleResourceJob' passing it the current time. Read +-- 'scheduleResourceJob' for further documentation. +createResourceJob :: ToJSON p + => Connection + -> TableNames + -> p + -> Maybe ResourceId + -> IO Job +createResourceJob conn tnames payload resource = do + t <- getCurrentTime + scheduleResourceJob conn tnames payload resource t -- | Create a job for execution at the given time. -- @@ -619,14 +671,45 @@ scheduleJob :: ToJSON p => Connection -- ^ DB connection to use. __Note:__ This should -- /ideally/ come out of your application's DB pool, -- not the 'cfgDbPool' you used in the job-runner. - -> TableName -- ^ DB-table which holds your jobs + -> TableNames -- ^ DB-tables which holds your jobs and resources -> p -- ^ Job payload -> UTCTime -- ^ when should the job be executed -> IO Job -scheduleJob conn tname payload runAt = do - let args = ( runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text ) - queryFormatter = toS <$> (PGS.formatQuery conn (createJobQuery tname) args) - rs <- PGS.query conn (createJobQuery tname) args +scheduleJob conn tnames payload runAt = + scheduleResourceJob conn tnames payload Nothing runAt + +-- | Create a job for execution at the given time, with additional concurrnecy +-- limits determined by the given 'resource'. +-- +-- * If time has already past, 'jobEventListener' is going to pick this up +-- for execution immediately. +-- +-- * If time is in the future, 'jobPoller' is going to pick this up with an +-- error of +/- 'cfgPollingInterval' seconds. Please do not expect very high +-- accuracy of when the job is actually executed. +-- +-- * If 'resource' is 'Nothing', then only the machine-bound concurrency +-- control limit will be enforced (ref: 'cfgConcurrencyControl'). If a +-- 'ResourceId' is supplied, and that id is found in the resource table (ref: +-- 'cfgTableNames'), then the resource's concurrency limit will be enforced +-- for this job, in addtion to the concurrency control limit. If the resource +-- does not specify a resource limit, or if the resource id is not found, +-- then the default resource limit (ref: 'cfgDefaultResourceLimit') will be +-- used. +scheduleResourceJob :: ToJSON p + => Connection -- ^ DB connection to use. __Note:__ This should + -- /ideally/ come out of your application's DB pool, + -- not the 'cfgDbPool' you used in the job-runner. + -> TableNames -- ^ DB-tables which holds your jobs and resources + -> p -- ^ Job payload + -> Maybe ResourceId -- ^ Id of the resource, if any, whose concurrency + -- limits will apply to this job + -> UTCTime -- ^ when should the job be executed + -> IO Job +scheduleResourceJob conn tnames payload resource runAt = do + let args = ( runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text, resource ) + queryFormatter = toS <$> (PGS.formatQuery conn (createJobQuery tnames) args) + rs <- PGS.query conn (createJobQuery tnames) args case rs of [] -> (Prelude.error . (<> "Not expecting a blank result set when creating a job. Query=")) <$> queryFormatter [r] -> pure r @@ -678,12 +761,12 @@ fetchAllJobTypes Config{cfgAllJobTypes, cfgDbPool} = liftIO $ do -- | Used by web\/admin IO to fetch a \"master list\" of all known job-runners. -- There is a known issue with the way this has been implemented: -- --- * Since this looks at the 'jobLockedBy' column of 'cfgTableName', it will --- discover only those job-runners that are actively executing at least one --- job at the time this function is executed. +-- * Since this looks at the 'jobLockedBy' column of the job table of +-- 'cfgTableNames', it will discover only those job-runners that are +-- actively executing at least one job at the time this function is +-- executed. fetchAllJobRunners :: (MonadIO m) => Config -> m [JobRunnerName] -fetchAllJobRunners Config{cfgTableName, cfgDbPool} = liftIO $ withResource cfgDbPool $ \conn -> do - fmap (mapMaybe fromOnly) $ PGS.query_ conn $ "select distinct locked_by from " <> cfgTableName - +fetchAllJobRunners Config{cfgTableNames, cfgDbPool} = liftIO $ withResource cfgDbPool $ \conn -> do + fmap (mapMaybe fromOnly) $ PGS.query_ conn $ "select distinct locked_by from " <> tnJob cfgTableNames diff --git a/src/OddJobs/Migrations.hs b/src/OddJobs/Migrations.hs index 6f4b05f..59b2ff1 100644 --- a/src/OddJobs/Migrations.hs +++ b/src/OddJobs/Migrations.hs @@ -8,8 +8,8 @@ import Database.PostgreSQL.Simple as PGS import Data.Functor (void) import OddJobs.Types -createJobTableQuery :: TableName -> Query -createJobTableQuery tname = "CREATE TABLE " <> tname <> +createJobTableQuery :: TableNames -> Query +createJobTableQuery (TableNames tname _) = "CREATE TABLE " <> tname <> "( id serial primary key" <> ", created_at timestamp with time zone default now() not null" <> ", updated_at timestamp with time zone default now() not null" <> @@ -20,6 +20,7 @@ createJobTableQuery tname = "CREATE TABLE " <> tname <> ", attempts int not null default 0" <> ", locked_at timestamp with time zone null" <> ", locked_by text null" <> + ", resource_id text null" <> ", constraint incorrect_locking_info CHECK ((status <> 'locked' and locked_at is null and locked_by is null) or (status = 'locked' and locked_at is not null and locked_by is not null))" <> ");" <> "create index idx_" <> tname <> "_created_at on " <> tname <> "(created_at);" <> @@ -27,23 +28,34 @@ createJobTableQuery tname = "CREATE TABLE " <> tname <> "create index idx_" <> tname <> "_locked_at on " <> tname <> "(locked_at);" <> "create index idx_" <> tname <> "_locked_by on " <> tname <> "(locked_by);" <> "create index idx_" <> tname <> "_status on " <> tname <> "(status);" <> - "create index idx_" <> tname <> "_run_at on " <> tname <> "(run_at);" + "create index idx_" <> tname <> "_run_at on " <> tname <> "(run_at);" <> + "create index idx_" <> tname <> "_resource_id on " <> tname <>"(resource_id) where resource_id is not null;" -createNotificationTrigger :: TableName -> Query -createNotificationTrigger tname = "create or replace function " <> fnName <> "() returns trigger as $$" <> +createResourceTableQuery :: TableNames -> Query +createResourceTableQuery tnames = "create table " <> tnResource tnames <> + "( resource_id text primary key" <> + ", resource_limit int" <> + ");" + +createNotificationTrigger :: TableNames -> Query +createNotificationTrigger tnames = "create or replace function " <> fnName <> "() returns trigger as $$" <> "begin \n" <> - " perform pg_notify('" <> pgEventName tname <> "', \n" <> + " perform pg_notify('" <> pgEventName tnames <> "', \n" <> " json_build_object('id', new.id, 'run_at', new.run_at, 'locked_at', new.locked_at)::text); \n" <> " return new; \n" <> "end; \n" <> "$$ language plpgsql;" <> - "create trigger " <> trgName <> " after insert on " <> tname <> " for each row execute procedure " <> fnName <> "();" + "create trigger " <> trgName <> " after insert on " <> tnJob tnames <> " for each row execute procedure " <> fnName <> "();" where - fnName = "notify_job_monitor_for_" <> tname - trgName = "trg_notify_job_monitor_for_" <> tname + fnName = "notify_job_monitor_for_" <> tnJob tnames + trgName = "trg_notify_job_monitor_for_" <> tnJob tnames createJobTable :: Connection -> TableName -> IO () -createJobTable conn tname = void $ do - PGS.execute_ conn (createJobTableQuery tname) - PGS.execute_ conn (createNotificationTrigger tname) +createJobTable conn tname = createJobTables conn $ simpleTableNames tname + +createJobTables :: Connection -> TableNames -> IO () +createJobTables conn tnames = do + void $ PGS.execute_ conn (createJobTableQuery tnames) + void $ PGS.execute_ conn (createResourceTableQuery tnames) + void $ PGS.execute_ conn (createNotificationTrigger tnames) diff --git a/src/OddJobs/Types.hs b/src/OddJobs/Types.hs index eb65cde..44669ca 100644 --- a/src/OddJobs/Types.hs +++ b/src/OddJobs/Types.hs @@ -35,8 +35,22 @@ import Control.Monad.Logger (LogLevel) -- @ type TableName = PGS.Query -pgEventName :: TableName -> Query -pgEventName tname = "job_created_" <> tname +-- | Specifies the table names used for job management. +data TableNames = TableNames + { tnJob :: TableName + , tnResource :: TableName + } + +-- | Convenience function for building 'TableNames' when resources are not being +-- used or the name of the resource table does not need to be controlled. +simpleTableNames :: TableName -> TableNames +simpleTableNames jobTableName = TableNames + { tnJob = jobTableName + , tnResource = jobTableName <> "_resource" + } + +pgEventName :: TableNames -> Query +pgEventName tnames = "job_created_" <> tnJob tnames newtype Seconds = Seconds { unSeconds :: Int } deriving (Eq, Show, Ord, Num, Read) @@ -158,9 +172,10 @@ instance FromJSON Status where Left e -> fail e Right r -> pure r - newtype JobRunnerName = JobRunnerName { unJobRunnerName :: Text } deriving (Eq, Show, FromField, ToField, Generic, ToJSON, FromJSON) +newtype ResourceId = ResourceId { unResourceId :: Text } deriving (Eq, Show, FromField, ToField, Generic, ToJSON, FromJSON) + data Job = Job { jobId :: JobId , jobCreatedAt :: UTCTime @@ -172,6 +187,7 @@ data Job = Job , jobAttempts :: Int , jobLockedAt :: Maybe UTCTime , jobLockedBy :: Maybe JobRunnerName + , jobResourceId :: Maybe ResourceId } deriving (Eq, Show, Generic) instance ToText Status where @@ -211,6 +227,7 @@ instance FromRow Job where <*> field -- attempts <*> field -- lockedAt <*> field -- lockedBy + <*> field -- resourceId -- TODO: Add a sum-type for return status which can signal the monitor about -- whether the job needs to be retried, marked successfull, or whether it has @@ -230,7 +247,7 @@ data AllJobTypes -- that represents your job payload. = AJTFixed [Text] -- | Construct the list of job-types dynamically by looking at the actual - -- payloads in 'cfgTableName' (using an SQL query). + -- payloads in the job table of 'cfgTableNames' (using an SQL query). | AJTSql (Connection -> IO [Text]) -- | A custom 'IO' action for fetching the list of job-types. | AJTCustom (IO [Text]) @@ -243,9 +260,10 @@ data AllJobTypes -- 'OddJobs.ConfigBuilder.mkConfig' function (to get something with sensible -- defaults) and then tweaking config parameters on a case-by-case basis. data Config = Config - { -- | The DB table which holds your jobs. Please note, this should have been - -- created by the 'OddJobs.Migrations.createJobTable' function. - cfgTableName :: TableName + { -- | The DB tables which hold your jobs and optional resource. Please note, + -- this should have been created by the 'OddJobs.Migrations.createJobTables' + -- function. + cfgTableNames :: TableNames -- | The actualy "job-runner" that __you__ need to provide. If this function -- throws a runtime exception, the job will be retried @@ -269,6 +287,17 @@ data Config = Config -- in the implementtion guide. , cfgConcurrencyControl :: ConcurrencyControl + -- | The default per-resource concurrency limit to be used when a job + -- specifies a resource id and that resource does not exist or does not + -- specify a limit. This has no effect for jobs that do not specify a + -- resource id. + -- __Note:__ this is is always an additional limit beyond whatever is + -- supplied in 'cfgConcurrencyContol' on any given machine, but also note + -- that unlike the limit of `cfgConcurrencyControl`, this IS a global limit + -- across the entire job-queue. This means that this limit will be applied + -- regardless of what machine the jobs are running on. + , cfgDefaultResourceLimit :: Int + -- | The DB connection-pool to use for the job-runner. __Note:__ in case -- your jobs require a DB connection, please create a separate -- connection-pool for them. This pool will be used ONLY for monitoring jobs diff --git a/src/OddJobs/Web.hs b/src/OddJobs/Web.hs index 99882e9..c45ec13 100644 --- a/src/OddJobs/Web.hs +++ b/src/OddJobs/Web.hs @@ -127,8 +127,8 @@ data Routes = Routes filterJobsQuery :: Config -> Filter -> (PGS.Query, [Action]) -filterJobsQuery Config{cfgTableName, cfgJobTypeSql} Filter{..} = - ( "SELECT " <> Job.concatJobDbColumns <> " FROM " <> cfgTableName <> whereClause <> " " <> (orderClause $ fromMaybe (OrdUpdatedAt, Desc) filterOrder) <> " " <> limitOffsetClause +filterJobsQuery Config{cfgTableNames, cfgJobTypeSql} Filter{..} = + ( "SELECT " <> Job.concatJobDbColumns <> " FROM " <> Job.tnJob cfgTableNames <> whereClause <> " " <> (orderClause $ fromMaybe (OrdUpdatedAt, Desc) filterOrder) <> " " <> limitOffsetClause , whereActions ) where From 8595a3c56cdb33eb6bc3e0cbd7dfb04338838c8a Mon Sep 17 00:00:00 2001 From: David Ellis Date: Tue, 20 Oct 2020 09:39:44 -0600 Subject: [PATCH 10/14] Introduced `dropJobTables` and refactored `Migrations.hs` Because there are unit tests that delete tabes, and now that there are multiple table names in play, it seemed useful to explicitly implement a deletion function. Becuase this deserved a comment, I also added comments to the other externally useful functions in this module, and rearranged the file a bit to highlight these defintions. --- src/OddJobs/Migrations.hs | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/src/OddJobs/Migrations.hs b/src/OddJobs/Migrations.hs index 59b2ff1..c51c721 100644 --- a/src/OddJobs/Migrations.hs +++ b/src/OddJobs/Migrations.hs @@ -8,6 +8,29 @@ import Database.PostgreSQL.Simple as PGS import Data.Functor (void) import OddJobs.Types +-- | Create database objects based on the name given for the primary job table, +-- with the names of other objects being generated autmatically. +createJobTable :: Connection -> TableName -> IO () +createJobTable conn tname = createJobTables conn $ simpleTableNames tname + +-- | Create database objects based on the name given for the primary job table +-- and the associated resource table. The names of indexes, functions, and +-- triggers are generated automatically. +createJobTables :: Connection -> TableNames -> IO () +createJobTables conn tnames = do + void $ PGS.execute_ conn (createJobTableQuery tnames) + void $ PGS.execute_ conn (createResourceTableQuery tnames) + void $ PGS.execute_ conn (createNotificationTrigger tnames) + +-- | Remove all Odd Jobs objects from the database +dropJobTables :: Connection -> TableNames -> IO () +dropJobTables conn tnames = do + void $ PGS.execute_ conn $ dropObject "table" $ tnResource tnames + void $ PGS.execute_ conn $ dropObject "table" $ tnJob tnames + void $ PGS.execute_ conn $ dropObject "function" $ notifyFunctionName tnames + where + dropObject typ obj = "drop " <> typ <> " if exists " <> obj <> ";" + createJobTableQuery :: TableNames -> Query createJobTableQuery (TableNames tname _) = "CREATE TABLE " <> tname <> "( id serial primary key" <> @@ -38,24 +61,16 @@ createResourceTableQuery tnames = "create table " <> tnResource tnames <> ");" createNotificationTrigger :: TableNames -> Query -createNotificationTrigger tnames = "create or replace function " <> fnName <> "() returns trigger as $$" <> +createNotificationTrigger tnames = "create or replace function " <> notifyFunctionName tnames <> "() returns trigger as $$" <> "begin \n" <> " perform pg_notify('" <> pgEventName tnames <> "', \n" <> " json_build_object('id', new.id, 'run_at', new.run_at, 'locked_at', new.locked_at)::text); \n" <> " return new; \n" <> "end; \n" <> "$$ language plpgsql;" <> - "create trigger " <> trgName <> " after insert on " <> tnJob tnames <> " for each row execute procedure " <> fnName <> "();" + "create trigger " <> trgName <> " after insert on " <> tnJob tnames <> " for each row execute procedure " <> notifyFunctionName tnames <> "();" where - fnName = "notify_job_monitor_for_" <> tnJob tnames trgName = "trg_notify_job_monitor_for_" <> tnJob tnames - -createJobTable :: Connection -> TableName -> IO () -createJobTable conn tname = createJobTables conn $ simpleTableNames tname - -createJobTables :: Connection -> TableNames -> IO () -createJobTables conn tnames = do - void $ PGS.execute_ conn (createJobTableQuery tnames) - void $ PGS.execute_ conn (createResourceTableQuery tnames) - void $ PGS.execute_ conn (createNotificationTrigger tnames) +notifyFunctionName :: TableNames -> Query +notifyFunctionName tnames = "notify_job_monitor_for_" <> tnJob tnames From 1a1c56b4c4253baf4363731e993f07902b7cb1fb Mon Sep 17 00:00:00 2001 From: David Ellis Date: Tue, 20 Oct 2020 11:21:22 -0600 Subject: [PATCH 11/14] Added `mkResourceConfig` for use with resource-limited jobs --- src/OddJobs/ConfigBuilder.hs | 37 +++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/src/OddJobs/ConfigBuilder.hs b/src/OddJobs/ConfigBuilder.hs index ebf51f9..ec6c596 100644 --- a/src/OddJobs/ConfigBuilder.hs +++ b/src/OddJobs/ConfigBuilder.hs @@ -53,7 +53,39 @@ mkConfig :: (LogLevel -> LogEvent -> IO ()) -- function, unless you know what you're doing. -> Config -- ^ The final 'Config' that can be used to start various job-runners -mkConfig logger tname dbpool ccControl jrunner configOverridesFn = +mkConfig logger tname = + mkResourceConfig logger (simpleTableNames tname) + +-- | This function gives you a 'Config' with a bunch of sensible defaults +-- already applied, but it allows the specification of all database table +-- names.. It requires the bare minimum of other configuration parameters that +-- this library cannot assume on your behalf. +-- +-- It makes a few __important assumptions__ about your 'jobPayload 'JSON, which +-- are documented in 'defaultJobType'. +mkResourceConfig :: (LogLevel -> LogEvent -> IO ()) + -- ^ "Structured logging" function. Ref: 'cfgLogger' + -> TableNames + -- ^ DB tables which hold your jobs and resources + -> Pool Connection + -- ^ DB connection-pool to be used by job-runner. Ref: 'cfgDbPool' + -> ConcurrencyControl + -- ^ Concurrency configuration. Ref: 'cfgConcurrencyControl' + -> (Job -> IO ()) + -- ^ The actual "job runner" which contains your application code. Ref: 'cfgJobRunner' + -> (Config -> Config) + -- ^ A function that allows you to modify the \"interim config\". The + -- \"interim config\" will cotain a bunch of in-built default config + -- params, along with the config params that you\'ve just provided + -- (i.e. logging function, table name, DB pool, etc). You can use this + -- function to override values in the \"interim config\". If you do not + -- wish to modify the \"interim config\" just pass 'Prelude.id' as an + -- argument to this parameter. __Note:__ it is strongly recommended + -- that you __do not__ modify the generated 'Config' outside of this + -- function, unless you know what you're doing. + -> Config + -- ^ The final 'Config' that can be used to start various job-runners +mkResourceConfig logger tnames dbpool ccControl jrunner configOverridesFn = let cfg = configOverridesFn $ Config { cfgPollingInterval = defaultPollingInterval , cfgOnJobSuccess = (const $ pure ()) @@ -63,7 +95,7 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn = , cfgDbPool = dbpool , cfgOnJobStart = (const $ pure ()) , cfgDefaultMaxAttempts = 10 - , cfgTableNames = simpleTableNames tname + , cfgTableNames = tnames , cfgOnJobTimeout = (const $ pure ()) , cfgConcurrencyControl = ccControl , cfgDefaultResourceLimit = 1 @@ -77,7 +109,6 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn = in cfg - -- | If you aren't interested in structured logging, you can use this function -- to emit plain-text logs (or define your own). defaultLogStr :: (Job -> Text) From 7c727b0b5d2cc90c78cdc49ca8c3d756df0078e9 Mon Sep 17 00:00:00 2001 From: David Ellis Date: Tue, 20 Oct 2020 11:22:23 -0600 Subject: [PATCH 12/14] Updated tests for resource-limited jobs --- test/Test.hs | 85 +++++++++++++++++++++++++++------------------------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/test/Test.hs b/test/Test.hs index f423346..ecb32aa 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -41,6 +41,7 @@ import Data.Ord (comparing, Down(..)) import Data.Maybe (fromMaybe) import qualified OddJobs.ConfigBuilder as Job import UnliftIO +import OddJobs.Types $(Aeson.deriveJSON Aeson.defaultOptions ''Seconds) @@ -121,7 +122,7 @@ testPayload = toJSON (10 :: Int) jobRunner :: Job.Job -> IO () jobRunner Job{jobPayload, jobAttempts} = case (fromJSON jobPayload) of Aeson.Error e -> error e - Success (j :: JobPayload) -> + Aeson.Success (j :: JobPayload) -> let recur pload idx = case pload of PayloadAlwaysFail delay -> (delaySeconds delay) >> (error $ "Forced error after " <> show delay <> " seconds") PayloadSucceed delay -> (delaySeconds delay) >> pure () @@ -154,13 +155,13 @@ logEventToJob le = case le of assertJobIdStatus :: (HasCallStack) => Connection - -> Job.TableName + -> Job.TableNames -> IORef [Job.LogEvent] -> String -> Job.Status -> JobId -> Assertion -assertJobIdStatus conn tname logRef msg st jid = do +assertJobIdStatus conn tnames logRef msg st jid = do logs <- readIORef logRef let mjid = Just jid case st of @@ -197,35 +198,39 @@ assertJobIdStatus conn tname logRef msg st jid = do _ -> False when (st /= Job.Success) $ do - Job.findJobByIdIO conn tname jid >>= \case + Job.findJobByIdIO conn tnames jid >>= \case Nothing -> assertFailure $ "Not expecting job to be deleted. JobId=" <> show jid Just (Job{jobStatus}) -> assertEqual msg st jobStatus -ensureJobId :: (HasCallStack) => Connection -> Job.TableName -> JobId -> IO Job -ensureJobId conn tname jid = Job.findJobByIdIO conn tname jid >>= \case +ensureJobId :: (HasCallStack) => Connection -> Job.TableNames -> JobId -> IO Job +ensureJobId conn tnames jid = Job.findJobByIdIO conn tnames jid >>= \case Nothing -> error $ "Not expecting job to be deleted. JobId=" <> show jid Just j -> pure j --- withRandomTable :: (MonadIO m) => Pool Connection -> (Job.TableName -> m a) -> m a +--withRandomTable :: (MonadBaseControl IO m, MonadUnliftIO m) => Pool Connection -> (Job.TableNames -> m a) -> m a +withRandomTable :: Pool Connection -> (Job.TableNames -> IO a) -> IO a withRandomTable jobPool action = do - (tname :: Job.TableName) <- liftIO ((("jobs_" <>) . fromString) <$> (replicateM 10 (R.randomRIO ('a', 'z')))) + tnames <- simpleTableNames . ("jobs_" <>) . fromString <$> liftIO (replicateM 10 (R.randomRIO ('a', 'z'))) finally - ((Pool.withResource jobPool $ \conn -> (liftIO $ Migrations.createJobTable conn tname)) >> (action tname)) - (Pool.withResource jobPool $ \conn -> liftIO $ void $ PGS.execute_ conn ("drop table if exists " <> tname <> ";")) + ((Pool.withResource jobPool $ \conn -> (liftIO $ Migrations.createJobTables conn tnames)) >> (action tnames)) + (Pool.withResource jobPool $ \conn -> liftIO $ void $ Migrations.dropJobTables conn tnames) --- withNewJobMonitor :: (Pool Connection) -> (TableName -> Assertion) -> Assertion +--withNewJobMonitor :: (MonadBaseControl IO m, MonadUnliftIO m) => Pool Connection -> (TableNames -> IORef [Job.LogEvent] -> m a) -> m a +withNewJobMonitor :: Pool Connection -> (TableNames -> IORef [Job.LogEvent] -> IO a) -> IO a withNewJobMonitor jobPool actualTest = do - withRandomTable jobPool $ \tname -> do - withNamedJobMonitor tname jobPool (actualTest tname) + withRandomTable jobPool $ \tnames -> do + withNamedJobMonitor tnames jobPool (actualTest tnames) -withNamedJobMonitor tname jobPool actualTest = do +--withNamedJobMonitor :: (MonadBaseControl IO m, MonadUnliftIO m) => TableNames -> Pool Connection -> (IORef [Job.LogEvent] -> m a) -> m a +withNamedJobMonitor :: TableNames -> Pool Connection -> (IORef [Job.LogEvent] -> IO a) -> IO a +withNamedJobMonitor tnames jobPool actualTest = do logRef :: IORef [Job.LogEvent] <- newIORef [] tcache <- newTimeCache simpleTimeFormat' withTimedFastLogger tcache LogNone $ \tlogger -> do let flogger logLevel logEvent = do tlogger $ \t -> toLogStr t <> " | " <> (Job.defaultLogStr Job.defaultJobType logLevel logEvent) atomicModifyIORef' logRef (\logs -> (logEvent:logs, ())) - cfg = Job.mkConfig flogger tname jobPool Job.UnlimitedConcurrentJobs jobRunner (\cfg -> cfg{Job.cfgDefaultMaxAttempts=3}) + cfg = Job.mkResourceConfig flogger tnames jobPool Job.UnlimitedConcurrentJobs jobRunner (\cfg -> cfg{Job.cfgDefaultMaxAttempts=3}) withAsync (Job.startJobRunner cfg) (const $ actualTest logRef) payloadGen :: MonadGen m => m JobPayload @@ -236,61 +241,61 @@ payloadGen = Gen.recursive Gen.choice nonRecursive recursive recursive = [ PayloadFail <$> (Gen.element [1, 2, 3]) <*> payloadGen ] testJobCreation appPool jobPool = testCase "job creation" $ do - withNewJobMonitor jobPool $ \tname logRef -> do + withNewJobMonitor jobPool $ \tnames logRef -> do Pool.withResource appPool $ \conn -> do - Job{jobId} <- Job.createJob conn tname (PayloadSucceed 0) + Job{jobId} <- Job.createJob conn tnames (PayloadSucceed 0) delaySeconds $ Seconds 6 - assertJobIdStatus conn tname logRef "Expecting job to be successful by now" Job.Success jobId + assertJobIdStatus conn tnames logRef "Expecting job to be successful by now" Job.Success jobId testEnsureShutdown appPool jobPool = testCase "ensure shutdown" $ do - withRandomTable jobPool $ \tname -> do - (jid, logRef) <- scheduleJob tname + withRandomTable jobPool $ \tnames -> do + (jid, logRef) <- scheduleJob tnames delaySeconds (2 * Job.defaultPollingInterval) Pool.withResource appPool $ \conn -> do - assertJobIdStatus conn tname logRef "Job should still be in queued state if job-monitor is no longer running" Job.Queued jid + assertJobIdStatus conn tnames logRef "Job should still be in queued state if job-monitor is no longer running" Job.Queued jid where - scheduleJob tname = withNamedJobMonitor tname jobPool $ \logRef -> do + scheduleJob tnames = withNamedJobMonitor tnames jobPool $ \logRef -> do t <- getCurrentTime Pool.withResource appPool $ \conn -> do - Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral (2 * (unSeconds Job.defaultPollingInterval))) t) - assertJobIdStatus conn tname logRef "Job is scheduled in future, should still be queueud" Job.Queued jobId + Job{jobId} <- Job.scheduleJob conn tnames (PayloadSucceed 0) (addUTCTime (fromIntegral (2 * (unSeconds Job.defaultPollingInterval))) t) + assertJobIdStatus conn tnames logRef "Job is scheduled in future, should still be queueud" Job.Queued jobId pure (jobId, logRef) testGracefulShutdown appPool jobPool = testCase "ensure graceful shutdown" $ do - withRandomTable jobPool $ \tname -> do - (j1, j2, logRef) <- withNamedJobMonitor tname jobPool $ \logRef -> do + withRandomTable jobPool $ \tnames -> do + (j1, j2, logRef) <- withNamedJobMonitor tnames jobPool $ \logRef -> do Pool.withResource appPool $ \conn -> do t <- getCurrentTime - j1 <- Job.createJob conn tname (PayloadSucceed $ 2 * Job.defaultPollingInterval) - j2 <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral $ unSeconds $ Job.defaultPollingInterval) t) + j1 <- Job.createJob conn tnames (PayloadSucceed $ 2 * Job.defaultPollingInterval) + j2 <- Job.scheduleJob conn tnames (PayloadSucceed 0) (addUTCTime (fromIntegral $ unSeconds $ Job.defaultPollingInterval) t) pure (j1, j2, logRef) Pool.withResource appPool $ \conn -> do delaySeconds 1 - assertJobIdStatus conn tname logRef "Expecting the first job to be in locked state because it should be running" Job.Locked (jobId j1) - assertJobIdStatus conn tname logRef "Expecting the second job to be queued because no new job should be picked up during graceful shutdown" Job.Queued (jobId j2) + assertJobIdStatus conn tnames logRef "Expecting the first job to be in locked state because it should be running" Job.Locked (jobId j1) + assertJobIdStatus conn tnames logRef "Expecting the second job to be queued because no new job should be picked up during graceful shutdown" Job.Queued (jobId j2) delaySeconds $ 3 * Job.defaultPollingInterval - assertJobIdStatus conn tname logRef "Expecting the first job to be completed successfully if graceful shutdown is implemented correctly" Job.Success (jobId j1) - assertJobIdStatus conn tname logRef "Expecting the second job to be queued because no new job should be picked up during graceful shutdown" Job.Queued (jobId j2) + assertJobIdStatus conn tnames logRef "Expecting the first job to be completed successfully if graceful shutdown is implemented correctly" Job.Success (jobId j1) + assertJobIdStatus conn tnames logRef "Expecting the second job to be queued because no new job should be picked up during graceful shutdown" Job.Queued (jobId j2) pure () testJobScheduling appPool jobPool = testCase "job scheduling" $ do - withNewJobMonitor jobPool $ \tname logRef -> do + withNewJobMonitor jobPool $ \tnames logRef -> do Pool.withResource appPool $ \conn -> do t <- getCurrentTime - job@Job{jobId} <- Job.scheduleJob conn tname (PayloadSucceed 0) (addUTCTime (fromIntegral 3600) t) + job@Job{jobId} <- Job.scheduleJob conn tnames (PayloadSucceed 0) (addUTCTime (fromIntegral 3600) t) delaySeconds $ Seconds 2 - assertJobIdStatus conn tname logRef "Job is scheduled in the future. It should NOT have been successful by now" Job.Queued jobId - j <- Job.saveJobIO conn tname job{jobRunAt = (addUTCTime (fromIntegral (-1)) t)} + assertJobIdStatus conn tnames logRef "Job is scheduled in the future. It should NOT have been successful by now" Job.Queued jobId + j <- Job.saveJobIO conn tnames job{jobRunAt = (addUTCTime (fromIntegral (-1)) t)} delaySeconds (Job.defaultPollingInterval + (Seconds 2)) - assertJobIdStatus conn tname logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId + assertJobIdStatus conn tnames logRef "Job had a runAt date in the past. It should have been successful by now" Job.Success jobId testJobFailure appPool jobPool = testCase "job retry" $ do - withNewJobMonitor jobPool $ \tname logRef -> do + withNewJobMonitor jobPool $ \tnames logRef -> do Pool.withResource appPool $ \conn -> do - Job{jobId} <- Job.createJob conn tname (PayloadAlwaysFail 0) + Job{jobId} <- Job.createJob conn tnames (PayloadAlwaysFail 0) delaySeconds $ Seconds 15 - Job{jobAttempts, jobStatus} <- ensureJobId conn tname jobId + Job{jobAttempts, jobStatus} <- ensureJobId conn tnames jobId assertEqual "Exepcting job to be in Failed status" Job.Failed jobStatus assertEqual ("Expecting job attempts to be 3. Found " <> show jobAttempts) 3 jobAttempts From 0b6738f893addb57082f89922009c9115a5f8220 Mon Sep 17 00:00:00 2001 From: David Ellis Date: Tue, 20 Oct 2020 11:38:09 -0600 Subject: [PATCH 13/14] Organized imports in `test/Test.hs` --- test/Test.hs | 81 ++++++++++++++++++++++++---------------------------- 1 file changed, 37 insertions(+), 44 deletions(-) diff --git a/test/Test.hs b/test/Test.hs index ecb32aa..68284eb 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -1,47 +1,43 @@ {-# LANGUAGE TypeSynonymInstances, FlexibleInstances, NamedFieldPuns, DeriveGeneric, FlexibleContexts, TypeFamilies, StandaloneDeriving #-} module Test where -import Test.Tasty as Tasty -import qualified OddJobs.Migrations as Migrations -import qualified OddJobs.Job as Job -import Database.PostgreSQL.Simple as PGS -import Data.Functor (void) -import Data.Pool as Pool -import Test.Tasty.HUnit -import Debug.Trace --- import Control.Exception.Lifted (finally, catch, bracket) -import Control.Monad.Logger -import Control.Monad.Reader -import Data.Aeson as Aeson -import Data.Aeson.TH as Aeson --- import Control.Concurrent.Lifted --- import Control.Concurrent.Async.Lifted -import OddJobs.Job (Job(..), JobId, delaySeconds, Seconds(..)) -import System.Log.FastLogger ( fromLogStr, withFastLogger, LogType'(..) - , defaultBufSize, FastLogger, FileLogSpec(..), newTimedFastLogger - , withTimedFastLogger) -import System.Log.FastLogger.Date (newTimeCache, simpleTimeFormat') -import Data.String.Conv (toS) -import Data.Time -import GHC.Generics -import Hedgehog -import qualified Hedgehog.Gen as Gen -import qualified Hedgehog.Range as Range -import Test.Tasty.Hedgehog -import qualified System.Random as R -import Data.String (fromString) +import Control.Monad.Logger +import Control.Monad.Reader +import Data.Aeson as Aeson +import Data.Aeson.TH as Aeson +import Data.Functor (void) import qualified Data.IntMap.Strict as Map -import Control.Monad.Trans.Control (liftWith, restoreT) -import Control.Monad.Morph (hoist) -import Data.List as DL -import OddJobs.Web as Web -import qualified Data.Time.Convenience as Time +import Data.List as DL +import Data.Maybe (fromMaybe) +import Data.Ord (comparing, Down(..)) +import Data.Pool as Pool +import Data.String (fromString) +import Data.String.Conv (toS) import qualified Data.Text as T -import Data.Ord (comparing, Down(..)) -import Data.Maybe (fromMaybe) +import Data.Time +import qualified Data.Time.Convenience as Time +import Database.PostgreSQL.Simple as PGS +import GHC.Generics +import System.Log.FastLogger ( fromLogStr, withFastLogger, LogType'(..) + , defaultBufSize, FastLogger, FileLogSpec(..), newTimedFastLogger + , withTimedFastLogger) +import System.Log.FastLogger.Date (newTimeCache, simpleTimeFormat') +import qualified System.Random as R +import UnliftIO + +import Hedgehog +import qualified Hedgehog.Gen as Gen +import qualified Hedgehog.Range as Range +import Test.Tasty as Tasty +import Test.Tasty.Hedgehog +import Test.Tasty.HUnit + import qualified OddJobs.ConfigBuilder as Job -import UnliftIO -import OddJobs.Types +import OddJobs.Job (Job(..), JobId, delaySeconds, Seconds(..)) +import qualified OddJobs.Job as Job +import qualified OddJobs.Migrations as Migrations +import qualified OddJobs.Types as Job +import OddJobs.Web as Web $(Aeson.deriveJSON Aeson.defaultOptions ''Seconds) @@ -207,22 +203,19 @@ ensureJobId conn tnames jid = Job.findJobByIdIO conn tnames jid >>= \case Nothing -> error $ "Not expecting job to be deleted. JobId=" <> show jid Just j -> pure j ---withRandomTable :: (MonadBaseControl IO m, MonadUnliftIO m) => Pool Connection -> (Job.TableNames -> m a) -> m a withRandomTable :: Pool Connection -> (Job.TableNames -> IO a) -> IO a withRandomTable jobPool action = do - tnames <- simpleTableNames . ("jobs_" <>) . fromString <$> liftIO (replicateM 10 (R.randomRIO ('a', 'z'))) + tnames <- Job.simpleTableNames . ("jobs_" <>) . fromString <$> liftIO (replicateM 10 $ R.randomRIO ('a', 'z')) finally ((Pool.withResource jobPool $ \conn -> (liftIO $ Migrations.createJobTables conn tnames)) >> (action tnames)) (Pool.withResource jobPool $ \conn -> liftIO $ void $ Migrations.dropJobTables conn tnames) ---withNewJobMonitor :: (MonadBaseControl IO m, MonadUnliftIO m) => Pool Connection -> (TableNames -> IORef [Job.LogEvent] -> m a) -> m a -withNewJobMonitor :: Pool Connection -> (TableNames -> IORef [Job.LogEvent] -> IO a) -> IO a +withNewJobMonitor :: Pool Connection -> (Job.TableNames -> IORef [Job.LogEvent] -> IO a) -> IO a withNewJobMonitor jobPool actualTest = do withRandomTable jobPool $ \tnames -> do withNamedJobMonitor tnames jobPool (actualTest tnames) ---withNamedJobMonitor :: (MonadBaseControl IO m, MonadUnliftIO m) => TableNames -> Pool Connection -> (IORef [Job.LogEvent] -> m a) -> m a -withNamedJobMonitor :: TableNames -> Pool Connection -> (IORef [Job.LogEvent] -> IO a) -> IO a +withNamedJobMonitor :: Job.TableNames -> Pool Connection -> (IORef [Job.LogEvent] -> IO a) -> IO a withNamedJobMonitor tnames jobPool actualTest = do logRef :: IORef [Job.LogEvent] <- newIORef [] tcache <- newTimeCache simpleTimeFormat' From bb06cdfb193964dfb73ecf57902862ccd393975b Mon Sep 17 00:00:00 2001 From: David Ellis Date: Tue, 20 Oct 2020 12:10:44 -0600 Subject: [PATCH 14/14] Updated tests to allow env var specification of database connection Now, if the environment variable "ODD_JOBS_TEST_DB_CONNECT" is set, it will be used as a connection string for unit test. If not, the existing hard-coded default connection will be used. This is useful when developing in a containerized environment. --- test/Test.hs | 52 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/test/Test.hs b/test/Test.hs index 68284eb..805b734 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -5,6 +5,7 @@ import Control.Monad.Logger import Control.Monad.Reader import Data.Aeson as Aeson import Data.Aeson.TH as Aeson +import Data.ByteString.Char8 (pack) import Data.Functor (void) import qualified Data.IntMap.Strict as Map import Data.List as DL @@ -18,6 +19,7 @@ import Data.Time import qualified Data.Time.Convenience as Time import Database.PostgreSQL.Simple as PGS import GHC.Generics +import System.Environment (lookupEnv) import System.Log.FastLogger ( fromLogStr, withFastLogger, LogType'(..) , defaultBufSize, FastLogger, FileLogSpec(..), newTimedFastLogger , withTimedFastLogger) @@ -41,32 +43,38 @@ import OddJobs.Web as Web $(Aeson.deriveJSON Aeson.defaultOptions ''Seconds) +-- | Default database connection used in unit tests. +-- __Note:__ This value will be used unless the environment variable +-- "ODD_JOBS_TEST_DB_CONNECT" is defined, which must be set to a valid +-- PostgreSQL connection string to override this default. +defaultTestConnectInfo :: ConnectInfo +defaultTestConnectInfo = ConnectInfo + { connectHost = "localhost" + , connectPort = fromIntegral (5432 :: Int) + , connectUser = "jobs_test" + , connectPassword = "jobs_test" + , connectDatabase = "jobs_test" + } + main :: IO () main = do - bracket createAppPool destroyAllResources $ \appPool -> do - bracket createJobPool destroyAllResources $ \jobPool -> do + bracket createTestPool destroyAllResources $ \appPool -> do + bracket createTestPool destroyAllResources $ \jobPool -> do defaultMain $ tests appPool jobPool + +createTestPool :: IO (Pool Connection) +createTestPool = + createPool + openTestConnection -- create a new resource + PGS.close -- destroy resource + 1 -- stripes + (fromRational 10) -- number of seconds unused resources are kept around + 45 -- max resources open per stripe where - connInfo = ConnectInfo - { connectHost = "localhost" - , connectPort = fromIntegral (5432 :: Int) - , connectUser = "jobs_test" - , connectPassword = "jobs_test" - , connectDatabase = "jobs_test" - } - - createAppPool = createPool - (PGS.connect connInfo) -- cretea a new resource - (PGS.close) -- destroy resource - 1 -- stripes - (fromRational 10) -- number of seconds unused resources are kept around - 45 - createJobPool = createPool - (PGS.connect connInfo) -- cretea a new resource - (PGS.close) -- destroy resource - 1 -- stripes - (fromRational 10) -- number of seconds unused resources are kept around - 45 + openTestConnection = + maybe (PGS.connect defaultConnectInfo) + (PGS.connectPostgreSQL . pack) + =<< lookupEnv "ODD_JOBS_TEST_DB_CONNECT" tests appPool jobPool = testGroup "All tests" [