Skip to content

Commit

Permalink
Make explorer to be persistent
Browse files Browse the repository at this point in the history
  • Loading branch information
ffakenz committed Feb 2, 2024
1 parent 573dcf6 commit 7c02218
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 57 deletions.
62 changes: 54 additions & 8 deletions hydra-cluster/test/Test/HydraExplorerSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ module Test.HydraExplorerSpec where
import Hydra.Prelude hiding (get)
import Test.Hydra.Prelude

import CardanoClient (RunningNode (..))
import CardanoClient (RunningNode (..), queryTip)
import CardanoNode (NodeLog, withCardanoNodeDevnet)
import Control.Lens ((^.), (^?))
import Data.Aeson as Aeson
import Data.Aeson.Lens (key, nth, _Array, _String)
import Hydra.Cardano.Api (NetworkId (..), NetworkMagic (..), unFile)
import Hydra.Cardano.Api (ChainPoint (..), NetworkId (..), NetworkMagic (..), unFile)
import Hydra.Cluster.Faucet (FaucetLog, publishHydraScriptsAs, seedFromFaucet_)
import Hydra.Cluster.Fixture (Actor (..), aliceSk, bobSk, cperiod)
import Hydra.Cluster.Util (chainConfigFor, keysFor)
import Hydra.Logging (showLogsOnFailure)
import Hydra.Options qualified as Options
import HydraNode (HydraNodeLog, input, send, waitMatch, withHydraNode)
import Network.HTTP.Client (responseBody)
import Network.HTTP.Simple (httpJSON, parseRequestThrow)
import System.FilePath ((</>))
import System.Process (CreateProcess (..), StdStream (..), proc, withCreateProcess)

spec :: Spec
Expand Down Expand Up @@ -48,13 +50,12 @@ spec = do
seedFromFaucet_ cardanoNode bobCardanoVk 25_000_000 (contramap FromFaucet tracer)
bobHeadId <- withHydraNode hydraTracer bobChainConfig tmpDir 2 bobSk [] [2] initHead

withHydraExplorer cardanoNode $ \explorer -> do
withHydraExplorer cardanoNode tmpDir Nothing $ \explorer -> do
allHeads <- getHeads explorer
length (allHeads ^. _Array) `shouldBe` 2
allHeads ^. nth 0 . key "headId" . _String `shouldBe` aliceHeadId
allHeads ^. nth 0 . key "status" . _String `shouldBe` "Initializing"
allHeads ^. nth 1 . key "headId" . _String `shouldBe` bobHeadId
allHeads ^. nth 1 . key "status" . _String `shouldBe` "Initializing"

it "can query for all hydra heads observed" $
failAfter 60 $
Expand All @@ -63,7 +64,7 @@ spec = do
withCardanoNodeDevnet (contramap FromCardanoNode tracer) tmpDir $ \cardanoNode@RunningNode{nodeSocket} -> do
let hydraTracer = contramap FromHydraNode tracer
hydraScriptsTxId <- publishHydraScriptsAs cardanoNode Faucet
withHydraExplorer cardanoNode $ \explorer -> do
withHydraExplorer cardanoNode tmpDir Nothing $ \explorer -> do
(aliceCardanoVk, _aliceCardanoSk) <- keysFor Alice
aliceChainConfig <- chainConfigFor Alice tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode aliceCardanoVk 25_000_000 (contramap FromFaucet tracer)
Expand Down Expand Up @@ -99,6 +100,48 @@ spec = do
allHeads ^. nth 1 . key "headId" . _String `shouldBe` bobHeadId
allHeads ^. nth 1 . key "status" . _String `shouldBe` "Aborted"

it "is persistent" $
failAfter 60 $
showLogsOnFailure "HydraExplorerSpec" $ \tracer -> do
withTempDir "hydra-explorer-history" $ \tmpDir -> do
withCardanoNodeDevnet (contramap FromCardanoNode tracer) tmpDir $ \cardanoNode@RunningNode{networkId, nodeSocket} -> do
let hydraTracer = contramap FromHydraNode tracer
hydraScriptsTxId <- publishHydraScriptsAs cardanoNode Faucet

let initHead hydraNode = do
send hydraNode $ input "Init" []
waitMatch 5 hydraNode $ \v -> do
guard $ v ^? key "tag" == Just "HeadIsInitializing"
v ^? key "headId" . _String

tip <- queryTip networkId nodeSocket

(aliceCardanoVk, _aliceCardanoSk) <- keysFor Alice
aliceChainConfig <- chainConfigFor Alice tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode aliceCardanoVk 25_000_000 (contramap FromFaucet tracer)
aliceHeadId <- withHydraNode hydraTracer aliceChainConfig tmpDir 1 aliceSk [] [1] initHead

