Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow jobs to have return values, which get persisted in the DB #103

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
7 changes: 5 additions & 2 deletions examples/OddJobsCliExample.lhs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,22 @@ data MyJob
In this example, the core job-runner function is in the `IO` monad. In all probability, you application's code will be in a custom monad, and not IO. Pleae refer to TODO, on how to work with custom monads.

\begin{code}
myJobRunner :: Job -> IO ()
myJobRunner :: Job -> IO (Maybe Aeson.Value)
myJobRunner job = do
throwParsePayload job >>= \case
SendWelcomeEmail userId -> do
putStrLn $ "This should call the function that actually sends the welcome email. " <>
"\nWe are purposely waiting 60 seconds before completing this job so that graceful shutdown can be demonstrated."
delaySeconds (Seconds 60)
putStrLn $ "SendWelcomeEmail to user: " <> show userId <> " complete (60 second wait is now over...)"
SendPasswordResetEmail _tkn ->
pure Nothing
SendPasswordResetEmail _tkn -> do
putStrLn "This should call the function that actually sends the password-reset email"
pure Nothing
SetupSampleData _userId -> do
_ <- Prelude.error "User onboarding is incomplete"
putStrLn "This should call the function that actually sets up sample data in a newly registered user's account"
pure Nothing
\end{code}

=== 5. Write the main function using `OddJobs.Cli`
Expand Down
25 changes: 19 additions & 6 deletions odd-jobs.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ cabal-version: 1.12
-- This file has been generated from package.yaml by hpack version 0.35.2.
--
-- see: https://github.com/sol/hpack
--
-- hash: dff1ced317b40e54688799203ec3743b119bf71f2fc98109a5e79f12c6e10fba

name: odd-jobs
version: 0.2.3
version: 0.2.2
synopsis: A full-featured PostgreSQL-backed job queue (with an admin UI)
description: - Background jobs library for Haskell.
- Extracted from production code at [Vacation Labs](https://www.vacationlabs.com).
Expand Down Expand Up @@ -47,6 +45,11 @@ extra-source-files:
assets/js/logo-slider.js
assets/odd-jobs-color-logo.png

flag jobresult
description: Controls whether the job is extended to have a result/return value along with workflow features
manual: True
default: False

library
exposed-modules:
OddJobs.Job
Expand All @@ -57,8 +60,8 @@ library
OddJobs.Types
OddJobs.ConfigBuilder
other-modules:
UI
OddJobs.Job.Query
UI
Paths_odd_jobs
hs-source-dirs:
src
Expand Down Expand Up @@ -91,7 +94,7 @@ library
, mtl
, optparse-applicative
, postgresql-simple
, resource-pool >= 0.4.0.0 && < 0.5.0.0
, resource-pool
, safe
, servant
, servant-lucid
Expand All @@ -109,6 +112,8 @@ library
, wai
, warp
default-language: Haskell2010
if flag(jobresult)
cpp-options: -D JOB_RESULT

executable devel
main-is: DevelMain.hs
Expand All @@ -117,8 +122,8 @@ executable devel
OddJobs.ConfigBuilder
OddJobs.Endpoints
OddJobs.Job
OddJobs.Migrations
OddJobs.Job.Query
OddJobs.Migrations
OddJobs.Types
OddJobs.Web
UI
Expand Down Expand Up @@ -175,6 +180,8 @@ executable devel
, wai
, warp
default-language: Haskell2010
if flag(jobresult)
cpp-options: -D JOB_RESULT

executable odd-jobs-cli-example
main-is: OddJobsCliExample.lhs
Expand All @@ -196,6 +203,7 @@ executable odd-jobs-cli-example
, async ==2.2.4
, base >=4.7 && <5
, bytestring
, containers
, directory
, either
, fast-logger
Expand Down Expand Up @@ -229,6 +237,8 @@ executable odd-jobs-cli-example
, wai
, warp
default-language: Haskell2010
if flag(jobresult)
cpp-options: -D JOB_RESULT

test-suite jobrunner
type: exitcode-stdio-1.0
Expand All @@ -239,6 +249,7 @@ test-suite jobrunner
OddJobs.ConfigBuilder
OddJobs.Endpoints
OddJobs.Job
OddJobs.Job.Query
OddJobs.Migrations
OddJobs.Types
OddJobs.Web
Expand Down Expand Up @@ -303,3 +314,5 @@ test-suite jobrunner
, wai
, warp
default-language: Haskell2010
if flag(jobresult)
cpp-options: -D JOB_RESULT
10 changes: 10 additions & 0 deletions package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ extra-source-files:
- assets/js/logo-slider.js
- assets/odd-jobs-color-logo.png

flags:
jobresult:
description: Controls whether the job is extended to have a result/return value along with workflow features
manual: true
default: false

when:
- condition: flag(jobresult)
cpp-options: -D JOB_RESULT


ghc-options:
- -Wall
Expand Down
3 changes: 2 additions & 1 deletion src/OddJobs/ConfigBuilder.hs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mkConfig :: (LogLevel -> LogEvent -> IO ())
-- ^ DB connection-pool to be used by job-runner. Ref: 'cfgDbPool'
-> ConcurrencyControl
-- ^ Concurrency configuration. Ref: 'cfgConcurrencyControl'
-> (Job -> IO ())
-> (Job -> IO (Maybe Aeson.Value))
-- ^ The actual "job runner" which contains your application code. Ref: 'cfgJobRunner'
-> (Config -> Config)
-- ^ A function that allows you to modify the \"interim config\". The
Expand Down Expand Up @@ -83,6 +83,7 @@ mkConfig logger tname dbpool ccControl jrunner configOverridesFn =
, cfgImmediateJobDeletion = defaultImmediateJobDeletion
, cfgDelayedJobDeletion = Nothing
, cfgDefaultRetryBackoff = \attempts -> pure $ Seconds $ 2 ^ attempts
-- , cfgEnableWorkflows = False
}
in cfg

