Skip to content

Commit

Permalink
cardano-node: threads with labels
Browse files Browse the repository at this point in the history
  • Loading branch information
jutaro committed Jan 29, 2025
1 parent c4d675b commit 2ed706d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 58 deletions.
73 changes: 37 additions & 36 deletions cardano-node/src/Cardano/Node/Configuration/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,6 @@ module Cardano.Node.Configuration.Logging
import Cardano.Api (textShow)
import qualified Cardano.Api as Api

import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.MVar (MVar, newMVar)
import Control.Concurrent.STM (STM)
import Control.Exception (IOException)
import Control.Exception.Safe (MonadCatch)
import Control.Monad (forM_, forever, void, when)
import Control.Monad.Except (ExceptT)
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Trans.Except.Extra (catchIOExceptT)
import "contra-tracer" Control.Tracer
import Data.List (nub)
import qualified Data.Map.Strict as Map
import Data.Maybe (isJust)
import Data.Text (Text, pack)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Version (showVersion)
import System.Metrics.Counter (Counter)
import System.Metrics.Gauge (Gauge)
import System.Metrics.Label (Label)
import qualified System.Remote.Monitoring.Wai as EKG

import Cardano.BM.Backend.Aggregation (plugin)
import Cardano.BM.Backend.EKGView (plugin)
import Cardano.BM.Backend.Monitoring (plugin)
Expand All @@ -65,17 +43,23 @@ import Cardano.BM.Data.LogItem (LOContent (..), LOMeta (..), LoggerNam
import qualified Cardano.BM.Observer.Monadic as Monadic
import qualified Cardano.BM.Observer.STM as Stm
import Cardano.BM.Plugin (loadPlugin)
#if defined(SYSTEMD)
#ifdef SYSTEMD
import Cardano.BM.Scribe.Systemd (plugin)
#endif
import Cardano.BM.Setup (setupTrace_, shutdown)
import Cardano.BM.Stats
import Cardano.BM.Stats.Resources
import qualified Cardano.BM.Trace as Trace
import Cardano.BM.Tracing

import qualified Cardano.Chain.Genesis as Gen
import Cardano.Git.Rev (gitRev)
import qualified Cardano.Ledger.Shelley.API as SL
import Cardano.Node.Configuration.POM (NodeConfiguration (..), ncProtocol)
import Cardano.Node.Protocol.Types (SomeConsensusProtocol (..))
import Cardano.Node.Types
import Cardano.Slotting.Slot (EpochSize (..))
import Cardano.Tracing.Config (TraceOptions (..))
import Cardano.Tracing.OrphanInstances.Common ()
import qualified Ouroboros.Consensus.BlockchainTime.WallClock.Types as WCT
import Ouroboros.Consensus.Byron.Ledger.Conversions
import Ouroboros.Consensus.Cardano.Block
Expand All @@ -86,13 +70,28 @@ import Ouroboros.Consensus.HardFork.Combinator.Degenerate
import Ouroboros.Consensus.Node.ProtocolInfo
import Ouroboros.Consensus.Shelley.Ledger.Ledger

import Cardano.Git.Rev (gitRev)
import Cardano.Node.Configuration.POM (NodeConfiguration (..), ncProtocol)
import Cardano.Node.Protocol.Types (SomeConsensusProtocol (..))
import Cardano.Node.Types
import Cardano.Slotting.Slot (EpochSize (..))
import Cardano.Tracing.Config (TraceOptions (..))
import Cardano.Tracing.OrphanInstances.Common ()
import qualified Control.Concurrent as Conc
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.MVar (MVar, newMVar)
import Control.Concurrent.STM (STM)
import Control.Exception (IOException)
import Control.Exception.Safe (MonadCatch)
import Control.Monad (forM_, forever, void, when)
import Control.Monad.Except (ExceptT)
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Trans.Except.Extra (catchIOExceptT)
import Data.List (nub)
import qualified Data.Map.Strict as Map
import Data.Maybe (isJust)
import Data.Text (Text, pack)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Version (showVersion)
import GHC.Conc (labelThread, myThreadId)
import System.Metrics.Counter (Counter)
import System.Metrics.Gauge (Gauge)
import System.Metrics.Label (Label)
import qualified System.Remote.Monitoring.Wai as EKG

import Paths_cardano_node (version)

--------------------------------
Expand Down Expand Up @@ -255,7 +254,7 @@ createLoggingLayer ver nodeConfig' p = do

when (ncLogMetrics nodeConfig) $
-- Record node metrics, if configured
startCapturingMetrics (ncTraceConfig nodeConfig) trace
startCapturingResources (ncTraceConfig nodeConfig) trace

mkLogLayer :: Configuration -> Switchboard Text -> Maybe EKGDirect -> Trace IO Text -> LoggingLayer
mkLogLayer logConfig switchBoard mbEkgDirect trace =
Expand All @@ -278,14 +277,16 @@ createLoggingLayer ver nodeConfig' p = do
, llEKGDirect = mbEkgDirect
}