withHydraExplorer cardanoNode tmpDir (Just tip) $ \explorer -> do
allHeads <- getHeads explorer
length (allHeads ^. _Array) `shouldBe` 1
allHeads ^. nth 0 . key "headId" . _String `shouldBe` aliceHeadId
allHeads ^. nth 0 . key "status" . _String `shouldBe` "Initializing"

tip' <- queryTip networkId nodeSocket

(bobCardanoVk, _bobCardanoSk) <- keysFor Bob
bobChainConfig <- chainConfigFor Bob tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode bobCardanoVk 25_000_000 (contramap FromFaucet tracer)
bobHeadId <- withHydraNode hydraTracer bobChainConfig tmpDir 2 bobSk [] [2] initHead

withHydraExplorer cardanoNode tmpDir (Just tip') $ \explorer -> do
allHeads <- getHeads explorer
length (allHeads ^. _Array) `shouldBe` 2
allHeads ^. nth 0 . key "headId" . _String `shouldBe` aliceHeadId
allHeads ^. nth 0 . key "status" . _String `shouldBe` "Initializing"
allHeads ^. nth 1 . key "headId" . _String `shouldBe` bobHeadId
allHeads ^. nth 1 . key "status" . _String `shouldBe` "Initializing"

newtype HydraExplorerHandle = HydraExplorerHandle {getHeads :: IO Value}

data HydraExplorerLog
Expand All @@ -109,14 +152,14 @@ data HydraExplorerLog
deriving anyclass (ToJSON)

-- | Starts a 'hydra-explorer' on some Cardano network.
withHydraExplorer :: RunningNode -> (HydraExplorerHandle -> IO ()) -> IO ()
withHydraExplorer cardanoNode action =
withHydraExplorer :: RunningNode -> FilePath -> Maybe ChainPoint -> (HydraExplorerHandle -> IO ()) -> IO ()
withHydraExplorer cardanoNode persistenceDir mStartChainFrom action =
withCreateProcess process{std_out = CreatePipe, std_err = CreatePipe} $
\_in _stdOut err processHandle ->
race
(checkProcessHasNotDied "hydra-explorer" processHandle err)
( -- XXX: wait for the http server to be listening on port
threadDelay 3
threadDelay 5
*> action HydraExplorerHandle{getHeads}
)
<&> either absurd id
Expand All @@ -130,5 +173,8 @@ withHydraExplorer cardanoNode action =
<> case networkId of
Mainnet -> ["--mainnet"]
Testnet (NetworkMagic magic) -> ["--testnet-magic", show magic]
<> ["--peer", "0.0.0.0:9090"]
<> ["--persistence-dir", show persistenceDir </> "explorer-state"]
<> Options.toArgStartChainFrom mStartChainFrom

RunningNode{nodeSocket, networkId} = cardanoNode
2 changes: 2 additions & 0 deletions hydra-explorer/hydra-explorer.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ library
, hydra-node
, hydra-prelude
, io-classes
, optparse-applicative
, servant
, servant-server
, wai
Expand All @@ -58,6 +59,7 @@ library
exposed-modules:
Hydra.Explorer
Hydra.Explorer.ExplorerState
Hydra.Explorer.Options

executable hydra-explorer
import: project-config
Expand Down
84 changes: 61 additions & 23 deletions hydra-explorer/src/Hydra/Explorer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@ import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO)
import Hydra.API.APIServerLog (APIServerLog (..), Method (..), PathInfo (..))
import Hydra.Chain.Direct.Tx (HeadObservation)
import Hydra.Explorer.ExplorerState (ExplorerState, HeadState, aggregateHeadObservations)
import Hydra.Cardano.Api (NetworkId (..), NetworkMagic (..), Tx, unFile)
import Hydra.Chain (OnChainTx)
import Hydra.Chain.Direct.Handlers (convertObservation)
import Hydra.Chain.Direct.Tx (
HeadObservation (..),
)
import Hydra.Explorer.ExplorerState (ExplorerState, HeadState, aggregateOnChainTx)
import Hydra.Explorer.Options (Options (..), hydraExplorerOptions)
import Hydra.Logging (Tracer, Verbosity (..), traceWith, withTracer)
import Hydra.Network (PortNumber)
import Hydra.Logging.Messages (HydraLog (..))
import Hydra.Network (Host (..))
import Hydra.Node (HydraNodeLog (..))
import Hydra.Options qualified as Options
import Hydra.Persistence (PersistenceIncremental (..), createPersistenceIncremental, loadAll)
import Network.Wai (Middleware, Request (..))
import Network.Wai.Handler.Warp qualified as Warp
import Options.Applicative (execParser)
import Servant (Server, throwError)
import Servant.API (Get, Header, JSON, addHeader, (:>))
import Servant.API.ResponseHeaders (Headers)
Expand All @@ -32,8 +43,8 @@ type API =

