diff --git a/.gitignore b/.gitignore index 3b9c2d0a..3fd028cb 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ Main.hs .stack-work TAGS stack*.yaml.lock +.vscode diff --git a/.travis.yml b/.travis.yml index 994cd526..2b8f448e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,13 @@ language: c # sudo: false -dist: bionic +dist: focal + +# addons: +# apt: +# sources: +# - sourceline: ppa:redislabs/redis +# packages: +# - redis cache: directories: @@ -8,28 +15,28 @@ cache: - $HOME/.cabal - $HOME/.stack +services: + - docker + before_install: + - sudo apt-get update + - sudo apt-get -y install redis-server - mkdir -p ~/.local/bin - mkdir -p ~/tmp - export PATH=~/.local/bin:$PATH - - curl -L https://github.com/commercialhaskell/stack/releases/download/v2.1.3/stack-2.1.3-linux-x86_64.tar.gz | tar xz -C ~/tmp - - mv ~/tmp/stack-2.1.3-linux-x86_64/stack ~/.local/bin/ - # - curl -L https://github.com/antirez/redis/archive/5.0.2.tar.gz | tar xz -C ~/tmp - # - cd ~/tmp/redis-5.0.2 && make - # - ~/tmp/redis-5.0.2/src/redis-server & - - sudo add-apt-repository -y ppa:chris-lea/redis-server - - sudo apt-get update - - sudo apt-get -y install redis-server + - curl -L https://github.com/commercialhaskell/stack/releases/download/v2.5.1/stack-2.5.1-linux-x86_64.tar.gz | tar xz -C ~/tmp + - mv ~/tmp/stack-2.5.1-linux-x86_64/stack ~/.local/bin/ - cd ${TRAVIS_BUILD_DIR} + - docker run -d -p 7000-7010:7000-7010 grokzen/redis-cluster:5.0.6 matrix: include: - # - env: GHCVER=7.10.3 STACK_YAML=stack-7.10.yaml - - env: GHCVER=8.0.1 STACK_YAML=stack-8.0.yaml - env: GHCVER=8.2.2 STACK_YAML=stack-8.2.yaml - env: GHCVER=8.4.1 STACK_YAML=stack-8.4.yaml - env: GHCVER=8.6.5 STACK_YAML=stack-8.6.yaml - env: GHCVER=8.8.1 STACK_YAML=stack-8.8.yaml + - env: GHCVER=8.10.4 STACK_YAML=stack-8.10.yaml + - env: GHCVER=8.10.6 STACK_YAML=stack-8.10.yaml allow_failures: - env: GHCVER=head STACK_YAML=stack-head.yaml diff --git a/CHANGELOG b/CHANGELOG index 016b26f5..228b0fe8 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,52 @@ # Changelog for Hedis +## 0.15.2 +* PR #14. Updated with hedis upstream +* PR #13. Added timeout for commands +* PR #11. Send the command to another node while exception occurred on running the query. + +## 0.15.1 + +* PR #181. Add MonadUnliftIO instance + +## 0.15.0 + +* PR #174, Issue #173. Hedis fails to decode xstreamInfo response in case when the stream is empty + +## 0.14.3 + +* PR #171. Support GHC 9 + +## 0.14.2 + +* PR #163. support for redis 6.0 COMMAND format +* PR #164. remove invalid tests for Redis Cluster + +## 0.14.1 + +* PR #162. Improved documentation for EVALSHA + +## 0.14.0 + +* PR #157. Clustering support + +## 0.13.1 + +* PR #158. Upgrade to Redis 6.0.9 & Fix auth test +* PR #160. Fix GHC 8.0.1 compat + +## 0.13.0 + +* PR #159. Issue #152. Make HSET return integer instead of bool + +## 0.12.15 + +* PR #154. Implement Redis Sentinel support + +## 0.12.14 + +* PR #153. Publicly expose ConnectTimeout exception + ## 0.12.13 * PR #150, Issue #143. Leaking sockets when connection fails diff --git a/cabal.project b/cabal.project new file mode 100644 index 00000000..3b98a43d --- /dev/null +++ b/cabal.project @@ -0,0 +1,2 @@ +packages: + ./ diff --git a/codegen/commands.json b/codegen/commands.json index 8066249a..11382b8d 100644 --- a/codegen/commands.json +++ b/codegen/commands.json @@ -1203,7 +1203,7 @@ ], "since": "2.0.0", "group": "hash", - "returns": "bool" + "returns": "integer" }, "HSETNX": { "summary": "Set the value of a hash field, only if the field does not exist", diff --git a/hedis.cabal b/hedis.cabal index d0dcb519..2fb965dd 100644 --- a/hedis.cabal +++ b/hedis.cabal @@ -1,5 +1,5 @@ name: hedis -version: 0.12.13 +version: 0.15.2 synopsis: Client library for the Redis datastore: supports full command set, pipelining. @@ -43,7 +43,7 @@ maintainer: Kostiantyn Rybnikov copyright: Copyright (c) 2011 Falko Peters category: Database build-type: Simple -cabal-version: >=1.8 +cabal-version: >=1.10 homepage: https://github.com/informatikr/hedis bug-reports: https://github.com/informatikr/hedis/issues extra-source-files: CHANGELOG @@ -58,6 +58,7 @@ flag dev manual: True library + default-language: Haskell2010 hs-source-dirs: src ghc-options: -Wall -fwarn-tabs if impl(ghc >= 8.6.0) @@ -67,6 +68,7 @@ library if flag(dev) ghc-prof-options: -auto-all exposed-modules: Database.Redis + , Database.Redis.Sentinel , Database.Redis.Core.Internal build-depends: scanner >= 0.2, async >= 2.1, @@ -75,6 +77,7 @@ library bytestring-lexing >= 0.5, exceptions, unordered-containers, + containers, text, deepseq, mtl >= 2, @@ -86,12 +89,17 @@ library vector >= 0.9, HTTP, errors, - network-uri + network-uri, + unliftio-core if !impl(ghc >= 8.0) build-depends: semigroups >= 0.11 && < 0.19 other-modules: Database.Redis.Core, + Database.Redis.Connection, + Database.Redis.Cluster, + Database.Redis.Cluster.HashSlot, + Database.Redis.Cluster.Command, Database.Redis.ProtocolPipelining, Database.Redis.Protocol, Database.Redis.PubSub, @@ -99,9 +107,12 @@ library Database.Redis.Types Database.Redis.Commands, Database.Redis.ManualCommands, - Database.Redis.URL + Database.Redis.URL, + Database.Redis.ConnectionContext + other-extensions: StrictData benchmark hedis-benchmark + default-language: Haskell2010 type: exitcode-stdio-1.0 main-is: benchmark/Benchmark.hs build-depends: @@ -116,10 +127,38 @@ benchmark hedis-benchmark ghc-prof-options: -auto-all test-suite hedis-test + default-language: Haskell2010 type: exitcode-stdio-1.0 hs-source-dirs: test - main-is: Test.hs + main-is: Main.hs other-modules: PubSubTest + Tests + build-depends: + base == 4.*, + bytestring >= 0.10, + hedis, + HUnit, + async, + stm, + text, + mtl == 2.*, + test-framework, + test-framework-hunit, + time + -- We use -O0 here, since GHC takes *very* long to compile so many constants + ghc-options: -O0 -Wall -rtsopts -fno-warn-unused-do-bind + if flag(dev) + ghc-options: -Werror + if flag(dev) + ghc-prof-options: -auto-all + +test-suite hedis-test-cluster + default-language: Haskell2010 + type: exitcode-stdio-1.0 + hs-source-dirs: test + main-is: ClusterMain.hs + other-modules: PubSubTest + Tests build-depends: base == 4.*, bytestring >= 0.10, @@ -140,6 +179,7 @@ test-suite hedis-test ghc-prof-options: -auto-all test-suite doctest + default-language: Haskell2010 type: exitcode-stdio-1.0 main-is: DocTest.hs ghc-options: -O0 -rtsopts diff --git a/src/Database/Redis.hs b/src/Database/Redis.hs index f723bf96..37e04eb1 100644 --- a/src/Database/Redis.hs +++ b/src/Database/Redis.hs @@ -23,7 +23,7 @@ module Database.Redis ( -- @ -- -- Send commands to the server: - -- + -- -- @ -- {-\# LANGUAGE OverloadedStrings \#-} -- ... @@ -114,7 +114,7 @@ module Database.Redis ( -- The Redis Scripting website () -- documents the exact semantics of the scripting commands and value -- conversion. - + -- ** Automatic Pipelining -- |Commands are automatically pipelined as much as possible. For example, -- in the above \"hello world\" example, all four commands are pipelined. @@ -130,7 +130,7 @@ module Database.Redis ( -- sent only when at least one reply has been received. That means, command -- functions may block until there are less than 1000 outstanding replies. -- - + -- ** Error Behavior -- | -- [Operations against keys holding the wrong kind of value:] Outside of a @@ -155,7 +155,7 @@ module Database.Redis ( -- sure it is not left in an unusable state, e.g. closed or inside a -- transaction. -- - + -- * The Redis Monad Redis(), runRedis, unRedis, reRedis, @@ -164,22 +164,24 @@ module Database.Redis ( -- * Connection Connection, ConnectError(..), connect, checkedConnect, disconnect, withConnect, withCheckedConnect, - ConnectInfo(..), defaultConnectInfo, parseConnectInfo, + ConnectInfo(..), defaultConnectInfo, parseConnectInfo, connectCluster, PortID(..), -- * Commands module Database.Redis.Commands, - + -- * Transactions module Database.Redis.Transactions, - + -- * Pub\/Sub module Database.Redis.PubSub, -- * Low-Level Command API sendRequest, + sendToAllMasterNodes, Reply(..),Status(..),RedisResult(..),ConnectionLostException(..), - + ConnectTimeout(..), + -- |[Solution to Exercise] -- -- Type of 'expire' inside a transaction: @@ -190,15 +192,28 @@ module Database.Redis ( -- -- > lindex :: ByteString -> Integer -> Redis (Either Reply ByteString) -- + HashSlot, keyToSlot ) where import Database.Redis.Core +import Database.Redis.Connection + ( runRedis + , connectCluster + , defaultConnectInfo + , ConnectInfo(..) + , disconnect + , checkedConnect + , connect + , ConnectError(..) + , Connection(..) + , withConnect + , withCheckedConnect) +import Database.Redis.ConnectionContext(PortID(..), ConnectionLostException(..), ConnectTimeout(..)) import Database.Redis.PubSub import Database.Redis.Protocol -import Database.Redis.ProtocolPipelining - (PortID(..), ConnectionLostException(..)) import Database.Redis.Transactions import Database.Redis.Types import Database.Redis.URL import Database.Redis.Commands +import Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot) diff --git a/src/Database/Redis/Cluster.hs b/src/Database/Redis/Cluster.hs new file mode 100644 index 00000000..e22bec04 --- /dev/null +++ b/src/Database/Redis/Cluster.hs @@ -0,0 +1,500 @@ +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE ScopedTypeVariables #-} +module Database.Redis.Cluster + ( Connection(..) + , NodeRole(..) + , NodeConnection(..) + , Node(..) + , ShardMap(..) + , HashSlot + , Shard(..) + , connect + , disconnect + , requestPipelined + , requestMasterNodes + , nodes +) where + +import qualified Data.ByteString as B +import qualified Data.ByteString.Char8 as Char8 +import qualified Data.IORef as IOR +import Data.Maybe(mapMaybe, fromMaybe) +import Data.List(nub, sortBy, find) +import Data.Map(fromListWith, assocs) +import Data.Function(on) +import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try) +import Control.Concurrent.Async(race) +import Control.Concurrent(threadDelay) +import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_) +import Control.Monad(zipWithM, when, replicateM) +import Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot) +import qualified Database.Redis.ConnectionContext as CC +import qualified Data.HashMap.Strict as HM +import qualified Data.IntMap.Strict as IntMap +import qualified Data.Time as Time +import Data.Typeable +import qualified Scanner +import System.Environment (lookupEnv) +import System.IO.Unsafe(unsafeInterleaveIO) +import Text.Read (readMaybe) + +import Database.Redis.Protocol(Reply(Error), renderRequest, reply) +import qualified Database.Redis.Cluster.Command as CMD + +-- This module implements a clustered connection whilst maintaining +-- compatibility with the original Hedis codebase. In particular it still +-- performs implicit pipelining using `unsafeInterleaveIO` as the single node +-- codebase does. To achieve this each connection carries around with it a +-- pipeline of commands. Every time `sendRequest` is called the command is +-- added to the pipeline and an IO action is returned which will, upon being +-- evaluated, execute the entire pipeline. If the pipeline is already executed +-- then it just looks up it's response in the executed pipeline. + +-- | A connection to a redis cluster, it is composed of a map from Node IDs to +-- | 'NodeConnection's, a 'Pipeline', and a 'ShardMap' +type IsReadOnly = Bool + +data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly + +-- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection' +data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID + +instance Eq NodeConnection where + (NodeConnection _ _ id1) == (NodeConnection _ _ id2) = id1 == id2 + +instance Ord NodeConnection where + compare (NodeConnection _ _ id1) (NodeConnection _ _ id2) = compare id1 id2 + +data PipelineState = + -- Nothing in the pipeline has been evaluated yet so nothing has been + -- sent + Pending [[B.ByteString]] + -- This pipeline has been executed, the replies are contained within it + | Executed [Reply] + -- We're in a MULTI-EXEC transaction. All commands in the transaction + -- should go to the same node, but we won't know what node that is until + -- we see a command with a key. We're storing these transactions and will + -- send them all together when we see an EXEC. + | TransactionPending [[B.ByteString]] +-- A pipeline has an MVar for the current state, this state is actually always +-- `Pending` because the first thing the implementation does when executing a +-- pipeline is to take the current pipeline state out of the MVar and replace +-- it with a new `Pending` state. The executed state is held on to by the +-- replies within it. + +newtype Pipeline = Pipeline (MVar PipelineState) + +data NodeRole = Master | Slave deriving (Show, Eq, Ord) + +type Host = String +type Port = Int +type NodeID = B.ByteString +-- Represents a single node, note that this type does not include the +-- connection to the node because the shard map can be shared amongst multiple +-- connections +data Node = Node NodeID NodeRole Host Port deriving (Show, Eq, Ord) + +type MasterNode = Node +type SlaveNode = Node + +-- A 'shard' is a master node and 0 or more slaves, (the 'master', 'slave' +-- terminology is unfortunate but I felt it better to follow the documentation +-- until it changes). +data Shard = Shard MasterNode [SlaveNode] deriving (Show, Eq, Ord) + +-- A map from hashslot to shards +newtype ShardMap = ShardMap (IntMap.IntMap Shard) deriving (Show) + +newtype MissingNodeException = MissingNodeException [B.ByteString] deriving (Show, Typeable) +instance Exception MissingNodeException + +newtype UnsupportedClusterCommandException = UnsupportedClusterCommandException [B.ByteString] deriving (Show, Typeable) +instance Exception UnsupportedClusterCommandException + +newtype CrossSlotException = CrossSlotException [[B.ByteString]] deriving (Show, Typeable) +instance Exception CrossSlotException + +data NoNodeException = NoNodeException deriving (Show, Typeable) +instance Exception NoNodeException + +connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> (NodeConnection -> IO ShardMap) -> IO Connection +connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap = do + shardMap <- readMVar shardMapVar + stateVar <- newMVar $ Pending [] + pipelineVar <- newMVar $ Pipeline stateVar + (eNodeConns, shouldRetry) <- nodeConnections shardMap + -- whenever one of the node connection is not established, + -- will refresh the slots and retry node connections. + -- This would handle fail over, IP change use cases. + nodeConns <- + if shouldRetry + then if not (HM.null eNodeConns) + then do + newShardMap <- refreshShardMap (head $ HM.elems eNodeConns) + refreshShardMapVar newShardMap + simpleNodeConnections newShardMap + else + throwIO NoNodeException + else + return eNodeConns + return $ Connection nodeConns pipelineVar shardMapVar (CMD.newInfoMap commandInfos) isReadOnly where + simpleNodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection) + simpleNodeConnections shardMap = HM.fromList <$> mapM connectNode (nub $ nodes shardMap) + nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection, Bool) + nodeConnections shardMap = do + info <- mapM (try . connectNode) (nub $ nodes shardMap) + return $ + foldl (\(acc, accB) x -> case x of + Right (v, nc) -> (HM.insert v nc acc, accB) + Left (_ :: SomeException) -> (acc, True) + ) (mempty, False) info + connectNode :: Node -> IO (NodeID, NodeConnection) + connectNode (Node n _ host port) = do + ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt + ref <- IOR.newIORef Nothing + return (n, NodeConnection ctx ref n) + refreshShardMapVar :: ShardMap -> IO () + refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap)) + +disconnect :: Connection -> IO () +disconnect (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where + disconnectNode (NodeConnection nodeCtx _ _) = CC.disconnect nodeCtx + +-- Add a request to the current pipeline for this connection. The pipeline will +-- be executed implicitly as soon as any result returned from this function is +-- evaluated. +requestPipelined :: IO ShardMap -> Connection -> [B.ByteString] -> IO Reply +requestPipelined refreshAction conn@(Connection _ pipelineVar shardMapVar _ _) nextRequest = modifyMVar pipelineVar $ \(Pipeline stateVar) -> do + (newStateVar, repliesIndex) <- hasLocked $ modifyMVar stateVar $ \case + Pending requests | isMulti nextRequest -> do + replies <- evaluatePipeline shardMapVar refreshAction conn requests + s' <- newMVar $ TransactionPending [nextRequest] + return (Executed replies, (s', 0)) + Pending requests | length requests > 1000 -> do + replies <- evaluatePipeline shardMapVar refreshAction conn (nextRequest:requests) + return (Executed replies, (stateVar, length requests)) + Pending requests -> + return (Pending (nextRequest:requests), (stateVar, length requests)) + TransactionPending requests -> + if isExec nextRequest then do + replies <- evaluateTransactionPipeline shardMapVar refreshAction conn (nextRequest:requests) + return (Executed replies, (stateVar, length requests)) + else + return (TransactionPending (nextRequest:requests), (stateVar, length requests)) + e@(Executed _) -> do + s' <- newMVar $ + if isMulti nextRequest then + TransactionPending [nextRequest] + else + Pending [nextRequest] + return (e, (s', 0)) + evaluateAction <- unsafeInterleaveIO $ do + replies <- hasLocked $ modifyMVar newStateVar $ \case + Executed replies -> + return (Executed replies, replies) + Pending requests-> do + replies <- evaluatePipeline shardMapVar refreshAction conn requests + return (Executed replies, replies) + TransactionPending requests-> do + replies <- evaluateTransactionPipeline shardMapVar refreshAction conn requests + return (Executed replies, replies) + return $ replies !! repliesIndex + return (Pipeline newStateVar, evaluateAction) + +isMulti :: [B.ByteString] -> Bool +isMulti ("MULTI" : _) = True +isMulti _ = False + +isExec :: [B.ByteString] -> Bool +isExec ("EXEC" : _) = True +isExec _ = False + +data PendingRequest = PendingRequest Int [B.ByteString] +data CompletedRequest = CompletedRequest Int [B.ByteString] Reply + +rawRequest :: PendingRequest -> [B.ByteString] +rawRequest (PendingRequest _ r) = r + +responseIndex :: CompletedRequest -> Int +responseIndex (CompletedRequest i _ _) = i + +rawResponse :: CompletedRequest -> Reply +rawResponse (CompletedRequest _ _ r) = r + +-- The approach we take here is similar to that taken by the redis-py-cluster +-- library, which is described at https://redis-py-cluster.readthedocs.io/en/master/pipelines.html +-- +-- Essentially we group all the commands by node (based on the current shardmap) +-- and then execute a pipeline for each node (maintaining the order of commands +-- on a per node basis but not between nodes). Once we've done this, if any of +-- the commands have resulted in a MOVED error we refresh the shard map, then +-- we run through all the responses and retry any MOVED or ASK errors. This retry +-- step is not pipelined, there is a request per error. This is probably +-- acceptable in most cases as these errors should only occur in the case of +-- cluster reconfiguration events, which should be rare. +evaluatePipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply] +evaluatePipeline shardMapVar refreshShardmapAction conn requests = do + shardMap <- hasLocked $ readMVar shardMapVar + requestsByNode <- getRequestsByNode shardMap + -- catch the exception thrown at each node level + -- send the command to random node. + -- merge the current responses with new responses. + eresps <- mapM (try . uncurry executeRequests) requestsByNode + -- take a random connection where there are no exceptions. + -- PERF_CONCERN: Since usually we send only one request at time, this won't be + -- heavy perf issue. but still should be evaluated and figured out with complete rewrite. + resps <- concat <$> mapM (\(resp, (cc, r)) -> case resp of + Right v -> return v + Left (_ :: SomeException) -> executeRequests (getRandomConnection cc conn) r + ) (zip eresps requestsByNode) + -- check for any moved in both responses and continue the flow. + when (any (moved . rawResponse) resps) refreshShardMapVar + retriedResps <- mapM (retry 0) resps + return $ map rawResponse $ sortBy (on compare responseIndex) retriedResps + where + getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])] + getRequestsByNode shardMap = do + commandsWithNodes <- zipWithM (requestWithNodes shardMap) (reverse [0..(length requests - 1)]) requests + return $ assocs $ fromListWith (++) (mconcat commandsWithNodes) + requestWithNodes :: ShardMap -> Int -> [B.ByteString] -> IO [(NodeConnection, [PendingRequest])] + requestWithNodes shardMap index request = do + nodeConns <- nodeConnectionForCommand conn shardMap request + return $ (, [PendingRequest index request]) <$> nodeConns + executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest] + executeRequests nodeConn nodeRequests = do + replies <- requestNode nodeConn $ map rawRequest nodeRequests + return $ zipWith (curry (\(PendingRequest i r, rep) -> CompletedRequest i r rep)) nodeRequests replies + retry :: Int -> CompletedRequest -> IO CompletedRequest + retry retryCount (CompletedRequest index request thisReply) = do + retryReply <- head <$> retryBatch shardMapVar refreshShardmapAction conn retryCount [request] [thisReply] + return (CompletedRequest index request retryReply) + refreshShardMapVar :: IO () + refreshShardMapVar = hasLocked $ modifyMVar_ shardMapVar (const refreshShardmapAction) + +-- Retry a batch of requests if any of the responses is a redirect instruction. +-- If multiple requests are passed in they're assumed to be a MULTI..EXEC +-- transaction and will all be retried. +retryBatch :: MVar ShardMap -> IO ShardMap -> Connection -> Int -> [[B.ByteString]] -> [Reply] -> IO [Reply] +retryBatch shardMapVar refreshShardmapAction conn retryCount requests replies = + -- The last reply will be the `EXEC` reply containing the redirection, if + -- there is one. + case last replies of + (Error errString) | B.isPrefixOf "MOVED" errString -> do + let (Connection _ _ _ infoMap _) = conn + keys <- mconcat <$> mapM (requestKeys infoMap) requests + hashSlot <- hashSlotForKeys (CrossSlotException requests) keys + nodeConn <- nodeConnForHashSlot shardMapVar conn (MissingNodeException (head requests)) hashSlot + requestNode nodeConn requests + (askingRedirection -> Just (host, port)) -> do + shardMap <- hasLocked $ readMVar shardMapVar + let maybeAskNode = nodeConnWithHostAndPort shardMap conn host port + case maybeAskNode of + Just askNode -> tail <$> requestNode askNode (["ASKING"] : requests) + Nothing -> case retryCount of + 0 -> do + _ <- hasLocked $ modifyMVar_ shardMapVar (const refreshShardmapAction) + retryBatch shardMapVar refreshShardmapAction conn (retryCount + 1) requests replies + _ -> throwIO $ MissingNodeException (head requests) + _ -> return replies + +-- Like `evaluateOnPipeline`, except we expect to be able to run all commands +-- on a single shard. Failing to meet this expectation is an error. +evaluateTransactionPipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply] +evaluateTransactionPipeline shardMapVar refreshShardmapAction conn requests' = do + let requests = reverse requests' + let (Connection _ _ _ infoMap _) = conn + keys <- mconcat <$> mapM (requestKeys infoMap) requests + -- In cluster mode Redis expects commands in transactions to all work on the + -- same hashslot. We find that hashslot here. + -- We could be more permissive and allow transactions that touch multiple + -- hashslots, as long as those hashslots are on the same node. This allows + -- a new failure case though: if some of the transactions hashslots are + -- moved to a different node we could end up in a situation where some of + -- the commands in a transaction are applied and some are not. Better to + -- fail early. + hashSlot <- hashSlotForKeys (CrossSlotException requests) keys + nodeConn <- nodeConnForHashSlot shardMapVar conn (MissingNodeException (head requests)) hashSlot + -- catch the exception thrown, send the command to random node. + -- This change is required to handle the cluster topology change. + eresps <- try $ requestNode nodeConn requests + resps <- + case eresps of + Right v -> return v + Left (_ :: SomeException) -> requestNode (getRandomConnection nodeConn conn) requests + -- The Redis documentation has the following to say on the effect of + -- resharding on multi-key operations: + -- + -- Multi-key operations may become unavailable when a resharding of the + -- hash slot the keys belong to is in progress. + -- + -- More specifically, even during a resharding the multi-key operations + -- targeting keys that all exist and all still hash to the same slot + -- (either the source or destination node) are still available. + -- + -- Operations on keys that don't exist or are - during the resharding - + -- split between the source and destination nodes, will generate a + -- -TRYAGAIN error. The client can try the operation after some time, + -- or report back the error. + -- + -- https://redis.io/topics/cluster-spec#multiple-keys-operations + -- + -- An important take-away here is that MULTI..EXEC transactions can fail + -- with a redirect in which case we need to repeat the full transaction on + -- the node we're redirected too. + -- + -- A second important takeway is that MULTI..EXEC transactions might + -- temporarily fail during resharding with a -TRYAGAIN error. We can only + -- make arbitrary decisions about how long to paus before the retry and how + -- often to retry, so instead we'll propagate the error to the library user + -- and let them decide how they would like to handle the error. + when (any moved resps) + (hasLocked $ modifyMVar_ shardMapVar (const refreshShardmapAction)) + retriedResps <- retryBatch shardMapVar refreshShardmapAction conn 0 requests resps + return retriedResps + +nodeConnForHashSlot :: Exception e => MVar ShardMap -> Connection -> e -> HashSlot -> IO NodeConnection +nodeConnForHashSlot shardMapVar conn exception hashSlot = do + let (Connection nodeConns _ _ _ _) = conn + (ShardMap shardMap) <- hasLocked $ readMVar shardMapVar + node <- + case IntMap.lookup (fromEnum hashSlot) shardMap of + Nothing -> throwIO exception + Just (Shard master _) -> return master + case HM.lookup (nodeId node) nodeConns of + Nothing -> throwIO exception + Just nodeConn' -> return nodeConn' + +hashSlotForKeys :: Exception e => e -> [B.ByteString] -> IO HashSlot +hashSlotForKeys exception keys = + case nub (keyToSlot <$> keys) of + -- If none of the commands contain a key we can send them to any + -- node. Let's pick the first one. + [] -> return 0 + [hashSlot] -> return hashSlot + _ -> throwIO $ exception + +requestKeys :: CMD.InfoMap -> [B.ByteString] -> IO [B.ByteString] +requestKeys infoMap request = + case CMD.keysForRequest infoMap request of + Nothing -> throwIO $ UnsupportedClusterCommandException request + Just k -> return k + +askingRedirection :: Reply -> Maybe (Host, Port) +askingRedirection (Error errString) = case Char8.words errString of + ["ASK", _, hostport] -> case Char8.split ':' hostport of + [host, portString] -> case Char8.readInt portString of + Just (port,"") -> Just (Char8.unpack host, port) + _ -> Nothing + _ -> Nothing + _ -> Nothing +askingRedirection _ = Nothing + +moved :: Reply -> Bool +moved (Error errString) = case Char8.words errString of + "MOVED":_ -> True + _ -> False +moved _ = False + + +nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection +nodeConnWithHostAndPort shardMap (Connection nodeConns _ _ _ _) host port = do + node <- nodeWithHostAndPort shardMap host port + HM.lookup (nodeId node) nodeConns + +nodeConnectionForCommand :: Connection -> ShardMap -> [B.ByteString] -> IO [NodeConnection] +nodeConnectionForCommand conn@(Connection nodeConns _ _ infoMap _) (ShardMap shardMap) request = + case request of + ("FLUSHALL" : _) -> allNodes + ("FLUSHDB" : _) -> allNodes + ("QUIT" : _) -> allNodes + ("UNWATCH" : _) -> allNodes + _ -> do + keys <- requestKeys infoMap request + hashSlot <- hashSlotForKeys (CrossSlotException [request]) keys + node <- case IntMap.lookup (fromEnum hashSlot) shardMap of + Nothing -> throwIO $ MissingNodeException request + Just (Shard master _) -> return master + maybe (throwIO $ MissingNodeException request) (return . return) (HM.lookup (nodeId node) nodeConns) + where + allNodes = + case allMasterNodes conn (ShardMap shardMap) of + Nothing -> throwIO $ MissingNodeException request + Just allNodes' -> return allNodes' + +allMasterNodes :: Connection -> ShardMap -> Maybe [NodeConnection] +allMasterNodes (Connection nodeConns _ _ _ _) (ShardMap shardMap) = + mapM (flip HM.lookup nodeConns . nodeId) onlyMasterNodes + where + onlyMasterNodes = (\(Shard master _) -> master) <$> nub (IntMap.elems shardMap) + +requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply] +requestNode (NodeConnection ctx lastRecvRef _) requests = do + envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT" + eresp <- race requestNodeImpl (threadDelay envTimeout) + case eresp of + Left e -> return e + Right _ -> putStrLn "timeout happened" *> throwIO NoNodeException + + where + requestNodeImpl :: IO [Reply] + requestNodeImpl = do + mapM_ (sendNode . renderRequest) requests + _ <- CC.flush ctx + replicateM (length requests) recvNode + sendNode :: B.ByteString -> IO () + sendNode = CC.send ctx + recvNode :: IO Reply + recvNode = do + maybeLastRecv <- IOR.readIORef lastRecvRef + scanResult <- case maybeLastRecv of + Just lastRecv -> Scanner.scanWith (CC.recv ctx) reply lastRecv + Nothing -> Scanner.scanWith (CC.recv ctx) reply B.empty + + case scanResult of + Scanner.Fail{} -> CC.errConnClosed + Scanner.More{} -> error "Hedis: parseWith returned Partial" + Scanner.Done rest' r -> do + IOR.writeIORef lastRecvRef (Just rest') + return r + +{-# INLINE nodes #-} +nodes :: ShardMap -> [Node] +nodes (ShardMap shardMap) = concatMap snd $ IntMap.toList $ fmap shardNodes shardMap where + shardNodes :: Shard -> [Node] + shardNodes (Shard master slaves) = master:slaves + + +nodeWithHostAndPort :: ShardMap -> Host -> Port -> Maybe Node +nodeWithHostAndPort shardMap host port = find (\(Node _ _ nodeHost nodePort) -> port == nodePort && host == nodeHost) (nodes shardMap) + +nodeId :: Node -> NodeID +nodeId (Node theId _ _ _) = theId + +hasLocked :: IO a -> IO a +hasLocked action = + action `catches` + [ Handler $ \exc@BlockedIndefinitelyOnMVar -> throwIO exc + ] + + +requestMasterNodes :: Connection -> [B.ByteString] -> IO [Reply] +requestMasterNodes conn req = do + masterNodeConns <- masterNodes conn + concat <$> mapM (`requestNode` [req]) masterNodeConns + +masterNodes :: Connection -> IO [NodeConnection] +masterNodes (Connection nodeConns _ shardMapVar _ _) = do + (ShardMap shardMap) <- readMVar shardMapVar + let masters = map ((\(Shard m _) -> m) . snd) $ IntMap.toList shardMap + let masterNodeIds = map nodeId masters + return $ mapMaybe (`HM.lookup` nodeConns) masterNodeIds + +getRandomConnection :: NodeConnection -> Connection -> NodeConnection +getRandomConnection nc conn = + let (Connection hmn _ _ _ _) = conn + conns = HM.elems hmn + in fromMaybe (head conns) $ find (nc /= ) conns diff --git a/src/Database/Redis/Cluster/Command.hs b/src/Database/Redis/Cluster/Command.hs new file mode 100644 index 00000000..7324b82a --- /dev/null +++ b/src/Database/Redis/Cluster/Command.hs @@ -0,0 +1,188 @@ +{-# LANGUAGE OverloadedStrings, RecordWildCards #-} +module Database.Redis.Cluster.Command where + +import Data.Char(toLower) +import qualified Data.ByteString as BS +import qualified Data.ByteString.Char8 as Char8 +import qualified Data.HashMap.Strict as HM +import Database.Redis.Types(RedisResult(decode)) +import Database.Redis.Protocol(Reply(..)) + +data Flag + = Write + | ReadOnly + | DenyOOM + | Admin + | PubSub + | NoScript + | Random + | SortForScript + | Loading + | Stale + | SkipMonitor + | Asking + | Fast + | MovableKeys + | Other BS.ByteString deriving (Show, Eq) + + +data AritySpec = Required Integer | MinimumRequired Integer deriving (Show) + +data LastKeyPositionSpec = LastKeyPosition Integer | UnlimitedKeys Integer deriving (Show) + +newtype InfoMap = InfoMap (HM.HashMap String CommandInfo) + +-- Represents the result of the COMMAND command, which returns information +-- about the position of keys in a request +data CommandInfo = CommandInfo + { name :: BS.ByteString + , arity :: AritySpec + , flags :: [Flag] + , firstKeyPosition :: Integer + , lastKeyPosition :: LastKeyPositionSpec + , stepCount :: Integer + } deriving (Show) + +instance RedisResult CommandInfo where + decode (MultiBulk (Just + [ Bulk (Just commandName) + , Integer aritySpec + , MultiBulk (Just replyFlags) + , Integer firstKeyPos + , Integer lastKeyPos + , Integer replyStepCount])) = do + parsedFlags <- mapM parseFlag replyFlags + lastKey <- parseLastKeyPos + return $ CommandInfo + { name = commandName + , arity = parseArity aritySpec + , flags = parsedFlags + , firstKeyPosition = firstKeyPos + , lastKeyPosition = lastKey + , stepCount = replyStepCount + } where + parseArity int = case int of + i | i >= 0 -> Required i + i -> MinimumRequired $ abs i + parseFlag :: Reply -> Either Reply Flag + parseFlag (SingleLine flag) = return $ case flag of + "write" -> Write + "readonly" -> ReadOnly + "denyoom" -> DenyOOM + "admin" -> Admin + "pubsub" -> PubSub + "noscript" -> NoScript + "random" -> Random + "sort_for_script" -> SortForScript + "loading" -> Loading + "stale" -> Stale + "skip_monitor" -> SkipMonitor + "asking" -> Asking + "fast" -> Fast + "movablekeys" -> MovableKeys + other -> Other other + parseFlag bad = Left bad + parseLastKeyPos :: Either Reply LastKeyPositionSpec + parseLastKeyPos = return $ case lastKeyPos of + i | i < 0 -> UnlimitedKeys (-i - 1) + i -> LastKeyPosition i + -- since redis 6.0 + decode (MultiBulk (Just + [ name@(Bulk (Just _)) + , arity@(Integer _) + , flags@(MultiBulk (Just _)) + , firstPos@(Integer _) + , lastPos@(Integer _) + , step@(Integer _) + , MultiBulk _ -- ACL categories + ])) = + decode (MultiBulk (Just [name, arity, flags, firstPos, lastPos, step])) + -- since redis 7.0 + decode (MultiBulk (Just + [ name@(Bulk (Just _)) + , arity@(Integer _) + , flags@(MultiBulk (Just _)) + , firstPos@(Integer _) + , lastPos@(Integer _) + , step@(Integer _) + , MultiBulk _ -- ACL categories + , MultiBulk _ -- Tips + , MultiBulk _ -- Key specifications + , MultiBulk _ -- Sub commands + ])) = + decode (MultiBulk (Just [name, arity, flags, firstPos, lastPos, step])) + + decode e = Left e + +newInfoMap :: [CommandInfo] -> InfoMap +newInfoMap = InfoMap . HM.fromList . map (\c -> (Char8.unpack $ name c, c)) + +keysForRequest :: InfoMap -> [BS.ByteString] -> Maybe [BS.ByteString] +keysForRequest _ ["DEBUG", "OBJECT", key] = + -- `COMMAND` output for `DEBUG` would let us believe it doesn't have any + -- keys, but the `DEBUG OBJECT` subcommand does. + Just [key] +keysForRequest _ ["QUIT"] = + -- The `QUIT` command is not listed in the `COMMAND` output. + Just [] +keysForRequest (InfoMap infoMap) request@(command:_) = do + info <- HM.lookup (map toLower $ Char8.unpack command) infoMap + keysForRequest' info request +keysForRequest _ [] = Nothing + +keysForRequest' :: CommandInfo -> [BS.ByteString] -> Maybe [BS.ByteString] +keysForRequest' info request + | isMovable info = + parseMovable request + | stepCount info == 0 = + Just [] + | otherwise = do + let possibleKeys = case lastKeyPosition info of + LastKeyPosition end -> take (fromEnum $ 1 + end - firstKeyPosition info) $ drop (fromEnum $ firstKeyPosition info) request + UnlimitedKeys end -> + drop (fromEnum $ firstKeyPosition info) $ + take (length request - fromEnum end) request + return $ takeEvery (fromEnum $ stepCount info) possibleKeys + +isMovable :: CommandInfo -> Bool +isMovable CommandInfo{..} = MovableKeys `elem` flags + +parseMovable :: [BS.ByteString] -> Maybe [BS.ByteString] +parseMovable ("SORT":key:_) = Just [key] +parseMovable ("EVAL":_:rest) = readNumKeys rest +parseMovable ("EVALSHA":_:rest) = readNumKeys rest +parseMovable ("ZUNIONSTORE":_:rest) = readNumKeys rest +parseMovable ("ZINTERSTORE":_:rest) = readNumKeys rest +parseMovable ("XREAD":rest) = readXreadKeys rest +parseMovable ("XREADGROUP":"GROUP":_:_:rest) = readXreadgroupKeys rest +parseMovable _ = Nothing + +readXreadKeys :: [BS.ByteString] -> Maybe [BS.ByteString] +readXreadKeys ("COUNT":_:rest) = readXreadKeys rest +readXreadKeys ("BLOCK":_:rest) = readXreadKeys rest +readXreadKeys ("STREAMS":rest) = Just $ take (length rest `div` 2) rest +readXreadKeys _ = Nothing + +readXreadgroupKeys :: [BS.ByteString] -> Maybe [BS.ByteString] +readXreadgroupKeys ("COUNT":_:rest) = readXreadKeys rest +readXreadgroupKeys ("BLOCK":_:rest) = readXreadKeys rest +readXreadgroupKeys ("NOACK":rest) = readXreadKeys rest +readXreadgroupKeys ("STREAMS":rest) = Just $ take (length rest `div` 2) rest +readXreadgroupKeys _ = Nothing + +readNumKeys :: [BS.ByteString] -> Maybe [BS.ByteString] +readNumKeys (rawNumKeys:rest) = do + numKeys <- readMaybe (Char8.unpack rawNumKeys) + return $ take numKeys rest +readNumKeys _ = Nothing +-- takeEvery 1 [1,2,3,4,5] ->[1,2,3,4,5] +-- takeEvery 2 [1,2,3,4,5] ->[1,3,5] +-- takeEvery 3 [1,2,3,4,5] ->[1,4] +takeEvery :: Int -> [a] -> [a] +takeEvery _ [] = [] +takeEvery n (x:xs) = x : takeEvery n (drop (n-1) xs) + +readMaybe :: Read a => String -> Maybe a +readMaybe s = case reads s of + [(val, "")] -> Just val + _ -> Nothing diff --git a/src/Database/Redis/Cluster/HashSlot.hs b/src/Database/Redis/Cluster/HashSlot.hs new file mode 100644 index 00000000..2db3a7ba --- /dev/null +++ b/src/Database/Redis/Cluster/HashSlot.hs @@ -0,0 +1,43 @@ +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE OverloadedStrings #-} +module Database.Redis.Cluster.HashSlot(HashSlot, keyToSlot) where + +import Data.Bits((.&.), xor, shiftL) +import qualified Data.ByteString.Char8 as Char8 +import qualified Data.ByteString as BS +import Data.Word(Word8, Word16) + +newtype HashSlot = HashSlot Word16 deriving (Num, Eq, Ord, Real, Enum, Integral, Show) + +numHashSlots :: Word16 +numHashSlots = 16384 + +-- | Compute the hashslot associated with a key +keyToSlot :: BS.ByteString -> HashSlot +keyToSlot = HashSlot . (.&.) (numHashSlots - 1) . crc16 . findSubKey + +-- | Find the section of a key to compute the slot for. +findSubKey :: BS.ByteString -> BS.ByteString +findSubKey key = case Char8.break (=='{') key of + (whole, "") -> whole + (_, xs) -> case Char8.break (=='}') (Char8.tail xs) of + ("", _) -> key + (subKey, _) -> subKey + +crc16 :: BS.ByteString -> Word16 +crc16 = BS.foldl (crc16Update 0x1021) 0 + +-- Taken from crc16 package +crc16Update :: Word16 -- ^ polynomial + -> Word16 -- ^ initial crc + -> Word8 -- ^ data byte + -> Word16 -- ^ new crc +crc16Update poly crc b = + foldl crc16UpdateBit newCrc [1 :: Int .. 8] + where + newCrc = crc `xor` shiftL (fromIntegral b :: Word16) 8 + crc16UpdateBit crc' _ = + if (crc' .&. 0x8000) /= 0x0000 + then shiftL crc' 1 `xor` poly + else shiftL crc' 1 + diff --git a/src/Database/Redis/Commands.hs b/src/Database/Redis/Commands.hs index e26129b8..800acf0a 100644 --- a/src/Database/Redis/Commands.hs +++ b/src/Database/Redis/Commands.hs @@ -261,7 +261,21 @@ xinfoStream, -- |Get info about a stream. The Redis command @XINFO@ is split int xdel, -- |Delete messages from a stream. Since Redis 5.0.0 xtrim, -- |Set the upper bound for number of messages in a stream. Since Redis 5.0.0 inf, -- |Constructor for `inf` Redis argument values - +ClusterNodesResponse(..), +ClusterNodesResponseEntry(..), +ClusterNodesResponseSlotSpec(..), +clusterNodes, +ClusterSlotsResponse(..), +ClusterSlotsResponseEntry(..), +ClusterSlotsNode(..), +clusterSlots, +clusterSetSlotNode, +clusterSetSlotStable, +clusterSetSlotImporting, +clusterSetSlotMigrating, +clusterGetKeysInSlot, +command, +readOnly -- * Unimplemented Commands -- |These commands are not implemented, as of now. Library -- users can implement these or other commands from @@ -306,7 +320,7 @@ import Prelude hiding (min,max) import Data.ByteString (ByteString) import Database.Redis.ManualCommands import Database.Redis.Types -import Database.Redis.Core +import Database.Redis.Core(sendRequest, RedisCtx) ttl :: (RedisCtx m f) @@ -830,7 +844,7 @@ hset => ByteString -- ^ key -> ByteString -- ^ field -> ByteString -- ^ value - -> m (f Bool) + -> m (f Integer) hset key field value = sendRequest (["HSET"] ++ [encode key] ++ [encode field] ++ [encode value] ) brpoplpush @@ -1080,4 +1094,3 @@ sismember -> ByteString -- ^ member -> m (f Bool) sismember key member = sendRequest (["SISMEMBER"] ++ [encode key] ++ [encode member] ) - diff --git a/src/Database/Redis/Connection.hs b/src/Database/Redis/Connection.hs new file mode 100644 index 00000000..bce72674 --- /dev/null +++ b/src/Database/Redis/Connection.hs @@ -0,0 +1,283 @@ +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE OverloadedStrings #-} +module Database.Redis.Connection where + +import Control.Exception +import qualified Control.Monad.Catch as Catch +import Control.Monad.IO.Class(liftIO, MonadIO) +import Control.Monad(when) +import Control.Concurrent.MVar(MVar, newMVar) +import qualified Data.ByteString as B +import qualified Data.ByteString.Char8 as Char8 +import Data.Functor(void) +import qualified Data.IntMap.Strict as IntMap +import Data.Pool(Pool, withResource, createPool, destroyAllResources) +import Data.Typeable +import qualified Data.Time as Time +import Network.TLS (ClientParams) +import qualified Network.Socket as NS +import qualified Data.HashMap.Strict as HM + +import qualified Database.Redis.ProtocolPipelining as PP +import Database.Redis.Core(Redis, runRedisInternal, runRedisClusteredInternal) +import Database.Redis.Protocol(Reply(..)) +import Database.Redis.Cluster(ShardMap(..), Node, Shard(..)) +import qualified Database.Redis.Cluster as Cluster +import qualified Database.Redis.ConnectionContext as CC +--import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline +import Database.Redis.Commands + ( ping + , select + , auth + , clusterSlots + , command + , readOnly + , ClusterSlotsResponse(..) + , ClusterSlotsResponseEntry(..) + , ClusterSlotsNode(..)) + +-------------------------------------------------------------------------------- +-- Connection +-- + +-- |A threadsafe pool of network connections to a Redis server. Use the +-- 'connect' function to create one. +data Connection + = NonClusteredConnection (Pool PP.Connection) + | ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection) + +-- |Information for connnecting to a Redis server. +-- +-- It is recommended to not use the 'ConnInfo' data constructor directly. +-- Instead use 'defaultConnectInfo' and update it with record syntax. For +-- example to connect to a password protected Redis server running on localhost +-- and listening to the default port: +-- +-- @ +-- myConnectInfo :: ConnectInfo +-- myConnectInfo = defaultConnectInfo {connectAuth = Just \"secret\"} +-- @ +-- +data ConnectInfo = ConnInfo + { connectHost :: NS.HostName + , connectPort :: CC.PortID + , connectAuth :: Maybe B.ByteString + , connectReadOnly :: Bool + -- ^ When the server is protected by a password, set 'connectAuth' to 'Just' + -- the password. Each connection will then authenticate by the 'auth' + -- command. + , connectDatabase :: Integer + -- ^ Each connection will 'select' the database with the given index. + , connectMaxConnections :: Int + -- ^ Maximum number of connections to keep open. The smallest acceptable + -- value is 1. + , connectMaxIdleTime :: Time.NominalDiffTime + -- ^ Amount of time for which an unused connection is kept open. The + -- smallest acceptable value is 0.5 seconds. If the @timeout@ value in + -- your redis.conf file is non-zero, it should be larger than + -- 'connectMaxIdleTime'. + , connectTimeout :: Maybe Time.NominalDiffTime + -- ^ Optional timeout until connection to Redis gets + -- established. 'ConnectTimeoutException' gets thrown if no socket + -- get connected in this interval of time. + , connectTLSParams :: Maybe ClientParams + -- ^ Optional TLS parameters. TLS will be enabled if this is provided. + } deriving Show + +data ConnectError = ConnectAuthError Reply + | ConnectSelectError Reply + deriving (Eq, Show, Typeable) + +instance Exception ConnectError + +-- |Default information for connecting: +-- +-- @ +-- connectHost = \"localhost\" +-- connectPort = PortNumber 6379 -- Redis default port +-- connectAuth = Nothing -- No password +-- connectDatabase = 0 -- SELECT database 0 +-- connectMaxConnections = 50 -- Up to 50 connections +-- connectMaxIdleTime = 30 -- Keep open for 30 seconds +-- connectTimeout = Nothing -- Don't add timeout logic +-- connectTLSParams = Nothing -- Do not use TLS +-- @ +-- +defaultConnectInfo :: ConnectInfo +defaultConnectInfo = ConnInfo + { connectHost = "localhost" + , connectPort = CC.PortNumber 6379 + , connectAuth = Nothing + , connectReadOnly = False + , connectDatabase = 0 + , connectMaxConnections = 50 + , connectMaxIdleTime = 30 + , connectTimeout = Nothing + , connectTLSParams = Nothing + } + +createConnection :: ConnectInfo -> IO PP.Connection +createConnection ConnInfo{..} = do + let timeoutOptUs = + round . (1000000 *) <$> connectTimeout + conn <- PP.connect connectHost connectPort timeoutOptUs + conn' <- case connectTLSParams of + Nothing -> return conn + Just tlsParams -> PP.enableTLS tlsParams conn + PP.beginReceiving conn' + + runRedisInternal conn' $ do + -- AUTH + case connectAuth of + Nothing -> return () + Just pass -> do + resp <- auth pass + case resp of + Left r -> liftIO $ throwIO $ ConnectAuthError r + _ -> return () + -- SELECT + when (connectDatabase /= 0) $ do + resp <- select connectDatabase + case resp of + Left r -> liftIO $ throwIO $ ConnectSelectError r + _ -> return () + return conn' + +-- |Constructs a 'Connection' pool to a Redis server designated by the +-- given 'ConnectInfo'. The first connection is not actually established +-- until the first call to the server. +connect :: ConnectInfo -> IO Connection +connect cInfo@ConnInfo{..} = NonClusteredConnection <$> + createPool (createConnection cInfo) PP.disconnect 1 connectMaxIdleTime connectMaxConnections + +-- |Constructs a 'Connection' pool to a Redis server designated by the +-- given 'ConnectInfo', then tests if the server is actually there. +-- Throws an exception if the connection to the Redis server can't be +-- established. +checkedConnect :: ConnectInfo -> IO Connection +checkedConnect connInfo = do + conn <- connect connInfo + runRedis conn $ void ping + return conn + +-- |Destroy all idle resources in the pool. +disconnect :: Connection -> IO () +disconnect (NonClusteredConnection pool) = destroyAllResources pool +disconnect (ClusteredConnection _ pool) = destroyAllResources pool + +-- | Memory bracket around 'connect' and 'disconnect'. +withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c +withConnect connInfo = Catch.bracket (liftIO $ connect connInfo) (liftIO . disconnect) + +-- | Memory bracket around 'checkedConnect' and 'disconnect' +withCheckedConnect :: ConnectInfo -> (Connection -> IO c) -> IO c +withCheckedConnect connInfo = bracket (checkedConnect connInfo) disconnect + +-- |Interact with a Redis datastore specified by the given 'Connection'. +-- +-- Each call of 'runRedis' takes a network connection from the 'Connection' +-- pool and runs the given 'Redis' action. Calls to 'runRedis' may thus block +-- while all connections from the pool are in use. +runRedis :: Connection -> Redis a -> IO a +runRedis (NonClusteredConnection pool) redis = + withResource pool $ \conn -> runRedisInternal conn redis +runRedis (ClusteredConnection _ pool) redis = + withResource pool $ \conn -> runRedisClusteredInternal conn (refreshShardMap conn) redis + +newtype ClusterConnectError = ClusterConnectError Reply + deriving (Eq, Show, Typeable) + +instance Exception ClusterConnectError + +-- |Constructs a 'ShardMap' of connections to clustered nodes. The argument is +-- a 'ConnectInfo' for any node in the cluster +-- +-- Some Redis commands are currently not supported in cluster mode +-- - CONFIG, AUTH +-- - SCAN +-- - MOVE, SELECT +-- - PUBLISH, SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, RESET +connectCluster :: ConnectInfo -> IO Connection +connectCluster bootstrapConnInfo = do + let timeoutOptUs = + round . (1000000 *) <$> connectTimeout bootstrapConnInfo + conn <- createConnection bootstrapConnInfo + slotsResponse <- runRedisInternal conn clusterSlots + shardMapVar <- case slotsResponse of + Left e -> throwIO $ ClusterConnectError e + Right slots -> do + shardMap <- shardMapFromClusterSlotsResponse slots + newMVar shardMap + commandInfos <- runRedisInternal conn command + case commandInfos of + Left e -> throwIO $ ClusterConnectError e + Right infos -> do + let + isConnectionReadOnly = connectReadOnly bootstrapConnInfo + clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn + pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) + return $ ClusteredConnection shardMapVar pool + where + withAuth host port timeout = do + conn <- PP.connect host port timeout + conn' <- case connectTLSParams bootstrapConnInfo of + Nothing -> return conn + Just tlsParams -> PP.enableTLS tlsParams conn + PP.beginReceiving conn' + + runRedisInternal conn' $ do + -- AUTH + case connectAuth bootstrapConnInfo of + Nothing -> return () + Just pass -> do + resp <- auth pass + case resp of + Left r -> liftIO $ throwIO $ ConnectAuthError r + _ -> return () + return $ PP.toCtx conn' + + clusterConnect :: Bool -> IO Cluster.Connection -> IO Cluster.Connection + clusterConnect readOnlyConnection connection = do + clusterConn@(Cluster.Connection nodeMap _ _ _ _) <- connection + nodesConns <- sequence $ ( PP.fromCtx . (\(Cluster.NodeConnection ctx _ _) -> ctx ) . snd) <$> (HM.toList nodeMap) + when readOnlyConnection $ + mapM_ (\conn -> do + PP.beginReceiving conn + runRedisInternal conn readOnly + ) nodesConns + return clusterConn + +shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap +shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr mkShardMap (pure IntMap.empty) clusterSlotsResponseEntries where + mkShardMap :: ClusterSlotsResponseEntry -> IO (IntMap.IntMap Shard) -> IO (IntMap.IntMap Shard) + mkShardMap ClusterSlotsResponseEntry{..} accumulator = do + accumulated <- accumulator + let master = nodeFromClusterSlotNode True clusterSlotsResponseEntryMaster + -- let replicas = map (nodeFromClusterSlotNode False) clusterSlotsResponseEntryReplicas + let shard = Shard master [] + let slotMap = IntMap.fromList $ map (, shard) [clusterSlotsResponseEntryStartSlot..clusterSlotsResponseEntryEndSlot] + return $ IntMap.union slotMap accumulated + nodeFromClusterSlotNode :: Bool -> ClusterSlotsNode -> Node + nodeFromClusterSlotNode isMaster ClusterSlotsNode{..} = + let hostname = Char8.unpack clusterSlotsNodeIP + role = if isMaster then Cluster.Master else Cluster.Slave + in + Cluster.Node clusterSlotsNodeID role hostname (toEnum clusterSlotsNodePort) + +refreshShardMap :: Cluster.Connection -> IO ShardMap +refreshShardMap (Cluster.Connection nodeConns _ _ _ _) = + refreshShardMapWithNodeConn (head $ HM.elems nodeConns) + +refreshShardMapWithNodeConn :: Cluster.NodeConnection -> IO ShardMap +refreshShardMapWithNodeConn (Cluster.NodeConnection ctx _ _) = do + pipelineConn <- PP.fromCtx ctx + refreshShardMapWithConn pipelineConn True + +refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap +refreshShardMapWithConn pipelineConn _ = do + _ <- PP.beginReceiving pipelineConn + slotsResponse <- runRedisInternal pipelineConn clusterSlots + case slotsResponse of + Left e -> throwIO $ ClusterConnectError e + Right slots -> shardMapFromClusterSlotsResponse slots diff --git a/src/Database/Redis/ConnectionContext.hs b/src/Database/Redis/ConnectionContext.hs new file mode 100644 index 00000000..db43891d --- /dev/null +++ b/src/Database/Redis/ConnectionContext.hs @@ -0,0 +1,164 @@ +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +module Database.Redis.ConnectionContext ( + ConnectionContext(..) + , ConnectTimeout(..) + , ConnectionLostException(..) + , PortID(..) + , connect + , disconnect + , send + , recv + , errConnClosed + , enableTLS + , flush + , ioErrorToConnLost +) where + +import Control.Concurrent (threadDelay) +import Control.Concurrent.Async (race) +import Control.Monad(when) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as LB +import qualified Data.IORef as IOR +import qualified Data.Time as Time +import Data.Maybe (fromMaybe) +import Control.Concurrent.MVar(newMVar, readMVar, swapMVar) +import Control.Exception(bracketOnError, Exception, throwIO, try) +import Data.Typeable +import Data.Functor(void) +import qualified Network.Socket as NS +import qualified Network.TLS as TLS +import System.Environment (lookupEnv) +import System.IO(Handle, hSetBinaryMode, hClose, IOMode(..), hFlush, hIsOpen) +import System.IO.Error(catchIOError) +import Text.Read (readMaybe) + +data ConnectionContext = NormalHandle Handle | TLSContext TLS.Context + +instance Show ConnectionContext where + show (NormalHandle _) = "NormalHandle" + show (TLSContext _) = "TLSContext" + +data Connection = Connection + { ctx :: ConnectionContext + , lastRecvRef :: IOR.IORef (Maybe B.ByteString) } + +instance Show Connection where + show Connection{..} = "Connection{ ctx = " ++ show ctx ++ ", lastRecvRef = IORef}" + +data ConnectPhase + = PhaseUnknown + | PhaseResolve + | PhaseOpenSocket + deriving (Show) + +newtype ConnectTimeout = ConnectTimeout ConnectPhase + deriving (Show, Typeable) + +instance Exception ConnectTimeout + +data ConnectionLostException = ConnectionLost deriving Show +instance Exception ConnectionLostException + +data PortID = PortNumber NS.PortNumber + | UnixSocket String + deriving (Eq, Show) + +connect :: NS.HostName -> PortID -> Maybe Int -> IO ConnectionContext +connect hostName portId timeoutOpt = + bracketOnError hConnect hClose $ \h -> do + hSetBinaryMode h True + return $ NormalHandle h + where + hConnect = do + phaseMVar <- newMVar PhaseUnknown + let doConnect = hConnect' phaseMVar + envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_CONNECT_TIMEOUT" + result <- race doConnect (threadDelay $ fromMaybe envTimeout timeoutOpt) + case result of + Left h -> return h + Right () -> do + phase <- readMVar phaseMVar + errConnectTimeout phase + hConnect' mvar = bracketOnError createSock NS.close $ \sock -> do + NS.setSocketOption sock NS.KeepAlive 1 + void $ swapMVar mvar PhaseResolve + void $ swapMVar mvar PhaseOpenSocket + NS.socketToHandle sock ReadWriteMode + where + createSock = case portId of + PortNumber portNumber -> do + addrInfo <- getHostAddrInfo hostName portNumber + connectSocket addrInfo + UnixSocket addr -> bracketOnError + (NS.socket NS.AF_UNIX NS.Stream NS.defaultProtocol) + NS.close + (\sock -> NS.connect sock (NS.SockAddrUnix addr) >> return sock) + +getHostAddrInfo :: NS.HostName -> NS.PortNumber -> IO [NS.AddrInfo] +getHostAddrInfo hostname port = + NS.getAddrInfo (Just hints) (Just hostname) (Just $ show port) + where + hints = NS.defaultHints + { NS.addrSocketType = NS.Stream } + +errConnectTimeout :: ConnectPhase -> IO a +errConnectTimeout phase = throwIO $ ConnectTimeout phase + +connectSocket :: [NS.AddrInfo] -> IO NS.Socket +connectSocket [] = error "connectSocket: unexpected empty list" +connectSocket (addr:rest) = tryConnect >>= \case + Right sock -> return sock + Left err -> if null rest + then throwIO err + else connectSocket rest + where + tryConnect :: IO (Either IOError NS.Socket) + tryConnect = bracketOnError createSock NS.close $ \sock -> + try (NS.connect sock $ NS.addrAddress addr) >>= \case + Right () -> return (Right sock) + Left err -> NS.close sock >> return (Left err) + where + createSock = NS.socket (NS.addrFamily addr) + (NS.addrSocketType addr) + (NS.addrProtocol addr) + +send :: ConnectionContext -> B.ByteString -> IO () +send (NormalHandle h) requestData = + ioErrorToConnLost (B.hPut h requestData) +send (TLSContext ctx) requestData = + ioErrorToConnLost (TLS.sendData ctx (LB.fromStrict requestData)) + +recv :: ConnectionContext -> IO B.ByteString +recv (NormalHandle h) = ioErrorToConnLost $ B.hGetSome h 4096 +recv (TLSContext ctx) = TLS.recvData ctx + + +ioErrorToConnLost :: IO a -> IO a +ioErrorToConnLost a = a `catchIOError` (\x -> putStrLn ("exception while running redis query: " <> show x) *> errConnClosed) + +errConnClosed :: IO a +errConnClosed = throwIO ConnectionLost + + +enableTLS :: TLS.ClientParams -> ConnectionContext -> IO ConnectionContext +enableTLS tlsParams (NormalHandle h) = do + ctx <- TLS.contextNew h tlsParams + TLS.handshake ctx + return $ TLSContext ctx +enableTLS _ c@(TLSContext _) = return c + +disconnect :: ConnectionContext -> IO () +disconnect (NormalHandle h) = do + open <- hIsOpen h + when open $ hClose h +disconnect (TLSContext ctx) = do + TLS.bye ctx + TLS.contextClose ctx + +flush :: ConnectionContext -> IO () +flush (NormalHandle h) = hFlush h +flush (TLSContext c) = TLS.contextFlush c diff --git a/src/Database/Redis/Core.hs b/src/Database/Redis/Core.hs index 3fe2f42b..c76f88fe 100644 --- a/src/Database/Redis/Core.hs +++ b/src/Database/Redis/Core.hs @@ -3,34 +3,27 @@ DeriveDataTypeable, StandaloneDeriving #-} module Database.Redis.Core ( - Connection(..), ConnectError(..), connect, checkedConnect, disconnect, - withConnect, withCheckedConnect, - ConnectInfo(..), defaultConnectInfo, - Redis(), runRedis, unRedis, reRedis, + Redis(), unRedis, reRedis, RedisCtx(..), MonadRedis(..), - send, recv, sendRequest, - auth, select, ping + send, recv, sendRequest, sendToAllMasterNodes, + runRedisInternal, + runRedisClusteredInternal, + RedisEnv(..), ) where import Prelude #if __GLASGOW_HASKELL__ < 710 import Control.Applicative #endif -import Control.Exception import Control.Monad.Reader -import qualified Control.Monad.Catch as Catch import qualified Data.ByteString as B import Data.IORef -import Data.Pool -import Data.Time -import Data.Typeable -import qualified Network.Socket as NS -import Network.TLS (ClientParams) import Database.Redis.Core.Internal import Database.Redis.Protocol import qualified Database.Redis.ProtocolPipelining as PP import Database.Redis.Types - +import Database.Redis.Cluster(ShardMap) +import qualified Database.Redis.Cluster as Cluster -------------------------------------------------------------------------------- -- The Redis Monad @@ -44,29 +37,21 @@ import Database.Redis.Types class (MonadRedis m) => RedisCtx m f | m -> f where returnDecode :: RedisResult a => Reply -> m (f a) -instance RedisCtx Redis (Either Reply) where - returnDecode = return . decode - class (Monad m) => MonadRedis m where liftRedis :: Redis a -> m a + +instance RedisCtx Redis (Either Reply) where + returnDecode = return . decode + instance MonadRedis Redis where liftRedis = id --- |Interact with a Redis datastore specified by the given 'Connection'. --- --- Each call of 'runRedis' takes a network connection from the 'Connection' --- pool and runs the given 'Redis' action. Calls to 'runRedis' may thus block --- while all connections from the pool are in use. -runRedis :: Connection -> Redis a -> IO a -runRedis (Conn pool) redis = - withResource pool $ \conn -> runRedisInternal conn redis - -- |Deconstruct Redis constructor. -- -- 'unRedis' and 'reRedis' can be used to define instances for -- arbitrary typeclasses. --- +-- -- WARNING! These functions are considered internal and no guarantee -- is given at this point that they will not break in future. unRedis :: Redis a -> ReaderT RedisEnv IO a @@ -82,11 +67,18 @@ runRedisInternal :: PP.Connection -> Redis a -> IO a runRedisInternal conn (Redis redis) = do -- Dummy reply in case no request is sent. ref <- newIORef (SingleLine "nobody will ever see this") - r <- runReaderT redis (Env conn ref) + r <- runReaderT redis (NonClusteredEnv conn ref) -- Evaluate last reply to keep lazy IO inside runRedis. readIORef ref >>= (`seq` return ()) return r +runRedisClusteredInternal :: Cluster.Connection -> IO ShardMap -> Redis a -> IO a +runRedisClusteredInternal connection refreshShardmapAction (Redis redis) = do + ref <- newIORef (SingleLine "nobody will ever see this") + r <- runReaderT redis (ClusteredEnv refreshShardmapAction connection ref) + readIORef ref >>= (`seq` return ()) + return r + setLastReply :: Reply -> ReaderT RedisEnv IO () setLastReply r = do ref <- asks envLastReply @@ -118,161 +110,27 @@ sendRequest :: (RedisCtx m f, RedisResult a) => [B.ByteString] -> m (f a) sendRequest req = do r' <- liftRedis $ Redis $ do - conn <- asks envConn - r <- liftIO $ PP.request conn (renderRequest req) - setLastReply r - return r + env <- ask + case env of + NonClusteredEnv{..} -> do + r <- liftIO $ PP.request envConn (renderRequest req) + setLastReply r + return r + ClusteredEnv{..} -> do + r <- liftIO $ Cluster.requestPipelined refreshAction connection req + lift (writeIORef clusteredLastReply r) + return r returnDecode r' - --------------------------------------------------------------------------------- --- Connection --- - --- |A threadsafe pool of network connections to a Redis server. Use the --- 'connect' function to create one. -newtype Connection = Conn (Pool PP.Connection) - --- |Information for connnecting to a Redis server. --- --- It is recommended to not use the 'ConnInfo' data constructor directly. --- Instead use 'defaultConnectInfo' and update it with record syntax. For --- example to connect to a password protected Redis server running on localhost --- and listening to the default port: --- --- @ --- myConnectInfo :: ConnectInfo --- myConnectInfo = defaultConnectInfo {connectAuth = Just \"secret\"} --- @ --- -data ConnectInfo = ConnInfo - { connectHost :: NS.HostName - , connectPort :: PP.PortID - , connectAuth :: Maybe B.ByteString - -- ^ When the server is protected by a password, set 'connectAuth' to 'Just' - -- the password. Each connection will then authenticate by the 'auth' - -- command. - , connectDatabase :: Integer - -- ^ Each connection will 'select' the database with the given index. - , connectMaxConnections :: Int - -- ^ Maximum number of connections to keep open. The smallest acceptable - -- value is 1. - , connectMaxIdleTime :: NominalDiffTime - -- ^ Amount of time for which an unused connection is kept open. The - -- smallest acceptable value is 0.5 seconds. If the @timeout@ value in - -- your redis.conf file is non-zero, it should be larger than - -- 'connectMaxIdleTime'. - , connectTimeout :: Maybe NominalDiffTime - -- ^ Optional timeout until connection to Redis gets - -- established. 'ConnectTimeoutException' gets thrown if no socket - -- get connected in this interval of time. - , connectTLSParams :: Maybe ClientParams - -- ^ Optional TLS parameters. TLS will be enabled if this is provided. - } deriving Show - -data ConnectError = ConnectAuthError Reply - | ConnectSelectError Reply - deriving (Eq, Show, Typeable) - -instance Exception ConnectError - --- |Default information for connecting: --- --- @ --- connectHost = \"localhost\" --- connectPort = PortNumber 6379 -- Redis default port --- connectAuth = Nothing -- No password --- connectDatabase = 0 -- SELECT database 0 --- connectMaxConnections = 50 -- Up to 50 connections --- connectMaxIdleTime = 30 -- Keep open for 30 seconds --- connectTimeout = Nothing -- Don't add timeout logic --- connectTLSParams = Nothing -- Do not use TLS --- @ --- -defaultConnectInfo :: ConnectInfo -defaultConnectInfo = ConnInfo - { connectHost = "localhost" - , connectPort = PP.PortNumber 6379 - , connectAuth = Nothing - , connectDatabase = 0 - , connectMaxConnections = 50 - , connectMaxIdleTime = 30 - , connectTimeout = Nothing - , connectTLSParams = Nothing - } - --- |Constructs a 'Connection' pool to a Redis server designated by the --- given 'ConnectInfo'. The first connection is not actually established --- until the first call to the server. -connect :: ConnectInfo -> IO Connection -connect ConnInfo{..} = Conn <$> - createPool create destroy 1 connectMaxIdleTime connectMaxConnections - where - create = do - let timeoutOptUs = - round . (1000000 *) <$> connectTimeout - conn <- PP.connect connectHost connectPort timeoutOptUs - conn' <- case connectTLSParams of - Nothing -> return conn - Just tlsParams -> PP.enableTLS tlsParams conn - PP.beginReceiving conn' - - runRedisInternal conn' $ do - -- AUTH - case connectAuth of - Nothing -> return () - Just pass -> do - resp <- auth pass - case resp of - Left r -> liftIO $ throwIO $ ConnectAuthError r - _ -> return () - -- SELECT - when (connectDatabase /= 0) $ do - resp <- select connectDatabase - case resp of - Left r -> liftIO $ throwIO $ ConnectSelectError r - _ -> return () - return conn' - - destroy = PP.disconnect - --- |Constructs a 'Connection' pool to a Redis server designated by the --- given 'ConnectInfo', then tests if the server is actually there. --- Throws an exception if the connection to the Redis server can't be --- established. -checkedConnect :: ConnectInfo -> IO Connection -checkedConnect connInfo = do - conn <- connect connInfo - runRedis conn $ void ping - return conn - --- |Destroy all idle resources in the pool. -disconnect :: Connection -> IO () -disconnect (Conn pool) = destroyAllResources pool - --- | Memory bracket around 'connect' and 'disconnect'. -withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c -withConnect connInfo = Catch.bracket (liftIO $ connect connInfo) (liftIO . disconnect) - --- | Memory bracket around 'checkedConnect' and 'disconnect' -withCheckedConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c -withCheckedConnect connInfo = Catch.bracket (liftIO $ checkedConnect connInfo) (liftIO . disconnect) - --- The AUTH command. It has to be here because it is used in 'connect'. -auth - :: B.ByteString -- ^ password - -> Redis (Either Reply Status) -auth password = sendRequest ["AUTH", password] - --- The SELECT command. Used in 'connect'. -select - :: RedisCtx m f - => Integer -- ^ index - -> m (f Status) -select ix = sendRequest ["SELECT", encode ix] - --- The PING command. Used in 'checkedConnect'. -ping - :: (RedisCtx m f) - => m (f Status) -ping = sendRequest (["PING"] ) +sendToAllMasterNodes :: (RedisResult a, MonadRedis m) => [B.ByteString] -> m [Either Reply a] +sendToAllMasterNodes req = do + r' <- liftRedis $ Redis $ do + env <- ask + case env of + NonClusteredEnv{..} -> do + r <- liftIO $ PP.request envConn (renderRequest req) + r `seq` return [r] + ClusteredEnv{..} -> do + r <- liftIO $ Cluster.requestMasterNodes connection req + return r + return $ map decode r' diff --git a/src/Database/Redis/Core/Internal.hs b/src/Database/Redis/Core/Internal.hs index 5ec938af..e39a8106 100644 --- a/src/Database/Redis/Core/Internal.hs +++ b/src/Database/Redis/Core/Internal.hs @@ -1,6 +1,7 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE RecordWildCards #-} module Database.Redis.Core.Internal where #if __GLASGOW_HASKELL__ > 711 && __GLASGOW_HASKELL__ < 808 @@ -9,7 +10,9 @@ import Control.Monad.Fail (MonadFail) import Control.Monad.Reader import Data.IORef import Database.Redis.Protocol +import Control.Monad.IO.Unlift (MonadUnliftIO) import qualified Database.Redis.ProtocolPipelining as PP +import qualified Database.Redis.Cluster as Cluster -- |Context for normal command execution, outside of transactions. Use -- 'runRedis' to run actions of this type. @@ -18,12 +21,18 @@ import qualified Database.Redis.ProtocolPipelining as PP -- possibility of Redis returning an 'Error' reply. newtype Redis a = Redis (ReaderT RedisEnv IO a) - deriving (Monad, MonadIO, Functor, Applicative) + deriving (Monad, MonadIO, Functor, Applicative, MonadUnliftIO) #if __GLASGOW_HASKELL__ > 711 deriving instance MonadFail Redis #endif -data RedisEnv = - Env - { envConn :: PP.Connection - , envLastReply :: IORef Reply - } +data RedisEnv + = NonClusteredEnv { envConn :: PP.Connection, nonClusteredLastReply :: IORef Reply } + | ClusteredEnv + { refreshAction :: IO Cluster.ShardMap + , connection :: Cluster.Connection + , clusteredLastReply :: IORef Reply + } + +envLastReply :: RedisEnv -> IORef Reply +envLastReply NonClusteredEnv{..} = nonClusteredLastReply +envLastReply ClusteredEnv{..} = clusteredLastReply diff --git a/src/Database/Redis/ManualCommands.hs b/src/Database/Redis/ManualCommands.hs index d4aaeb89..9c093321 100644 --- a/src/Database/Redis/ManualCommands.hs +++ b/src/Database/Redis/ManualCommands.hs @@ -1,13 +1,19 @@ -{-# LANGUAGE OverloadedStrings, RecordWildCards, FlexibleContexts #-} +{-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, FlexibleContexts #-} module Database.Redis.ManualCommands where import Prelude hiding (min, max) import Data.ByteString (ByteString, empty, append) -import Data.Maybe (maybeToList) +import qualified Data.ByteString.Char8 as Char8 +import qualified Data.ByteString as BS +import Data.Maybe (maybeToList, catMaybes) +#if __GLASGOW_HASKELL__ < 808 +import Data.Semigroup ((<>)) +#endif import Database.Redis.Core import Database.Redis.Protocol import Database.Redis.Types +import qualified Database.Redis.Cluster.Command as CMD objectRefcount @@ -354,9 +360,11 @@ eval script keys args = where numkeys = toInteger (length keys) +-- | Works like 'eval', but sends the SHA1 hash of the script instead of the script itself. +-- Fails if the server does not recognise the hash, in which case, 'eval' should be used instead. evalsha :: (RedisCtx m f, RedisResult a) - => ByteString -- ^ script + => ByteString -- ^ base16-encoded sha1 hash of the script -> [ByteString] -- ^ keys -> [ByteString] -- ^ args -> m (f a) @@ -847,19 +855,21 @@ instance RedisResult StreamsRecord where data XReadOpts = XReadOpts { block :: Maybe Integer , recordCount :: Maybe Integer + , noack :: Bool } deriving (Show, Eq) -- |Redis default 'XReadOpts'. Equivalent to omitting all optional parameters. -- -- @ -- XReadOpts --- { block = Nothing -- Don't block waiting for more records --- , recordCount = Nothing -- no record count +-- { block = Nothing -- Don't block waiting for more records +-- , recordCount = Nothing -- no record count +-- , noack = False -- Add read records to the PEL if acknowledgement is not received -- } -- @ -- defaultXreadOpts :: XReadOpts -defaultXreadOpts = XReadOpts { block = Nothing, recordCount = Nothing } +defaultXreadOpts = XReadOpts { block = Nothing, recordCount = Nothing, noack = False} data XReadResponse = XReadResponse { stream :: ByteString @@ -882,10 +892,11 @@ xreadOpts streamsAndIds opts = sendRequest $ internalXreadArgs :: [(ByteString, ByteString)] -> XReadOpts -> [ByteString] internalXreadArgs streamsAndIds XReadOpts{..} = - concat [blockArgs, countArgs, ["STREAMS"], streams, recordIds] + concat [blockArgs, countArgs, noackArgs, ["STREAMS"], streams, recordIds] where blockArgs = maybe [] (\blockMillis -> ["BLOCK", encode blockMillis]) block countArgs = maybe [] (\countRecords -> ["COUNT", encode countRecords]) recordCount + noackArgs = if noack == False then [] else ["NOACK"] -- NOACK supported only for xreadgroup calls streams = map (\(stream, _) -> stream) streamsAndIds recordIds = map (\(_, recordId) -> recordId) streamsAndIds @@ -920,7 +931,7 @@ xgroupCreate -> ByteString -- ^ group name -> ByteString -- ^ start ID -> m (f Status) -xgroupCreate stream groupName startId = sendRequest $ ["XGROUP", "CREATE", stream, groupName, startId] +xgroupCreate stream groupName startId = sendRequest $ ["XGROUP", "CREATE", stream, groupName, startId, "MKSTREAM"] xgroupSetId :: (RedisCtx m f) @@ -1146,7 +1157,8 @@ xinfoGroups -> m (f [XInfoGroupsResponse]) xinfoGroups stream = sendRequest ["XINFO", "GROUPS", stream] -data XInfoStreamResponse = XInfoStreamResponse +data XInfoStreamResponse + = XInfoStreamResponse { xinfoStreamLength :: Integer , xinfoStreamRadixTreeKeys :: Integer , xinfoStreamRadixTreeNodes :: Integer @@ -1154,21 +1166,62 @@ data XInfoStreamResponse = XInfoStreamResponse , xinfoStreamLastEntryId :: ByteString , xinfoStreamFirstEntry :: StreamsRecord , xinfoStreamLastEntry :: StreamsRecord - } deriving (Show, Eq) + } + | XInfoStreamEmptyResponse + { xinfoStreamLength :: Integer + , xinfoStreamRadixTreeKeys :: Integer + , xinfoStreamRadixTreeNodes :: Integer + , xinfoStreamNumGroups :: Integer + , xinfoStreamLastEntryId :: ByteString + } + deriving (Show, Eq) instance RedisResult XInfoStreamResponse where - decode (MultiBulk (Just [ - Bulk (Just "length"),Integer xinfoStreamLength, - Bulk (Just "radix-tree-keys"),Integer xinfoStreamRadixTreeKeys, - Bulk (Just "radix-tree-nodes"),Integer xinfoStreamRadixTreeNodes, - Bulk (Just "groups"),Integer xinfoStreamNumGroups, - Bulk (Just "last-generated-id"),Bulk (Just xinfoStreamLastEntryId), - Bulk (Just "first-entry"), rawFirstEntry , - Bulk (Just "last-entry"), rawLastEntry ])) = do - xinfoStreamFirstEntry <- decode rawFirstEntry - xinfoStreamLastEntry <- decode rawLastEntry - return XInfoStreamResponse{..} - decode a = Left a + decode = decodeRedis5 <> decodeRedis6 + where + decodeRedis5 (MultiBulk (Just [ + Bulk (Just "length"),Integer xinfoStreamLength, + Bulk (Just "radix-tree-keys"),Integer xinfoStreamRadixTreeKeys, + Bulk (Just "radix-tree-nodes"),Integer xinfoStreamRadixTreeNodes, + Bulk (Just "groups"),Integer xinfoStreamNumGroups, + Bulk (Just "last-generated-id"),Bulk (Just xinfoStreamLastEntryId), + Bulk (Just "first-entry"), Bulk Nothing , + Bulk (Just "last-entry"), Bulk Nothing ])) = do + return XInfoStreamEmptyResponse{..} + decodeRedis5 (MultiBulk (Just [ + Bulk (Just "length"),Integer xinfoStreamLength, + Bulk (Just "radix-tree-keys"),Integer xinfoStreamRadixTreeKeys, + Bulk (Just "radix-tree-nodes"),Integer xinfoStreamRadixTreeNodes, + Bulk (Just "groups"),Integer xinfoStreamNumGroups, + Bulk (Just "last-generated-id"),Bulk (Just xinfoStreamLastEntryId), + Bulk (Just "first-entry"), rawFirstEntry , + Bulk (Just "last-entry"), rawLastEntry ])) = do + xinfoStreamFirstEntry <- decode rawFirstEntry + xinfoStreamLastEntry <- decode rawLastEntry + return XInfoStreamResponse{..} + decodeRedis5 a = Left a + + decodeRedis6 (MultiBulk (Just [ + Bulk (Just "length"),Integer xinfoStreamLength, + Bulk (Just "radix-tree-keys"),Integer xinfoStreamRadixTreeKeys, + Bulk (Just "radix-tree-nodes"),Integer xinfoStreamRadixTreeNodes, + Bulk (Just "last-generated-id"),Bulk (Just xinfoStreamLastEntryId), + Bulk (Just "groups"),Integer xinfoStreamNumGroups, + Bulk (Just "first-entry"), Bulk Nothing , + Bulk (Just "last-entry"), Bulk Nothing ])) = do + return XInfoStreamEmptyResponse{..} + decodeRedis6 (MultiBulk (Just [ + Bulk (Just "length"),Integer xinfoStreamLength, + Bulk (Just "radix-tree-keys"),Integer xinfoStreamRadixTreeKeys, + Bulk (Just "radix-tree-nodes"),Integer xinfoStreamRadixTreeNodes, + Bulk (Just "last-generated-id"),Bulk (Just xinfoStreamLastEntryId), + Bulk (Just "groups"),Integer xinfoStreamNumGroups, + Bulk (Just "first-entry"), rawFirstEntry , + Bulk (Just "last-entry"), rawLastEntry ])) = do + xinfoStreamFirstEntry <- decode rawFirstEntry + xinfoStreamLastEntry <- decode rawLastEntry + return XInfoStreamResponse{..} + decodeRedis6 a = Left a xinfoStream :: (RedisCtx m f) @@ -1197,3 +1250,184 @@ xtrim stream opts = sendRequest $ ["XTRIM", stream] ++ optArgs inf :: RealFloat a => a inf = 1 / 0 + +auth + :: RedisCtx m f + => ByteString -- ^ password + -> m (f Status) +auth password = sendRequest ["AUTH", password] + +-- the select command. used in 'connect'. +select + :: RedisCtx m f + => Integer -- ^ index + -> m (f Status) +select ix = sendRequest ["SELECT", encode ix] + +-- the ping command. used in 'checkedconnect'. +ping + :: (RedisCtx m f) + => m (f Status) +ping = sendRequest (["PING"] ) + +data ClusterNodesResponse = ClusterNodesResponse + { clusterNodesResponseEntries :: [ClusterNodesResponseEntry] + } deriving (Show, Eq) + +data ClusterNodesResponseEntry = ClusterNodesResponseEntry { clusterNodesResponseNodeId :: ByteString + , clusterNodesResponseNodeIp :: ByteString + , clusterNodesResponseNodePort :: Integer + , clusterNodesResponseNodeFlags :: [ByteString] + , clusterNodesResponseMasterId :: Maybe ByteString + , clusterNodesResponsePingSent :: Integer + , clusterNodesResponsePongReceived :: Integer + , clusterNodesResponseConfigEpoch :: Integer + , clusterNodesResponseLinkState :: ByteString + , clusterNodesResponseSlots :: [ClusterNodesResponseSlotSpec] + } deriving (Show, Eq) + +data ClusterNodesResponseSlotSpec + = ClusterNodesResponseSingleSlot Integer + | ClusterNodesResponseSlotRange Integer Integer + | ClusterNodesResponseSlotImporting Integer ByteString + | ClusterNodesResponseSlotMigrating Integer ByteString deriving (Show, Eq) + + +instance RedisResult ClusterNodesResponse where + decode r@(Bulk (Just bulkData)) = maybe (Left r) Right $ do + infos <- mapM parseNodeInfo $ Char8.lines bulkData + return $ ClusterNodesResponse infos where + parseNodeInfo :: ByteString -> Maybe ClusterNodesResponseEntry + parseNodeInfo line = case Char8.words line of + (nodeId : hostNamePort : flags : masterNodeId : pingSent : pongRecv : epoch : linkState : slots) -> + case Char8.split ':' hostNamePort of + [hostName, port] -> ClusterNodesResponseEntry <$> pure nodeId + <*> pure hostName + <*> readInteger port + <*> pure (Char8.split ',' flags) + <*> pure (readMasterNodeId masterNodeId) + <*> readInteger pingSent + <*> readInteger pongRecv + <*> readInteger epoch + <*> pure linkState + <*> (pure . catMaybes $ map readNodeSlot slots) + _ -> Nothing + _ -> Nothing + readInteger :: ByteString -> Maybe Integer + readInteger = fmap fst . Char8.readInteger + + readMasterNodeId :: ByteString -> Maybe ByteString + readMasterNodeId "-" = Nothing + readMasterNodeId nodeId = Just nodeId + + readNodeSlot :: ByteString -> Maybe ClusterNodesResponseSlotSpec + readNodeSlot slotSpec = case '[' `Char8.elem` slotSpec of + True -> readSlotImportMigrate slotSpec + False -> case '-' `Char8.elem` slotSpec of + True -> readSlotRange slotSpec + False -> ClusterNodesResponseSingleSlot <$> readInteger slotSpec + readSlotImportMigrate :: ByteString -> Maybe ClusterNodesResponseSlotSpec + readSlotImportMigrate slotSpec = case BS.breakSubstring "->-" slotSpec of + (_, "") -> case BS.breakSubstring "-<-" slotSpec of + (_, "") -> Nothing + (leftPart, rightPart) -> ClusterNodesResponseSlotImporting + <$> (readInteger $ Char8.drop 1 leftPart) + <*> (pure $ BS.take (BS.length rightPart - 1) rightPart) + (leftPart, rightPart) -> ClusterNodesResponseSlotMigrating + <$> (readInteger $ Char8.drop 1 leftPart) + <*> (pure $ BS.take (BS.length rightPart - 1) rightPart) + readSlotRange :: ByteString -> Maybe ClusterNodesResponseSlotSpec + readSlotRange slotSpec = case BS.breakSubstring "-" slotSpec of + (_, "") -> Nothing + (leftPart, rightPart) -> ClusterNodesResponseSlotRange + <$> readInteger leftPart + <*> (readInteger $ BS.drop 1 rightPart) + + decode r = Left r + +clusterNodes + :: (RedisCtx m f) + => m (f ClusterNodesResponse) +clusterNodes = sendRequest $ ["CLUSTER", "NODES"] + +data ClusterSlotsResponse = ClusterSlotsResponse { clusterSlotsResponseEntries :: [ClusterSlotsResponseEntry] } deriving (Show) + +data ClusterSlotsNode = ClusterSlotsNode + { clusterSlotsNodeIP :: ByteString + , clusterSlotsNodePort :: Int + , clusterSlotsNodeID :: ByteString + } deriving (Show) + +data ClusterSlotsResponseEntry = ClusterSlotsResponseEntry + { clusterSlotsResponseEntryStartSlot :: Int + , clusterSlotsResponseEntryEndSlot :: Int + , clusterSlotsResponseEntryMaster :: ClusterSlotsNode + , clusterSlotsResponseEntryReplicas :: [ClusterSlotsNode] + } deriving (Show) + +instance RedisResult ClusterSlotsResponse where + decode (MultiBulk (Just bulkData)) = do + clusterSlotsResponseEntries <- mapM decode bulkData + return ClusterSlotsResponse{..} + decode a = Left a + +instance RedisResult ClusterSlotsResponseEntry where + decode (MultiBulk (Just + ((Integer startSlot):(Integer endSlot):masterData:replicas))) = do + clusterSlotsResponseEntryMaster <- decode masterData + clusterSlotsResponseEntryReplicas <- mapM decode replicas + let clusterSlotsResponseEntryStartSlot = fromInteger startSlot + let clusterSlotsResponseEntryEndSlot = fromInteger endSlot + return ClusterSlotsResponseEntry{..} + decode a = Left a + +instance RedisResult ClusterSlotsNode where + decode (MultiBulk (Just ((Bulk (Just clusterSlotsNodeIP)):(Integer port):(Bulk (Just clusterSlotsNodeID)):_))) = Right ClusterSlotsNode{..} + where clusterSlotsNodePort = fromInteger port + decode a = Left a + + +clusterSlots + :: (RedisCtx m f) + => m (f ClusterSlotsResponse) +clusterSlots = sendRequest $ ["CLUSTER", "SLOTS"] + +clusterSetSlotImporting + :: (RedisCtx m f) + => Integer + -> ByteString + -> m (f Status) +clusterSetSlotImporting slot sourceNodeId = sendRequest $ ["CLUSTER", "SETSLOT", (encode slot), "IMPORTING", sourceNodeId] + +clusterSetSlotMigrating + :: (RedisCtx m f) + => Integer + -> ByteString + -> m (f Status) +clusterSetSlotMigrating slot destinationNodeId = sendRequest $ ["CLUSTER", "SETSLOT", (encode slot), "MIGRATING", destinationNodeId] + +clusterSetSlotStable + :: (RedisCtx m f) + => Integer + -> m (f Status) +clusterSetSlotStable slot = sendRequest $ ["CLUSTER", "SETSLOT", "STABLE", (encode slot)] + +clusterSetSlotNode + :: (RedisCtx m f) + => Integer + -> ByteString + -> m (f Status) +clusterSetSlotNode slot node = sendRequest ["CLUSTER", "SETSLOT", (encode slot), "NODE", node] + +clusterGetKeysInSlot + :: (RedisCtx m f) + => Integer + -> Integer + -> m (f [ByteString]) +clusterGetKeysInSlot slot count = sendRequest ["CLUSTER", "GETKEYSINSLOT", (encode slot), (encode count)] + +command :: (RedisCtx m f) => m (f [CMD.CommandInfo]) +command = sendRequest ["COMMAND"] + +readOnly :: (RedisCtx m f) => m (f Status) +readOnly = sendRequest ["READONLY"] \ No newline at end of file diff --git a/src/Database/Redis/ProtocolPipelining.hs b/src/Database/Redis/ProtocolPipelining.hs index 1e6388d3..8ba3cc87 100644 --- a/src/Database/Redis/ProtocolPipelining.hs +++ b/src/Database/Redis/ProtocolPipelining.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE DeriveDataTypeable #-} -{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} @@ -18,38 +16,23 @@ -- module Database.Redis.ProtocolPipelining ( Connection, - connect, enableTLS, beginReceiving, disconnect, request, send, recv, flush, - ConnectionLostException(..), - PortID(..) + connect, enableTLS, beginReceiving, disconnect, request, send, recv, flush, fromCtx, toCtx ) where import Prelude -import Control.Concurrent (threadDelay) -import Control.Concurrent.Async (race) -import Control.Concurrent.MVar -import Control.Exception import Control.Monad import qualified Scanner import qualified Data.ByteString as S -import qualified Data.ByteString.Lazy as L import Data.IORef -import Data.Typeable import qualified Network.Socket as NS import qualified Network.TLS as TLS -import System.IO -import System.IO.Error import System.IO.Unsafe import Database.Redis.Protocol - -data PortID = PortNumber NS.PortNumber - | UnixSocket String - deriving (Eq, Show) - -data ConnectionContext = NormalHandle Handle | TLSContext TLS.Context +import qualified Database.Redis.ConnectionContext as CC data Connection = Conn - { connCtx :: ConnectionContext -- ^ Connection socket-handle. + { connCtx :: CC.ConnectionContext -- ^ Connection socket-handle. , connReplies :: IORef [Reply] -- ^ Reply thunks for unsent requests. , connPending :: IORef [Reply] -- ^ Reply thunks for requests "in the pipeline". Refers to the same list as @@ -60,92 +43,25 @@ data Connection = Conn -- length connPending - pendingCount = length connReplies } -data ConnectionLostException = ConnectionLost - deriving (Show, Typeable) - -instance Exception ConnectionLostException -data ConnectPhase - = PhaseUnknown - | PhaseResolve - | PhaseOpenSocket - deriving (Show) +fromCtx :: CC.ConnectionContext -> IO Connection +fromCtx ctx = Conn ctx <$> newIORef [] <*> newIORef [] <*> newIORef 0 -data ConnectTimeout = ConnectTimeout ConnectPhase - deriving (Show, Typeable) +toCtx :: Connection -> CC.ConnectionContext +toCtx = connCtx -instance Exception ConnectTimeout - -getHostAddrInfo :: NS.HostName -> NS.PortNumber -> IO [NS.AddrInfo] -getHostAddrInfo hostname port = do - NS.getAddrInfo (Just hints) (Just hostname) (Just $ show port) - where - hints = NS.defaultHints - { NS.addrSocketType = NS.Stream } - -connectSocket :: [NS.AddrInfo] -> IO NS.Socket -connectSocket [] = error "connectSocket: unexpected empty list" -connectSocket (addr:rest) = tryConnect >>= \case - Right sock -> return sock - Left err -> if null rest - then throwIO err - else connectSocket rest - where - tryConnect :: IO (Either IOError NS.Socket) - tryConnect = bracketOnError createSock NS.close $ \sock -> do - try (NS.connect sock $ NS.addrAddress addr) >>= \case - Right () -> return (Right sock) - Left err -> NS.close sock >> return (Left err) - where - createSock = NS.socket (NS.addrFamily addr) - (NS.addrSocketType addr) - (NS.addrProtocol addr) - -connect :: NS.HostName -> PortID -> Maybe Int -> IO Connection -connect hostName portId timeoutOpt = - bracketOnError hConnect hClose $ \h -> do - hSetBinaryMode h True +connect :: NS.HostName -> CC.PortID -> Maybe Int -> IO Connection +connect hostName portId timeoutOpt = do + connCtx <- CC.connect hostName portId timeoutOpt connReplies <- newIORef [] connPending <- newIORef [] connPendingCnt <- newIORef 0 - let connCtx = NormalHandle h return Conn{..} - where - hConnect = do - phaseMVar <- newMVar PhaseUnknown - let doConnect = hConnect' phaseMVar - case timeoutOpt of - Nothing -> doConnect - Just micros -> do - result <- race doConnect (threadDelay micros) - case result of - Left h -> return h - Right () -> do - phase <- readMVar phaseMVar - errConnectTimeout phase - hConnect' mvar = bracketOnError createSock NS.close $ \sock -> do - NS.setSocketOption sock NS.KeepAlive 1 - void $ swapMVar mvar PhaseResolve - void $ swapMVar mvar PhaseOpenSocket - NS.socketToHandle sock ReadWriteMode - where - createSock = case portId of - PortNumber portNumber -> do - addrInfo <- getHostAddrInfo hostName portNumber - connectSocket addrInfo - UnixSocket addr -> bracketOnError - (NS.socket NS.AF_UNIX NS.Stream NS.defaultProtocol) - NS.close - (\sock -> NS.connect sock (NS.SockAddrUnix addr) >> return sock) enableTLS :: TLS.ClientParams -> Connection -> IO Connection enableTLS tlsParams conn@Conn{..} = do - case connCtx of - NormalHandle h -> do - ctx <- TLS.contextNew h tlsParams - TLS.handshake ctx - return $ conn { connCtx = TLSContext ctx } - TLSContext _ -> return conn + newCtx <- CC.enableTLS tlsParams connCtx + return conn{connCtx = newCtx} beginReceiving :: Connection -> IO () beginReceiving conn = do @@ -154,25 +70,13 @@ beginReceiving conn = do writeIORef (connPending conn) rs disconnect :: Connection -> IO () -disconnect Conn{..} = do - case connCtx of - NormalHandle h -> do - open <- hIsOpen h - when open $ hClose h - TLSContext ctx -> do - TLS.bye ctx - TLS.contextClose ctx +disconnect Conn{..} = CC.disconnect connCtx -- |Write the request to the socket output buffer, without actually sending. -- The 'Handle' is 'hFlush'ed when reading replies from the 'connCtx'. send :: Connection -> S.ByteString -> IO () send Conn{..} s = do - case connCtx of - NormalHandle h -> - ioErrorToConnLost $ S.hPut h s - - TLSContext ctx -> - ioErrorToConnLost $ TLS.sendData ctx (L.fromStrict s) + CC.send connCtx s -- Signal that we expect one more reply from Redis. n <- atomicModifyIORef' connPendingCnt $ \n -> let n' = n+1 in (n', n') @@ -195,10 +99,7 @@ recv Conn{..} = do -- for the multithreaded pub/sub code, the sending thread needs to explicitly flush the subscription -- change requests. flush :: Connection -> IO () -flush Conn{..} = - case connCtx of - NormalHandle h -> hFlush h - TLSContext ctx -> TLS.contextFlush ctx +flush Conn{..} = CC.flush connCtx -- |Send a request and receive the corresponding reply request :: Connection -> S.ByteString -> IO Reply @@ -226,7 +127,7 @@ connGetReplies conn@Conn{..} = go S.empty (SingleLine "previous of first") previous `seq` return () scanResult <- Scanner.scanWith readMore reply rest case scanResult of - Scanner.Fail{} -> errConnClosed + Scanner.Fail{} -> CC.errConnClosed Scanner.More{} -> error "Hedis: parseWith returned Partial" Scanner.Done rest' r -> do -- r is the same as 'head' of 'connPending'. Since we just @@ -239,17 +140,6 @@ connGetReplies conn@Conn{..} = go S.empty (SingleLine "previous of first") rs <- unsafeInterleaveIO (go rest' r) return (r:rs) - readMore = ioErrorToConnLost $ do + readMore = CC.ioErrorToConnLost $ do flush conn - case connCtx of - NormalHandle h -> S.hGetSome h 4096 - TLSContext ctx -> TLS.recvData ctx - -ioErrorToConnLost :: IO a -> IO a -ioErrorToConnLost a = a `catchIOError` const errConnClosed - -errConnClosed :: IO a -errConnClosed = throwIO ConnectionLost - -errConnectTimeout :: ConnectPhase -> IO a -errConnectTimeout phase = throwIO $ ConnectTimeout phase + CC.recv connCtx diff --git a/src/Database/Redis/PubSub.hs b/src/Database/Redis/PubSub.hs index cfd78c0d..71022b2c 100644 --- a/src/Database/Redis/PubSub.hs +++ b/src/Database/Redis/PubSub.hs @@ -38,6 +38,7 @@ import Data.Semigroup (Semigroup(..)) #endif import qualified Data.HashMap.Strict as HM import qualified Database.Redis.Core as Core +import qualified Database.Redis.Connection as Connection import qualified Database.Redis.ProtocolPipelining as PP import Database.Redis.Protocol (Reply(..), renderRequest) import Database.Redis.Types @@ -90,7 +91,7 @@ instance Semigroup (Cmd Subscribe a) where instance Monoid (Cmd Subscribe a) where mempty = DoNothing mappend = (<>) - + instance Semigroup (Cmd Unsubscribe a) where (<>) DoNothing x = x (<>) x DoNothing = x @@ -181,7 +182,7 @@ unsubscribe -> PubSub unsubscribe cs = mempty{ unsubs = Cmd cs } --- |Listen for messages published to channels matching the given patterns +-- |Listen for messages published to channels matching the given patterns -- (). psubscribe :: [ByteString] -- ^ pattern @@ -189,7 +190,7 @@ psubscribe psubscribe [] = mempty psubscribe ps = mempty{ psubs = Cmd ps } --- |Stop listening for messages posted to channels matching the given patterns +-- |Stop listening for messages posted to channels matching the given patterns -- (). punsubscribe :: [ByteString] -- ^ pattern @@ -199,11 +200,11 @@ punsubscribe ps = mempty{ punsubs = Cmd ps } -- |Listens to published messages on subscribed channels and channels matching -- the subscribed patterns. For documentation on the semantics of Redis -- Pub\/Sub see . --- --- The given callback function is called for each received message. +-- +-- The given callback function is called for each received message. -- Subscription changes are triggered by the returned 'PubSub'. To keep -- subscriptions unchanged, the callback can return 'mempty'. --- +-- -- Example: Subscribe to the \"news\" channel indefinitely. -- -- @ @@ -544,11 +545,13 @@ sendThread ctrl rawConn = forever $ do -- main = do -- conn <- connect defaultConnectInfo -- pubSubCtrl <- newPubSubController [("mychannel", myhandler)] [] --- forkIO $ forever $ +-- concurrently ( forever $ -- pubSubForever conn pubSubCtrl onInitialComplete -- \`catch\` (\\(e :: SomeException) -> do -- putStrLn $ "Got error: " ++ show e -- threadDelay $ 50*1000) -- TODO: use exponential backoff +-- ) $ restOfYourProgram +-- -- -- {- elsewhere in your program, use pubSubCtrl to change subscriptions -} -- @ @@ -560,13 +563,13 @@ sendThread ctrl rawConn = forever $ do -- and then create a Haskell thread bound to each capability each calling 'pubSubForever' in a loop. -- This will create one network connection per controller/capability and allow you to -- register separate channels and callbacks for each controller, spreading the load across the capabilities. -pubSubForever :: Core.Connection -- ^ The connection pool +pubSubForever :: Connection.Connection -- ^ The connection pool -> PubSubController -- ^ The controller which keeps track of all subscriptions and handlers -> IO () -- ^ This action is executed once Redis acknowledges that all the subscriptions in -- the controller are now subscribed. You can use this after an exception (such as -- 'ConnectionLost') to signal that all subscriptions are now reactivated. -> IO () -pubSubForever (Core.Conn pool) ctrl onInitialLoad = withResource pool $ \rawConn -> do +pubSubForever (Connection.NonClusteredConnection pool) ctrl onInitialLoad = withResource pool $ \rawConn -> do -- get initial subscriptions and write them into the queue. atomically $ do let loop = tryReadTBQueue (sendChanges ctrl) >>= @@ -597,6 +600,7 @@ pubSubForever (Core.Conn pool) ctrl onInitialLoad = withResource pool $ \rawConn (Right (Left err)) -> throwIO err (Left (Left err)) -> throwIO err _ -> return () -- should never happen, since threads exit only with an error +pubSubForever (Connection.ClusteredConnection _ _) _ _ = undefined ------------------------------------------------------------------------------ diff --git a/src/Database/Redis/Sentinel.hs b/src/Database/Redis/Sentinel.hs new file mode 100644 index 00000000..d3a4f0d8 --- /dev/null +++ b/src/Database/Redis/Sentinel.hs @@ -0,0 +1,221 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE StrictData #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE StandaloneDeriving #-} + +-- | "Database.Redis" like interface with connection through Redis Sentinel. +-- +-- More details here: . +-- +-- Example: +-- +-- @ +-- conn <- 'connect' 'SentinelConnectionInfo' (("localhost", PortNumber 26379) :| []) "mymaster" 'defaultConnectInfo' +-- +-- 'runRedis' conn $ do +-- 'set' "hello" "world" +-- @ +-- +-- When connection is opened, the Sentinels will be queried to get current master. Subsequent 'runRedis' +-- calls will talk to that master. +-- +-- If 'runRedis' call fails, the next call will choose a new master to talk to. +-- +-- This implementation is based on Gist by Emanuel Borsboom +-- at +module Database.Redis.Sentinel + ( + -- * Connection + SentinelConnectInfo(..) + , SentinelConnection + , connect + -- * runRedis with Sentinel support + , runRedis + , RedisSentinelException(..) + + -- * Re-export Database.Redis + , module Database.Redis + ) where + +import Control.Concurrent +import Control.Exception (Exception, IOException, evaluate, throwIO) +import Control.Monad +import Control.Monad.Catch (Handler (..), MonadCatch, catches, throwM) +import Control.Monad.Except +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS +import qualified Data.ByteString.Char8 as BS8 +import Data.Foldable (toList) +import Data.List (delete) +import Data.List.NonEmpty (NonEmpty (..)) +import Data.Typeable (Typeable) +import Data.Unique +import Network.Socket (HostName) + +import Database.Redis hiding (Connection, connect, runRedis) +import qualified Database.Redis as Redis + +-- | Interact with a Redis datastore. See 'Database.Redis.runRedis' for details. +runRedis :: SentinelConnection + -> Redis (Either Reply a) + -> IO (Either Reply a) +runRedis (SentinelConnection connMVar) action = do + (baseConn, preToken) <- modifyMVar connMVar $ \oldConnection@SentinelConnection' + { rcCheckFailover + , rcToken = oldToken + , rcSentinelConnectInfo = oldConnectInfo + , rcMasterConnectInfo = oldMasterConnectInfo + , rcBaseConnection = oldBaseConnection } -> + if rcCheckFailover + then do + (newConnectInfo, newMasterConnectInfo) <- updateMaster oldConnectInfo + newToken <- newUnique + (connInfo, conn) <- + if sameHost newMasterConnectInfo oldMasterConnectInfo + then return (oldMasterConnectInfo, oldBaseConnection) + else do + newConn <- Redis.connect newMasterConnectInfo + return (newMasterConnectInfo, newConn) + + return + ( SentinelConnection' + { rcCheckFailover = False + , rcToken = newToken + , rcSentinelConnectInfo = newConnectInfo + , rcMasterConnectInfo = connInfo + , rcBaseConnection = conn + } + , (conn, newToken) + ) + else return (oldConnection, (oldBaseConnection, oldToken)) + + -- Use evaluate to make sure we catch exceptions from 'runRedis'. + reply <- (Redis.runRedis baseConn action >>= evaluate) + `catchRedisRethrow` (\_ -> setCheckSentinel preToken) + case reply of + Left (Error e) | "READONLY " `BS.isPrefixOf` e -> + -- This means our connection has turned into a slave + setCheckSentinel preToken + _ -> return () + return reply + + where + sameHost :: Redis.ConnectInfo -> Redis.ConnectInfo -> Bool + sameHost l r = connectHost l == connectHost r && connectPort l == connectPort r + + setCheckSentinel preToken = modifyMVar_ connMVar $ \conn@SentinelConnection'{rcToken} -> + if preToken == rcToken + then do + newToken <- newUnique + return (conn{rcToken = newToken, rcCheckFailover = True}) + else return conn + + +connect :: SentinelConnectInfo -> IO SentinelConnection +connect origConnectInfo = do + (connectInfo, masterConnectInfo) <- updateMaster origConnectInfo + conn <- Redis.connect masterConnectInfo + token <- newUnique + + SentinelConnection <$> newMVar SentinelConnection' + { rcCheckFailover = False + , rcToken = token + , rcSentinelConnectInfo = connectInfo + , rcMasterConnectInfo = masterConnectInfo + , rcBaseConnection = conn + } + +updateMaster :: SentinelConnectInfo + -> IO (SentinelConnectInfo, Redis.ConnectInfo) +updateMaster sci@SentinelConnectInfo{..} = do + -- This is using the Either monad "backwards" -- Left means stop because we've made a connection, + -- Right means try again. + resultEither <- runExceptT $ forM_ connectSentinels $ \(host, port) -> do + trySentinel host port `catchRedis` (\_ -> return ()) + + + case resultEither of + Left (conn, sentinelPair) -> return + ( sci + { connectSentinels = sentinelPair :| delete sentinelPair (toList connectSentinels) + } + , conn + ) + Right () -> throwIO $ NoSentinels connectSentinels + where + trySentinel :: HostName -> PortID -> ExceptT (Redis.ConnectInfo, (HostName, PortID)) IO () + trySentinel sentinelHost sentinelPort = do + -- bang to ensure exceptions from runRedis get thrown immediately. + !replyE <- liftIO $ do + !sentinelConn <- Redis.connect $ Redis.defaultConnectInfo + { connectHost = sentinelHost + , connectPort = sentinelPort + , connectMaxConnections = 1 + } + Redis.runRedis sentinelConn $ sendRequest + ["SENTINEL", "get-master-addr-by-name", connectMasterName] + + case replyE of + Right [host, port] -> + throwError + ( connectBaseInfo + { connectHost = BS8.unpack host + , connectPort = + maybe + (PortNumber 26379) + (PortNumber . fromIntegral . fst) + $ BS8.readInt port + } + , (sentinelHost, sentinelPort) + ) + _ -> return () + +catchRedisRethrow :: MonadCatch m => m a -> (String -> m ()) -> m a +catchRedisRethrow action handler = + action `catches` + [ Handler $ \ex -> handler (show @IOException ex) >> throwM ex + , Handler $ \ex -> handler (show @ConnectionLostException ex) >> throwM ex + ] + +catchRedis :: MonadCatch m => m a -> (String -> m a) -> m a +catchRedis action handler = + action `catches` + [ Handler $ \ex -> handler (show @IOException ex) + , Handler $ \ex -> handler (show @ConnectionLostException ex) + ] + +newtype SentinelConnection = SentinelConnection (MVar SentinelConnection') + +data SentinelConnection' + = SentinelConnection' + { rcCheckFailover :: Bool + , rcToken :: Unique + , rcSentinelConnectInfo :: SentinelConnectInfo + , rcMasterConnectInfo :: Redis.ConnectInfo + , rcBaseConnection :: Redis.Connection + } + +-- | Configuration of Sentinel hosts. +data SentinelConnectInfo + = SentinelConnectInfo + { connectSentinels :: NonEmpty (HostName, PortID) + -- ^ List of sentinels. + , connectMasterName :: ByteString + -- ^ Name of master to connect to. + , connectBaseInfo :: Redis.ConnectInfo + -- ^ This is used to configure auth and other parameters for Redis connection, + -- but 'Redis.connectHost' and 'Redis.connectPort' are ignored. + } + deriving (Show) + +-- | Exception thrown by "Database.Redis.Sentinel". +data RedisSentinelException + = NoSentinels (NonEmpty (HostName, PortID)) + -- ^ Thrown if no sentinel can be reached. + deriving (Show, Typeable) + +deriving instance Exception RedisSentinelException diff --git a/src/Database/Redis/Transactions.hs b/src/Database/Redis/Transactions.hs index 56b7fee0..86d750f7 100644 --- a/src/Database/Redis/Transactions.hs +++ b/src/Database/Redis/Transactions.hs @@ -41,7 +41,7 @@ instance RedisCtx RedisTx Queued where -- future index in EXEC result list i <- get put (i+1) - return $ Queued (decode . (!i)) + return $ Queued (decode . (! i)) -- |A 'Queued' value represents the result of a command inside a transaction. It -- is a proxy object for the /actual/ result, which will only be available diff --git a/src/Database/Redis/URL.hs b/src/Database/Redis/URL.hs index e4ad80cd..ce64654a 100644 --- a/src/Database/Redis/URL.hs +++ b/src/Database/Redis/URL.hs @@ -11,8 +11,8 @@ import Control.Monad (guard) #if __GLASGOW_HASKELL__ < 808 import Data.Monoid ((<>)) #endif -import Database.Redis.Core (ConnectInfo(..), defaultConnectInfo) -import Database.Redis.ProtocolPipelining +import Database.Redis.Connection (ConnectInfo(..), defaultConnectInfo) +import qualified Database.Redis.ConnectionContext as CC import Network.HTTP.Base import Network.URI (parseURI, uriPath, uriScheme) import Text.Read (readMaybe) @@ -24,7 +24,7 @@ import qualified Data.ByteString.Char8 as C8 -- Username is ignored, path is used to specify the database: -- -- >>> parseConnectInfo "redis://username:password@host:42/2" --- Right (ConnInfo {connectHost = "host", connectPort = PortNumber 42, connectAuth = Just "password", connectDatabase = 2, connectMaxConnections = 50, connectMaxIdleTime = 30s, connectTimeout = Nothing, connectTLSParams = Nothing}) +-- Right (ConnInfo {connectHost = "host", connectPort = PortNumber 42, connectAuth = Just "password", connectReadOnly = False, connectDatabase = 2, connectMaxConnections = 50, connectMaxIdleTime = 30s, connectTimeout = Nothing, connectTLSParams = Nothing}) -- -- >>> parseConnectInfo "redis://username:password@host:42/db" -- Left "Invalid port: db" @@ -38,7 +38,7 @@ import qualified Data.ByteString.Char8 as C8 -- @'defaultConnectInfo'@: -- -- >>> parseConnectInfo "redis://" --- Right (ConnInfo {connectHost = "localhost", connectPort = PortNumber 6379, connectAuth = Nothing, connectDatabase = 0, connectMaxConnections = 50, connectMaxIdleTime = 30s, connectTimeout = Nothing, connectTLSParams = Nothing}) +-- Right (ConnInfo {connectHost = "localhost", connectPort = PortNumber 6379, connectAuth = Nothing, connectReadOnly = False, connectDatabase = 0, connectMaxConnections = 50, connectMaxIdleTime = 30s, connectTimeout = Nothing, connectTLSParams = Nothing}) -- parseConnectInfo :: String -> Either String ConnectInfo parseConnectInfo url = do @@ -59,7 +59,7 @@ parseConnectInfo url = do { connectHost = if null h then connectHost defaultConnectInfo else h - , connectPort = maybe (connectPort defaultConnectInfo) (PortNumber . fromIntegral) (port uriAuth) + , connectPort = maybe (connectPort defaultConnectInfo) (CC.PortNumber . fromIntegral) (port uriAuth) , connectAuth = C8.pack <$> password uriAuth , connectDatabase = db } diff --git a/stack-8.0.yaml b/stack-8.0.yaml deleted file mode 100644 index 3edb1eb8..00000000 --- a/stack-8.0.yaml +++ /dev/null @@ -1,10 +0,0 @@ -resolver: nightly-2016-05-31 -packages: -- '.' -extra-deps: -- slave-thread-1.0.1 -- partial-handler-1.0.1 -flags: - hedis: - dev: true -extra-package-dbs: [] diff --git a/stack-7.10.yaml b/stack-8.10.yaml similarity index 71% rename from stack-7.10.yaml rename to stack-8.10.yaml index 53b55670..169abd72 100644 --- a/stack-7.10.yaml +++ b/stack-8.10.yaml @@ -1,8 +1,7 @@ -resolver: lts-5.1 +resolver: lts-18.23 packages: - '.' extra-deps: -- scanner-0.2 flags: hedis: dev: true diff --git a/stack.yaml b/stack.yaml deleted file mode 100644 index 2b9f3345..00000000 --- a/stack.yaml +++ /dev/null @@ -1,8 +0,0 @@ -resolver: lts-14.12 -packages: -- '.' -extra-deps: -flags: - hedis: - dev: true -extra-package-dbs: [] diff --git a/stack.yaml b/stack.yaml new file mode 120000 index 00000000..1b471944 --- /dev/null +++ b/stack.yaml @@ -0,0 +1 @@ +stack-8.10.yaml \ No newline at end of file diff --git a/stack.yaml~HEAD b/stack.yaml~HEAD new file mode 100644 index 00000000..545605b7 --- /dev/null +++ b/stack.yaml~HEAD @@ -0,0 +1,7 @@ +resolver: lts-15.15 +packages: + - "." +flags: + hedis: + dev: true +extra-package-dbs: [] diff --git a/test/ClusterMain.hs b/test/ClusterMain.hs new file mode 100644 index 00000000..23820651 --- /dev/null +++ b/test/ClusterMain.hs @@ -0,0 +1,50 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} + +module Main (main) where + +import qualified Test.Framework as Test +import Database.Redis +import Tests + +main :: IO () +main = do + -- We're looking for the cluster on a non-default port to support running + -- this test in parallel witht the regular non-cluster tests. To quickly + -- spin up a cluster on this port using docker you can run: + -- + -- docker run -e "IP=0.0.0.0" -p 30001-30005:30001-30005 grokzen/redis-cluster:5.0.6 + conn <- connectCluster defaultConnectInfo { connectPort = PortNumber 30001 } + Test.defaultMain (tests conn) + +tests :: Connection -> [Test.Test] +tests conn = map ($conn) $ concat + [ testsMisc, testsKeys, testsStrings, [testHashes], testsLists, testsSets, [testHyperLogLog] + , testsZSets, [testTransaction], [testScripting] + , testsConnection, testsServer, [testSScan, testHScan, testZScan], [testZrangelex] + , [testXAddRead, testXReadGroup, testXRange, testXpending, testXClaim, testXInfo, testXDel, testXTrim] + -- should always be run last as connection gets closed after it + , [testQuit] + ] + +testsServer :: [Test] +testsServer = + [testBgrewriteaof, testFlushall, testSlowlog, testDebugObject] + +testsConnection :: [Test] +testsConnection = [ testConnectAuthUnexpected, testEcho, testPing + ] + +testsKeys :: [Test] +testsKeys = [ testKeys, testExpireAt, testSortCluster, testGetType, testObject ] + +testSortCluster :: Test +testSortCluster = testCase "sort" $ do + lpush "{same}ids" ["1","2","3"] >>=? 3 + sort "{same}ids" defaultSortOpts >>=? ["1","2","3"] + sortStore "{same}ids" "{same}anotherKey" defaultSortOpts >>=? 3 + let opts = defaultSortOpts { sortOrder = Desc, sortAlpha = True + , sortLimit = (1,2) + , sortBy = Nothing + , sortGet = [] } + sort "{same}ids" opts >>=? ["2", "1"] diff --git a/test/Main.hs b/test/Main.hs new file mode 100644 index 00000000..06151067 --- /dev/null +++ b/test/Main.hs @@ -0,0 +1,34 @@ +module Main (main) where + +import qualified Test.Framework as Test +import Database.Redis +import Tests +import PubSubTest + +main :: IO () +main = do + conn <- connect defaultConnectInfo + Test.defaultMain (tests conn) + +tests :: Connection -> [Test.Test] +tests conn = map ($conn) $ concat + [ testsMisc, testsKeys, testsStrings, [testHashes], testsLists, testsSets, [testHyperLogLog] + , testsZSets, [testPubSub], [testTransaction], [testScripting] + , testsConnection, testsServer, [testScans, testSScan, testHScan, testZScan], [testZrangelex] + , [testXAddRead, testXReadGroup, testXRange, testXpending, testXClaim, testXInfo, testXDel, testXTrim] + , testPubSubThreaded + -- should always be run last as connection gets closed after it + , [testQuit] + ] + +testsServer :: [Test] +testsServer = + [testServer, testBgrewriteaof, testFlushall, testInfo, testConfig + ,testSlowlog, testDebugObject] + +testsConnection :: [Test] +testsConnection = [ testConnectAuth, testConnectAuthUnexpected, testConnectDb + , testConnectDbUnexisting, testEcho, testPing, testSelect ] + +testsKeys :: [Test] +testsKeys = [ testKeys, testKeysNoncluster, testExpireAt, testSort, testGetType, testObject ] diff --git a/test/PubSubTest.hs b/test/PubSubTest.hs index eb147044..05596c98 100644 --- a/test/PubSubTest.hs +++ b/test/PubSubTest.hs @@ -70,35 +70,34 @@ removeAllTest conn = Test.testCase "Multithreaded Pub/Sub - basic" $ do ctrl <- newPubSubController [("foo1", handler "InitialFoo1" msgVar), ("foo2", handler "InitialFoo2" msgVar)] [("bar1:*", phandler "InitialBar1" msgVar), ("bar2:*", phandler "InitialBar2" msgVar)] withAsync (pubSubForever conn ctrl (atomically $ writeTVar initialComplete True)) $ \_ -> do + -- wait for initial + atomically $ readTVar initialComplete >>= \b -> if b then return () else retry + expectRedisChannels conn ["foo1", "foo2"] - -- wait for initial - atomically $ readTVar initialComplete >>= \b -> if b then return () else retry - expectRedisChannels conn ["foo1", "foo2"] - - runRedis conn $ publish "foo1" "Hello" - waitForMessage msgVar "InitialFoo1" "Hello" + runRedis conn $ publish "foo1" "Hello" + waitForMessage msgVar "InitialFoo1" "Hello" - runRedis conn $ publish "bar2:zzz" "World" - waitForPMessage msgVar "InitialBar2" "bar2:zzz" "World" + runRedis conn $ publish "bar2:zzz" "World" + waitForPMessage msgVar "InitialBar2" "bar2:zzz" "World" - -- subscribe to foo1 and bar1 again - addChannelsAndWait ctrl [("foo1", handler "NewFoo1" msgVar)] [("bar1:*", phandler "NewBar1" msgVar)] - expectRedisChannels conn ["foo1", "foo2"] + -- subscribe to foo1 and bar1 again + addChannelsAndWait ctrl [("foo1", handler "NewFoo1" msgVar)] [("bar1:*", phandler "NewBar1" msgVar)] + expectRedisChannels conn ["foo1", "foo2"] - runRedis conn $ publish "foo1" "abcdef" - waitForMessage msgVar "InitialFoo1" "abcdef" - waitForMessage msgVar "NewFoo1" "abcdef" + runRedis conn $ publish "foo1" "abcdef" + waitForMessage msgVar "InitialFoo1" "abcdef" + waitForMessage msgVar "NewFoo1" "abcdef" - -- unsubscribe from foo1 and bar1 - removeChannelsAndWait ctrl ["foo1", "unusued"] ["bar1:*", "unused:*"] - expectRedisChannels conn ["foo2"] + -- unsubscribe from foo1 and bar1 + removeChannelsAndWait ctrl ["foo1", "unusued"] ["bar1:*", "unused:*"] + expectRedisChannels conn ["foo2"] - -- foo2 and bar2 are still subscribed - runRedis conn $ publish "foo2" "12345" - waitForMessage msgVar "InitialFoo2" "12345" + -- foo2 and bar2 are still subscribed + runRedis conn $ publish "foo2" "12345" + waitForMessage msgVar "InitialFoo2" "12345" - runRedis conn $ publish "bar2:aaa" "0987" - waitForPMessage msgVar "InitialBar2" "bar2:aaa" "0987" + runRedis conn $ publish "bar2:aaa" "0987" + waitForPMessage msgVar "InitialBar2" "bar2:aaa" "0987" data TestError = TestError ByteString deriving (Eq, Show, Typeable) @@ -127,48 +126,48 @@ removeFromUnregister conn = Test.testCase "Multithreaded Pub/Sub - unregister ha initialComplete <- newTVarIO False ctrl <- newPubSubController [] [] withAsync (pubSubForever conn ctrl (atomically $ writeTVar initialComplete True)) $ \_ -> do - atomically $ readTVar initialComplete >>= \b -> if b then return () else retry + atomically $ readTVar initialComplete >>= \b -> if b then return () else retry - -- register to some channels - void $ addChannelsAndWait ctrl - [("abc", handler "InitialAbc" msgVar), ("xyz", handler "InitialXyz" msgVar)] - [("def:*", phandler "InitialDef" msgVar), ("uvw", phandler "InitialUvw" msgVar)] - expectRedisChannels conn ["abc", "xyz"] + -- register to some channels + void $ addChannelsAndWait ctrl + [("abc", handler "InitialAbc" msgVar), ("xyz", handler "InitialXyz" msgVar)] + [("def:*", phandler "InitialDef" msgVar), ("uvw", phandler "InitialUvw" msgVar)] + expectRedisChannels conn ["abc", "xyz"] - runRedis conn $ publish "abc" "Hello" - waitForMessage msgVar "InitialAbc" "Hello" + runRedis conn $ publish "abc" "Hello" + waitForMessage msgVar "InitialAbc" "Hello" - -- register to some more channels - unreg <- addChannelsAndWait ctrl - [("abc", handler "SecondAbc" msgVar), ("123", handler "Second123" msgVar)] - [("def:*", phandler "SecondDef" msgVar), ("890:*", phandler "Second890" msgVar)] - expectRedisChannels conn ["abc", "xyz", "123"] + -- register to some more channels + unreg <- addChannelsAndWait ctrl + [("abc", handler "SecondAbc" msgVar), ("123", handler "Second123" msgVar)] + [("def:*", phandler "SecondDef" msgVar), ("890:*", phandler "Second890" msgVar)] + expectRedisChannels conn ["abc", "xyz", "123"] - -- check messages on all channels - runRedis conn $ publish "abc" "World" - waitForMessage msgVar "InitialAbc" "World" - waitForMessage msgVar "SecondAbc" "World" + -- check messages on all channels + runRedis conn $ publish "abc" "World" + waitForMessage msgVar "InitialAbc" "World" + waitForMessage msgVar "SecondAbc" "World" - runRedis conn $ publish "123" "World2" - waitForMessage msgVar "Second123" "World2" + runRedis conn $ publish "123" "World2" + waitForMessage msgVar "Second123" "World2" - runRedis conn $ publish "def:bbbb" "World3" - waitForPMessage msgVar "InitialDef" "def:bbbb" "World3" - waitForPMessage msgVar "SecondDef" "def:bbbb" "World3" + runRedis conn $ publish "def:bbbb" "World3" + waitForPMessage msgVar "InitialDef" "def:bbbb" "World3" + waitForPMessage msgVar "SecondDef" "def:bbbb" "World3" - runRedis conn $ publish "890:tttt" "World4" - waitForPMessage msgVar "Second890" "890:tttt" "World4" + runRedis conn $ publish "890:tttt" "World4" + waitForPMessage msgVar "Second890" "890:tttt" "World4" - -- unregister - unreg + -- unregister + unreg - -- we have no way of waiting until unregister actually happened, so just delay and hope - threadDelay $ 1000*1000 -- 1 second - expectRedisChannels conn ["abc", "xyz"] + -- we have no way of waiting until unregister actually happened, so just delay and hope + threadDelay $ 1000*1000 -- 1 second + expectRedisChannels conn ["abc", "xyz"] - -- now only initial should be around. In particular, abc should still be subscribed - runRedis conn $ publish "abc" "World5" - waitForMessage msgVar "InitialAbc" "World5" + -- now only initial should be around. In particular, abc should still be subscribed + runRedis conn $ publish "abc" "World5" + waitForMessage msgVar "InitialAbc" "World5" - runRedis conn $ publish "def:cccc" "World6" - waitForPMessage msgVar "InitialDef" "def:cccc" "World6" + runRedis conn $ publish "def:cccc" "World6" + waitForPMessage msgVar "InitialDef" "def:cccc" "World6" diff --git a/test/Test.hs b/test/Tests.hs similarity index 81% rename from test/Test.hs rename to test/Tests.hs index 3287e560..b7223be1 100644 --- a/test/Test.hs +++ b/test/Tests.hs @@ -1,5 +1,5 @@ {-# LANGUAGE CPP, OverloadedStrings, RecordWildCards, LambdaCase #-} -module Main (main) where +module Tests where #if __GLASGOW_HASKELL__ < 710 import Control.Applicative @@ -13,21 +13,15 @@ import Control.Monad.Trans import qualified Data.List as L import Data.Time import Data.Time.Clock.POSIX -import qualified Test.Framework as Test (Test, defaultMain) +import qualified Test.Framework as Test (Test) import qualified Test.Framework.Providers.HUnit as Test (testCase) import qualified Test.HUnit as HUnit import Database.Redis -import PubSubTest ------------------------------------------------------------------------------ --- Main and helpers +-- helpers -- -main :: IO () -main = do - conn <- connect defaultConnectInfo - Test.defaultMain (tests conn) - type Test = Connection -> Test.Test testCase :: String -> Redis () -> Test @@ -51,20 +45,6 @@ redis >>=? expected = do assert :: Bool -> Redis () assert = liftIO . HUnit.assert ------------------------------------------------------------------------------- --- Tests --- -tests :: Connection -> [Test.Test] -tests conn = map ($conn) $ concat - [ testsMisc, testsKeys, testsStrings, [testHashes], testsLists, testsSets, [testHyperLogLog] - , testsZSets, [testPubSub], [testTransaction], [testScripting] - , testsConnection, testsServer, [testScans], [testZrangelex] - , [testXAddRead, testXReadGroup, testXRange, testXpending, testXClaim, testXInfo, testXDel, testXTrim] - , testPubSubThreaded - -- should always be run last as connection gets closed after it - , [testQuit] - ] - ------------------------------------------------------------------------------ -- Miscellaneous -- @@ -78,7 +58,7 @@ testConstantSpacePipelining :: Test testConstantSpacePipelining = testCase "constant-space pipelining" $ do -- This testcase should not exceed the maximum heap size, as set in -- the run-test.sh script. - replicateM_ 100000 ping + replicateM_ 100000 (set "key" "val") -- If the program didn't crash, pipelining takes constant memory. assert True @@ -95,13 +75,15 @@ testForceErrorReply = testCase "force error reply" $ do testPipelining :: Test testPipelining = testCase "pipelining" $ do - let n = 100 + let n = 200 tPipe <- deltaT $ do - pongs <- replicateM n ping - assert $ pongs == replicate n (Right Pong) + oks <- replicateM n (set "pipelinekey" "pipelineval") + assert $ oks == replicate n (Right Ok) - tNoPipe <- deltaT $ replicateM_ n (ping >>=? Pong) + tNoPipe <- deltaT $ replicateM_ n ((set "pipelinekey" "pipelineval") >>=? Ok) -- pipelining should at least be twice as fast. + liftIO $ putStrLn $ "tPipe: " ++ show tPipe + liftIO $ putStrLn $ "tNoPipe: " ++ show tNoPipe assert $ tNoPipe / tPipe > 2 where deltaT redis = do @@ -113,49 +95,51 @@ testEvalReplies :: Test testEvalReplies conn = testCase "eval unused replies" go conn where go = do - _ignored <- set "key" "value" - (liftIO $ do + _ <- liftIO $ runRedis conn $ set "key" "value" + result <- liftIO $ do threadDelay $ 10 ^ (5 :: Int) mvar <- newEmptyMVar _ <- (Async.wait =<< Async.async (runRedis conn (get "key"))) >>= putMVar mvar - takeMVar mvar) >>=? - Just "value" + takeMVar mvar + pure result >>=? Just "value" ------------------------------------------------------------------------------ -- Keys -- -testsKeys :: [Test] -testsKeys = [ testKeys, testExpireAt, testSort, testGetType, testObject ] - testKeys :: Test testKeys = testCase "keys" $ do - set "key" "value" >>=? Ok - get "key" >>=? Just "value" - exists "key" >>=? True - keys "*" >>=? ["key"] - randomkey >>=? Just "key" - move "key" 13 >>=? True - select 13 >>=? Ok - expire "key" 1 >>=? True - pexpire "key" 1000 >>=? True - ttl "key" >>= \case + set "{same}key" "value" >>=? Ok + get "{same}key" >>=? Just "value" + exists "{same}key" >>=? True + expire "{same}key" 1 >>=? True + pexpire "{same}key" 1000 >>=? True + ttl "{same}key" >>= \case Left _ -> error "error" Right t -> do assert $ t `elem` [0..1] - pttl "key" >>= \case + pttl "{same}key" >>= \case Left _ -> error "error" Right pt -> do assert $ pt `elem` [990..1000] - persist "key" >>=? True - dump "key" >>= \case + persist "{same}key" >>=? True + dump "{same}key" >>= \case Left _ -> error "impossible" Right s -> do - restore "key'" 0 s >>=? Ok - rename "key" "key'" >>=? Ok - renamenx "key'" "key" >>=? True - del ["key"] >>=? 1 - select 0 >>=? Ok + restore "{same}key'" 0 s >>=? Ok + rename "{same}key" "{same}key'" >>=? Ok + renamenx "{same}key'" "{same}key" >>=? True + del ["{same}key"] >>=? 1 + +testKeysNoncluster :: Test +testKeysNoncluster = testCase "keysNoncluster" $ do + set "key" "value" >>=? Ok + keys "*" >>=? ["key"] + randomkey >>=? Just "key" + move "key" 13 >>=? True + select 13 >>=? Ok + get "key" >>=? Just "value" + select 0 >>=? Ok testExpireAt :: Test testExpireAt = testCase "expireat" $ do @@ -196,7 +180,7 @@ testGetType = testCase "getType" $ do del ["key"] >>=? 1 where ts = [ (set "key" "value" >>=? Ok, String) - , (hset "key" "field" "value" >>=? True, Hash) + , (hset "key" "field" "value" >>=? 1, Hash) , (lpush "key" ["value"] >>=? 1, List) , (sadd "key" ["member"] >>=? 1, Set) , (zadd "key" [(42,"member"),(12.3,"value")] >>=? 2, ZSet) @@ -219,43 +203,45 @@ testsStrings = [testStrings, testBitops] testStrings :: Test testStrings = testCase "strings" $ do - setnx "key" "value" >>=? True - getset "key" "hello" >>=? Just "value" - append "key" "world" >>=? 10 - strlen "key" >>=? 10 - setrange "key" 0 "hello" >>=? 10 - getrange "key" 0 4 >>=? "hello" - mset [("k1","v1"), ("k2","v2")] >>=? Ok - msetnx [("k1","v1"), ("k2","v2")] >>=? False - mget ["key"] >>=? [Just "helloworld"] - setex "key" 1 "42" >>=? Ok - psetex "key" 1000 "42" >>=? Ok - decr "key" >>=? 41 - decrby "key" 1 >>=? 40 - incr "key" >>=? 41 - incrby "key" 1 >>=? 42 - incrbyfloat "key" 1 >>=? 43 - del ["key"] >>=? 1 - setbit "key" 42 "1" >>=? 0 - getbit "key" 42 >>=? 1 - bitcount "key" >>=? 1 - bitcountRange "key" 0 (-1) >>=? 1 + setnx "key" "value" >>=? True + getset "key" "hello" >>=? Just "value" + append "key" "world" >>=? 10 + strlen "key" >>=? 10 + setrange "key" 0 "hello" >>=? 10 + getrange "key" 0 4 >>=? "hello" + mset [("{same}k1","v1"), ("{same}k2","v2")] >>=? Ok + msetnx [("{same}k1","v1"), ("{same}k2","v2")] >>=? False + mget ["key"] >>=? [Just "helloworld"] + setex "key" 1 "42" >>=? Ok + psetex "key" 1000 "42" >>=? Ok + decr "key" >>=? 41 + decrby "key" 1 >>=? 40 + incr "key" >>=? 41 + incrby "key" 1 >>=? 42 + incrbyfloat "key" 1 >>=? 43 + del ["key"] >>=? 1 + setbit "key" 42 "1" >>=? 0 + getbit "key" 42 >>=? 1 + bitcount "key" >>=? 1 + bitcountRange "key" 0 (-1) >>=? 1 testBitops :: Test testBitops = testCase "bitops" $ do - set "k1" "a" >>=? Ok - set "k2" "b" >>=? Ok - bitopAnd "k3" ["k1", "k2"] >>=? 1 - bitopOr "k3" ["k1", "k2"] >>=? 1 - bitopXor "k3" ["k1", "k2"] >>=? 1 - bitopNot "k3" "k1" >>=? 1 + set "{same}k1" "a" >>=? Ok + set "{same}k2" "b" >>=? Ok + bitopAnd "{same}k3" ["{same}k1", "{same}k2"] >>=? 1 + bitopOr "{same}k3" ["{same}k1", "{same}k2"] >>=? 1 + bitopXor "{same}k3" ["{same}k1", "{same}k2"] >>=? 1 + bitopNot "{same}k3" "{same}k1" >>=? 1 ------------------------------------------------------------------------------ -- Hashes -- testHashes :: Test testHashes = testCase "hashes" $ do - hset "key" "field" "value" >>=? True + hset "key" "field" "another" >>=? 1 + hset "key" "field" "another" >>=? 0 + hset "key" "field" "value" >>=? 0 hsetnx "key" "field" "value" >>=? False hexists "key" "field" >>=? True hlen "key" >>=? 1 @@ -296,12 +282,12 @@ testLists = testCase "lists" $ do testBpop :: Test testBpop = testCase "blocking push/pop" $ do - lpush "key" ["v3","v2","v1"] >>=? 3 - blpop ["key"] 1 >>=? Just ("key","v1") - brpop ["key"] 1 >>=? Just ("key","v3") - rpush "k1" ["v1","v2"] >>=? 2 - brpoplpush "k1" "k2" 1 >>=? Just "v2" - rpoplpush "k1" "k2" >>=? Just "v1" + lpush "{same}key" ["v3","v2","v1"] >>=? 3 + blpop ["{same}key"] 1 >>=? Just ("{same}key","v1") + brpop ["{same}key"] 1 >>=? Just ("{same}key","v3") + rpush "{same}k1" ["v1","v2"] >>=? 2 + brpoplpush "{same}k1" "{same}k2" 1 >>=? Just "v2" + rpoplpush "{same}k1" "{same}k2" >>=? Just "v1" ------------------------------------------------------------------------------ -- Sets @@ -318,7 +304,7 @@ testSets = testCase "sets" $ do srandmember "set" >>=? Just "member" spop "set" >>=? Just "member" srem "set" ["member"] >>=? 0 - smove "set" "set'" "member" >>=? False + smove "{same}set" "{same}set'" "member" >>=? False _ <- sadd "set" ["member1", "member2"] (fmap L.sort <$> spopN "set" 2) >>=? ["member1", "member2"] _ <- sadd "set" ["member1", "member2"] @@ -326,13 +312,13 @@ testSets = testCase "sets" $ do testSetAlgebra :: Test testSetAlgebra = testCase "set algebra" $ do - sadd "s1" ["member"] >>=? 1 - sdiff ["s1", "s2"] >>=? ["member"] - sunion ["s1", "s2"] >>=? ["member"] - sinter ["s1", "s2"] >>=? [] - sdiffstore "s3" ["s1", "s2"] >>=? 1 - sunionstore "s3" ["s1", "s2"] >>=? 1 - sinterstore "s3" ["s1", "s2"] >>=? 0 + sadd "{same}s1" ["member"] >>=? 1 + sdiff ["{same}s1", "{same}s2"] >>=? ["member"] + sunion ["{same}s1", "{same}s2"] >>=? ["member"] + sinter ["{same}s1", "{same}s2"] >>=? [] + sdiffstore "{same}s3" ["{same}s1", "{same}s2"] >>=? 1 + sunionstore "{same}s3" ["{same}s1", "{same}s2"] >>=? 1 + sinterstore "{same}s3" ["{same}s1", "{same}s2"] >>=? 0 ------------------------------------------------------------------------------ -- Sorted Sets @@ -371,16 +357,16 @@ testZSets = testCase "sorted sets" $ do testZStore :: Test testZStore = testCase "zunionstore/zinterstore" $ do - zadd "k1" [(1, "v1"), (2, "v2")] >>= \case + zadd "{same}k1" [(1, "v1"), (2, "v2")] >>= \case Left _ -> error "error" _ -> return () - zadd "k2" [(2, "v2"), (3, "v3")] >>= \case + zadd "{same}k2" [(2, "v2"), (3, "v3")] >>= \case Left _ -> error "error" _ -> return () - zinterstore "newkey" ["k1","k2"] Sum >>=? 1 - zinterstoreWeights "newkey" [("k1",1),("k2",2)] Max >>=? 1 - zunionstore "newkey" ["k1","k2"] Sum >>=? 3 - zunionstoreWeights "newkey" [("k1",1),("k2",2)] Min >>=? 3 + zinterstore "{same}newkey" ["{same}k1","{same}k2"] Sum >>=? 1 + zinterstoreWeights "{same}newkey" [("{same}k1",1),("{same}k2",2)] Max >>=? 1 + zunionstore "{same}newkey" ["{same}k1","{same}k2"] Sum >>=? 3 + zunionstoreWeights "{same}newkey" [("{same}k1",1),("{same}k2",2)] Min >>=? 3 ------------------------------------------------------------------------------ -- HyperLogLog @@ -403,18 +389,18 @@ testHyperLogLog = testCase "hyperloglog" $ do _ -> return () pfcount ["hll1"] >>=? 5 -- test merge - pfadd "hll2" ["1", "2", "3"] >>= \case + pfadd "{same}hll2" ["1", "2", "3"] >>= \case Left _ -> error "error" _ -> return () - pfadd "hll3" ["4", "5", "6"] >>= \case + pfadd "{same}hll3" ["4", "5", "6"] >>= \case Left _ -> error "error" _ -> return () - pfmerge "hll4" ["hll2", "hll3"] >>= \case + pfmerge "{same}hll4" ["{same}hll2", "{same}hll3"] >>= \case Left _ -> error "error" _ -> return () - pfcount ["hll4"] >>=? 6 + pfcount ["{same}hll4"] >>=? 6 -- test union cardinality - pfcount ["hll2", "hll3"] >>=? 6 + pfcount ["{same}hll2", "{same}hll3"] >>=? 6 ------------------------------------------------------------------------------ -- Pub/Sub @@ -452,17 +438,17 @@ testPubSub conn = testCase "pubSub" go conn -- testTransaction :: Test testTransaction = testCase "transaction" $ do - watch ["k1", "k2"] >>=? Ok + watch ["{same}k1", "{same}k2"] >>=? Ok unwatch >>=? Ok - set "foo" "foo" >>= \case + set "{same}foo" "foo" >>= \case Left _ -> error "error" _ -> return () - set "bar" "bar" >>= \case + set "{same}bar" "bar" >>= \case Left _ -> error "error" _ -> return () foobar <- multiExec $ do - foo <- get "foo" - bar <- get "bar" + foo <- get "{same}foo" + bar <- get "{same}bar" return $ (,) <$> foo <*> bar assert $ foobar == TxSuccess (Just "foo", Just "bar") @@ -503,10 +489,6 @@ testScripting conn = testCase "scripting" go conn ------------------------------------------------------------------------------ -- Connection -- -testsConnection :: [Test] -testsConnection = [ testConnectAuth, testConnectAuthUnexpected, testConnectDb - , testConnectDbUnexisting, testEcho, testPing, testSelect ] - testConnectAuth :: Test testConnectAuth = testCase "connect/auth" $ do configSet "requirepass" "pass" >>=? Ok @@ -524,7 +506,7 @@ testConnectAuthUnexpected = testCase "connect/auth/unexpected" $ do where connInfo = defaultConnectInfo { connectAuth = Just "pass" } err = Left $ ConnectAuthError $ - Error "ERR Client sent AUTH, but no password is set" + Error "ERR AUTH called without any password configured for the default user. Are you sure your configuration is correct?" testConnectDb :: Test testConnectDb = testCase "connect/db" $ do @@ -563,11 +545,6 @@ testSelect = testCase "select" $ do ------------------------------------------------------------------------------ -- Server -- -testsServer :: [Test] -testsServer = - [testServer, testBgrewriteaof, testFlushall, testInfo, testConfig - ,testSlowlog, testDebugObject] - testServer :: Test testServer = testCase "server" $ do time >>= \case @@ -632,14 +609,23 @@ testScans = testCase "scans" $ do scan cursor0 >>=? (cursor0, ["key"]) scanOpts cursor0 sOpts1 >>=? (cursor0, ["key"]) scanOpts cursor0 sOpts2 >>=? (cursor0, []) + where sOpts1 = defaultScanOpts { scanMatch = Just "k*" } + sOpts2 = defaultScanOpts { scanMatch = Just "not*"} + +testSScan :: Test +testSScan = testCase "sscan" $ do sadd "set" ["1"] >>=? 1 sscan "set" cursor0 >>=? (cursor0, ["1"]) - hset "hash" "k" "v" >>=? True + +testHScan :: Test +testHScan = testCase "hscan" $ do + hset "hash" "k" "v" >>=? 1 hscan "hash" cursor0 >>=? (cursor0, [("k", "v")]) + +testZScan :: Test +testZScan = testCase "zscan" $ do zadd "zset" [(42, "2")] >>=? 1 zscan "zset" cursor0 >>=? (cursor0, [("2", 42)]) - where sOpts1 = defaultScanOpts { scanMatch = Just "k*" } - sOpts2 = defaultScanOpts { scanMatch = Just "not*"} testZrangelex ::Test testZrangelex = testCase "zrangebylex" $ do @@ -652,20 +638,20 @@ testZrangelex = testCase "zrangebylex" $ do testXAddRead ::Test testXAddRead = testCase "xadd/xread" $ do - xadd "somestream" "123" [("key", "value"), ("key2", "value2")] - xadd "otherstream" "456" [("key1", "value1")] - xaddOpts "thirdstream" "*" [("k", "v")] (Maxlen 1) - xaddOpts "thirdstream" "*" [("k", "v")] (ApproxMaxlen 1) - xread [("somestream", "0"), ("otherstream", "0")] >>=? Just [ + xadd "{same}somestream" "123" [("key", "value"), ("key2", "value2")] + xadd "{same}otherstream" "456" [("key1", "value1")] + xaddOpts "{same}thirdstream" "*" [("k", "v")] (Maxlen 1) + xaddOpts "{same}thirdstream" "*" [("k", "v")] (ApproxMaxlen 1) + xread [("{same}somestream", "0"), ("{same}otherstream", "0")] >>=? Just [ XReadResponse { - stream = "somestream", + stream = "{same}somestream", records = [StreamsRecord{recordId = "123-0", keyValues = [("key", "value"), ("key2", "value2")]}] }, XReadResponse { - stream = "otherstream", + stream = "{same}otherstream", records = [StreamsRecord{recordId = "456-0", keyValues = [("key1", "value1")]}] }] - xlen "somestream" >>=? 1 + xlen "{same}somestream" >>=? 1 testXReadGroup ::Test testXReadGroup = testCase "XGROUP */xreadgroup/xack" $ do