Skip to content

Commit

Permalink
Fix intersection negotiation on reconnections.
Browse files Browse the repository at this point in the history
  • Loading branch information
KtorZ committed Jan 26, 2023
1 parent 914cede commit 6cad2c9
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 36 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
### [2.3.4] - 2022-01-26

#### Added

N/A

#### Changed

- Fixed a restart issue where Kupo will continue synchronize back from the checkpoint known at startup instead of the latest known checkpoint after loosing (and recovering) connection from its block provider (cardano-node or ogmios).

#### Removed

N/A

### [2.3.3] - 2022-01-23

#### Added
Expand Down
2 changes: 1 addition & 1 deletion docs/api/v2.3.3.yaml → docs/api/v2.3.4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ servers:

info:
title: Kupo
version: v2.3.3
version: v2.3.4
license:
name: MPL-2.0
url: https://raw.githubusercontent.com/cardanosolutions/kupo/master/LICENSE
Expand Down
2 changes: 1 addition & 1 deletion docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@

<div id="versions">
<a href="#">v2.2.1</a>
<a href="#">v2.3.3</a>
<a href="#">v2.3.4</a>
<a href="#">latest</a>
<script>
;[1,2,3].forEach(ix => {
Expand Down
5 changes: 3 additions & 2 deletions kupo.cabal

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
_config: !include ".hpack.config.yaml"

name: kupo
version: 2.3.3
version: 2.3.4
stability: stable
github: "cardanosolutions/kupo"
license: MPL-2.0
Expand Down Expand Up @@ -132,6 +132,7 @@ tests:
- text
- wai
- wai-extra
- websockets
- yaml
build-tools:
- hspec-discover
Expand Down
2 changes: 1 addition & 1 deletion src/Kupo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ kupoWith tr withProducer withFetchBlock =

liftIO $ run $ \db -> do
patterns <- newPatternsCache (tracerConfiguration tr) config db
(mostRecentCheckpoint, checkpoints) <- startOrResume (tracerConfiguration tr) config db
let notifyTip = recordCheckpoint health
let statusToggle = connectionStatusToggle health
let tracerChainSync = contramap ConsumerChainSync . tracerConsumer
Expand Down Expand Up @@ -251,6 +250,7 @@ kupoWith tr withProducer withFetchBlock =

-- Block producer, fetching blocks from the network
( withChainSyncExceptionHandler (tracerChainSync tr) statusToggle $ do
(mostRecentCheckpoint, checkpoints) <- startOrResume (tracerConfiguration tr) config db
initializeHealth health mostRecentCheckpoint
producer
(tracerChainSync tr)
Expand Down
6 changes: 4 additions & 2 deletions src/Kupo/Data/Cardano/Tip.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{-# LANGUAGE PatternSynonyms #-}

module Kupo.Data.Cardano.Tip where
module Kupo.Data.Cardano.Tip
( module Kupo.Data.Cardano.Tip
, Ouroboros.getTipPoint
) where

import Kupo.Prelude

Expand Down Expand Up @@ -48,4 +51,3 @@ distanceToTip :: Tip -> SlotNo -> Word64
distanceToTip =
distanceToSlot . getTipSlotNo
{-# INLINABLE distanceToTip #-}

96 changes: 68 additions & 28 deletions test/Test/Kupo/AppSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import Control.Concurrent.STM.TChan
, tryReadTChan
, writeTChan
)
import Control.Concurrent.STM.TVar
( TVar
, readTVar
, newTVarIO
, writeTVar
)
import Control.Monad.Class.MonadAsync
( link
)
Expand Down Expand Up @@ -55,6 +61,9 @@ import Kupo.Control.MonadLog
, defaultTracers
, nullTracer
)
import Kupo.Control.MonadThrow
( throwIO
)
import Kupo.Control.MonadTime
( DiffTime
)
Expand Down Expand Up @@ -133,6 +142,9 @@ import Network.HTTP.Client
( defaultManagerSettings
, newManager
)
import Network.WebSockets
( ConnectionException (..)
)
import System.Environment
( lookupEnv
)
Expand Down Expand Up @@ -226,6 +238,7 @@ spec = do
prop "State-Machine" $ withMaxSuccess maxSuccess $
forAll genInputManagement $ \inputManagement -> do
forAll genServerPort $ \serverPort -> do
let httpClient = newHttpClientWith manager (serverHost, serverPort) (\_ -> pure ())
let stateMachine = StateMachine
initModel
transition
Expand All @@ -234,11 +247,7 @@ spec = do
Nothing
(generator inputManagement)
shrinker
(semantics
garbageCollectionInterval
(newHttpClientWith manager (serverHost, serverPort) (\_ -> pure ()))
chan
)
(semantics garbageCollectionInterval httpClient chan)
mock
(cleanup chan)
forAllCommands stateMachine Nothing $ \cmds -> monadicIO $ do
Expand All @@ -256,7 +265,7 @@ spec = do
, deferIndexes
}
env <- run (newEnvironment config)
producer <- run (newMockProducer <$> atomically (dupTChan chan))
producer <- run (newMockProducer httpClient <$> atomically (dupTChan chan))
fetchBlock <- run (newMockFetchBlock <$> atomically (dupTChan chan))
let kupo = kupoWith tracers producer fetchBlock `runWith` env
asyncId <- run (async kupo)
Expand Down Expand Up @@ -310,6 +319,7 @@ spec = do
data Event (r :: Type -> Type)
= DoRollForward !Tip !PartialBlock
| DoRollBackward !Tip !Point
| LoseConnection
| GetMostRecentCheckpoint
| GetPreviousCheckpoint !SlotNo
| GetUtxo
Expand All @@ -330,6 +340,8 @@ instance Show (Event r) where
toString $ "(DoRollForward " <> showPartialBlock ", " blk <> ")"
DoRollBackward _ pt ->
toString $ "(DoRollBackward " <> showPoint pt <> ")"
LoseConnection ->
"LoseConnection"
GetMostRecentCheckpoint ->
"GetMostRecentCheckpoint"
GetPreviousCheckpoint sl ->
Expand Down Expand Up @@ -471,6 +483,8 @@ precondition (LongestRollback k) model = \case
if point `elem` points && point /= tip && d <= k
then Top
else Bot
LoseConnection ->
Top
GetPreviousCheckpoint{} ->
Top
GetMostRecentCheckpoint ->
Expand Down Expand Up @@ -539,6 +553,8 @@ transition model cmd _res =
(hasConflictingInput unspentOutputReferences)
(mempool model <> foldMap blockBody (reverse dropped))
}
LoseConnection ->
model
GetMostRecentCheckpoint ->
model
GetPreviousCheckpoint{} ->
Expand Down Expand Up @@ -647,6 +663,7 @@ generator inputManagement model =
<*> selectPastPoint (blockPoint <$> blocks)
)
, (1, pure Pause)
, (1, pure LoseConnection)
]

