diff --git a/app/Main.hs b/app/Main.hs index 636b052..be4cb89 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -52,25 +52,32 @@ main = do <> " / Connects websockets to PostgreSQL asynchronous notifications." conf <- loadSecretFile =<< readOptions + shutdownSignal <- newEmptyMVar let host = configHost conf port = configPort conf listenChannel = toS $ configListenChannel conf pgSettings = toS (configDatabase conf) + waitForShutdown cl = void $ forkIO (takeMVar shutdownSignal >> cl >> die "Shutting server down...") + appSettings = setHost ((fromString . toS) host) . setPort port . setServerName (toS $ "postgres-websockets/" <> prettyVersion) . setTimeout 3600 + . setInstallShutdownHandler waitForShutdown + . setGracefulShutdownTimeout (Just 5) $ defaultSettings putStrLn $ ("Listening on port " :: Text) <> show (configPort conf) + let shutdown = putErrLn ("Broadcaster connection is dead" :: Text) >> putMVar shutdownSignal () pool <- P.acquire (configPool conf, 10, pgSettings) - multi <- newHasqlBroadcaster listenChannel pgSettings + multi <- newHasqlBroadcaster shutdown listenChannel pgSettings getTime <- mkGetTime runSettings appSettings $ postgresWsMiddleware getTime listenChannel (configJwtSecret conf) pool multi $ logStdout $ maybe dummyApp staticApp' (configPath conf) + where mkGetTime :: IO (IO UTCTime) mkGetTime = mkAutoUpdate defaultUpdateSettings {updateAction = getCurrentTime} diff --git a/src/PostgresWebsockets/HasqlBroadcast.hs b/src/PostgresWebsockets/HasqlBroadcast.hs index 844724c..66b933e 100644 --- a/src/PostgresWebsockets/HasqlBroadcast.hs +++ b/src/PostgresWebsockets/HasqlBroadcast.hs @@ -26,19 +26,19 @@ import PostgresWebsockets.Broadcast {- | Returns a multiplexer from a connection URI, keeps trying to connect in case there is any error. This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners -} -newHasqlBroadcaster :: Text -> ByteString -> IO Multiplexer -newHasqlBroadcaster ch = newHasqlBroadcasterForConnection . tryUntilConnected +newHasqlBroadcaster :: IO () -> Text -> ByteString -> IO Multiplexer +newHasqlBroadcaster onConnectionFailure ch = newHasqlBroadcasterForConnection . tryUntilConnected where - newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel ch + newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch {- | Returns a multiplexer from a connection URI or an error message on the left case This function also spawns a thread that keeps relaying the messages from the database to the multiplexer's listeners -} -newHasqlBroadcasterOrError :: Text -> ByteString -> IO (Either ByteString Multiplexer) -newHasqlBroadcasterOrError ch = +newHasqlBroadcasterOrError :: IO () -> Text -> ByteString -> IO (Either ByteString Multiplexer) +newHasqlBroadcasterOrError onConnectionFailure ch = acquire >=> (sequence . mapBoth show (newHasqlBroadcasterForConnection . return)) where - newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel ch + newHasqlBroadcasterForConnection = newHasqlBroadcasterForChannel onConnectionFailure ch tryUntilConnected :: ByteString -> IO Connection tryUntilConnected = @@ -78,13 +78,12 @@ tryUntilConnected = @ -} -newHasqlBroadcasterForChannel :: Text -> IO Connection -> IO Multiplexer -newHasqlBroadcasterForChannel ch getCon = do - multi <- newMultiplexer openProducer closeProducer +newHasqlBroadcasterForChannel :: IO () -> Text -> IO Connection -> IO Multiplexer +newHasqlBroadcasterForChannel onConnectionFailure ch getCon = do + multi <- newMultiplexer openProducer $ const onConnectionFailure void $ relayMessagesForever multi return multi where - closeProducer _ = putErrLn "Broadcaster is dead" toMsg :: ByteString -> ByteString -> Message toMsg c m = case decode (toS m) of Just v -> Message (channelDef c v) m diff --git a/test/HasqlBroadcastSpec.hs b/test/HasqlBroadcastSpec.hs index ef59a8d..49e5121 100644 --- a/test/HasqlBroadcastSpec.hs +++ b/test/HasqlBroadcastSpec.hs @@ -15,7 +15,7 @@ spec = describe "newHasqlBroadcaster" $ do <$> acquire connStr it "relay messages sent to the appropriate database channel" $ do - multi <- either (panic .show) id <$> newHasqlBroadcasterOrError "postgres-websockets" "postgres://localhost/postgres_ws_test" + multi <- either (panic .show) id <$> newHasqlBroadcasterOrError (pure ()) "postgres-websockets" "postgres://localhost/postgres_ws_test" msg <- liftIO newEmptyMVar onMessage multi "test" $ putMVar msg