Skip to content

Commit

Permalink
fix connector lookup (#1210)
Browse files Browse the repository at this point in the history
  • Loading branch information
Time-Hu authored Dec 29, 2022
1 parent d9e9675 commit 655207c
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 56 deletions.
1 change: 0 additions & 1 deletion hstream-io/HStream/IO/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ data HStreamConfig = HStreamConfig
data Worker = Worker
{ hsConfig :: HStreamConfig
, options :: IOOptions
, checkNode :: T.Text -> IO Bool
, ioTasksM :: C.MVar (HM.HashMap T.Text IOTask)
, monitorTid :: IORef C.ThreadId
, workerHandle :: MetaHandle
Expand Down
42 changes: 2 additions & 40 deletions hstream-io/HStream/IO/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import Data.IORef (newIORef, readIORef)
import qualified Data.IORef as C
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import GHC.Stack (HasCallStack)

import qualified HStream.IO.IOTask as IOTask
Expand All @@ -27,8 +25,8 @@ import qualified HStream.Server.HStreamApi as API
import qualified HStream.SQL.Codegen as CG
import HStream.Utils.Validation (validateNameAndThrow)

newWorker :: MetaHandle -> HStreamConfig -> IOOptions -> (T.Text -> IO Bool) -> IO Worker
newWorker mHandle hsConfig options checkNode = do
newWorker :: MetaHandle -> HStreamConfig -> IOOptions -> IO Worker
newWorker mHandle hsConfig options = do
Log.info $ "new Worker with hsConfig:" <> Log.buildString (show hsConfig)
ioTasksM <- C.newMVar HM.empty
monitorTid <- newIORef undefined
Expand All @@ -54,34 +52,6 @@ monitor worker@Worker{..} = do
ioTasks <- C.readMVar ioTasksM
forM_ ioTasks IOTask.checkProcess

createIOTaskFromSql :: Worker -> T.Text -> IO API.Connector
createIOTaskFromSql worker@Worker{..} sql = do
(CG.CreateConnectorPlan cType cName cTarget ifNotExist cfg) <- CG.streamCodegen sql
validateNameAndThrow cName
Log.info $ "CreateConnector CodeGen"
<> ", connector type: " <> Log.buildText cType
<> ", connector name: " <> Log.buildText cName
<> ", config: " <> Log.buildString (show cfg)
checkNode_ worker cName
taskId <- UUID.toText <$> UUID.nextRandom
let IOOptions {..} = options
taskType = if cType == "SOURCE" then SOURCE else SINK
image = makeImage taskType cTarget options
connectorConfig =
J.object
[ "hstream" J..= toTaskJson hsConfig taskId
, "connector" J..= cfg
]
taskInfo = TaskInfo
{ taskName = cName
, taskType = if cType == "SOURCE" then SOURCE else SINK
, taskConfig = TaskConfig image optTasksNetwork
, connectorConfig = connectorConfig
, originSql = sql
}
createIOTask worker taskId taskInfo
return $ mkConnector cName (ioTaskStatusToText NEW)

createIOTask :: HasCallStack => Worker -> T.Text -> TaskInfo -> IO ()
createIOTask Worker{..} taskId taskInfo@TaskInfo {..} = do
let taskPath = optTasksPath options <> "/" <> taskId
Expand All @@ -105,13 +75,11 @@ listIOTasks Worker{..} = M.listIOTaskMeta workerHandle

stopIOTask :: Worker -> T.Text -> Bool -> Bool-> IO ()
stopIOTask worker name ifIsRunning force = do
checkNode_ worker name
ioTask <- getIOTask worker name
IOTask.stopIOTask ioTask ifIsRunning force

startIOTask :: Worker -> T.Text -> IO ()
startIOTask worker name = do
checkNode_ worker name
getIOTask worker name >>= IOTask.startIOTask

getIOTask :: Worker -> T.Text -> IO IOTask
Expand All @@ -123,16 +91,10 @@ getIOTask Worker{..} name = do

deleteIOTask :: Worker -> T.Text -> IO ()
deleteIOTask worker@Worker{..} taskName = do
checkNode_ worker taskName
stopIOTask worker taskName True False
M.deleteIOTaskMeta workerHandle taskName
C.modifyMVar_ ioTasksM $ return . HM.delete taskName

checkNode_ :: Worker -> T.Text -> IO ()
checkNode_ Worker{..} name = do
res <- checkNode name
unless res . throwIO $ WrongNodeException "send HStream IO request to wrong node"

makeImage :: IOTaskType -> T.Text -> IOOptions -> T.Text
makeImage typ name IOOptions{..} =
fromMaybe
Expand Down
3 changes: 2 additions & 1 deletion hstream/src/HStream/Server/Core/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import HStream.Server.Core.Common
import qualified HStream.Server.Core.Stream as Core
import qualified HStream.Server.Core.View as Core
import HStream.Server.Handler.Common
import HStream.Server.Handler.Connector
import qualified HStream.Server.HStore as HStore
import HStream.Server.HStreamApi
import qualified HStream.Server.HStreamApi as API
Expand Down Expand Up @@ -127,7 +128,7 @@ executeQuery sc@ServerContext{..} CommandQuery{..} = do
pure $ API.CommandQueryResponse (mkVectorStruct s "created_stream")
CreateConnectorPlan _ cName _ _ _ -> do
validateNameAndThrow cName
void $ IO.createIOTaskFromSql scIOWorker commandQueryStmtText
void $ createIOTaskFromSql sc commandQueryStmtText
pure $ CommandQueryResponse V.empty
InsertPlan {} -> discard "Append"
DropPlan checkIfExist dropObject ->
Expand Down
80 changes: 70 additions & 10 deletions hstream/src/HStream/Server/Handler/Connector.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand All @@ -20,23 +21,34 @@ module HStream.Server.Handler.Connector
, handleDeleteConnector
, handleResumeConnector
, handlePauseConnector

, createIOTaskFromSql
) where

import Control.Exception (throwIO)
import Control.Monad (unless)
import qualified Data.Aeson as A
import qualified Data.Text as T
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified Data.Vector as V
import qualified HsGrpc.Server as G
import Network.GRPC.HighLevel.Generated

import qualified HStream.Exception as HE
import qualified HStream.IO.Types as IO
import qualified HStream.IO.Worker as IO
import qualified HStream.Logger as Log
import HStream.Server.Core.Common (lookupResource')
import HStream.Server.Exception (catchDefaultEx,
defaultExceptionHandle)
import HStream.Server.HStreamApi
import HStream.Server.Types
import qualified HStream.SQL.Codegen as CG
import HStream.ThirdParty.Protobuf (Empty (..))
import HStream.Utils (returnResp)
import HStream.Utils (ResourceType (..),
returnResp,
validateNameAndThrow)

createConnectorHandler
:: ServerContext
Expand All @@ -45,11 +57,11 @@ createConnectorHandler
createConnectorHandler sc
(ServerNormalRequest _ CreateConnectorRequest{..}) = defaultExceptionHandle $ do
Log.debug "Receive Create Sink Connector Request"
IO.createIOTaskFromSql (scIOWorker sc) createConnectorRequestSql >>= returnResp
createIOTaskFromSql sc createConnectorRequestSql >>= returnResp

handleCreateConnector :: ServerContext -> G.UnaryHandler CreateConnectorRequest Connector
handleCreateConnector sc _ CreateConnectorRequest{..} = catchDefaultEx $
IO.createIOTaskFromSql (scIOWorker sc) createConnectorRequestSql
createIOTaskFromSql sc createConnectorRequestSql

listConnectorsHandler
:: ServerContext
Expand Down Expand Up @@ -87,43 +99,91 @@ deleteConnectorHandler
:: ServerContext
-> ServerRequest 'Normal DeleteConnectorRequest Empty
-> IO (ServerResponse 'Normal Empty)
deleteConnectorHandler ServerContext{..}
deleteConnectorHandler sc@ServerContext{..}
(ServerNormalRequest _metadata DeleteConnectorRequest{..}) = defaultExceptionHandle $ do
Log.debug $ "Receive Delete Connector Request. "
<> "Connector Name: " <> Log.buildText deleteConnectorRequestName
ServerNode{..} <- lookupResource' sc ResConnector deleteConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.deleteIOTask scIOWorker deleteConnectorRequestName
returnResp Empty

handleDeleteConnector :: ServerContext -> G.UnaryHandler DeleteConnectorRequest Empty
handleDeleteConnector ServerContext{..} _ DeleteConnectorRequest{..} = catchDefaultEx $
IO.deleteIOTask scIOWorker deleteConnectorRequestName >> pure Empty
handleDeleteConnector sc@ServerContext{..} _ DeleteConnectorRequest{..} = catchDefaultEx $ do
ServerNode{..} <- lookupResource' sc ResConnector deleteConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.deleteIOTask scIOWorker deleteConnectorRequestName >> pure Empty

resumeConnectorHandler
:: ServerContext
-> ServerRequest 'Normal ResumeConnectorRequest Empty
-> IO (ServerResponse 'Normal Empty)
resumeConnectorHandler ServerContext{..}
resumeConnectorHandler sc@ServerContext{..}
(ServerNormalRequest _metadata ResumeConnectorRequest{..}) = defaultExceptionHandle $ do
Log.debug $ "Receive ResumeConnectorRequest. "
<> "Connector Name: " <> Log.buildText resumeConnectorRequestName
ServerNode{..} <- lookupResource' sc ResConnector resumeConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.startIOTask scIOWorker resumeConnectorRequestName
returnResp Empty

handleResumeConnector :: ServerContext -> G.UnaryHandler ResumeConnectorRequest Empty
handleResumeConnector ServerContext{..} _ ResumeConnectorRequest{..} = catchDefaultEx $
handleResumeConnector sc@ServerContext{..} _ ResumeConnectorRequest{..} = catchDefaultEx $ do
ServerNode{..} <- lookupResource' sc ResConnector resumeConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.startIOTask scIOWorker resumeConnectorRequestName >> pure Empty

pauseConnectorHandler
:: ServerContext
-> ServerRequest 'Normal PauseConnectorRequest Empty
-> IO (ServerResponse 'Normal Empty)
pauseConnectorHandler ServerContext{..}
pauseConnectorHandler sc@ServerContext{..}
(ServerNormalRequest _metadata PauseConnectorRequest{..}) = defaultExceptionHandle $ do
Log.debug $ "Receive Terminate Connector Request. "
<> "Connector ID: " <> Log.buildText pauseConnectorRequestName
ServerNode{..} <- lookupResource' sc ResConnector pauseConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.stopIOTask scIOWorker pauseConnectorRequestName False False
returnResp Empty

handlePauseConnector :: ServerContext -> G.UnaryHandler PauseConnectorRequest Empty
handlePauseConnector ServerContext{..} _ PauseConnectorRequest{..} = catchDefaultEx $
handlePauseConnector sc@ServerContext{..} _ PauseConnectorRequest{..} = catchDefaultEx $ do
ServerNode{..} <- lookupResource' sc ResConnector pauseConnectorRequestName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
IO.stopIOTask scIOWorker pauseConnectorRequestName False False >> pure Empty

createIOTaskFromSql :: ServerContext -> T.Text -> IO Connector
createIOTaskFromSql sc@ServerContext{scIOWorker = worker@IO.Worker{..}, ..} sql = do
(CG.CreateConnectorPlan cType cName cTarget ifNotExist cfg) <- CG.streamCodegen sql
validateNameAndThrow cName
ServerNode{..} <- lookupResource' sc ResConnector cName
unless (serverNodeId == serverID) $
throwIO $ HE.WrongServer "Connector is bound to a different node"
Log.info $ "CreateConnector CodeGen"
<> ", connector type: " <> Log.buildText cType
<> ", connector name: " <> Log.buildText cName
<> ", config: " <> Log.buildString (show cfg)
taskId <- UUID.toText <$> UUID.nextRandom
let IO.IOOptions {..} = options
taskType = if cType == "SOURCE" then IO.SOURCE else IO.SINK
image = IO.makeImage taskType cTarget options
connectorConfig =
A.object
[ "hstream" A..= IO.toTaskJson hsConfig taskId
, "connector" A..= cfg
]
taskInfo = IO.TaskInfo
{ taskName = cName
, taskType = if cType == "SOURCE" then IO.SOURCE else IO.SINK
, taskConfig = IO.TaskConfig image optTasksNetwork
, connectorConfig = connectorConfig
, originSql = sql
}
IO.createIOTask worker taskId taskInfo
return $ IO.mkConnector cName (IO.ioTaskStatusToText IO.NEW)
4 changes: 0 additions & 4 deletions hstream/src/HStream/Server/Initialization.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ initializeServer opts@ServerOpts{..} gossipContext hh serverState = do
hh
(IO.HStreamConfig (cBytesToText (CB.pack _serverAddress <> ":" <> CB.pack (show _serverPort))))
_ioOptions
(\k -> do
(_e, hr) <- readTVarIO epochHashRing
return $ getAllocatedNodeId hr k == _serverID
)

shardInfo <- newMVar HM.empty
shardTable <- newMVar HM.empty
Expand Down

0 comments on commit 655207c

Please sign in to comment.