Expand Down
99 changes: 57 additions & 42 deletions src/OddJobs/Job.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ module OddJobs.Job
, throwParsePayload
, eitherParsePayloadWith
, throwParsePayloadWith
, noJobResult
)
where

Expand All @@ -87,6 +88,7 @@ import qualified Data.Pool as Pool
import Data.Pool(Pool)
import Data.Text as T
import Database.PostgreSQL.Simple as PGS
import Database.PostgreSQL.Simple.ToRow as PGS
import Database.PostgreSQL.Simple.Notification
import UnliftIO.Async hiding (poll)
import UnliftIO.Concurrent (threadDelay, myThreadId)
Expand Down Expand Up @@ -147,12 +149,13 @@ import Data.Aeson.Internal (iparse, IResult(..), formatError)
-- type-class based interface as well (similar to what
-- 'Yesod.JobQueue.YesodJobQueue' provides).
class (MonadUnliftIO m, MonadBaseControl IO m) => HasJobRunner m where
-- isWorkflowEnabled :: m Bool
getPollingInterval :: m Seconds
onJobSuccess :: Job -> m ()
immediateJobDeletion :: m (Job -> IO Bool)
delayedJobDeletion :: m (Maybe (PGS.Connection -> IO Int64))
onJobFailed :: m [JobErrHandler]
getJobRunner :: m (Job -> IO ())
getJobRunner :: m (Job -> IO (Maybe Aeson.Value))
getDbPool :: m (Pool Connection)
getTableName :: m TableName
onJobStart :: Job -> m ()
Expand Down Expand Up @@ -188,6 +191,7 @@ logCallbackErrors :: (HasJobRunner m) => JobId -> Text -> m () -> m ()
logCallbackErrors jid msg action = catchAny action $ \e -> log LevelError $ LogText $ msg <> " Job ID=" <> toS (show jid) <> ": " <> toS (show e)