type GetHeads = IO [HeadState]

api :: Proxy API
api = Proxy
explorerAPI :: Proxy API
explorerAPI = Proxy

server :: GetHeads -> Server API
server = handleGetHeads
Expand Down Expand Up @@ -68,41 +79,68 @@ logMiddleware tracer app' req sendResponse = do

httpApp :: Tracer IO APIServerLog -> GetHeads -> Application
httpApp tracer getHeads =
logMiddleware tracer $ serve api $ server getHeads
logMiddleware tracer $ serve explorerAPI $ server getHeads

observerHandler :: TVar IO ExplorerState -> [HeadObservation] -> IO ()
observerHandler explorerState observations = do
observerHandler ::
TVar IO ExplorerState ->
PersistenceIncremental (OnChainTx Tx) IO ->
[HeadObservation] ->
IO ()
observerHandler explorerState PersistenceIncremental{append} observations = do
let onChainTxs = mapMaybe convertObservation observations
forM_ onChainTxs append
atomically $
modifyTVar' explorerState $
aggregateHeadObservations observations
modifyTVar' explorerState $ \currentState ->
foldl' aggregateOnChainTx currentState onChainTxs

readModelGetHeadIds :: TVar IO ExplorerState -> GetHeads
readModelGetHeadIds = readTVarIO

main :: IO ()
main = do
withTracer (Verbose "hydra-explorer") $ \tracer -> do
explorerState <- newTVarIO (mempty :: ExplorerState)
withTracer (Verbose "hydra-explorer") $ \(tracer :: Tracer IO (HydraLog Tx ())) -> do
opts <- execParser hydraExplorerOptions
let Options
{ networkId
, host = Host{hostname, port}
, nodeSocket
, startChainFrom
, persistenceDir
} = opts
persistence <- createPersistenceIncremental $ persistenceDir <> "/explorer-state"
explorerState <- do
let nodeTracer = contramap Node tracer
events <- loadAll persistence
traceWith nodeTracer LoadedState{numberOfEvents = fromIntegral $ length events}
let initialState = mempty
recoveredSt = foldl' aggregateOnChainTx initialState events
newTVarIO recoveredSt

let getHeads = readModelGetHeadIds explorerState
args <- getArgs

apiTracer = contramap APIServer tracer

