Skip to content

Commit

Permalink
sql: make hstream_enable_schema flag ready (#1541)
Browse files Browse the repository at this point in the history
* hstream: schemaful sql tests

* sql: fix incorrect window start & end name

* sql: fix incorrect column in planner caused by absolute ref
  • Loading branch information
Commelina authored Aug 3, 2023
1 parent aec998f commit 08ce1a9
Show file tree
Hide file tree
Showing 15 changed files with 471 additions and 68 deletions.
9 changes: 9 additions & 0 deletions hstream-sql/src/HStream/SQL/Binder/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ instance Bind DataType where
bind TypeJson {} = return BTypeJsonb
bind (TypeArray _ t) = bind t >>= return . BTypeArray

----------------------------------------
-- hard-coded values
----------------------------------------
winStartText :: Text
winStartText = "window_start"

winEndText :: Text
winEndText = "window_end"

----------------------------------------
-- exceptions
----------------------------------------
Expand Down
8 changes: 4 additions & 4 deletions hstream-sql/src/HStream/SQL/Binder/Select.hs
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ instance Bind TableRef where
pushLayer topLayer
let layerMaxColIndex = L.maximum . L.concat $ map (fmap fst . HM.elems) (HM.elems topHM)
let windowLayer = BindContextLayer
{ layerColumnBindings = HM.fromList [ ("winStartText", HM.fromList [("", (layerMaxColIndex+1,BindUnitBase))])
, ("winEndText" , HM.fromList [("", (layerMaxColIndex+2,BindUnitBase))])
{ layerColumnBindings = HM.fromList [ (winStartText, HM.fromList [("", (layerMaxColIndex+1,BindUnitBase))])
, (winEndText , HM.fromList [("", (layerMaxColIndex+2,BindUnitBase))])
]
}
pushLayer windowLayer
Expand All @@ -194,8 +194,8 @@ instance Bind TableRef where
pushLayer topLayer
let layerMaxColIndex = L.maximum . L.concat $ map (fmap fst . HM.elems) (HM.elems topHM)
let windowLayer = BindContextLayer
{ layerColumnBindings = HM.fromList [ ("winStartText", HM.fromList [("", (layerMaxColIndex+1,BindUnitBase))])
, ("winEndText" , HM.fromList [("", (layerMaxColIndex+2,BindUnitBase))])
{ layerColumnBindings = HM.fromList [ (winStartText, HM.fromList [("", (layerMaxColIndex+1,BindUnitBase))])
, (winEndText , HM.fromList [("", (layerMaxColIndex+2,BindUnitBase))])
]
}
pushLayer windowLayer
Expand Down
2 changes: 1 addition & 1 deletion hstream-sql/src/HStream/SQL/Codegen/CommonNew.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ scalarExprToFun :: ScalarExpr -> FlowObject -> Either ERROR_TYPE FlowValue
scalarExprToFun scalar o = case scalar of
ColumnRef si ci ->
case getField (si,ci) o of
Nothing -> Left . ERR $ "Can not get column: " <> T.pack (show si) <> "." <> T.pack (show ci)
Nothing -> Left . ERR $ "Can not get column: " <> T.pack (show si) <> "." <> T.pack (show ci) <> "in object: " <> T.pack (show o)
Just (_,v) -> Right v
Literal constant -> Right $ constantToFlowValue constant
CallUnary op scalar -> do
Expand Down
4 changes: 2 additions & 2 deletions hstream-sql/src/HStream/SQL/Codegen/V1New.hs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ relationExprToGraph relation builder = case relation of
s' <- HS.stream sourceConfig builder
return (EStream1 s', [schemaOwner schema], [], [])
LoopJoinOn schema r1 r2 expr typ t -> do
let joiner = \o1 o2 -> o1 <++> o2
let joiner = \o1 o2 -> setFlowObjectStreamId 0 (o1 <++> o2)
joinCond = \record1 record2 ->
case scalarExprToFun expr (recordValue record1 `HM.union` recordValue record2) of
Left e -> False -- FIXME: log error message
Expand Down Expand Up @@ -342,7 +342,7 @@ relationExprToGraph relation builder = case relation of
let aggregateR = \acc Record{..} ->
case aggregateF acc recordValue of
Left (e,v) -> v -- FIXME: log error message
Right v -> v
Right v -> setFlowObjectStreamId 0 v
aggregateMerge' = \k o1 o2 ->
case aggregateMergeF k o1 o2 of
Left (e,v) -> v
Expand Down
59 changes: 42 additions & 17 deletions hstream-sql/src/HStream/SQL/PlannerNew.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module HStream.SQL.PlannerNew (
import Control.Monad.Reader
import Control.Monad.State
import Data.Functor ((<&>))
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap as IntMap
import qualified Data.List as L
import Data.Maybe (fromMaybe)
Expand Down Expand Up @@ -50,7 +51,7 @@ instance Plan BoundTableRef where
Just schema -> do
-- 1-in/1-out
let schema' = setSchemaStreamId 0 schema
let ctx = PlanContext (IntMap.singleton 0 schema')
let ctx = PlanContext (IntMap.singleton 0 schema') mempty
put ctx
return $ StreamScan schema'

Expand All @@ -59,7 +60,7 @@ instance Plan BoundTableRef where
-- 1-in/1-out
let schema = setSchemaStream name $
setSchemaStreamId 0 (relationExprSchema relationExpr)
let ctx = PlanContext (IntMap.singleton 0 schema)
let ctx = PlanContext (IntMap.singleton 0 schema) mempty
put ctx
return relationExpr

Expand All @@ -83,20 +84,31 @@ instance Plan BoundTableRef where
ctx2 <- get

put $ PlanContext
{ planContextSchemas = planContextSchemas ctx1 <::> planContextSchemas ctx2
{ planContextSchemas = planContextSchemas ctx1 <::>
planContextSchemas ctx2
, planContextBasicRefs = mempty
}
(scalarExpr,_) <- plan expr

-- 1-out!
let schema = setSchemaStream name $
setSchemaStreamId 0 $ Schema
setSchemaStreamId 0 $ Schema
{ schemaOwner = name
, schemaColumns = schemaColumns schema1
<:+:> schemaColumns schema2
, schemaColumns = schemaColumns schema1 <:+:>
schemaColumns schema2
}
put $ PlanContext
{ planContextSchemas = IntMap.map (setSchemaStreamId 0)
(IntMap.singleton 0 schema <::> planContextSchemas ctx1 <::> planContextSchemas ctx2)
{ planContextSchemas = IntMap.map (setSchemaStreamId 0) (IntMap.singleton 0 schema)
, planContextBasicRefs = HM.fromList
(L.map (\(colAbs, colRel) ->
( (columnStream colAbs, columnId colAbs)
, (columnStream colRel, columnId colRel)
)
) ((IntMap.elems (schemaColumns schema1) ++
IntMap.elems (schemaColumns schema2))
`zip` (IntMap.elems (schemaColumns schema)))
) `HM.union` (planContextBasicRefs ctx1 `HM.union` planContextBasicRefs ctx2)
-- FIXME: clean up!!
}
let win = calendarDiffTimeToMs interval
return $ LoopJoinOn schema relationExpr1' relationExpr2' scalarExpr typ win
Expand Down Expand Up @@ -129,17 +141,16 @@ instance Plan BoundAgg where
(aggTups :: [(AggregateExpr, ColumnCatalog)])
<- (mapM (\(boundExpr,alias_m) -> do
-- Note: a expr can have multiple aggs: `SUM(a) + MAX(b)`
let boundAggExprs = exprAggregates boundExpr
mapM (\boundAggExpr -> do
let (BoundExprAggregate name boundAgg) = boundAggExpr
(agg,catalog) <- plan boundAgg
let catalog' = catalog { columnName = T.pack name } -- FIXME
mapM (\aggBoundExpr -> do
let (BoundExprAggregate name aggBound) = aggBoundExpr
(agg,catalog) <- plan aggBound
let catalog' = catalog { columnName = T.pack name } -- WARNING: do not use alias here
return (agg,catalog')
) boundAggExprs
) (exprAggregates boundExpr)
) selTups
) <&> L.concat

let grpsIntmap = IntMap.fromList $ L.map (\(i, (_,catalog)) -> (i, catalog{columnId = i})
let grpsIntmap = IntMap.fromList $ L.map (\(i, (_,catalog)) -> (i, catalog{columnId = i, columnStream = ""}) -- FIXME: stream name & clean up
) ([0..] `zip` grps)
aggsIntmap = IntMap.fromList $ L.map (\(i, (_,catalog)) -> (i, catalog{columnId = i})
) ([0..] `zip` aggTups)
Expand Down Expand Up @@ -168,7 +179,18 @@ instance Plan BoundAgg where
let new_schema = Schema { schemaOwner = "" -- FIXME
, schemaColumns = grpsIntmap <:+:> aggsIntmap <:+:> dummyTimewindowCols
}
let ctx_new = PlanContext (IntMap.singleton 0 new_schema)
let ctx_new = PlanContext
{ planContextSchemas = IntMap.singleton 0 new_schema
, planContextBasicRefs = HM.fromList
(L.map (\(colAbs, colRel) ->
( (columnStream colAbs, columnId colAbs)
, (columnStream colRel, columnId colRel)
)
) (((snd <$> grps)
`zip` (IntMap.elems grpsIntmap)))
) `HM.union` planContextBasicRefs ctx_old -- WARNING: order matters!
}
-- FIXME: clean up!!
put ctx_new
return $ \upstream -> Reduce new_schema upstream (fst <$> grps) (fst <$> aggTups) win_m

Expand Down Expand Up @@ -196,7 +218,10 @@ instance Plan BoundSel where
let new_schema = Schema { schemaOwner = "" -- FIXME
, schemaColumns = new_schema_cols
}
let ctx_new = PlanContext (IntMap.singleton 0 new_schema)
let ctx_new = PlanContext
{ planContextSchemas = IntMap.singleton 0 new_schema
, planContextBasicRefs = planContextBasicRefs ctx_old
}
put ctx_new
return $ \upstream -> Project new_schema upstream (fst <$> projTups)

Expand Down
9 changes: 6 additions & 3 deletions hstream-sql/src/HStream/SQL/PlannerNew/Expr.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ instance Plan (Aggregate BoundExpr) where
Nullary nullary -> do
let catalog = ColumnCatalog { columnId = 0
, columnName = T.pack $ show agg
-- FIXME: only a placeholder. It will be replaced by the aggregate expression name, see `PlannerNew#BoundAgg`.
, columnStreamId = 0
, columnStream = ""
, columnType = BTypeInteger
Expand All @@ -34,12 +35,14 @@ instance Plan (Aggregate BoundExpr) where
return (Nullary nullary, catalog)
Unary op e -> do
(e',c') <- plan e
let catalog = c' { columnName = T.pack $ show agg } -- FIXME
let catalog = c' { columnName = T.pack $ show agg }
-- FIXME: only a placeholder. It will be replaced by the aggregate expression name, see `PlannerNew#BoundAgg`.
return (Unary op e', catalog)
Binary op e1 e2 -> do
(e1',c1') <- plan e1
(e2',_) <- plan e2
let catalog = c1' { columnName = T.pack $ show agg } -- FIXME
let catalog = c1' { columnName = T.pack $ show agg }
-- FIXME: only a placeholder. It will be replaced by the aggregate expression name, see `PlannerNew#BoundAgg`.
return (Binary op e1' e2', catalog)

type instance PlannedType BoundExpr = (ScalarExpr, ColumnCatalog)
Expand Down Expand Up @@ -90,7 +93,7 @@ instance Plan BoundExpr where
return (AccessArray e' rhs, catalog)
BoundExprCol name stream col colId -> do
ctx <- get
case lookupColumn ctx stream col of
case lookupColumn ctx stream colId of
Nothing -> throwSQLException PlanException Nothing $ "column " <> T.unpack stream <> ".#" <> show colId <> " not found in ctx " <> show ctx
Just (_,columnId,catalog) ->
return (ColumnRef (columnStreamId catalog) columnId, catalog)
Expand Down
43 changes: 25 additions & 18 deletions hstream-sql/src/HStream/SQL/PlannerNew/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import Control.Applicative ((<|>))
import Control.Monad.Reader
import Control.Monad.State
import qualified Data.Aeson as Aeson
import qualified Data.Bimap as Bimap
import Data.Function (on)
import Data.Hashable
import qualified Data.HashMap.Strict as HM
Expand Down Expand Up @@ -92,11 +91,15 @@ instance Show ScalarExpr where
-- planner context
----------------------------------------
data PlanContext = PlanContext
{ planContextSchemas :: IntMap Schema
{ planContextSchemas :: IntMap Schema
, planContextBasicRefs :: HM.HashMap (Text,Int) (Text,Int)
-- absolute -> relative e.g. s1.0 -> _join#0.1
-- this is only used in `lookupColumn`.
-- FIXME: design a better structure to handle relative schemas
} deriving (Show)

defaultPlanContext :: PlanContext
defaultPlanContext = PlanContext mempty
defaultPlanContext = PlanContext mempty mempty

-- | Compose two 'IntMap's of stream schemas with their indexes. It assumes
-- that the two 'IntMap's have contiguous indexes starting from 0 in their
Expand All @@ -114,28 +117,32 @@ defaultPlanContext = PlanContext mempty
in IntMap.fromList (tups1 <> tups2')

instance Semigroup PlanContext where
(PlanContext s1) <> (PlanContext s2) = PlanContext (s1 <::> s2)
(PlanContext s1 hm1) <> (PlanContext s2 hm2) = PlanContext (s1 <::> s2) (hm1 `HM.union` hm2)
instance Monoid PlanContext where
mempty = PlanContext mempty
mempty = PlanContext mempty mempty

-- | Lookup a certain column in the planning context with
-- stream name and column name. Return the index of
-- stream name and column index. Return the index of
-- matched stream, the real index of the column
-- and the catalog of matched column.
-- WARNING: The returned stream index may not be the same
-- as the `streamId` of the column! This should
-- be fixed in the future.
lookupColumn :: PlanContext -> Text -> Text -> Maybe (Int, Int, ColumnCatalog)
lookupColumn (PlanContext m) streamName colName =
L.foldr (\(i,Schema{..}) acc -> case acc of
Just _ -> acc
Nothing ->
let catalogTup_m = L.find (\(n, ColumnCatalog{..}) ->
columnStream == streamName &&
columnName == colName
) (IntMap.toList schemaColumns)
in (fmap (\(n,catalog) -> (i,n,catalog)) catalogTup_m) <|> acc
) Nothing (IntMap.toList m)
lookupColumn :: PlanContext -> Text -> Int -> Maybe (Int, Int, ColumnCatalog)
lookupColumn (PlanContext m baseRefs) streamName colId =
let (streamName', colId') = go (streamName, colId)
in L.foldr (\(i,Schema{..}) acc -> case acc of
Just _ -> acc
Nothing ->
let catalogTup_m = L.find (\(n, ColumnCatalog{..}) ->
columnStream == streamName' &&
columnId == colId'
) (IntMap.toList schemaColumns)
in (fmap (\(n,catalog) -> (i,n,catalog)) catalogTup_m) <|> acc
) Nothing (IntMap.toList m)
where go (s,i) = case HM.lookup (s,i) baseRefs of
Just (s',i') -> go (s',i')
Nothing -> (s,i)

-- | Lookup a certain column name in the planning context. Return the index of
-- matched stream, the real index of the column
Expand All @@ -144,7 +151,7 @@ lookupColumn (PlanContext m) streamName colName =
-- as the `columnStreamId` of the column! This should
-- be fixed in the future.
lookupColumnName :: PlanContext -> Text -> Maybe (Int, Int, ColumnCatalog)
lookupColumnName (PlanContext m) k =
lookupColumnName (PlanContext m _) k =
L.foldr (\(i,Schema{..}) acc -> case acc of
Just _ -> acc
Nothing ->
Expand Down
35 changes: 20 additions & 15 deletions hstream-sql/src/HStream/SQL/Rts/New.hs
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,21 @@ jsonValueToFlowValue (columnType, v) = case v of
Aeson.String t -> case columnType of
BTypeDate -> case iso8601ParseM (Text.unpack t) of
Just d -> FlowDate d
Nothing -> throwRuntimeException $ "Invalid date value" <> Text.unpack t
Nothing -> throwRuntimeException $ "Invalid date value " <> Text.unpack t
BTypeTime -> case iso8601ParseM (Text.unpack t) of
Just d -> FlowTime d
Nothing -> throwRuntimeException $ "Invalid time value" <> Text.unpack t
Nothing -> throwRuntimeException $ "Invalid time value " <> Text.unpack t
BTypeTimestamp -> case iso8601ParseM (Text.unpack t) of
Just d -> FlowTimestamp d
Nothing -> throwRuntimeException $ "Invalid timestamp value" <> Text.unpack t
Nothing -> throwRuntimeException $ "Invalid timestamp value " <> Text.unpack t
BTypeInterval -> case iso8601ParseM (Text.unpack t) of
Just d -> FlowInterval d
Nothing -> throwRuntimeException $ "Invalid interval value" <> Text.unpack t
Nothing -> throwRuntimeException $ "Invalid interval value " <> Text.unpack t
_ -> FlowText t -- FIXME
Aeson.Number n -> case columnType of
BTypeFloat -> FlowFloat (Scientific.toRealFloat n)
BTypeInteger -> FlowInt (floor n)
_ -> throwRuntimeException $ "Invalid number value" <> show n
_ -> throwRuntimeException $ "Invalid number value " <> show n
Aeson.Array arr ->
let (BTypeArray elemType) = columnType
in FlowArray (V.toList $
Expand All @@ -176,7 +176,7 @@ jsonValueToFlowValue (columnType, v) = case v of
[("$binary", Aeson.Object obj')] -> case do
Aeson.String t <- HsAeson.lookup "base64" obj'
Base64.base64Decode (CB.toBytes $ textToCBytes t) of
Nothing -> throwRuntimeException $ "Invalid $binary value" <> show obj'
Nothing -> throwRuntimeException $ "Invalid $binary value " <> show obj'
Just bs -> FlowByte (CB.fromBytes bs)
_ -> FlowSubObject (jsonObjectToFlowObject' obj)
_ -> throwRuntimeException $ "Invalid json value: " <> show v
Expand Down Expand Up @@ -223,9 +223,15 @@ extractJsonObjectSchema json =
jsonObjectToFlowObject :: Schema -> Aeson.Object -> FlowObject
jsonObjectToFlowObject schema object =
let list = HsAeson.toList object
list' = L.map (\(catalog,(k,v)) ->
(catalog, jsonValueToFlowValue (columnType catalog, v))
) ((IntMap.elems (schemaColumns schema)) `zip` list)
list' = L.map (\(k,v) ->
let catalog = case L.find (\col -> columnName col == HsAeson.toText k)
(IntMap.elems $ schemaColumns schema) of
Just cata -> cata
Nothing -> throwRuntimeException $ -- FIXME: Just omit it instead of throwing exception?
"Invalid key encountered: " <> show k <>
". Schema=" <> show schema
in (catalog, jsonValueToFlowValue (columnType catalog, v))
) list
in HM.fromList list'

jsonObjectToFlowObject' :: Aeson.Object -> FlowObject
Expand All @@ -248,9 +254,8 @@ infixl 5 <++>
tups2' = L.map (\(cata, v) -> (cata { columnId = columnId cata + maxId1 + 1 }, v)) tups2
in HM.fromList (tups1 <> tups2')

--------------------------------------------------------------------------------
winStartText :: Text
winStartText = "window_start"

winEndText :: Text
winEndText = "window_end"
setFlowObjectStreamId :: Int -> FlowObject -> FlowObject
setFlowObjectStreamId streamId hm =
let tups = HM.toList hm
tups' = L.map (\(cata, v) -> (cata { columnStreamId = streamId }, v)) tups
in HM.fromList tups'
2 changes: 2 additions & 0 deletions hstream/hstream.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,10 @@ test-suite hstream-test
HStream.AdminCommandSpec
HStream.ConfigSpec
HStream.HandlerSpec
HStream.RegressionNewSpec
HStream.RegressionSpec
HStream.RunQuerySpec
HStream.RunSQLNewSpec
HStream.RunSQLSpec
HStream.ShardSpec
HStream.SpecUtils
Expand Down
Loading

0 comments on commit 08ce1a9

Please sign in to comment.