Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the explorer persistent #1281

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 54 additions & 8 deletions hydra-cluster/test/Test/HydraExplorerSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ module Test.HydraExplorerSpec where
import Hydra.Prelude hiding (get)
import Test.Hydra.Prelude

import CardanoClient (RunningNode (..))
import CardanoClient (RunningNode (..), queryTip)
import CardanoNode (NodeLog, withCardanoNodeDevnet)
import Control.Lens ((^.), (^?))
import Data.Aeson as Aeson
import Data.Aeson.Lens (key, nth, _Array, _String)
import Hydra.Cardano.Api (NetworkId (..), NetworkMagic (..), unFile)
import Hydra.Cardano.Api (ChainPoint (..), NetworkId (..), NetworkMagic (..), unFile)
import Hydra.Cluster.Faucet (FaucetLog, publishHydraScriptsAs, seedFromFaucet_)
import Hydra.Cluster.Fixture (Actor (..), aliceSk, bobSk, cperiod)
import Hydra.Cluster.Util (chainConfigFor, keysFor)
import Hydra.Logging (showLogsOnFailure)
import Hydra.Options qualified as Options
import HydraNode (HydraNodeLog, input, send, waitMatch, withHydraNode)
import Network.HTTP.Client (responseBody)
import Network.HTTP.Simple (httpJSON, parseRequestThrow)
import System.FilePath ((</>))
import System.Process (CreateProcess (..), StdStream (..), proc, withCreateProcess)

spec :: Spec
Expand Down Expand Up @@ -48,13 +50,12 @@ spec = do
seedFromFaucet_ cardanoNode bobCardanoVk 25_000_000 (contramap FromFaucet tracer)
bobHeadId <- withHydraNode hydraTracer bobChainConfig tmpDir 2 bobSk [] [2] initHead

withHydraExplorer cardanoNode $ \explorer -> do
withHydraExplorer cardanoNode tmpDir Nothing $ \explorer -> do
allHeads <- getHeads explorer
length (allHeads ^. _Array) `shouldBe` 2
allHeads ^. nth 0 . key "headId" . _String `shouldBe` aliceHeadId
allHeads ^. nth 0 . key "status" . _String `shouldBe` "Initializing"
allHeads ^. nth 1 . key "headId" . _String `shouldBe` bobHeadId
allHeads ^. nth 1 . key "status" . _String `shouldBe` "Initializing"

it "can query for all hydra heads observed" $
failAfter 60 $
Expand All @@ -63,7 +64,7 @@ spec = do
withCardanoNodeDevnet (contramap FromCardanoNode tracer) tmpDir $ \cardanoNode@RunningNode{nodeSocket} -> do
let hydraTracer = contramap FromHydraNode tracer
hydraScriptsTxId <- publishHydraScriptsAs cardanoNode Faucet
withHydraExplorer cardanoNode $ \explorer -> do
withHydraExplorer cardanoNode tmpDir Nothing $ \explorer -> do
(aliceCardanoVk, _aliceCardanoSk) <- keysFor Alice
aliceChainConfig <- chainConfigFor Alice tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode aliceCardanoVk 25_000_000 (contramap FromFaucet tracer)
Expand Down Expand Up @@ -99,6 +100,48 @@ spec = do
allHeads ^. nth 1 . key "headId" . _String `shouldBe` bobHeadId
allHeads ^. nth 1 . key "status" . _String `shouldBe` "Aborted"

it "is persistent" $
failAfter 60 $
showLogsOnFailure "HydraExplorerSpec" $ \tracer -> do
withTempDir "hydra-explorer-history" $ \tmpDir -> do
withCardanoNodeDevnet (contramap FromCardanoNode tracer) tmpDir $ \cardanoNode@RunningNode{networkId, nodeSocket} -> do
let hydraTracer = contramap FromHydraNode tracer
hydraScriptsTxId <- publishHydraScriptsAs cardanoNode Faucet

