-
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[work-pool] Add work-pool subproject
* This is an extraction from my mutant-manager project.
- Loading branch information
Showing
9 changed files
with
580 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# work-pool | ||
|
||
Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
_common/package: !include "../common/package.yaml" | ||
|
||
name: work-pool | ||
synopsis: Introspectable work pool | ||
homepage: https://github.com/mbj/mhs#readme | ||
github: mbj/mhs | ||
version: 0.0.1 | ||
|
||
<<: *defaults | ||
|
||
dependencies: | ||
- base | ||
- mprelude | ||
- text | ||
- unliftio | ||
|
||
tests: | ||
test: | ||
<<: *test | ||
dependencies: | ||
- containers | ||
- devtools | ||
- tasty | ||
- tasty-hunit | ||
- work-pool |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
module WorkPool | ||
( Config(..) | ||
, Pool | ||
, pushJob | ||
, runPool | ||
) | ||
where | ||
|
||
import MPrelude | ||
import Prelude (succ) | ||
|
||
import qualified UnliftIO.Async as UnliftIO | ||
import qualified UnliftIO.STM as UnliftIO | ||
|
||
-- | Worker pool configuration | ||
data Config a = Config | ||
{ produceJobs :: forall m . MonadUnliftIO m => Pool a -> m () | ||
-- ^ function called from the main thread producing work, use `pushJob` to | ||
-- create workable jobs. | ||
, queueSize :: Natural | ||
-- ^ maximum size of the jobs queued | ||
, workerCount :: Natural | ||
-- ^ number of workers to boot | ||
, workerRun :: forall m . MonadUnliftIO m => Natural -> m (a -> m ()) | ||
-- ^ function called when a worker is booted, argument is the worker index, | ||
-- returns an action to be called per job assigned to this worker. | ||
} | ||
|
||
-- Internal queue event, supplying job or quitting the worker | ||
data Event a = Quit | Job a | ||
|
||
-- | Running pool | ||
newtype Pool a = Pool | ||
{ queue :: UnliftIO.TBQueue (Event a) | ||
} | ||
|
||
-- | Add (dynamically) created a job to the pool | ||
-- | ||
-- This function will block if the max queue size would be overflown. | ||
-- As the workers create space in the queue this function will unblock. | ||
pushJob :: MonadIO m => Pool a -> a -> m () | ||
pushJob Pool{..} item | ||
= UnliftIO.atomically | ||
$ UnliftIO.writeTBQueue queue (Job item) | ||
|
||
-- | Run worker pool with specified config | ||
-- | ||
-- The function will return if either: | ||
-- * the `produceJobs` function returns | ||
-- * a worker or throws an error | ||
-- * the producer throws an error. | ||
-- | ||
-- Care is taken to not leak threads via the use if `withAsync` from the `async` package. | ||
runPool :: forall a m . MonadUnliftIO m => Config a -> m () | ||
runPool Config{..} = do | ||
pool@Pool{..} <- UnliftIO.atomically $ do | ||
queue <- UnliftIO.newTBQueue queueSize | ||
pure Pool{..} | ||
|
||
boot pool $ \handlers -> do | ||
produceJobs pool -- supply jobs | ||
UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit -- signal workers to gracefully exit | ||
traverse_ UnliftIO.wait handlers -- wait for workers to gracefully exit | ||
where | ||
boot :: Pool a -> ([UnliftIO.Async ()] -> m ()) -> m () | ||
boot Pool{..} withAsyncHandlers = go 0 [] | ||
where | ||
go :: Natural -> [UnliftIO.Async ()] -> m () | ||
go index asyncHandlers = | ||
if index == workerCount | ||
then withAsyncHandlers asyncHandlers | ||
else | ||
UnliftIO.withAsync | ||
(workLoop =<< workerRun index) | ||
(\async -> go (succ index) (async:asyncHandlers)) | ||
|
||
workLoop action = | ||
pullJob >>= maybe (pure ()) (\item -> action item >> workLoop action) | ||
|
||
pullJob | ||
= UnliftIO.atomically | ||
$ UnliftIO.readTBQueue queue >>= \case | ||
(Job item) -> pure $ pure item | ||
Quit -> UnliftIO.writeTBQueue queue Quit $> empty |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
import Control.Arrow (left) | ||
import Control.Monad (when) | ||
import MPrelude | ||
import Test.Tasty | ||
import Test.Tasty.HUnit | ||
|
||
import qualified Data.List as List | ||
import qualified Data.Set as Set | ||
import qualified Data.String as String | ||
import qualified Devtools | ||
import qualified UnliftIO.Concurrent as UnliftIO | ||
import qualified UnliftIO.Exception as UnliftIO | ||
import qualified WorkPool | ||
|
||
main :: IO () | ||
main | ||
= defaultMain | ||
$ testGroup "work-pool" | ||
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"]) | ||
, mkSuccess 1 1 | ||
, mkSuccess 1 100 | ||
, mkSuccess 100 1 | ||
, mkSuccess 100 100 | ||
, mkSuccess 1000 1000 | ||
, producerFailure | ||
, workerFailure | ||
] | ||
where | ||
mkSuccess :: Natural -> Natural -> TestTree | ||
mkSuccess queueSize workerCount = | ||
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do | ||
output <- UnliftIO.newMVar [] | ||
WorkPool.runPool $ config output | ||
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output | ||
where | ||
config output = WorkPool.Config{..} | ||
where | ||
workerRun :: MonadUnliftIO m => Natural -> m (Natural -> m ()) | ||
workerRun _index = pure $ \value -> do | ||
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ()) | ||
|
||
workerFailure :: TestTree | ||
workerFailure = testCase "worker failure" $ do | ||
result <- UnliftIO.try (WorkPool.runPool config) | ||
assertEqual "" (Left "intentional error\n") (left formatException result) | ||
where | ||
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..} | ||
|
||
workerRun :: MonadIO m => Natural -> m (Natural -> m ()) | ||
workerRun _index = pure $ \value -> | ||
when (value == 100) $ UnliftIO.throwString "intentional error" | ||
|
||
producerFailure :: TestTree | ||
producerFailure = testCase "producer failure" $ do | ||
result <- UnliftIO.try (WorkPool.runPool config) | ||
assertEqual "" (Left "intentional error\n") (left formatException result) | ||
where | ||
config | ||
= WorkPool.Config | ||
{ produceJobs = produceJobsFailing | ||
, queueSize = 100 | ||
, workerCount = 100 | ||
, .. | ||
} | ||
|
||
produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m () | ||
produceJobsFailing pool = do | ||
WorkPool.pushJob pool 1 | ||
UnliftIO.throwString "intentional error" | ||
|
||
workerRun :: MonadIO m => Natural -> m (Natural -> m ()) | ||
workerRun _index = pure . const $ pure () | ||
|
||
formatException :: UnliftIO.SomeException -> String | ||
formatException | ||
= String.unlines | ||
. List.drop 2 | ||
. List.take 3 | ||
. String.lines | ||
. UnliftIO.displayException | ||
|
||
produceJobs :: MonadIO m => WorkPool.Pool Natural -> m () | ||
produceJobs pool = traverse_ (WorkPool.pushJob pool) values | ||
|
||
values :: [Natural] | ||
values = [0..1000] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
Diff 0.4.1 | ||
OneTuple 0.4.1.1 | ||
QuickCheck 2.14.3 | ||
StateVar 1.2.2 | ||
aeson 2.1.2.1 | ||
alex 3.3.0.0 | ||
ansi-terminal 0.11.5 | ||
ansi-terminal-types 0.11.5 | ||
ansi-wl-pprint 0.6.9 | ||
array 0.5.4.0 | ||
assoc 1.1 | ||
async 2.2.5 | ||
attoparsec 0.14.4 | ||
base 4.17.2.1 | ||
base-compat 0.12.3 | ||
base-compat-batteries 0.12.3 | ||
base-orphans 0.9.1 | ||
bifunctors 5.5.15 | ||
binary 0.8.9.1 | ||
bitvec 1.1.5.0 | ||
bytestring 0.11.5.3 | ||
call-stack 0.4.0 | ||
clock 0.8.4 | ||
cmdargs 0.10.22 | ||
colour 2.3.6 | ||
comonad 5.0.8 | ||
conduit 1.3.5 | ||
containers 0.6.7 | ||
contravariant 1.5.5 | ||
cpphs 1.20.9.1 | ||
data-default 0.7.1.1 | ||
data-default-class 0.1.2.0 | ||
data-default-instances-containers 0.0.1 | ||
data-default-instances-dlist 0.0.1 | ||
data-default-instances-old-locale 0.0.1 | ||
data-fix 0.3.2 | ||
deepseq 1.4.8.0 | ||
deriving-aeson 0.2.9 | ||
devtools 0.2.0 | ||
directory 1.3.7.1 | ||
distributive 0.6.2.1 | ||
dlist 1.0 | ||
exceptions 0.10.5 | ||
extra 1.7.14 | ||
file-embed 0.0.15.0 | ||
filepath 1.4.2.2 | ||
filepattern 0.1.3 | ||
foldable1-classes-compat 0.1 | ||
generically 0.1.1 | ||
ghc 9.4.8 | ||
ghc-bignum 1.3 | ||
ghc-boot 9.4.8 | ||
ghc-boot-th 9.4.8 | ||
ghc-heap 9.4.8 | ||
ghc-lib-parser 9.4.8.20231111 | ||
ghc-lib-parser-ex 9.4.0.0 | ||
ghc-prim 0.9.1 | ||
ghci 9.4.8 | ||
happy 1.20.1.1 | ||
hashable 1.4.3.0 | ||
hlint 3.5 | ||
hpc 0.6.1.0 | ||
hscolour 1.24.4 | ||
indexed-traversable 0.1.3 | ||
indexed-traversable-instances 0.1.1.2 | ||
integer-logarithms 1.0.3.1 | ||
libyaml 0.1.2 | ||
mono-traversable 1.0.15.3 | ||
mprelude 0.2.3 | ||
mtl 2.2.2 | ||
old-locale 1.0.0.7 | ||
optparse-applicative 0.17.1.0 | ||
parsec 3.1.16.1 | ||
polyparse 1.13 | ||
pretty 1.1.3.6 | ||
primitive 0.8.0.0 | ||
process 1.6.18.0 | ||
random 1.2.1.1 | ||
refact 0.3.0.2 | ||
resourcet 1.2.6 | ||
rts 1.0.2 | ||
safe-exceptions 0.1.7.4 | ||
scientific 0.3.7.0 | ||
semialign 1.3 | ||
semigroupoids 5.3.7 | ||
source-constraints 0.0.5 | ||
split 0.2.3.5 | ||
splitmix 0.1.0.5 | ||
stm 2.5.1.0 | ||
strict 0.5 | ||
syb 0.7.2.4 | ||
tagged 0.8.7 | ||
tasty 1.4.3 | ||
tasty-expected-failure 0.12.3 | ||
tasty-hunit 0.10.1 | ||
tasty-mgolden 0.0.2 | ||
template-haskell 2.19.0.0 | ||
terminfo 0.4.1.5 | ||
text 2.0.2 | ||
text-short 0.1.5 | ||
th-abstraction 0.4.5.0 | ||
th-lift 0.8.4 | ||
these 1.2 | ||
time 1.12.2 | ||
time-compat 1.9.6.1 | ||
transformers 0.5.6.2 | ||
transformers-compat 0.7.2 | ||
typed-process 0.2.11.1 | ||
unbounded-delays 0.1.1.1 | ||
uniplate 1.6.13 | ||
unix 2.7.3 | ||
unliftio 0.2.25.0 | ||
unliftio-core 0.2.1.0 | ||
unordered-containers 0.2.19.1 | ||
utf8-string 1.0.2 | ||
uuid-types 1.0.5.1 | ||
vector 0.13.1.0 | ||
vector-algorithms 0.9.0.1 | ||
vector-stream 0.1.0.0 | ||
witherable 0.4.2 | ||
work-pool 0.0.1 | ||
yaml 0.11.11.2 |
Oops, something went wrong.