instance HasJobRunner RunnerM where
-- isWorkflowEnabled = asks (cfgEnableWorkflows . envConfig)
getPollingInterval = asks (cfgPollingInterval . envConfig)
onJobFailed = asks (cfgOnJobFailed . envConfig)
onJobSuccess job = do
Expand Down Expand Up @@ -281,30 +285,29 @@ findJobByIdIO conn tname jid = PGS.query conn findJobByIdQuery (tname, jid) >>=
_js -> Prelude.error $ "Not expecting to find multiple jobs by id=" <> show jid


saveJobQuery :: PGS.Query
saveJobQuery = "UPDATE ? set run_at = ?, status = ?, payload = ?, last_error = ?, attempts = ?, locked_at = ?, locked_by = ? WHERE id = ? RETURNING " <> concatJobDbColumns

deleteJobQuery :: PGS.Query
deleteJobQuery = "DELETE FROM ? WHERE id = ?"

saveJob :: (HasJobRunner m) => Job -> m Job
saveJob j = do
tname <- getTableName
-- workflowEnabled <- isWorkflowEnabled
withDbConnection $ \conn -> liftIO $ saveJobIO conn tname j

saveJobIO :: Connection -> TableName -> Job -> IO Job
saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobId} = do
rs <- PGS.query conn saveJobQuery
( tname
, jobRunAt
, jobStatus
, jobPayload
, jobLastError
, jobAttempts
, jobLockedAt
, jobLockedBy
, jobId
)
saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobResult, jobId, jobParentId} = do
let args = ( tname
, jobRunAt
, jobStatus
, jobPayload
, jobLastError
, jobAttempts
, jobLockedAt
, jobLockedBy
, jobResult
, jobParentId
)
rs <- PGS.query conn saveJobQuery args
case rs of
[] -> Prelude.error $ "Could not find job while updating it id=" <> show jobId
[j] -> pure j
Expand Down Expand Up @@ -362,29 +365,30 @@ instance Exception TimeoutException
runJobWithTimeout :: (HasJobRunner m)
=> Seconds
-> Job
-> m ()
-> m (Maybe Aeson.Value)
runJobWithTimeout timeoutSec job@Job{jobId} = do
threadsRef <- envJobThreadsRef <$> getRunnerEnv
jobRunner_ <- getJobRunner

a <- async $ liftIO $ jobRunner_ job

_x <- atomicModifyIORef' threadsRef $ \threads ->
( DM.insert jobId a threads
, DL.map asyncThreadId $ DM.elems $ DM.insert jobId a threads
)
atomicModifyIORef' threadsRef $ \threads -> ( DM.insert jobId (void a) threads, () )
-- ( DM.insert jobId a threads
-- , DL.map asyncThreadId $ DM.elems $ DM.insert jobId a threads
-- )

-- liftIO $ putStrLn $ "Threads: " <> show x
log LevelDebug $ LogText $ toS $ "Spawned job in " <> show (asyncThreadId a)

t <- async $ do
delaySeconds timeoutSec
throwIO TimeoutException
t <- async $ delaySeconds timeoutSec

void $ finally
(waitEitherCancel a t)