-- | Generate a new network tip based on the current network tip and a local
Expand Down Expand Up @@ -836,22 +853,24 @@ cleanup chan _ = do
semantics
:: DiffTime
-> HttpClient IO
-> TChan RequestNextResponse
-> TChan (Either ConnectionException RequestNextResponse)
-> Event Concrete
-> IO (Response Concrete)
semantics pause HttpClient{..} chan = \case
DoRollForward tip block -> do
cp <- getMostRecentCheckpoint
atomically (writeTChan chan (RollForward tip block))
atomically $ writeTChan chan (Right $ RollForward tip block)
fmap Unit $ waitUntilM $ do
cp' <- getMostRecentCheckpoint
pure (cp' > cp)
DoRollBackward tip point -> do
cp <- getMostRecentCheckpoint
atomically (writeTChan chan (RollBackward tip point))
atomically $ writeTChan chan (Right $ RollBackward tip point)
fmap Unit $ waitUntilM $ do
cp' <- getMostRecentCheckpoint
pure (cp' < cp)
LoseConnection ->
Unit <$> atomically (writeTChan chan (Left ConnectionClosed))
GetMostRecentCheckpoint -> do
Checkpoint . Just <$> getMostRecentCheckpoint
GetPreviousCheckpoint sl -> do
Expand Down Expand Up @@ -884,6 +903,8 @@ mock model = \case
pure (Unit ())
DoRollBackward{} ->
pure (Unit ())
LoseConnection{} ->
pure (Unit ())
GetMostRecentCheckpoint ->
pure $ Checkpoint $ case currentChain model of
[] -> Just GenesisPoint
Expand Down Expand Up @@ -918,36 +939,53 @@ mock model = \case
-- bounded queue of events that are generated by the quickcheck machinery. It
-- does nothing more than passing information around in the mailbox.
newMockProducer
:: TChan RequestNextResponse
:: HttpClient IO
-> TChan (Either ConnectionException RequestNextResponse)
-> ( (Point -> ForcedRollbackHandler IO -> IO ())
-> Mailbox IO (Tip, PartialBlock) (Tip, Point)
-> ChainSyncClient IO PartialBlock
-> IO ()
)
-> IO ()
newMockProducer chan callback = do
newMockProducer HttpClient{..} chan callback = do
lastKnownTipVar <- newTVarIO GenesisPoint
mailbox <- atomically (newMailbox mailboxCapacity)
callback forcedRollbackCallback mailbox $ \_ -> \case
[GenesisPoint] -> do
const $ forever $ atomically $ do
readTChan chan >>= \case
RollForward tip block ->
putHighFrequencyMessage mailbox (tip, block)
RollBackward tip point ->
putIntermittentMessage mailbox (tip, point)
_notGenesis -> do
const $ fail
"Mock producer cannot start from a list of checkpoints. \
\Indeed, the goal is to test arbitrary sequences \
\of execution, for which starting 'from scratch' is \
\therefore not only sufficient but necessary."
requestedTip:_ -> \_ -> do
lastKnownTip <- atomically (readTVar lastKnownTipVar)
when (requestedTip /= lastKnownTip) $ do
putStrLn $ "RequestedTip: " <> show requestedTip
putStrLn $ "LastKnowntip: " <> show lastKnownTip
fail "Tip out of sync"
forever $ do
result <- atomically $ readTChan chan >>= \case
Right (RollForward tip block) ->
Right <$> putHighFrequencyMessage mailbox (tip, block)
Right (RollBackward tip point) ->
Right <$> putIntermittentMessage mailbox (tip, point)
Left e -> do
pure (Left e)
either
(\e -> do
atomically . writeTVar lastKnownTipVar =<< getMostRecentCheckpoint
throwIO e
)
return
result
[] -> do
const $ fail "Empty list requested for mock producer"
where
forcedRollbackCallback _point _handler =
fail "Mock producer cannot force rollback."

getMostRecentCheckpoint =
listCheckpoints <&> \case
[] -> GenesisPoint
h:_ -> h

-- | Mock a request to the node which returns the block immediately following the given point.
newMockFetchBlock
:: TChan RequestNextResponse
:: TChan (Either e RequestNextResponse)
-> (FetchBlockClient IO PartialBlock -> IO ())
-> IO ()
newMockFetchBlock chan callback =
Expand All @@ -956,13 +994,15 @@ newMockFetchBlock chan callback =
blocks <- applyBlocks [] <$> atomically (cloneTChan chan >>= flushTChan)
reply $ find ((== slotNo) . getPointSlotNo . blockPoint) blocks
where
applyBlocks :: [PartialBlock] -> [RequestNextResponse] -> [PartialBlock]
applyBlocks :: [PartialBlock] -> [Either e RequestNextResponse] -> [PartialBlock]
applyBlocks blocks = \case
[] ->
reverse blocks
(RollForward _ block):rest ->
Left{}:rest ->
applyBlocks blocks rest
(Right (RollForward _ block)):rest ->
applyBlocks (block:blocks) rest
(RollBackward _ point):rest ->
(Right (RollBackward _ point)):rest ->
let blocks' = dropWhile ((/= point) . blockPoint) blocks
in applyBlocks blocks' rest

Expand Down

0 comments on commit 6cad2c9

Please sign in to comment.