chainObserverArgs =
["--node-socket", unFile nodeSocket]
<> case networkId of
Mainnet -> ["--mainnet"]
Testnet (NetworkMagic magic) -> ["--testnet-magic", show magic]
<> Options.toArgStartChainFrom startChainFrom
race
-- FIXME: this is going to be problematic on mainnet.
( withArgs (args <> ["--start-chain-from", "0"]) $
Hydra.ChainObserver.main (observerHandler explorerState)
( withArgs chainObserverArgs $
Hydra.ChainObserver.main (observerHandler explorerState persistence)
)
( traceWith tracer (APIServerStarted (fromIntegral port :: PortNumber))
*> Warp.runSettings (settings tracer) (httpApp tracer getHeads)
( traceWith apiTracer (APIServerStarted port)
*> Warp.runSettings (settings apiTracer port hostname) (httpApp apiTracer getHeads)
)
>>= \case
Left{} -> error "Something went wrong"
Right a -> pure a
where
port = 9090

settings tracer =
settings tracer port hostname =
Warp.defaultSettings
& Warp.setPort port
& Warp.setHost "0.0.0.0"
& Warp.setPort (fromIntegral port)
& Warp.setHost (fromString . toString $ hostname)
& Warp.setOnException (\_ e -> traceWith tracer $ APIConnectionError{reason = show e})

addCorsHeaders ::
Expand Down
26 changes: 10 additions & 16 deletions hydra-explorer/src/Hydra/Explorer/ExplorerState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import Hydra.HeadId (HeadId (..), HeadSeed)
import Data.Aeson (Value (..))
import Hydra.Cardano.Api (Tx, TxIn, UTxO)
import Hydra.Chain (HeadParameters (..), OnChainTx (..))
import Hydra.Chain.Direct.Handlers (convertObservation)
import Hydra.Chain.Direct.Tx (
HeadObservation (..),
headSeedToTxIn,
)
import Hydra.ContestationPeriod (ContestationPeriod, toNominalDiffTime)
Expand Down Expand Up @@ -273,20 +271,16 @@ replaceHeadState newHeadState@HeadState{headId = newHeadStateId} explorerState =
then newHeadState : tailStates
else currentHeadState : replaceHeadState newHeadState tailStates

aggregateHeadObservations :: [HeadObservation] -> ExplorerState -> ExplorerState
aggregateHeadObservations observations currentState =
foldl' aggregateOnChainTx currentState (mapMaybe convertObservation observations)
where
aggregateOnChainTx :: ExplorerState -> OnChainTx Tx -> ExplorerState
aggregateOnChainTx explorerState =
\case
OnInitTx{headId, headSeed, headParameters, participants} -> aggregateInitObservation headId headSeed headParameters participants explorerState
OnAbortTx{headId} -> aggregateAbortObservation headId explorerState
OnCommitTx{headId, party, committed} -> aggregateCommitObservation headId party committed explorerState
OnCollectComTx{headId} -> aggregateCollectComObservation headId explorerState
OnCloseTx{headId, snapshotNumber, contestationDeadline} -> aggregateCloseObservation headId snapshotNumber contestationDeadline explorerState
OnContestTx{headId, snapshotNumber} -> aggregateContestObservation headId snapshotNumber explorerState
OnFanoutTx{headId} -> aggregateFanoutObservation headId explorerState
aggregateOnChainTx :: ExplorerState -> OnChainTx Tx -> ExplorerState
aggregateOnChainTx explorerState =
\case
OnInitTx{headId, headSeed, headParameters, participants} -> aggregateInitObservation headId headSeed headParameters participants explorerState
OnAbortTx{headId} -> aggregateAbortObservation headId explorerState
OnCommitTx{headId, party, committed} -> aggregateCommitObservation headId party committed explorerState
OnCollectComTx{headId} -> aggregateCollectComObservation headId explorerState
OnCloseTx{headId, snapshotNumber, contestationDeadline} -> aggregateCloseObservation headId snapshotNumber contestationDeadline explorerState
OnContestTx{headId, snapshotNumber} -> aggregateContestObservation headId snapshotNumber explorerState
OnFanoutTx{headId} -> aggregateFanoutObservation headId explorerState

findHeadState :: HeadId -> ExplorerState -> Maybe HeadState
findHeadState idToFind = find (\HeadState{headId} -> headId == idToFind)
44 changes: 44 additions & 0 deletions hydra-explorer/src/Hydra/Explorer/Options.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module Hydra.Explorer.Options where

import Hydra.Prelude

import Hydra.Cardano.Api (ChainPoint (..), NetworkId, SocketPath)
import Hydra.Network (Host)
import Hydra.Options (
networkIdParser,
nodeSocketParser,
peerParser,
persistenceDirParser,
startChainFromParser,
)
import Options.Applicative (Parser, ParserInfo, fullDesc, header, helper, info, progDesc)

type Options :: Type
data Options = Options
{ networkId :: NetworkId
, host :: Host
, nodeSocket :: SocketPath
, startChainFrom :: Maybe ChainPoint
, persistenceDir :: FilePath
}
deriving stock (Show, Eq)

optionsParser :: Parser Options
optionsParser =
Options
<$> networkIdParser
<*> peerParser
<*> nodeSocketParser
<*> optional startChainFromParser
<*> persistenceDirParser

hydraExplorerOptions :: ParserInfo Options
hydraExplorerOptions =
info
( optionsParser
<**> helper
)
( fullDesc
<> progDesc "Explore hydra heads from chain."
<> header "hydra-explorer"
)
7 changes: 6 additions & 1 deletion hydra-explorer/test/Hydra/Explorer/ExplorerStateSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module Hydra.Explorer.ExplorerStateSpec where
import Hydra.Prelude
import Test.Hydra.Prelude

import Hydra.Chain.Direct.Handlers (convertObservation)
import Hydra.Chain.Direct.Tx (HeadObservation (..))
import Hydra.Explorer.ExplorerState (ExplorerState, aggregateHeadObservations, headId)
import Hydra.Explorer.ExplorerState (ExplorerState, aggregateOnChainTx, headId)
import Hydra.HeadId (HeadId)
import Hydra.OnChainId ()
import Test.QuickCheck (forAll, suchThat, (=/=))
Expand All @@ -28,3 +29,7 @@ spec = do

getHeadIds :: ExplorerState -> [HeadId]
getHeadIds = fmap headId

aggregateHeadObservations :: [HeadObservation] -> ExplorerState -> ExplorerState
aggregateHeadObservations observations currentState =
foldl' aggregateOnChainTx currentState (mapMaybe convertObservation observations)
Loading

0 comments on commit 7c02218

Please sign in to comment.