let initHead hydraNode = do
send hydraNode $ input "Init" []
waitMatch 5 hydraNode $ \v -> do
guard $ v ^? key "tag" == Just "HeadIsInitializing"
v ^? key "headId" . _String

tip <- queryTip networkId nodeSocket

(aliceCardanoVk, _aliceCardanoSk) <- keysFor Alice
aliceChainConfig <- chainConfigFor Alice tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode aliceCardanoVk 25_000_000 (contramap FromFaucet tracer)
aliceHeadId <- withHydraNode hydraTracer aliceChainConfig tmpDir 1 aliceSk [] [1] initHead

withHydraExplorer cardanoNode tmpDir (Just tip) $ \explorer -> do
allHeads <- getHeads explorer
length (allHeads ^. _Array) `shouldBe` 1
allHeads ^. nth 0 . key "headId" . _String `shouldBe` aliceHeadId
allHeads ^. nth 0 . key "status" . _String `shouldBe` "Initializing"

tip' <- queryTip networkId nodeSocket

(bobCardanoVk, _bobCardanoSk) <- keysFor Bob
bobChainConfig <- chainConfigFor Bob tmpDir nodeSocket hydraScriptsTxId [] cperiod
seedFromFaucet_ cardanoNode bobCardanoVk 25_000_000 (contramap FromFaucet tracer)
bobHeadId <- withHydraNode hydraTracer bobChainConfig tmpDir 2 bobSk [] [2] initHead

withHydraExplorer cardanoNode tmpDir (Just tip') $ \explorer -> do
allHeads <- getHeads explorer
length (allHeads ^. _Array) `shouldBe` 2
allHeads ^. nth 0 . key "headId" . _String `shouldBe` aliceHeadId
allHeads ^. nth 0 . key "status" . _String `shouldBe` "Initializing"
allHeads ^. nth 1 . key "headId" . _String `shouldBe` bobHeadId
allHeads ^. nth 1 . key "status" . _String `shouldBe` "Initializing"

newtype HydraExplorerHandle = HydraExplorerHandle {getHeads :: IO Value}

data HydraExplorerLog
Expand All @@ -109,14 +152,14 @@ data HydraExplorerLog
deriving anyclass (ToJSON)

-- | Starts a 'hydra-explorer' on some Cardano network.
withHydraExplorer :: RunningNode -> (HydraExplorerHandle -> IO ()) -> IO ()
withHydraExplorer cardanoNode action =
withHydraExplorer :: RunningNode -> FilePath -> Maybe ChainPoint -> (HydraExplorerHandle -> IO ()) -> IO ()
withHydraExplorer cardanoNode persistenceDir mStartChainFrom action =
withCreateProcess process{std_out = CreatePipe, std_err = CreatePipe} $
\_in _stdOut err processHandle ->
race
(checkProcessHasNotDied "hydra-explorer" processHandle err)
( -- XXX: wait for the http server to be listening on port
threadDelay 3
threadDelay 5
*> action HydraExplorerHandle{getHeads}
)
<&> either absurd id
Expand All @@ -130,5 +173,8 @@ withHydraExplorer cardanoNode action =
<> case networkId of
Mainnet -> ["--mainnet"]
Testnet (NetworkMagic magic) -> ["--testnet-magic", show magic]
<> ["--peer", "0.0.0.0:9090"]
<> ["--persistence-dir", show persistenceDir </> "explorer-state"]
<> Options.toArgStartChainFrom mStartChainFrom

RunningNode{nodeSocket, networkId} = cardanoNode
2 changes: 2 additions & 0 deletions hydra-explorer/hydra-explorer.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ library
, hydra-node
, hydra-prelude
, io-classes
, optparse-applicative
, servant
, servant-server
, wai
Expand All @@ -58,6 +59,7 @@ library
exposed-modules:
Hydra.Explorer
Hydra.Explorer.ExplorerState
Hydra.Explorer.Options

executable hydra-explorer
import: project-config
Expand Down
84 changes: 61 additions & 23 deletions hydra-explorer/src/Hydra/Explorer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@ import Hydra.Prelude

import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO, readTVarIO)
import Hydra.API.APIServerLog (APIServerLog (..), Method (..), PathInfo (..))
import Hydra.Chain.Direct.Tx (HeadObservation)
import Hydra.Explorer.ExplorerState (ExplorerState, HeadState, aggregateHeadObservations)
import Hydra.Cardano.Api (NetworkId (..), NetworkMagic (..), Tx, unFile)
import Hydra.Chain (OnChainTx)
import Hydra.Chain.Direct.Handlers (convertObservation)
import Hydra.Chain.Direct.Tx (
HeadObservation (..),
)
import Hydra.Explorer.ExplorerState (ExplorerState, HeadState, aggregateOnChainTx)
import Hydra.Explorer.Options (Options (..), hydraExplorerOptions)
import Hydra.Logging (Tracer, Verbosity (..), traceWith, withTracer)
import Hydra.Network (PortNumber)
import Hydra.Logging.Messages (HydraLog (..))
import Hydra.Network (Host (..))
import Hydra.Node (HydraNodeLog (..))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should: not re-use hydra-node library, but create new trace data types.

