Skip to content

Commit

Permalink
Merge pull request #67 from diogob/metadata-channel
Browse files Browse the repository at this point in the history
Add a configuration option to open a channel for metadata
  • Loading branch information
diogob authored Oct 16, 2020
2 parents 9f2cfc4 + 444fd89 commit 995683e
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 115 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# CHANGELOG

## 0.10.0.0

- Add `PGWS_META_CHANNEL` to configure optional metadata channel to send events from the server. Initially the oply event is `ConnectionOpen`.
- Add property `event` to message JSON. Two possible values so far: `ConnectionOpen` and `WebsocketMessage`.
- Breaking change: the property `channel` is not appended to claims anymore. If `channel` is in the original token claims it will still be present.

## 0.9.0.0

- Add @filename semantics to PGWS_DB_URI configiration variable to allow secret management to use a file instead of an environment variable.
- Add PGWS_RETRIES to limit the amount of times the server tries to open a database connection upon startup (defaults to 5). This breaks backward compatibility if you rely on the behaviour of the server to try infitite times.
- Add `PGWS_RETRIES` to limit the amount of times the server tries to open a database connection upon startup (defaults to 5). This breaks backward compatibility if you rely on the behaviour of the server to try infitite times.

## 0.8.0.1

Expand Down
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ To use a secure socket (`wss://`) you will need a proxy server like nginx to han
Every message received from the browser will be in JSON format as:
```javascript
{
"claims": { "message_delivered_at": 0.0, "a_custom_claim_from_the_jwt": "your_custom_value" },
"event": "WebsocketMessage",
"channel": "destination_channel",
"payload": "message content"
"payload": "message content",
"claims": { "message_delivered_at": 0.0, "a_custom_claim_from_the_jwt": "your_custom_value" }
}
```

Expand All @@ -109,8 +110,19 @@ To send a message to a particular channel on the browser one should notify the p
```sql
SELECT pg_notify(
'postgres-websockets-listener',
json_build_object('channel', 'chat', 'payload', 'test')::text
json_build_object('event', 'WebsocketMessage', 'channel', 'chat', 'payload', 'test')::text
);
```

Where `postgres-websockets-listener` is the database channel used by your instance of postgres-websockets and `chat` is the channel where the browser is connected (the same issued in the JWT used to connect).

## Monitoring Connections

To monitor connection opening one should set the variable `PGWS_META_CHANNEL` which will enable the meta-data messages generation in the server on the channel name specified.
For instamce, if we use the configuration in the [sample-env](./sample-env) we will see messages like the one bellow each time a connection is estabilished (only after the JWT is validated).

```javascript
{"event":"ConnectionOpen","channel":"server-info","payload":"server-info","claims":{"mode":"rw","message_delivered_at":1.602719440727465893e9}}
```

You can monitor these messages on another websocket connection with a proper read token for the channel `server-info` or also having an additional database listener on the `PGWS_LISTEN_CHANNEL`.
6 changes: 5 additions & 1 deletion client-example/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ function jwt() {
}

