Skip to content

Commit

Permalink
fix: ReadStreamByKey rpc should close server stream when until offset…
Browse files Browse the repository at this point in the history
… reached (#1532)
  • Loading branch information
YangKian authored Jul 27, 2023
1 parent c3173b7 commit 5140610
Showing 1 changed file with 79 additions and 41 deletions.
120 changes: 79 additions & 41 deletions hstream/src/HStream/Server/Core/ShardReader.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}

module HStream.Server.Core.ShardReader
( createShardReader
Expand Down Expand Up @@ -277,7 +278,7 @@ readStreamByKey ServerContext{..} streamWriter streamReader =
Log.info $ "Create shardReader " <> Log.build readStreamByKeyRequestReaderId
-- Logdevice reader will blocked when no data returned by store
S.readerSetWaitOnlyWhenNoData reader
(sTimestamp, eTimestamp) <- startReadingShard scLDClient reader readStreamByKeyRequestReaderId shardId (toOffset <$> readStreamByKeyRequestFrom) (toOffset <$> readStreamByKeyRequestUntil)
(sTimestamp, eTimestamp) <- startReading reader readStreamByKeyRequestReaderId shardId (toOffset <$> readStreamByKeyRequestFrom) (toOffset <$> readStreamByKeyRequestUntil)

recordBuffer <- newIORef V.empty
let biReader = BiStreamReader { biStreamReader = reader
Expand All @@ -301,9 +302,9 @@ readStreamByKey ServerContext{..} streamWriter streamReader =
Log.info $ "shard reader " <> Log.build biStreamReaderId <> " stop reading, destroy reader"

readRecords :: (BiStreamReader, Maybe Word64) -> IO ()
{- First round read, read from hstore directly -}
readRecords (reader@BiStreamReader{..}, Just cnt) = do
-- First round read, read from hstore directly
isSuccess <- readLoop reader (fromIntegral cnt)
isSuccess <- readLoopInternal reader (fromIntegral cnt)
if isSuccess
then do
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId
Expand All @@ -312,57 +313,73 @@ readStreamByKey ServerContext{..} streamWriter streamReader =
<> ", shard " <> Log.build bistreamReaderTargetShard
readRecords (reader, Nothing)
else Log.info $ "BiStreamReader " <> Log.build biStreamReaderId <> " exit read because send records to client failed."
readRecords (reader@BiStreamReader{..}, Nothing) =
{-
- Later read loop. Waiting for the client to send the total number of messages for the next round of reads,
then cyclically reading data from the store to deliver to the client.
- Records that exceeds the total number of client requests is cached and prioritized for delivery in the
next round of delivery.
- Any failure to send causes the read loop to exit, and the biStreamReader will be destroyed
-}
{-
- Later read loop. Waiting for the client to send the total number of messages for the next round of reads,
then cyclically reading data from the store to deliver to the client.
- Records that exceeds the total number of client requests is cached and prioritized for delivery in the
next round of delivery.
- Any failure to send causes the read loop to exit, and the biStreamReader will be destroyed
-}
readRecords (reader@BiStreamReader{..}, Nothing) = do
whileM $ do
nextRoundReads <- handleClientRequest reader
case nextRoundReads of
Just cnt -> do
sends <- sendCachedRecords reader cnt
let cnt' = fromIntegral cnt - V.length sends
if V.all id sends
then do
isSuccess <- readLoop reader cnt'
when isSuccess $
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId
<> " finish reading " <> Log.build (show cnt)
<> " records from stream " <> Log.build biStreamReaderTargetStream
<> ", shard " <> Log.build bistreamReaderTargetShard
return isSuccess
else do
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId <> " exit read because send records to client failed."
return False
Nothing -> do
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId <> " exit read because no more data request by client."
return False

readLoop :: BiStreamReader -> Int -> IO Bool
readLoop reader@BiStreamReader{..} cnt
-- check if until offset is reached before holding on next client request
isReading <- S.readerIsReadingAny biStreamReader
if isReading
then readLoop reader
else do
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId
<> " reached until offset for stream " <> Log.build biStreamReaderTargetStream
<> ", shard " <> Log.build bistreamReaderTargetShard
return False
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId
<> " read stream " <> Log.build biStreamReaderTargetStream
<> ", shard " <> Log.build bistreamReaderTargetShard
<> " done."

readLoop :: BiStreamReader -> IO Bool
readLoop reader@BiStreamReader{..} = do
nextRoundReads <- handleClientRequest reader
case nextRoundReads of
Just cnt -> do
sends <- sendCachedRecords reader cnt
let cnt' = fromIntegral cnt - V.length sends
if V.all id sends
then do
isSuccess <- readLoopInternal reader cnt'
when isSuccess $
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId
<> " finish reading " <> Log.build (show cnt)
<> " records from stream " <> Log.build biStreamReaderTargetStream
<> ", shard " <> Log.build bistreamReaderTargetShard
return isSuccess
else do
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId <> " exit read because send records to client failed."
return False
Nothing -> do
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId <> " exit read because no more data request by client."
return False

readLoopInternal :: BiStreamReader -> Int -> IO Bool
readLoopInternal reader@BiStreamReader{..} cnt
| cnt == 0 = return True
| otherwise = do
records <- S.readerRead biStreamReader maxReadBatch
if null records
then do
-- check if until offset is reached.
isReading <- S.readerIsReadingAny biStreamReader
if isReading then readLoop reader cnt
else do Log.fatal $ "BiStreamReader " <> Log.build biStreamReaderId
<> " stop reading stream " <> Log.build biStreamReaderTargetStream
<> ", shard " <> Log.build bistreamReaderTargetShard
<> " unexpectedly."
throwIO $ HE.UnexpectedError $ "BiStreamReader " <> show biStreamReaderId <> " stop reading unexpected"
if isReading then readLoopInternal reader cnt
else do Log.info $ "BiStreamReader " <> Log.build biStreamReaderId
<> " reached until offset for stream " <> Log.build biStreamReaderTargetStream
<> ", shard " <> Log.build bistreamReaderTargetShard
return False
else do
res <- getResponseRecords biStreamReader bistreamReaderTargetShard records biStreamReaderId biStreamReaderStartTs biStreamReaderEndTs
let (res', remains) = V.splitAt cnt $ V.map (filterReceivedRecordByKey biStreamReaderTargetKey) res
_ <- atomicModifyIORef' biStreamRecordBuffer $ \buffer -> (buffer <> remains, V.empty)
successSends <- sendRecords reader (V.unzip <$> res')
if V.all id successSends
then readLoop reader (cnt - V.length successSends)
then readLoopInternal reader (cnt - V.length successSends)
else do
Log.info $ "BiStreamReader " <> Log.build biStreamReaderId <> " exit read because send records to client failed."
return False
Expand Down Expand Up @@ -410,6 +427,27 @@ readStreamByKey ServerContext{..} streamWriter streamReader =
filterHStreamRecords :: T.Text -> API.HStreamRecord -> Bool
filterHStreamRecords key record = getRecordKey record == key

startReading
:: S.LDReader
-> T.Text -- readerId, use for logging
-> S.C_LogID
-> Maybe ServerInternalOffset -- startOffset
-> Maybe ServerInternalOffset -- endOffset
-> IO (Maybe Int64, Maybe Int64)
startReading reader readerId rShardId rStart rEnd = do
(startLSN, sTimestamp) <- maybe (return (S.LSN_MIN, Nothing)) (getLogLSN scLDClient rShardId False) rStart
-- set default until LSN to tailLSN
(endLSN, eTimestamp) <- maybe ((, Nothing) <$> S.getTailLSN scLDClient rShardId) (getLogLSN scLDClient rShardId True) rEnd
when (endLSN < startLSN) $
throwIO . HE.ConflictShardReaderOffset $ "startLSN(" <> show startLSN <>") should less than and equal to endLSN(" <> show endLSN <> ")"
-- Since the LSN obtained by timestamp is not accurate, for scenarios where the endLSN is determined using a timestamp,
-- set the endLSN to LSN_MAX and do not rely on the underlying reader mechanism to determine the end of the read
let endLSN' = if isJust eTimestamp then S.LSN_MAX else endLSN
S.readerStartReading reader rShardId startLSN endLSN'
Log.info $ "ShardReader " <> Log.build readerId <> " start reading shard " <> Log.build rShardId
<> " from = " <> Log.build (show startLSN) <> ", to = " <> Log.build (show endLSN')
return (sTimestamp, eTimestamp)

----------------------------------------------------------------------------------------------------------------------------------
-- helper

Expand Down

0 comments on commit 5140610

Please sign in to comment.