finally
(waitEitherCancel t a >>= either (const $ throwIO TimeoutException) pure)
(atomicModifyIORef' threadsRef $ \threads -> (DM.delete jobId threads, ()))




-- | runs a job, blocks for as long as it's in progress
runJob :: (HasJobRunner m) => JobId -> m ()
Expand All @@ -397,9 +401,9 @@ runJob jid = do
log LevelInfo $ LogJobStart job
flip catch (exceptionHandler job startTime) $ do
onJobStart job
runJobWithTimeout lockTimeout job
jresult <- runJobWithTimeout lockTimeout job
endTime <- liftIO getCurrentTime
let newJob = job{jobStatus=OddJobs.Types.Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime}
let newJob = job{jobStatus=OddJobs.Types.Success, jobLockedBy=Nothing, jobLockedAt=Nothing, jobUpdatedAt = endTime, jobResult = jresult}
shouldDeleteJob <- immediateJobDeletion >>= (\fn -> liftIO $ fn newJob)
if shouldDeleteJob
then deleteJob jid
Expand Down Expand Up @@ -470,7 +474,6 @@ restartUponCrash name_ action = do
case x of
Left (e :: SomeException) -> log LevelError $ LogText $ name_ <> " seems to have exited with an error. Restarting: " <> toS (show e)
Right r -> log LevelError $ LogText $ name_ <> " seems to have exited with the folloing result: " <> toS (show r) <> ". Restaring."
traceM "CRASH OCCURRED"
restartUponCrash name_ action

-- | Spawns 'jobPoller' and 'jobEventListener' in separate threads and restarts
Expand Down Expand Up @@ -729,7 +732,6 @@ jobEventListener = do
jobDeletionPoller :: (HasJobRunner m) => (Connection -> IO Int64) -> m ()
jobDeletionPoller deletionFn = do
i <- getPollingInterval
dbPool <- getDbPool
withDbConnection $ \conn -> do
forever $ do
n <- liftIO $ deletionFn conn
Expand Down Expand Up @@ -783,14 +785,14 @@ scheduleJob conn tname payload runAt = do

type ResourceList = [(ResourceId, Int)]

createJobWithResources
:: ToJSON p
=> Connection
-> TableName
-> ResourceCfg
-> p
-> ResourceList
-> IO Job
createJobWithResources ::
ToJSON p =>
Connection ->
TableName ->
ResourceCfg ->
p ->
ResourceList ->
IO Job
createJobWithResources conn tname resCfg payload resources = do
t <- getCurrentTime
scheduleJobWithResources conn tname resCfg payload resources t
Expand All @@ -804,10 +806,9 @@ scheduleJobWithResources
-> ResourceList
-> UTCTime
-> IO Job
scheduleJobWithResources conn tname ResourceCfg{..} payload resources runAt = do
scheduleJobWithResources conn tname ResourceCfg{..} payload resources runAt = PGS.withTransaction conn $ do
-- We insert everything in a single transaction to delay @NOTIFY@ calls,
-- so a job isn't picked up before its resources are inserted.
PGS.begin conn
let args = ( tname, runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text )
queryFormatter = toS <$> PGS.formatQuery conn createJobQuery args
rs <- PGS.query conn createJobQuery args
Expand All @@ -821,8 +822,6 @@ scheduleJobWithResources conn tname ResourceCfg{..} payload resources runAt = do
void $ PGS.execute conn ensureResource (resCfgResourceTable, rawResourceId resourceId, resCfgDefaultLimit)
void $ PGS.execute conn registerResourceUsage (resCfgUsageTable, jobId job, rawResourceId resourceId, usage)

PGS.commit conn

pure job

-- getRunnerEnv :: (HasJobRunner m) => m RunnerEnv
Expand Down Expand Up @@ -856,6 +855,22 @@ throwParsePayloadWith parser job =
either throwString pure (eitherParsePayloadWith parser job)


-- | If you aren't interesting in storing 'jobResults' then use this in every
-- branch of the @case@ statement in your 'jobRunner', like this:
--
-- @
-- myJobRunner :: Job -> IO (Maybe Aeson.Value)
-- myJobRunner job = do
--
-- throwParsePayload job >>= \case
-- SendConfirmationEmail uid -> noJobResult $ sendConfirmationEmail uid
-- SetupCustomerAccount cid -> noJobResult $ setupCustomerAccount cid
-- @
--
noJobResult :: (Functor f) => f a -> f (Maybe Aeson.Value)
noJobResult = (Nothing <$)


-- | Used by the web\/admin UI to fetch a \"master list\" of all known
-- job-types. Ref: 'cfgAllJobTypes'
fetchAllJobTypes :: (MonadIO m)
Expand Down
Loading
Loading