Skip to content

Commit

Permalink
Merge pull request #57 from diogob/shutdown-on-connection-failure
Browse files Browse the repository at this point in the history
Shutdown server if listener connection dies
  • Loading branch information
diogob authored Jun 12, 2020
2 parents a4c0e55 + 63c7e08 commit b0f9405
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
9 changes: 8 additions & 1 deletion app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,32 @@ main = do
<> " / Connects websockets to PostgreSQL asynchronous notifications."

conf <- loadSecretFile =<< readOptions
shutdownSignal <- newEmptyMVar
let host = configHost conf
port = configPort conf
listenChannel = toS $ configListenChannel conf
pgSettings = toS (configDatabase conf)
waitForShutdown cl = void $ forkIO (takeMVar shutdownSignal >> cl >> die "Shutting server down...")

appSettings = setHost ((fromString . toS) host)
. setPort port
. setServerName (toS $ "postgres-websockets/" <> prettyVersion)
. setTimeout 3600
. setInstallShutdownHandler waitForShutdown
. setGracefulShutdownTimeout (Just 5)
$ defaultSettings

putStrLn $ ("Listening on port " :: Text) <> show (configPort conf)

let shutdown = putErrLn ("Broadcaster connection is dead" :: Text) >> putMVar shutdownSignal ()
pool <- P.acquire (configPool conf, 10, pgSettings)
multi <- newHasqlBroadcaster listenChannel pgSettings
multi <- newHasqlBroadcaster shutdown listenChannel pgSettings
getTime <- mkGetTime

runSettings appSettings $
postgresWsMiddleware getTime listenChannel (configJwtSecret conf) pool multi $
logStdout $ maybe dummyApp staticApp' (configPath conf)

where
mkGetTime :: IO (IO UTCTime)
mkGetTime = mkAutoUpdate defaultUpdateSettings {updateAction = getCurrentTime}
Expand Down
19 changes: 9 additions & 10 deletions src/PostgresWebsockets/HasqlBroadcast.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@ import PostgresWebsockets.Broadcast
{- | Returns a multiplexer from a connection URI, keeps trying to connect in case there is any error.
This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners
-}
newHasqlBroadcaster :: Text -> ByteString -> IO Multiplexer
newHasqlBroadcaster ch = newHasqlBroadcasterForConnection . tryUntilConnected
newHasqlBroadcaster :: IO () -> Text -> ByteString -> IO Multiplexer
newHasqlBroadcaster onConnectionFailure ch = newHasqlBroadcasterForConnection . tryUntilConnected
where
newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel ch
newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch

{- | Returns a multiplexer from a connection URI or an error message on the left case
This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners
-}
newHasqlBroadcasterOrError :: Text -> ByteString -> IO (Either ByteString Multiplexer)
newHasqlBroadcasterOrError ch =
newHasqlBroadcasterOrError :: IO () -> Text -> ByteString -> IO (Either ByteString Multiplexer)
newHasqlBroadcasterOrError onConnectionFailure ch =
acquire >=> (sequence . mapBoth show (newHasqlBroadcasterForConnection . return))
where
newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel ch
newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch

tryUntilConnected :: ByteString -> IO Connection
tryUntilConnected =
Expand Down Expand Up @@ -78,13 +78,12 @@ tryUntilConnected =
@
-}
newHasqlBroadcasterForChannel :: Text -> IO Connection -> IO Multiplexer
newHasqlBroadcasterForChannel ch getCon = do
multi <- newMultiplexer openProducer closeProducer
newHasqlBroadcasterForChannel :: IO () -> Text -> IO Connection -> IO Multiplexer
newHasqlBroadcasterForChannel onConnectionFailure ch getCon = do
multi <- newMultiplexer openProducer $ const onConnectionFailure
void $ relayMessagesForever multi
return multi
where
closeProducer _ = putErrLn "Broadcaster is dead"
toMsg :: ByteString -> ByteString -> Message
toMsg c m = case decode (toS m) of
Just v -> Message (channelDef c v) m
Expand Down
2 changes: 1 addition & 1 deletion test/HasqlBroadcastSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec = describe "newHasqlBroadcaster" $ do
<$> acquire connStr

it "relay messages sent to the appropriate database channel" $ do
multi <- either (panic .show) id <$> newHasqlBroadcasterOrError "postgres-websockets" "postgres://localhost/postgres_ws_test"
multi <- either (panic .show) id <$> newHasqlBroadcasterOrError (pure ()) "postgres-websockets" "postgres://localhost/postgres_ws_test"
msg <- liftIO newEmptyMVar
onMessage multi "test" $ putMVar msg

Expand Down

0 comments on commit b0f9405

Please sign in to comment.