This makes it harder to decouple this later from it (and hydra-chain-observer). Also, we are not logging from a hydra-node here, but from the explorer.

import Hydra.Options qualified as Options
import Hydra.Persistence (PersistenceIncremental (..), createPersistenceIncremental, loadAll)
import Network.Wai (Middleware, Request (..))
import Network.Wai.Handler.Warp qualified as Warp
import Options.Applicative (execParser)
import Servant (Server, throwError)
import Servant.API (Get, Header, JSON, addHeader, (:>))
import Servant.API.ResponseHeaders (Headers)
Expand All @@ -32,8 +43,8 @@ type API =

type GetHeads = IO [HeadState]

api :: Proxy API
api = Proxy
explorerAPI :: Proxy API
explorerAPI = Proxy

server :: GetHeads -> Server API
server = handleGetHeads
Expand Down Expand Up @@ -68,41 +79,68 @@ logMiddleware tracer app' req sendResponse = do

httpApp :: Tracer IO APIServerLog -> GetHeads -> Application
httpApp tracer getHeads =
logMiddleware tracer $ serve api $ server getHeads
logMiddleware tracer $ serve explorerAPI $ server getHeads

observerHandler :: TVar IO ExplorerState -> [HeadObservation] -> IO ()
observerHandler explorerState observations = do
observerHandler ::
TVar IO ExplorerState ->
PersistenceIncremental (OnChainTx Tx) IO ->
[HeadObservation] ->
IO ()
observerHandler explorerState PersistenceIncremental{append} observations = do
let onChainTxs = mapMaybe convertObservation observations
forM_ onChainTxs append
atomically $
modifyTVar' explorerState $
aggregateHeadObservations observations
modifyTVar' explorerState $ \currentState ->
foldl' aggregateOnChainTx currentState onChainTxs

readModelGetHeadIds :: TVar IO ExplorerState -> GetHeads
readModelGetHeadIds = readTVarIO

main :: IO ()
main = do
withTracer (Verbose "hydra-explorer") $ \tracer -> do
explorerState <- newTVarIO (mempty :: ExplorerState)
withTracer (Verbose "hydra-explorer") $ \(tracer :: Tracer IO (HydraLog Tx ())) -> do
opts <- execParser hydraExplorerOptions
let Options
{ networkId
, host = Host{hostname, port}
, nodeSocket
, startChainFrom
, persistenceDir
} = opts
persistence <- createPersistenceIncremental $ persistenceDir <> "/explorer-state"
explorerState <- do
let nodeTracer = contramap Node tracer
events <- loadAll persistence
traceWith nodeTracer LoadedState{numberOfEvents = fromIntegral $ length events}
let initialState = mempty
recoveredSt = foldl' aggregateOnChainTx initialState events
newTVarIO recoveredSt
Copy link
Member