startCapturingMetrics :: TraceOptions
startCapturingResources :: TraceOptions
-> Trace IO Text
-> IO ()
startCapturingMetrics (TraceDispatcher _) _tr = do
startCapturingResources (TraceDispatcher _) _tr = do
pure ()

startCapturingMetrics _ tr = do
void . Async.async . forever $ do
startCapturingResources _ tr = do
void . Async.async $ do
myThreadId >>= flip labelThread "Resource capturing (old tracing)"
forever $ do
readResourceStats
>>= maybe (pure ())
(traceResourceStats
Expand Down
32 changes: 21 additions & 11 deletions cardano-node/src/Cardano/Node/Tracing/Tracers/Peer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Cardano.Node.Orphans ()
import Cardano.Node.Queries
import Ouroboros.Consensus.Block (Header)
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client (ChainSyncClientHandle,
csCandidate, cschcMap, viewChainSyncState, )
csCandidate, cschcMap, viewChainSyncState)
import Ouroboros.Consensus.Util.Orphans ()
import qualified Ouroboros.Network.AnchoredFragment as Net
import Ouroboros.Network.Block (unSlotNo)
Expand All @@ -39,25 +39,35 @@ import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as Text
import GHC.Conc (labelThread, myThreadId)
import Text.Printf (printf)

{- HLINT ignore "Use =<<" -}
{- HLINT ignore "Use <=<" -}

-- | Starts a background thread to periodically trace the current peer list.
-- The thread is linked to the parent thread for proper error propagation
-- and labeled for easier debugging and identification.
startPeerTracer
:: Tracer IO [PeerT blk]
-> NodeKernelData blk
-> Int
:: Tracer IO [PeerT blk] -- ^ Tracer for the peer list
-> NodeKernelData blk -- ^ Node kernel containing peer data
-> Int -- ^ Delay in milliseconds between traces
-> IO ()
startPeerTracer tr nodeKern delayMilliseconds = do
as <- async peersThread
link as
startPeerTracer tracer nodeKernel delayMilliseconds = do
thread <- async peersThread
-- Link the thread to the parent to propagate exceptions properly.
link thread
where
-- | The background thread that periodically traces the peer list.
peersThread :: IO ()
peersThread = forever $ do
peers <- getCurrentPeers nodeKern
traceWith tr peers
threadDelay (delayMilliseconds * 1000)
peersThread = do
-- Label the thread for easier debugging and identification.
myThreadId >>= flip labelThread "Peer Tracer"
forever $ do
peers <- getCurrentPeers nodeKernel
traceWith tracer peers
threadDelay (delayMilliseconds * 1000)


data PeerT blk = PeerT
RemoteConnectionId
Expand Down
29 changes: 18 additions & 11 deletions cardano-node/src/Cardano/Node/Tracing/Tracers/Resources.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@ import Control.Concurrent.Async (async)
import Control.Monad (forM_, forever)
import Control.Monad.Class.MonadAsync (link)
import "contra-tracer" Control.Tracer
import GHC.Conc (labelThread, myThreadId)

startResourceTracer
:: Tracer IO ResourceStats
-> Int
-> IO ()
startResourceTracer tr delayMilliseconds = do
as <- async resourceThread
link as
-- | Starts a background thread to periodically trace resource statistics.
-- The thread reads resource stats and traces them using the given tracer.
-- It is linked to the parent thread to ensure proper error propagation.
startResourceTracer :: Tracer IO ResourceStats -> Int -> IO ()
startResourceTracer tracer delayMilliseconds = do
thread <- async resourceThread
-- Link the thread to the parent to propagate exceptions properly.
link thread
where
-- | The background thread that periodically traces resource stats.
resourceThread :: IO ()
resourceThread = forever $ do
mbrs <- readResourceStats
forM_ mbrs $ \rs -> traceWith tr rs
threadDelay (delayMilliseconds * 1000)
resourceThread = do
-- Label the thread for easier debugging and identification.
myThreadId >>= flip labelThread "Resource Stats Tracer"
forever $ do
maybeStats <- readResourceStats
-- If stats are available, trace them using the provided tracer.
forM_ maybeStats $ traceWith tracer
threadDelay (delayMilliseconds * 1000)

0 comments on commit 2ed706d

Please sign in to comment.