$(document).ready(function () {
var ws = null;
var ws = null, meta = null;

$('#channel').keyup(updateJWT);
updateJWT();
Expand All @@ -74,6 +74,10 @@ $(document).ready(function () {
if(ws === null){
var jwt = $('#jwt').val();
var channel = $('#channel').val();

meta = createWebSocket('/server-info/' + jwt);
meta.onmessage = onMessage('#meta-messages');

if(channel == ""){
ws = createWebSocket('/' + jwt);
} else {
Expand Down
3 changes: 3 additions & 0 deletions client-example/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ <h2>Chat</h2>
<h2>Messages sent to chat channel</h2>
<div id="messages">
</div>
<h2>Messages sent to meta channel</h2>
<div id="meta-messages">
</div>
</div>
</div>
</body>
Expand Down
3 changes: 2 additions & 1 deletion postgres-websockets.cabal
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: postgres-websockets
version: 0.9.0.0
version: 0.10.0.0
synopsis: Middleware to map LISTEN/NOTIFY messages to Websockets
description: Please see README.md
homepage: https://github.com/diogob/postgres-websockets#readme
Expand All @@ -25,6 +25,7 @@ library
other-modules: Paths_postgres_websockets
, PostgresWebsockets.Server
, PostgresWebsockets.Middleware
, PostgresWebsockets.Context
build-depends: base >= 4.7 && < 5
, hasql-pool >= 0.5 && < 0.6
, text >= 1.2 && < 1.3
Expand Down
3 changes: 3 additions & 0 deletions sample-env
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ export PGWS_ROOT_PATH="./client-example"
## Sends a copy of every message received from websocket clients to the channel specified bellow as an aditional NOTIFY command.
export PGWS_LISTEN_CHANNEL="postgres-websockets-listener"

## Send postgres-websockets server events to this channel (will be sent both to the database and the connected websocket clients)
export PGWS_META_CHANNEL="server-info"

## Host and port on which the websockets server (and the static files server) will be listening.
export PGWS_HOST="*4"
export PGWS_PORT=3000
Expand Down
6 changes: 3 additions & 3 deletions src/PostgresWebsockets.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ module PostgresWebsockets
, postgresWsMiddleware
) where

import PostgresWebsockets.Middleware
import PostgresWebsockets.Server
import PostgresWebsockets.Config
import PostgresWebsockets.Middleware ( postgresWsMiddleware )
import PostgresWebsockets.Server ( serve )
import PostgresWebsockets.Config ( prettyVersion, loadConfig )
11 changes: 6 additions & 5 deletions src/PostgresWebsockets/Claims.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ module PostgresWebsockets.Claims
( ConnectionInfo,validateClaims
) where

import Control.Lens
import qualified Crypto.JOSE.Types as JOSE.Types
import Crypto.JWT
import qualified Data.HashMap.Strict as M
import Protolude
import Protolude
import Control.Lens
import Crypto.JWT
import Data.List
import Data.Time.Clock (UTCTime)
import qualified Crypto.JOSE.Types as JOSE.Types
import qualified Data.HashMap.Strict as M
import qualified Data.Aeson as JSON


type Claims = M.HashMap Text JSON.Value
type ConnectionInfo = ([ByteString], ByteString, Claims)

Expand Down
2 changes: 2 additions & 0 deletions src/PostgresWebsockets/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ data AppConfig = AppConfig {
, configHost :: Text
, configPort :: Int
, configListenChannel :: Text
, configMetaChannel :: Maybe Text
, configJwtSecret :: ByteString
, configJwtSecretIsBase64 :: Bool
, configPool :: Int
Expand Down Expand Up @@ -68,6 +69,7 @@ readOptions =
<*> var str "PGWS_HOST" (def "*4" <> helpDef show <> help "Address the server will listen for websocket connections")
<*> var auto "PGWS_PORT" (def 3000 <> helpDef show <> help "Port the server will listen for websocket connections")
<*> var str "PGWS_LISTEN_CHANNEL" (def "postgres-websockets-listener" <> helpDef show <> help "Master channel used in the database to send or read messages in any notification channel")
<*> optional (var str "PGWS_META_CHANNEL" (help "Websockets channel used to send events about the server state changes."))
<*> var str "PGWS_JWT_SECRET" (help "Secret used to sign JWT tokens used to open communications channels")
<*> var auto "PGWS_JWT_SECRET_BASE64" (def False <> helpDef show <> help "Indicate whether the JWT secret should be decoded from a base64 encoded string")
<*> var auto "PGWS_POOL_SIZE" (def 10 <> helpDef show <> help "How many connection to the database should be used by the connection pool")
Expand Down
39 changes: 39 additions & 0 deletions src/PostgresWebsockets/Context.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{-|
Module : PostgresWebsockets.Context
Description : Produce a context capable of running postgres-websockets sessions
-}
module PostgresWebsockets.Context
( Context (..)
, mkContext
) where

import Protolude
import Data.Time.Clock (UTCTime, getCurrentTime)
import Control.AutoUpdate ( defaultUpdateSettings
, mkAutoUpdate
, updateAction
)
import qualified Hasql.Pool as P

import PostgresWebsockets.Config ( AppConfig(..) )
import PostgresWebsockets.HasqlBroadcast (newHasqlBroadcaster)
import PostgresWebsockets.Broadcast (Multiplexer)

data Context = Context {
ctxConfig :: AppConfig
, ctxPool :: P.Pool
, ctxMulti :: Multiplexer
, ctxGetTime :: IO UTCTime
}

-- | Given a configuration and a shutdown action (performed when the Multiplexer's listen connection dies) produces the context necessary to run sessions
mkContext :: AppConfig -> IO () -> IO Context
mkContext conf@AppConfig{..} shutdown = do
Context conf
<$> P.acquire (configPool, 10, pgSettings)
<*> newHasqlBroadcaster shutdown (toS configListenChannel) configRetries pgSettings
<*> mkGetTime
where
mkGetTime :: IO (IO UTCTime)
mkGetTime = mkAutoUpdate defaultUpdateSettings {updateAction = getCurrentTime}
pgSettings = toS configDatabase
12 changes: 6 additions & 6 deletions src/PostgresWebsockets/HasqlBroadcast.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import Protolude hiding (putErrLn)

import Hasql.Connection
import Hasql.Notifications
import Data.Aeson (decode, Value(..))
import Data.HashMap.Lazy (lookupDefault)
import Data.Aeson (decode, Value(..))
import Data.HashMap.Lazy (lookupDefault)
import Data.Either.Combinators (mapBoth)
import Data.Function (id)
import Control.Retry (RetryStatus(..), retrying, capDelay, exponentialBackoff)
import Data.Function (id)
import Control.Retry (RetryStatus(..), retrying, capDelay, exponentialBackoff)

import PostgresWebsockets.Broadcast

Expand Down Expand Up @@ -99,11 +99,11 @@ newHasqlBroadcasterForChannel onConnectionFailure ch getCon = do
_ -> d
lookupStringDef _ d _ = d
channelDef = lookupStringDef "channel"
openProducer msgs = do
openProducer msgQ = do
con <- getCon
listen con $ toPgIdentifier ch
waitForNotifications
(\c m-> atomically $ writeTQueue msgs $ toMsg c m)
(\c m-> atomically $ writeTQueue msgQ $ toMsg c m)
con

putErrLn :: Text -> IO ()
Expand Down
110 changes: 60 additions & 50 deletions src/PostgresWebsockets/Middleware.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,49 +10,58 @@ module PostgresWebsockets.Middleware
( postgresWsMiddleware
) where

import qualified Hasql.Pool as H
import qualified Hasql.Notifications as H
import qualified Network.Wai as Wai
import Protolude
import Data.Time.Clock (UTCTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime)
import Control.Concurrent.AlarmClock (newAlarmClock, setAlarm)
import qualified Hasql.Notifications as H
import qualified Hasql.Pool as H
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.WebSockets as WS
import qualified Network.WebSockets as WS
import Protolude

import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as M
import qualified Data.Text.Encoding.Error as T
import Data.Time.Clock (UTCTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime)
import Control.Concurrent.AlarmClock (newAlarmClock, setAlarm)
import PostgresWebsockets.Broadcast (Multiplexer, onMessage)
import qualified PostgresWebsockets.Broadcast as B
import PostgresWebsockets.Claims
import qualified Network.WebSockets as WS

import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as BL
import qualified Data.HashMap.Strict as M
import qualified Data.Text.Encoding.Error as T

import PostgresWebsockets.Broadcast (onMessage)
import PostgresWebsockets.Claims ( ConnectionInfo, validateClaims )
import PostgresWebsockets.Context ( Context(..) )
import PostgresWebsockets.Config (AppConfig(..))
import qualified PostgresWebsockets.Broadcast as B


data Event =
WebsocketMessage
| ConnectionOpen
deriving (Show, Eq, Generic)

data Message = Message
{ claims :: A.Object
, channel :: Text
, event :: Event
, payload :: Text
, channel :: Text
} deriving (Show, Eq, Generic)

instance A.ToJSON Event
instance A.ToJSON Message

-- | Given a secret, a function to fetch the system time, a Hasql Pool and a Multiplexer this will give you a WAI middleware.
postgresWsMiddleware :: IO UTCTime -> Text -> ByteString -> H.Pool -> Multiplexer -> Wai.Application -> Wai.Application
postgresWsMiddleware :: Context -> Wai.Middleware
postgresWsMiddleware =
WS.websocketsOr WS.defaultConnectionOptions `compose` wsApp
where
compose = (.) . (.) . (.) . (.) . (.)
WS.websocketsOr WS.defaultConnectionOptions . wsApp

-- private functions
jwtExpirationStatusCode :: Word16
jwtExpirationStatusCode = 3001

-- when the websocket is closed a ConnectionClosed Exception is triggered
-- this kills all children and frees resources for us
wsApp :: IO UTCTime -> Text -> ByteString -> H.Pool -> Multiplexer -> WS.ServerApp
wsApp getTime dbChannel secret pool multi pendingConn =
getTime >>= validateClaims requestChannel secret (toS jwtToken) >>= either rejectRequest forkSessions
wsApp :: Context -> WS.ServerApp
wsApp Context{..} pendingConn =
ctxGetTime >>= validateClaims requestChannel (configJwtSecret ctxConfig) (toS jwtToken) >>= either rejectRequest forkSessions
where
hasRead m = m == ("r" :: ByteString) || m == ("rw" :: ByteString)
hasWrite m = m == ("w" :: ByteString) || m == ("rw" :: ByteString)
Expand Down Expand Up @@ -85,43 +94,44 @@ wsApp getTime dbChannel secret pool multi pendingConn =
Just _ -> pure ()
Nothing -> pure ()

let sendNotification msg channel = sendMessageWithTimestamp $ websocketMessageForChannel msg channel
sendMessageToDatabase = sendToDatabase ctxPool (configListenChannel ctxConfig)
sendMessageWithTimestamp = timestampMessage ctxGetTime >=> sendMessageToDatabase
websocketMessageForChannel = Message validClaims WebsocketMessage
connectionOpenMessage = Message validClaims ConnectionOpen

case configMetaChannel ctxConfig of
Nothing -> pure ()
Just ch -> sendMessageWithTimestamp $ connectionOpenMessage (toS $ BS.intercalate "," chs) ch

when (hasRead mode) $
forM_ chs $ flip (onMessage multi) $ WS.sendTextData conn . B.payload
forM_ chs $ flip (onMessage ctxMulti) $ WS.sendTextData conn . B.payload

when (hasWrite mode) $
let sendNotifications = void . H.notifyPool pool dbChannel . toS
in notifySession validClaims conn getTime sendNotifications chs
notifySession conn sendNotification chs

waitForever <- newEmptyMVar
void $ takeMVar waitForever

-- Having both channel and claims as parameters seem redundant
-- But it allows the function to ignore the claims structure and the source
-- of the channel, so all claims decoding can be coded in the caller
notifySession :: A.Object
-> WS.Connection
-> IO UTCTime
-> (ByteString -> IO ())
-> [ByteString]
-> IO ()
notifySession claimsToSend wsCon getTime send chs =
notifySession :: WS.Connection -> (Text -> Text -> IO ()) -> [ByteString] -> IO ()
notifySession wsCon sendToChannel chs =
withAsync (forever relayData) wait
where
relayData = do
relayData = do
msg <- WS.receiveData wsCon
forM_ chs (relayChannelData msg . toS)
forM_ chs (sendToChannel msg . toS)

relayChannelData msg ch = do
claims' <- claimsWithTime ch
send $ jsonMsg ch claims' msg

-- we need to decode the bytestring to re-encode valid JSON for the notification
jsonMsg :: Text -> M.HashMap Text A.Value -> ByteString -> ByteString
jsonMsg ch cl = BL.toStrict . A.encode . Message cl ch . decodeUtf8With T.lenientDecode

claimsWithTime :: Text -> IO (M.HashMap Text A.Value)
claimsWithTime ch = do
time <- utcTimeToPOSIXSeconds <$> getTime
return $ M.insert "message_delivered_at" (A.Number $ realToFrac time) (claimsWithChannel ch)
sendToDatabase :: H.Pool -> Text -> Message -> IO ()
sendToDatabase pool dbChannel =
notify . jsonMsg
where
notify = void . H.notifyPool pool dbChannel . toS
jsonMsg = BL.toStrict . A.encode

claimsWithChannel ch = M.insert "channel" (A.String ch) claimsToSend
timestampMessage :: IO UTCTime -> Message -> IO Message
timestampMessage getTime msg@Message{..} = do
time <- utcTimeToPOSIXSeconds <$> getTime
return $ msg{ claims = M.insert "message_delivered_at" (A.Number $ realToFrac time) claims}
Loading

0 comments on commit 995683e

Please sign in to comment.