@ch1bo ch1bo Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must: include chain information in persisted data to avoid inconsistencies by design

This explorer-state seems not to include any information about which point on the chain was last seen. Hence, we can easily "miss heads" if we start the explorer, stop it and then start it again after a few minutes (e.g. when doing maintenance)

This must be avoided! Having an explorer synchronizing everything from a fixed point (--start-chain-from or even genesis) is strictly more correct than this half-way persisting. Also, it is not a big problem if starting the explorer takes a few minutes before it is fully live (can be controlled by the --start-chain-from option).

I would like to discuss the premise of this whole PR please.


let getHeads = readModelGetHeadIds explorerState
args <- getArgs

apiTracer = contramap APIServer tracer

chainObserverArgs =
["--node-socket", unFile nodeSocket]
<> case networkId of
Mainnet -> ["--mainnet"]
Testnet (NetworkMagic magic) -> ["--testnet-magic", show magic]
<> Options.toArgStartChainFrom startChainFrom
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could: make this consistent

Either use all the toArg functions or none.

Eventually we need to get rid of using them anyway as we can not depend on hydra-node or hydra-chain-observer when we do #1282

race
-- FIXME: this is going to be problematic on mainnet.
( withArgs (args <> ["--start-chain-from", "0"]) $
Hydra.ChainObserver.main (observerHandler explorerState)
( withArgs chainObserverArgs $
Hydra.ChainObserver.main (observerHandler explorerState persistence)
)
( traceWith tracer (APIServerStarted (fromIntegral port :: PortNumber))
*> Warp.runSettings (settings tracer) (httpApp tracer getHeads)
( traceWith apiTracer (APIServerStarted port)
*> Warp.runSettings (settings apiTracer port hostname) (httpApp apiTracer getHeads)
)
>>= \case
Left{} -> error "Something went wrong"
Right a -> pure a
where
port = 9090

settings tracer =
settings tracer port hostname =
Warp.defaultSettings
& Warp.setPort port
& Warp.setHost "0.0.0.0"
& Warp.setPort (fromIntegral port)
& Warp.setHost (fromString . toString $ hostname)
& Warp.setOnException (\_ e -> traceWith tracer $ APIConnectionError{reason = show e})

addCorsHeaders ::
Expand Down
26 changes: 10 additions & 16 deletions hydra-explorer/src/Hydra/Explorer/ExplorerState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import Hydra.HeadId (HeadId (..), HeadSeed)
import Data.Aeson (Value (..))
import Hydra.Cardano.Api (Tx, TxIn, UTxO)
import Hydra.Chain (HeadParameters (..), OnChainTx (..))
import Hydra.Chain.Direct.Handlers (convertObservation)
import Hydra.Chain.Direct.Tx (
HeadObservation (..),
headSeedToTxIn,
)
import Hydra.ContestationPeriod (ContestationPeriod, toNominalDiffTime)
Expand Down Expand Up @@ -273,20 +271,16 @@ replaceHeadState newHeadState@HeadState{headId = newHeadStateId} explorerState =
then newHeadState : tailStates
else currentHeadState : replaceHeadState newHeadState tailStates

aggregateHeadObservations :: [HeadObservation] -> ExplorerState -> ExplorerState
aggregateHeadObservations observations currentState =
foldl' aggregateOnChainTx currentState (mapMaybe convertObservation observations)
where
aggregateOnChainTx :: ExplorerState -> OnChainTx Tx -> ExplorerState
aggregateOnChainTx explorerState =
\case
OnInitTx{headId, headSeed, headParameters, participants} -> aggregateInitObservation headId headSeed headParameters participants explorerState
OnAbortTx{headId} -> aggregateAbortObservation headId explorerState
OnCommitTx{headId, party, committed} -> aggregateCommitObservation headId party committed explorerState
OnCollectComTx{headId} -> aggregateCollectComObservation headId explorerState
OnCloseTx{headId, snapshotNumber, contestationDeadline} -> aggregateCloseObservation headId snapshotNumber contestationDeadline explorerState
OnContestTx{headId, snapshotNumber} -> aggregateContestObservation headId snapshotNumber explorerState
OnFanoutTx{headId} -> aggregateFanoutObservation headId explorerState
aggregateOnChainTx :: ExplorerState -> OnChainTx Tx -> ExplorerState
aggregateOnChainTx explorerState =
\case
OnInitTx{headId, headSeed, headParameters, participants} -> aggregateInitObservation headId headSeed headParameters participants explorerState
OnAbortTx{headId} -> aggregateAbortObservation headId explorerState
OnCommitTx{headId, party, committed} -> aggregateCommitObservation headId party committed explorerState
OnCollectComTx{headId} -> aggregateCollectComObservation headId explorerState
OnCloseTx{headId, snapshotNumber, contestationDeadline} -> aggregateCloseObservation headId snapshotNumber contestationDeadline explorerState
OnContestTx{headId, snapshotNumber} -> aggregateContestObservation headId snapshotNumber explorerState
OnFanoutTx{headId} -> aggregateFanoutObservation headId explorerState

findHeadState :: HeadId -> ExplorerState -> Maybe HeadState
findHeadState idToFind = find (\HeadState{headId} -> headId == idToFind)
44 changes: 44 additions & 0 deletions hydra-explorer/src/Hydra/Explorer/Options.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module Hydra.Explorer.Options where

import Hydra.Prelude

import Hydra.Cardano.Api (ChainPoint (..), NetworkId, SocketPath)
import Hydra.Network (Host)
import Hydra.Options (
networkIdParser,
nodeSocketParser,
peerParser,
persistenceDirParser,
startChainFromParser,
)
import Options.Applicative (Parser, ParserInfo, fullDesc, header, helper, info, progDesc)

type Options :: Type
data Options = Options
{ networkId :: NetworkId
, host :: Host
, nodeSocket :: SocketPath
, startChainFrom :: Maybe ChainPoint
, persistenceDir :: FilePath
}
deriving stock (Show, Eq)

optionsParser :: Parser Options
optionsParser =
Options
<$> networkIdParser
<*> peerParser
<*> nodeSocketParser
<*> optional startChainFromParser
<*> persistenceDirParser

hydraExplorerOptions :: ParserInfo Options
hydraExplorerOptions =
info
( optionsParser
<**> helper
)
( fullDesc
<> progDesc "Explore hydra heads from chain."
<> header "hydra-explorer"
)
7 changes: 6 additions & 1 deletion hydra-explorer/test/Hydra/Explorer/ExplorerStateSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module Hydra.Explorer.ExplorerStateSpec where
import Hydra.Prelude
import Test.Hydra.Prelude

import Hydra.Chain.Direct.Handlers (convertObservation)
import Hydra.Chain.Direct.Tx (HeadObservation (..))
import Hydra.Explorer.ExplorerState (ExplorerState, aggregateHeadObservations, headId)
import Hydra.Explorer.ExplorerState (ExplorerState, aggregateOnChainTx, headId)
import Hydra.HeadId (HeadId)
import Hydra.OnChainId ()
import Test.QuickCheck (forAll, suchThat, (=/=))
Expand All @@ -28,3 +29,7 @@ spec = do

getHeadIds :: ExplorerState -> [HeadId]
getHeadIds = fmap headId

aggregateHeadObservations :: [HeadObservation] -> ExplorerState -> ExplorerState
aggregateHeadObservations observations currentState =
foldl' aggregateOnChainTx currentState (mapMaybe convertObservation observations)
Loading
Loading