Skip to content

Commit

Permalink
Add resource-based concurrency control
Browse files Browse the repository at this point in the history
This is based on the system in saurabhnanda#55, and aims to
address saurabhnanda#38, where jobs need to have some limited
access to resources that controls how many can run simultaneously.

Unlike that PR, this implements a system where jobs can require access
to 0 or more resources, with different amounts of usage. This is because
of a business need for jobs to be able to require multiple resources.

The implementation is intended to have no performance impact on existing
code wherever the user has not opted in to resource-based concurrency.
This is done by having parallel implementations wherever necessary that
switch based on the concurrency control chosen.
  • Loading branch information
ivb-supercede committed Feb 1, 2023
1 parent f921230 commit 181cda2
Show file tree
Hide file tree
Showing 4 changed files with 396 additions and 53 deletions.
220 changes: 169 additions & 51 deletions src/OddJobs/Job.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE RankNTypes, FlexibleInstances, FlexibleContexts, PartialTypeSignatures, UndecidableInstances #-}
{-# LANGUAGE ExistentialQuantification, RecordWildCards, ScopedTypeVariables #-}

module OddJobs.Job
(
Expand All @@ -12,13 +13,17 @@ module OddJobs.Job
-- $config
, Config(..)
, ConcurrencyControl(..)
, ResourceCfg(..)

-- * Creating/scheduling jobs
--
-- $createJobs
, createJob
, scheduleJob

, createJobWithResources
, scheduleJobWithResources

-- * @Job@ and associated data-types
--
-- $dataTypes
Expand All @@ -31,6 +36,8 @@ module OddJobs.Job
, Seconds(..)
, JobErrHandler(..)
, AllJobTypes(..)
, ResourceId(..)
, FunctionName

-- ** Structured logging
--
Expand Down Expand Up @@ -78,7 +85,7 @@ import Data.Pool(Pool)
import Data.Text as T
import Database.PostgreSQL.Simple as PGS
import Database.PostgreSQL.Simple.Notification
import UnliftIO.Async
import UnliftIO.Async hiding (poll)
import UnliftIO.Concurrent (threadDelay, myThreadId)
import Data.String
import System.Posix.Process (getProcessID)
Expand All @@ -89,7 +96,8 @@ import Control.Monad.Logger as MLogger (LogLevel(..), LogStr, toLogStr)
import UnliftIO.IORef
import UnliftIO.Exception ( SomeException(..), try, catch, finally
, catchAny, bracket, Exception(..), throwIO
, catches, Handler(..), mask_, throwString
, catches, Handler(..), mask_, onException
, throwString
)
import Data.Proxy
import Control.Monad.Trans.Control
Expand Down Expand Up @@ -309,7 +317,7 @@ saveJob j = do
saveJobIO :: Connection -> TableName -> Job -> IO Job
saveJobIO conn tname Job{jobRunAt, jobStatus, jobPayload, jobLastError, jobAttempts, jobLockedBy, jobLockedAt, jobId} = do
rs <- PGS.query conn saveJobQuery
( tname
( tname
, jobRunAt
, jobStatus
, jobPayload
Expand Down Expand Up @@ -476,7 +484,18 @@ jobMonitor = do

-- | Ref: 'jobPoller'
jobPollingSql :: Query
jobPollingSql = "update ? set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1 WHERE id in (select id from ? where (run_at<=? AND ((status in ?) OR (status = ? and locked_at<?))) ORDER BY attempts ASC, run_at ASC LIMIT 1 FOR UPDATE) RETURNING id"
jobPollingSql =
"update ? set status = ?, locked_at = ?, locked_by = ?, attempts=attempts+1\
\ WHERE id in (select id from ? where (run_at<=? AND ((status in ?) OR (status = ? and locked_at<?))) \
\ ORDER BY attempts ASC, run_at ASC LIMIT 1 FOR UPDATE) RETURNING id"

jobPollingWithResourceSql :: Query
jobPollingWithResourceSql =
" UPDATE ? SET status = ?, locked_at = ?, locked_by = ?, attempts = attempts + 1 \
\ WHERE id in (select id from ? where (run_at<=? AND ((status in ?) OR (status = ? and locked_at<?))) \
\ AND ?(id) \
\ ORDER BY attempts ASC, run_at ASC LIMIT 1) \
\ RETURNING id"

waitForJobs :: (HasJobRunner m)
=> m ()
Expand All @@ -491,15 +510,23 @@ waitForJobs = do
delaySeconds (Seconds 1)
waitForJobs

data ConcurrencyAction
= DontPoll
| PollAny
| PollWithResources ResourceCfg

getConcurrencyControlFn :: (HasJobRunner m)
=> m (m Bool)
=> m (Connection -> m ConcurrencyAction)
getConcurrencyControlFn = getConcurrencyControl >>= \case
UnlimitedConcurrentJobs -> pure $ pure True
MaxConcurrentJobs maxJobs -> pure $ do
UnlimitedConcurrentJobs -> pure $ const $ pure PollAny
MaxConcurrentJobs maxJobs -> pure $ const $ do
curJobs <- getRunnerEnv >>= (readIORef . envJobThreadsRef)
pure $ DL.length curJobs < maxJobs
DynamicConcurrency fn -> pure $ liftIO fn
pure $ pollIf $ DL.length curJobs < maxJobs
ResourceLimits resCfg -> pure $ const $ pure $ PollWithResources resCfg
DynamicConcurrency fn -> pure $ const $ pollIf <$> liftIO fn

where
pollIf cond = if cond then PollAny else DontPoll

jobPollingIO :: Connection -> String -> TableName -> Seconds -> IO [Only JobId]
jobPollingIO pollerDbConn processName tname lockTimeout = do
Expand Down Expand Up @@ -537,27 +564,55 @@ jobPoller = do
lockTimeout <- getDefaultJobTimeout
traceM "jobPoller just before log statement"
log LevelInfo $ LogText $ toS $ "Starting the job monitor via DB polling with processName=" <> processName

let poll conn mResCfg = join $ mask_ $ do
log LevelDebug $ LogText $ toS $ "[" <> processName <> "] Polling the job queue.."
t <- liftIO getCurrentTime
r <- case mResCfg of
Nothing -> liftIO $
PGS.query conn jobPollingSql
( tname
, Locked
, t
, processName
, tname
, t
, In [Queued, Retry]
, Locked
, addUTCTime (fromIntegral $ negate $ unSeconds lockTimeout) t)
Just ResourceCfg{..} -> liftIO $
PGS.query conn jobPollingWithResourceSql
( tname
, Locked
, t
, processName
, tname
, t
, In [Queued, Retry]
, Locked
, addUTCTime (fromIntegral $ negate $ unSeconds lockTimeout) t
, resCfgCheckResourceFunction
)
case r of
-- When we don't have any jobs to run, we can relax a bit...
[] -> pure delayAction

-- When we find a job to run, fork and try to find the next job without any delay...
[Only (jid :: JobId)] -> do
void $ async $ runJob jid
pure noDelayAction

x -> error $ "WTF just happened? I was supposed to get only a single row, but got: " ++ show x

concurrencyControlFn <- getConcurrencyControlFn
withResource pool $ \pollerDbConn -> forever $ concurrencyControlFn >>= \case
False -> do
withResource pool $ \pollerDbConn -> forever $ concurrencyControlFn pollerDbConn >>= \case
DontPoll -> do
log LevelWarn $ LogText "NOT polling the job queue due to concurrency control"
-- If we can't run any jobs ATM, relax and wait for resources to free up
delayAction
True -> do
join
(mask_ $ do
log LevelDebug $ LogText $ toS $ "[" <> processName <> "] Polling the job queue.."
r <- liftIO $ jobPollingIO pollerDbConn processName tname lockTimeout
case r of
-- When we don't have any jobs to run, we can relax a bit...
[] -> pure delayAction

-- When we find a job to run, fork and try to find the next job without any delay...
[Only (jid :: JobId)] -> do
void $ async $ runJob jid
pure noDelayAction

x -> error $ "WTF just happened? I was supposed to get only a single row, but got: " ++ show x)
PollAny -> poll pollerDbConn Nothing
PollWithResources resCfg -> poll pollerDbConn (Just resCfg)

where
delayAction = delaySeconds =<< getPollingInterval
noDelayAction = pure ()
Expand All @@ -573,9 +628,17 @@ jobEventListener = do
jwName <- liftIO jobWorkerName
concurrencyControlFn <- getConcurrencyControlFn

let tryLockingJob jid = do
let tryLockingJob jid mResCfg = withDbConnection $ \conn -> do
let q = "UPDATE ? SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 WHERE id=? AND status in ? RETURNING id"
withDbConnection (\conn -> liftIO $ PGS.query conn q (tname, Locked, jwName, jid, In [Queued, Retry])) >>= \case
qWithResources =
"UPDATE ? SET status=?, locked_at=now(), locked_by=?, attempts=attempts+1 \
\ WHERE id=? AND status in ? AND ?(id) RETURNING id"
result <- case mResCfg of
Nothing -> liftIO $ PGS.query conn q (tname, Locked, jwName, jid, In [Queued, Retry])
Just ResourceCfg{..} -> liftIO $ PGS.query conn qWithResources
(tname, Locked, jwName, jid, In [Queued, Retry], resCfgCheckResourceFunction)

case result of
[] -> do
log LevelDebug $ LogText $ toS $ "Job was locked by someone else before I could start. Skipping it. JobId=" <> show jid
pure Nothing
Expand All @@ -587,29 +650,34 @@ jobEventListener = do
forever $ do
log LevelDebug $ LogText "[LISTEN/NOTIFY] Event loop"
notif <- liftIO $ getNotification monitorDbConn
concurrencyControlFn >>= \case
False -> log LevelWarn $ LogText "Received job event, but ignoring it due to concurrency control"
True -> do
let pload = notificationData notif
log LevelDebug $ LogText $ toS $ "NOTIFY | " <> show pload
case eitherDecode $ toS pload of
Left e -> log LevelError $ LogText $ toS $ "Unable to decode notification payload received from Postgres. Payload=" <> show pload <> " Error=" <> show e

-- Checking if job needs to be fired immediately AND it is not already
-- taken by some othe thread, by the time it got to us
Right (v :: Value) -> case Aeson.parseMaybe parser v of
Nothing -> log LevelError $ LogText $ toS $ "Unable to extract id/run_at/locked_at from " <> show pload
Just (jid, runAt_, mLockedAt_) -> do
t <- liftIO getCurrentTime
if (runAt_ <= t) && isNothing mLockedAt_
then do log LevelDebug $ LogText $ toS $ "Job needs needs to be run immediately. Attempting to fork in background. JobId=" <> show jid
void $ async $ do
-- Let's try to lock the job first... it is possible that it has already
-- been picked up by the poller by the time we get here.
tryLockingJob jid >>= \case
Nothing -> pure ()
Just lockedJid -> runJob lockedJid
else log LevelDebug $ LogText $ toS $ "Job is either for future, or is already locked. Skipping. JobId=" <> show jid

let pload = notificationData notif
runNotifWithFilter :: HasJobRunner m => Maybe ResourceCfg -> m ()
runNotifWithFilter mResCfg = do
log LevelDebug $ LogText $ toS $ "NOTIFY | " <> show pload
case eitherDecode $ toS pload of
Left e -> log LevelError $ LogText $ toS $ "Unable to decode notification payload received from Postgres. Payload=" <> show pload <> " Error=" <> show e

-- Checking if job needs to be fired immediately AND it is not already
-- taken by some othe thread, by the time it got to us
Right (v :: Value) -> case Aeson.parseMaybe parser v of
Nothing -> log LevelError $ LogText $ toS $ "Unable to extract id/run_at/locked_at from " <> show pload
Just (jid, runAt_, mLockedAt_) -> do
t <- liftIO getCurrentTime
if (runAt_ <= t) && isNothing mLockedAt_
then do log LevelDebug $ LogText $ toS $ "Job needs needs to be run immediately. Attempting to fork in background. JobId=" <> show jid
void $ async $ do
-- Let's try to lock the job first... it is possible that it has already
-- been picked up by the poller by the time we get here.
tryLockingJob jid mResCfg >>= \case
Nothing -> pure ()
Just lockedJid -> runJob lockedJid
else log LevelDebug $ LogText $ toS $ "Job is either for future, is already locked, or would violate concurrency constraints. Skipping. JobId=" <> show jid

concurrencyControlFn monitorDbConn >>= \case
DontPoll -> log LevelWarn $ LogText "Received job event, but ignoring it due to concurrency control"
PollAny -> runNotifWithFilter Nothing
PollWithResources resCfg -> runNotifWithFilter (Just resCfg)
where
parser :: Value -> Aeson.Parser (JobId, UTCTime, Maybe UTCTime)
parser = withObject "expecting an object to parse job.run_at and job.locked_at" $ \o -> do
Expand All @@ -623,6 +691,12 @@ jobEventListener = do
createJobQuery :: PGS.Query
createJobQuery = "INSERT INTO ? (run_at, status, payload, last_error, attempts, locked_at, locked_by) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING " <> concatJobDbColumns

ensureResource :: PGS.Query
ensureResource = "INSERT INTO ? (id, usage_limit) VALUES (?, ?) ON CONFLICT DO NOTHING"

registerResourceUsage :: PGS.Query
registerResourceUsage = "INSERT INTO ? (job_id, resource_id, usage) VALUES (?, ?, ?)"

-- $createJobs
--
-- Ideally you'd want to create wrappers for 'createJob' and 'scheduleJob' in
Expand Down Expand Up @@ -668,6 +742,50 @@ scheduleJob conn tname payload runAt = do
[r] -> pure r
_ -> Prelude.error . (<> "Not expecting multiple rows when creating a single job. Query=") <$> queryFormatter

type ResourceList = [(ResourceId, Int)]

createJobWithResources
:: ToJSON p
=> Connection
-> TableName
-> ResourceCfg
-> p
-> ResourceList
-> IO Job
createJobWithResources conn tname resCfg payload resources = do
t <- getCurrentTime
scheduleJobWithResources conn tname resCfg payload resources t

scheduleJobWithResources
:: ToJSON p
=> Connection
-> TableName
-> ResourceCfg
-> p
-> ResourceList
-> UTCTime
-> IO Job
scheduleJobWithResources conn tname ResourceCfg{..} payload resources runAt = do
-- We insert everything in a single transaction to delay @NOTIFY@ calls,
-- so a job isn't picked up before its resources are inserted.
PGS.begin conn
let args = ( tname, runAt, Queued, toJSON payload, Nothing :: Maybe Value, 0 :: Int, Nothing :: Maybe Text, Nothing :: Maybe Text )
queryFormatter = toS <$> PGS.formatQuery conn createJobQuery args
rs <- PGS.query conn createJobQuery args

job <- flip onException (PGS.rollback conn) $ case rs of
[] -> Prelude.error . (<> "Not expecting a blank result set when creating a job. Query=") <$> queryFormatter
[r] -> pure r
_ -> Prelude.error . (<> "Not expecting multiple rows when creating a single job. Query=") <$> queryFormatter

forM_ resources $ \(resourceId, usage) -> do
void $ PGS.execute conn ensureResource (resCfgResourceTable, rawResourceId resourceId, resCfgDefaultLimit)
void $ PGS.execute conn registerResourceUsage (resCfgUsageTable, jobId job, rawResourceId resourceId, usage)

PGS.commit conn

pure job


-- getRunnerEnv :: (HasJobRunner m) => m RunnerEnv
-- getRunnerEnv = ask
Expand Down
56 changes: 56 additions & 0 deletions src/OddJobs/Migrations.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE RecordWildCards #-}
module OddJobs.Migrations
( module OddJobs.Migrations
, module OddJobs.Types
Expand Down Expand Up @@ -72,3 +73,58 @@ createJobTable conn tname = void $ do
fnName = PGS.Identifier $ "notify_job_monitor_for_" <> getTnameTxt tname
trgName = PGS.Identifier $ "trg_notify_job_monitor_for_" <> getTnameTxt tname
getTnameTxt (PGS.QualifiedIdentifier _ tname') = tname'

createResourceTableQuery :: Query
createResourceTableQuery = "CREATE TABLE IF NOT EXISTS ?" <>
"( id text primary key" <>
", usage_limit int not null" <>
")";

createUsageTableQuery :: Query
createUsageTableQuery = "CREATE TABLE IF NOT EXISTS ?" <>
"( job_id serial not null REFERENCES ? ON DELETE CASCADE" <>
", resource_id text not null REFERENCES ? ON DELETE CASCADE" <>
", usage int not null" <>
", PRIMARY KEY (job_id, resource_id)" <>
");"

createUsageFunction :: Query
createUsageFunction = "CREATE OR REPLACE FUNCTION ?(resourceId text) RETURNS int as $$" <>
" SELECT sum(usage) FROM ? AS j INNER JOIN ? AS jr ON j.id = jr.job_id " <>
" WHERE jr.resource_id = $1 AND j.status = ? " <>
" $$ LANGUAGE SQL;"

createCheckResourceFunction :: Query
createCheckResourceFunction = "CREATE OR REPLACE FUNCTION ?(jobId int) RETURNS bool as $$" <>
" SELECT coalesce(bool_and(?(resource.id) + job_resource.usage <= resource.usage_limit), true) FROM " <>
" ? AS job_resource INNER JOIN ? AS resource ON job_resource.resource_id = resource.id " <>
" WHERE job_resource.job_id = $1" <>
" $$ LANGUAGE SQL;"

createResourceTables
:: Connection
-> TableName -- ^ Name of the jobs table
-> ResourceCfg
-> IO ()
createResourceTables conn jobTableName ResourceCfg{..} = do
void $ PGS.execute conn createResourceTableQuery (PGS.Only resCfgResourceTable)
void $ PGS.execute conn createUsageTableQuery
( resCfgUsageTable
, jobTableName
, resCfgResourceTable
)
void $ PGS.execute conn createUsageFunction
( usageFnName
, jobTableName
, resCfgUsageTable
, Locked
)
void $ PGS.execute conn createCheckResourceFunction
( resCfgCheckResourceFunction
, usageFnName
, resCfgUsageTable
, resCfgResourceTable
)
where
usageFnName = PGS.Identifier $ "calculate_usage_for_resource_from_" <> getTnameTxt resCfgUsageTable
getTnameTxt (PGS.QualifiedIdentifier _ tname') = tname'
Loading

0 comments on commit 181cda2

Please sign in to comment.