Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: make hstream_enable_schema flag ready #1541

Merged
merged 3 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hstream-sql/src/HStream/SQL/Binder/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ instance Bind DataType where
bind TypeJson {} = return BTypeJsonb
bind (TypeArray _ t) = bind t >>= return . BTypeArray

----------------------------------------
-- hard-coded values
----------------------------------------
winStartText :: Text
winStartText = "window_start"

winEndText :: Text
winEndText = "window_end"

----------------------------------------
-- exceptions
----------------------------------------
Expand Down
8 changes: 4 additions & 4 deletions hstream-sql/src/HStream/SQL/Binder/Select.hs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ instance Bind TableRef where
pushLayer topLayer
let layerMaxColIndex = L.maximum . L.concat $ map (fmap fst . HM.elems) (HM.elems topHM)
let windowLayer = BindContextLayer
{ layerColumnBindings = HM.fromList [ ("winStartText", HM.fromList [("", (layerMaxColIndex+1,BindUnitBase))])
, ("winEndText" , HM.fromList [("", (layerMaxColIndex+2,BindUnitBase))])
{ layerColumnBindings = HM.fromList [ (winStartText, HM.fromList [("", (layerMaxColIndex+1,BindUnitBase))])
, (winEndText , HM.fromList [("", (layerMaxColIndex+2,BindUnitBase))])
]
}
pushLayer windowLayer
Expand All @@ -194,8 +194,8 @@ instance Bind TableRef where
pushLayer topLayer
let layerMaxColIndex = L.maximum . L.concat $ map (fmap fst . HM.elems) (HM.elems topHM)
let windowLayer = BindContextLayer
{ layerColumnBindings = HM.fromList [ ("winStartText", HM.fromList [("", (layerMaxColIndex+1,BindUnitBase))])
, ("winEndText" , HM.fromList [("", (layerMaxColIndex+2,BindUnitBase))])
{ layerColumnBindings = HM.fromList [ (winStartText, HM.fromList [("", (layerMaxColIndex+1,BindUnitBase))])
, (winEndText , HM.fromList [("", (layerMaxColIndex+2,BindUnitBase))])
]
}
pushLayer windowLayer
Expand Down
2 changes: 1 addition & 1 deletion hstream-sql/src/HStream/SQL/Codegen/CommonNew.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ scalarExprToFun :: ScalarExpr -> FlowObject -> Either ERROR_TYPE FlowValue
scalarExprToFun scalar o = case scalar of
ColumnRef si ci ->
case getField (si,ci) o of
Nothing -> Left . ERR $ "Can not get column: " <> T.pack (show si) <> "." <> T.pack (show ci)
Nothing -> Left . ERR $ "Can not get column: " <> T.pack (show si) <> "." <> T.pack (show ci) <> "in object: " <> T.pack (show o)
Just (_,v) -> Right v
Literal constant -> Right $ constantToFlowValue constant
CallUnary op scalar -> do
Expand Down
4 changes: 2 additions & 2 deletions hstream-sql/src/HStream/SQL/Codegen/V1New.hs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ relationExprToGraph relation builder = case relation of
s' <- HS.stream sourceConfig builder
return (EStream1 s', [schemaOwner schema], [], [])
LoopJoinOn schema r1 r2 expr typ t -> do
let joiner = \o1 o2 -> o1 <++> o2
let joiner = \o1 o2 -> setFlowObjectStreamId 0 (o1 <++> o2)
joinCond = \record1 record2 ->
case scalarExprToFun expr (recordValue record1 `HM.union` recordValue record2) of
Left e -> False -- FIXME: log error message
Expand Down Expand Up @@ -342,7 +342,7 @@ relationExprToGraph relation builder = case relation of
let aggregateR = \acc Record{..} ->
case aggregateF acc recordValue of
Left (e,v) -> v -- FIXME: log error message
Right v -> v
Right v -> setFlowObjectStreamId 0 v
aggregateMerge' = \k o1 o2 ->
case aggregateMergeF k o1 o2 of
Left (e,v) -> v
Expand Down
59 changes: 42 additions & 17 deletions hstream-sql/src/HStream/SQL/PlannerNew.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module HStream.SQL.PlannerNew (
import Control.Monad.Reader
import Control.Monad.State
import Data.Functor ((<&>))
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap as IntMap
import qualified Data.List as L
import Data.Maybe (fromMaybe)
Expand Down Expand Up @@ -50,7 +51,7 @@ instance Plan BoundTableRef where
Just schema -> do
-- 1-in/1-out
let schema' = setSchemaStreamId 0 schema
let ctx = PlanContext (IntMap.singleton 0 schema')
let ctx = PlanContext (IntMap.singleton 0 schema') mempty
put ctx
return $ StreamScan schema'

Expand All @@ -59,7 +60,7 @@ instance Plan BoundTableRef where
-- 1-in/1-out
let schema = setSchemaStream name $
setSchemaStreamId 0 (relationExprSchema relationExpr)
let ctx = PlanContext (IntMap.singleton 0 schema)
let ctx = PlanContext (IntMap.singleton 0 schema) mempty
put ctx
return relationExpr

Expand All @@ -83,20 +84,31 @@ instance Plan BoundTableRef where
ctx2 <- get

put $ PlanContext
{ planContextSchemas = planContextSchemas ctx1 <::> planContextSchemas ctx2
{ planContextSchemas = planContextSchemas ctx1 <::>
planContextSchemas ctx2
, planContextBasicRefs = mempty
}
(scalarExpr,_) <- plan expr

-- 1-out!
let schema = setSchemaStream name $
setSchemaStreamId 0 $ Schema
setSchemaStreamId 0 $ Schema
{ schemaOwner = name
, schemaColumns = schemaColumns schema1
<:+:> schemaColumns schema2
, schemaColumns = schemaColumns schema1 <:+:>
schemaColumns schema2
}
put $ PlanContext
{ planContextSchemas = IntMap.map (setSchemaStreamId 0)
(IntMap.singleton 0 schema <::> planContextSchemas ctx1 <::> planContextSchemas ctx2)
{ planContextSchemas = IntMap.map (setSchemaStreamId 0) (IntMap.singleton 0 schema)
, planContextBasicRefs = HM.fromList
(L.map (\(colAbs, colRel) ->
( (columnStream colAbs, columnId colAbs)
, (columnStream colRel, columnId colRel)
)
) ((IntMap.elems (schemaColumns schema1) ++
IntMap.elems (schemaColumns schema2))
`zip` (IntMap.elems (schemaColumns schema)))
) `HM.union` (planContextBasicRefs ctx1 `HM.union` planContextBasicRefs ctx2)
-- FIXME: clean up!!
}
let win = calendarDiffTimeToMs interval
return $ LoopJoinOn schema relationExpr1' relationExpr2' scalarExpr typ win
Expand Down Expand Up @@ -129,17 +141,16 @@ instance Plan BoundAgg where
(aggTups :: [(AggregateExpr, ColumnCatalog)])
<- (mapM (\(boundExpr,alias_m) -> do
-- Note: a expr can have multiple aggs: `SUM(a) + MAX(b)`
let boundAggExprs = exprAggregates boundExpr
mapM (\boundAggExpr -> do
let (BoundExprAggregate name boundAgg) = boundAggExpr
(agg,catalog) <- plan boundAgg
let catalog' = catalog { columnName = T.pack name } -- FIXME
mapM (\aggBoundExpr -> do
let (BoundExprAggregate name aggBound) = aggBoundExpr
(agg,catalog) <- plan aggBound
let catalog' = catalog { columnName = T.pack name } -- WARNING: do not use alias here
return (agg,catalog')
) boundAggExprs
) (exprAggregates boundExpr)
) selTups
) <&> L.concat

let grpsIntmap = IntMap.fromList $ L.map (\(i, (_,catalog)) -> (i, catalog{columnId = i})
let grpsIntmap = IntMap.fromList $ L.map (\(i, (_,catalog)) -> (i, catalog{columnId = i, columnStream = ""}) -- FIXME: stream name & clean up
) ([0..] `zip` grps)
aggsIntmap = IntMap.fromList $ L.map (\(i, (_,catalog)) -> (i, catalog{columnId = i})
) ([0..] `zip` aggTups)
Expand Down Expand Up @@ -168,7 +179,18 @@ instance Plan BoundAgg where
let new_schema = Schema { schemaOwner = "" -- FIXME
, schemaColumns = grpsIntmap <:+:> aggsIntmap <:+:> dummyTimewindowCols
}
let ctx_new = PlanContext (IntMap.singleton 0 new_schema)
let ctx_new = PlanContext
{ planContextSchemas = IntMap.singleton 0 new_schema
, planContextBasicRefs = HM.fromList
(L.map (\(colAbs, colRel) ->
( (columnStream colAbs, columnId colAbs)
, (columnStream colRel, columnId colRel)
)
) (((snd <$> grps)
`zip` (IntMap.elems grpsIntmap)))
) `HM.union` planContextBasicRefs ctx_old -- WARNING: order matters!
}
-- FIXME: clean up!!
put ctx_new
return $ \upstream -> Reduce new_schema upstream (fst <$> grps) (fst <$> aggTups) win_m

Expand Down Expand Up @@ -196,7 +218,10 @@ instance Plan BoundSel where
let new_schema = Schema { schemaOwner = "" -- FIXME
, schemaColumns = new_schema_cols
}
let ctx_new = PlanContext (IntMap.singleton 0 new_schema)
let ctx_new = PlanContext
{ planContextSchemas = IntMap.singleton 0 new_schema
, planContextBasicRefs = planContextBasicRefs ctx_old
}
put ctx_new
return $ \upstream -> Project new_schema upstream (fst <$> projTups)

Expand Down
9 changes: 6 additions & 3 deletions hstream-sql/src/HStream/SQL/PlannerNew/Expr.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ instance Plan (Aggregate BoundExpr) where
Nullary nullary -> do
let catalog = ColumnCatalog { columnId = 0
, columnName = T.pack $ show agg
-- FIXME: only a placeholder. It will be replaced by the aggregate expression name, see `PlannerNew#BoundAgg`.
, columnStreamId = 0
, columnStream = ""
, columnType = BTypeInteger
Expand All @@ -34,12 +35,14 @@ instance Plan (Aggregate BoundExpr) where
return (Nullary nullary, catalog)
Unary op e -> do
(e',c') <- plan e
let catalog = c' { columnName = T.pack $ show agg } -- FIXME
let catalog = c' { columnName = T.pack $ show agg }
-- FIXME: only a placeholder. It will be replaced by the aggregate expression name, see `PlannerNew#BoundAgg`.
return (Unary op e', catalog)
Binary op e1 e2 -> do
(e1',c1') <- plan e1
(e2',_) <- plan e2
let catalog = c1' { columnName = T.pack $ show agg } -- FIXME
let catalog = c1' { columnName = T.pack $ show agg }
-- FIXME: only a placeholder. It will be replaced by the aggregate expression name, see `PlannerNew#BoundAgg`.
return (Binary op e1' e2', catalog)

type instance PlannedType BoundExpr = (ScalarExpr, ColumnCatalog)
Expand Down Expand Up @@ -90,7 +93,7 @@ instance Plan BoundExpr where
return (AccessArray e' rhs, catalog)
BoundExprCol name stream col colId -> do
ctx <- get
case lookupColumn ctx stream col of
case lookupColumn ctx stream colId of
Nothing -> throwSQLException PlanException Nothing $ "column " <> T.unpack stream <> ".#" <> show colId <> " not found in ctx " <> show ctx
Just (_,columnId,catalog) ->
return (ColumnRef (columnStreamId catalog) columnId, catalog)
Expand Down
43 changes: 25 additions & 18 deletions hstream-sql/src/HStream/SQL/PlannerNew/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import Control.Applicative ((<|>))
import Control.Monad.Reader
import Control.Monad.State
import qualified Data.Aeson as Aeson
import qualified Data.Bimap as Bimap
import Data.Function (on)
import Data.Hashable
import qualified Data.HashMap.Strict as HM
Expand Down Expand Up @@ -92,11 +91,15 @@ instance Show ScalarExpr where
-- planner context
----------------------------------------
data PlanContext = PlanContext
{ planContextSchemas :: IntMap Schema
{ planContextSchemas :: IntMap Schema
, planContextBasicRefs :: HM.HashMap (Text,Int) (Text,Int)
-- absolute -> relative e.g. s1.0 -> _join#0.1
-- this is only used in `lookupColumn`.
-- FIXME: design a better structure to handle relative schemas
} deriving (Show)

defaultPlanContext :: PlanContext
defaultPlanContext = PlanContext mempty
defaultPlanContext = PlanContext mempty mempty

-- | Compose two 'IntMap's of stream schemas with their indexes. It assumes
-- that the two 'IntMap's have contiguous indexes starting from 0 in their
Expand All @@ -114,28 +117,32 @@ defaultPlanContext = PlanContext mempty
in IntMap.fromList (tups1 <> tups2')

instance Semigroup PlanContext where
(PlanContext s1) <> (PlanContext s2) = PlanContext (s1 <::> s2)
(PlanContext s1 hm1) <> (PlanContext s2 hm2) = PlanContext (s1 <::> s2) (hm1 `HM.union` hm2)
instance Monoid PlanContext where
mempty = PlanContext mempty
mempty = PlanContext mempty mempty

-- | Lookup a certain column in the planning context with
-- stream name and column name. Return the index of
-- stream name and column index. Return the index of
-- matched stream, the real index of the column
-- and the catalog of matched column.
-- WARNING: The returned stream index may not be the same
-- as the `streamId` of the column! This should
-- be fixed in the future.
lookupColumn :: PlanContext -> Text -> Text -> Maybe (Int, Int, ColumnCatalog)
lookupColumn (PlanContext m) streamName colName =
L.foldr (\(i,Schema{..}) acc -> case acc of
Just _ -> acc
Nothing ->
let catalogTup_m = L.find (\(n, ColumnCatalog{..}) ->
columnStream == streamName &&
columnName == colName
) (IntMap.toList schemaColumns)
in (fmap (\(n,catalog) -> (i,n,catalog)) catalogTup_m) <|> acc
) Nothing (IntMap.toList m)
lookupColumn :: PlanContext -> Text -> Int -> Maybe (Int, Int, ColumnCatalog)
lookupColumn (PlanContext m baseRefs) streamName colId =
let (streamName', colId') = go (streamName, colId)
in L.foldr (\(i,Schema{..}) acc -> case acc of
Just _ -> acc
Nothing ->
let catalogTup_m = L.find (\(n, ColumnCatalog{..}) ->
columnStream == streamName' &&
columnId == colId'
) (IntMap.toList schemaColumns)
in (fmap (\(n,catalog) -> (i,n,catalog)) catalogTup_m) <|> acc
) Nothing (IntMap.toList m)
where go (s,i) = case HM.lookup (s,i) baseRefs of
Just (s',i') -> go (s',i')
Nothing -> (s,i)

-- | Lookup a certain column name in the planning context. Return the index of
-- matched stream, the real index of the column
Expand All @@ -144,7 +151,7 @@ lookupColumn (PlanContext m) streamName colName =
-- as the `columnStreamId` of the column! This should
-- be fixed in the future.
lookupColumnName :: PlanContext -> Text -> Maybe (Int, Int, ColumnCatalog)
lookupColumnName (PlanContext m) k =
lookupColumnName (PlanContext m _) k =
L.foldr (\(i,Schema{..}) acc -> case acc of
Just _ -> acc
Nothing ->
Expand Down
35 changes: 20 additions & 15 deletions hstream-sql/src/HStream/SQL/Rts/New.hs
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,21 @@ jsonValueToFlowValue (columnType, v) = case v of
Aeson.String t -> case columnType of
BTypeDate -> case iso8601ParseM (Text.unpack t) of
Just d -> FlowDate d
Nothing -> throwRuntimeException $ "Invalid date value" <> Text.unpack t
Nothing -> throwRuntimeException $ "Invalid date value " <> Text.unpack t
BTypeTime -> case iso8601ParseM (Text.unpack t) of
Just d -> FlowTime d
Nothing -> throwRuntimeException $ "Invalid time value" <> Text.unpack t
Nothing -> throwRuntimeException $ "Invalid time value " <> Text.unpack t
BTypeTimestamp -> case iso8601ParseM (Text.unpack t) of
Just d -> FlowTimestamp d
Nothing -> throwRuntimeException $ "Invalid timestamp value" <> Text.unpack t
Nothing -> throwRuntimeException $ "Invalid timestamp value " <> Text.unpack t
BTypeInterval -> case iso8601ParseM (Text.unpack t) of
Just d -> FlowInterval d
Nothing -> throwRuntimeException $ "Invalid interval value" <> Text.unpack t
Nothing -> throwRuntimeException $ "Invalid interval value " <> Text.unpack t
_ -> FlowText t -- FIXME
Aeson.Number n -> case columnType of
BTypeFloat -> FlowFloat (Scientific.toRealFloat n)
BTypeInteger -> FlowInt (floor n)
_ -> throwRuntimeException $ "Invalid number value" <> show n
_ -> throwRuntimeException $ "Invalid number value " <> show n
Aeson.Array arr ->
let (BTypeArray elemType) = columnType
in FlowArray (V.toList $
Expand All @@ -176,7 +176,7 @@ jsonValueToFlowValue (columnType, v) = case v of
[("$binary", Aeson.Object obj')] -> case do
Aeson.String t <- HsAeson.lookup "base64" obj'
Base64.base64Decode (CB.toBytes $ textToCBytes t) of
Nothing -> throwRuntimeException $ "Invalid $binary value" <> show obj'
Nothing -> throwRuntimeException $ "Invalid $binary value " <> show obj'
Just bs -> FlowByte (CB.fromBytes bs)
_ -> FlowSubObject (jsonObjectToFlowObject' obj)
_ -> throwRuntimeException $ "Invalid json value: " <> show v
Expand Down Expand Up @@ -223,9 +223,15 @@ extractJsonObjectSchema json =
jsonObjectToFlowObject :: Schema -> Aeson.Object -> FlowObject
jsonObjectToFlowObject schema object =
let list = HsAeson.toList object
list' = L.map (\(catalog,(k,v)) ->
(catalog, jsonValueToFlowValue (columnType catalog, v))
) ((IntMap.elems (schemaColumns schema)) `zip` list)
list' = L.map (\(k,v) ->
let catalog = case L.find (\col -> columnName col == HsAeson.toText k)
(IntMap.elems $ schemaColumns schema) of
Just cata -> cata
Nothing -> throwRuntimeException $ -- FIXME: Just omit it instead of throwing exception?
"Invalid key encountered: " <> show k <>
". Schema=" <> show schema
in (catalog, jsonValueToFlowValue (columnType catalog, v))
) list
in HM.fromList list'

jsonObjectToFlowObject' :: Aeson.Object -> FlowObject
Expand All @@ -248,9 +254,8 @@ infixl 5 <++>
tups2' = L.map (\(cata, v) -> (cata { columnId = columnId cata + maxId1 + 1 }, v)) tups2
in HM.fromList (tups1 <> tups2')

--------------------------------------------------------------------------------
winStartText :: Text
winStartText = "window_start"

winEndText :: Text
winEndText = "window_end"
setFlowObjectStreamId :: Int -> FlowObject -> FlowObject
setFlowObjectStreamId streamId hm =
let tups = HM.toList hm
tups' = L.map (\(cata, v) -> (cata { columnStreamId = streamId }, v)) tups
in HM.fromList tups'
2 changes: 2 additions & 0 deletions hstream/hstream.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,10 @@ test-suite hstream-test
HStream.AdminCommandSpec
HStream.ConfigSpec
HStream.HandlerSpec
HStream.RegressionNewSpec
HStream.RegressionSpec
HStream.RunQuerySpec
HStream.RunSQLNewSpec
HStream.RunSQLSpec
HStream.ShardSpec
HStream.SpecUtils
Expand Down
Loading