From f9c1f2666d392632d9ae9c04efd8a86847961cbc Mon Sep 17 00:00:00 2001 From: Commelina Date: Fri, 28 Apr 2023 17:21:06 +0800 Subject: [PATCH] processing: fix a bug that may cause task recovering failure (#1392) --- .../src/HStream/Processing/Connector.hs | 1 + .../src/HStream/Processing/Processor.hs | 15 ++++++++++----- hstream/src/HStream/Server/ConnectorTypes.hs | 1 + hstream/src/HStream/Server/Core/Subscription.hs | 4 ++++ hstream/src/HStream/Server/HStore.hs | 8 ++++++++ .../src/HStream/Server/Handler/Subscription.hs | 9 ++++----- 6 files changed, 28 insertions(+), 10 deletions(-) diff --git a/hstream-processing/src/HStream/Processing/Connector.hs b/hstream-processing/src/HStream/Processing/Connector.hs index 38ae9a864..ed6e1fb46 100644 --- a/hstream-processing/src/HStream/Processing/Connector.hs +++ b/hstream-processing/src/HStream/Processing/Connector.hs @@ -33,6 +33,7 @@ data SourceConnector = SourceConnector data SourceConnectorWithoutCkp = SourceConnectorWithoutCkp { subscribeToStreamWithoutCkp :: StreamName -> API.SpecialOffset -> IO (), unSubscribeToStreamWithoutCkp :: StreamName -> IO (), + isSubscribedToStreamWithoutCkp :: StreamName -> IO Bool, -- readRecordsWithoutCkp :: StreamName -> IO [SourceRecord] withReadRecordsWithoutCkp :: StreamName -> diff --git a/hstream-processing/src/HStream/Processing/Processor.hs b/hstream-processing/src/HStream/Processing/Processor.hs index 8f8fa5d2e..035b79526 100644 --- a/hstream-processing/src/HStream/Processing/Processor.hs +++ b/hstream-processing/src/HStream/Processing/Processor.hs @@ -167,14 +167,19 @@ runTask statsHolder SourceConnectorWithoutCkp {..} sinkConnector taskBuilder@Tas logOptions <- logOptionsHandle stderr True withLogFunc logOptions $ \lf -> do ctx <- buildTaskContext task lf changeLogger snapshotter - let offset = API.SpecialOffsetLATEST - forM_ sourceStreamNames (flip subscribeToStreamWithoutCkp offset) + forM_ sourceStreamNames $ \stream -> do + isSubscribedToStreamWithoutCkp stream >>= \case + True -> return () + False -> subscribeToStreamWithoutCkp stream API.SpecialOffsetLATEST chan <- newTChanIO - withAsync (forConcurrently_ sourceStreamNames (f chan connectorClosed)) $ \a -> - withAsync (g task ctx chan) $ \b -> do - waitEither_ a b `finally` forM_ sourceStreamNames unSubscribeToStreamWithoutCkp + withAsync (g task ctx chan) $ \b -> + waitEither_ a b `finally` do + forM_ sourceStreamNames (\stream -> do + isSubscribedToStreamWithoutCkp stream >>= \case + True -> unSubscribeToStreamWithoutCkp stream + False -> return () ) where qid = getTaskName taskBuilder f :: TChan ([SourceRecord], MVar ()) -> TVar Bool -> T.Text -> IO () diff --git a/hstream/src/HStream/Server/ConnectorTypes.hs b/hstream/src/HStream/Server/ConnectorTypes.hs index 12d5b7788..a802c72c3 100644 --- a/hstream/src/HStream/Server/ConnectorTypes.hs +++ b/hstream/src/HStream/Server/ConnectorTypes.hs @@ -81,6 +81,7 @@ data SourceConnector = SourceConnector data SourceConnectorWithoutCkp = SourceConnectorWithoutCkp { subscribeToStreamWithoutCkp :: StreamName -> API.SpecialOffset -> IO (), unSubscribeToStreamWithoutCkp :: StreamName -> IO (), + isSubscribedToStreamWithoutCkp :: StreamName -> IO Bool, --readRecordsWithoutCkp :: StreamName -> IO [SourceRecord] withReadRecordsWithoutCkp :: StreamName -> (BL.ByteString -> Maybe BL.ByteString) diff --git a/hstream/src/HStream/Server/Core/Subscription.hs b/hstream/src/HStream/Server/Core/Subscription.hs index 7db959a1d..8ecc915ae 100644 --- a/hstream/src/HStream/Server/Core/Subscription.hs +++ b/hstream/src/HStream/Server/Core/Subscription.hs @@ -54,6 +54,10 @@ import HStream.Utils (decompressBatchedRecord, -------------------------------------------------------------------------------- +checkSubscriptionExist :: ServerContext -> Text -> IO Bool +checkSubscriptionExist ServerContext{..} sid = + M.checkMetaExists @SubscriptionWrap sid metaHandle + listSubscriptions :: ServerContext -> IO (V.Vector Subscription) listSubscriptions sc = CC.listSubscriptions sc Nothing diff --git a/hstream/src/HStream/Server/HStore.hs b/hstream/src/HStream/Server/HStore.hs index 599f5bc74..fa995c315 100644 --- a/hstream/src/HStream/Server/HStore.hs +++ b/hstream/src/HStream/Server/HStore.hs @@ -72,6 +72,7 @@ hstoreSourceConnectorWithoutCkp :: ServerContext -> T.Text -> TVar Bool -> Sourc hstoreSourceConnectorWithoutCkp ctx consumerName consumerClosed = SourceConnectorWithoutCkp { subscribeToStreamWithoutCkp = subscribeToHStoreStream' ctx consumerName, unSubscribeToStreamWithoutCkp = unSubscribeToHStoreStream' ctx consumerName, + isSubscribedToStreamWithoutCkp = isSubscribedToHStoreStream' ctx consumerName, withReadRecordsWithoutCkp = withReadRecordsFromHStore' ctx consumerName, connectorClosed = consumerClosed } @@ -134,6 +135,13 @@ unSubscribeToHStoreStream' ctx consumerName streamName = do } Core.deleteSubscription ctx req +isSubscribedToHStoreStream' :: ServerContext + -> T.Text + -> HCT.StreamName + -> IO Bool +isSubscribedToHStoreStream' ctx consumerName streamName = + Core.checkSubscriptionExist ctx (hstoreSubscriptionPrefix <> streamName <> "_" <> consumerName) + dataRecordToSourceRecord :: S.LDClient -> Payload -> IO SourceRecord dataRecordToSourceRecord ldclient Payload {..} = do streamName <- S.streamName . fst <$> S.getStreamIdFromLogId ldclient pLogID diff --git a/hstream/src/HStream/Server/Handler/Subscription.hs b/hstream/src/HStream/Server/Handler/Subscription.hs index 43701a782..17bb0a78a 100644 --- a/hstream/src/HStream/Server/Handler/Subscription.hs +++ b/hstream/src/HStream/Server/Handler/Subscription.hs @@ -151,18 +151,17 @@ checkSubscriptionExistHandler :: ServerContext -> ServerRequest 'Normal CheckSubscriptionExistRequest CheckSubscriptionExistResponse -> IO (ServerResponse 'Normal CheckSubscriptionExistResponse) -checkSubscriptionExistHandler ServerContext {..} (ServerNormalRequest _metadata req@CheckSubscriptionExistRequest {..}) = defaultExceptionHandle $ do +checkSubscriptionExistHandler sc (ServerNormalRequest _metadata req@CheckSubscriptionExistRequest {..}) = defaultExceptionHandle $ do Log.debug $ "Receive checkSubscriptionExistHandler request: " <> Log.buildString (show req) - let sid = checkSubscriptionExistRequestSubscriptionId - res <- M.checkMetaExists @SubscriptionWrap sid metaHandle + res <- Core.checkSubscriptionExist sc checkSubscriptionExistRequestSubscriptionId returnResp . CheckSubscriptionExistResponse $ res handleCheckSubscriptionExist :: ServerContext -> G.UnaryHandler CheckSubscriptionExistRequest CheckSubscriptionExistResponse -handleCheckSubscriptionExist ServerContext{..} _ req = catchDefaultEx $ do +handleCheckSubscriptionExist sc _ req = catchDefaultEx $ do let sid = checkSubscriptionExistRequestSubscriptionId req - res <- M.checkMetaExists @SubscriptionWrap sid metaHandle + res <- Core.checkSubscriptionExist sc sid pure $ CheckSubscriptionExistResponse res -------------------------------------------------------------------------------