Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hstream-sql: code clean up #1488

Merged
merged 7 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ grpc-hs: grpc-hs-deps
--includeDir . \
--proto HStream/Gossip/HStreamGossip.proto \
--out ../gen-hs)
(find common/api/gen-hs/ -type f -name '*.hs' -exec sed -i '1i{-# OPTIONS_GHC -w #-}\n' {} \;)

grpc-cpp:
(cd common/api && mkdir -p cpp/gen && \
Expand All @@ -75,6 +76,7 @@ sql:: sql-deps
hstream-sql/gen-sql/HStream/SQL/Lex.x > hstream-sql/gen-sql/HStream/SQL/Lex.x.new)
(diff hstream-sql/gen-sql/HStream/SQL/Lex.x hstream-sql/gen-sql/HStream/SQL/Lex.x.new || true)
(cd hstream-sql/gen-sql && mv HStream/SQL/Lex.x.new HStream/SQL/Lex.x && make)
(find hstream-sql/gen-sql/ -type f -name '*.hs' -exec sed -i '1i{-# OPTIONS_GHC -w #-}\n' {} \;)

sql-deps::
# Change to a temporary dir to avoid create hstream dists.
Expand Down
5 changes: 3 additions & 2 deletions hstream-sql/hstream-sql.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ flag hstream_use_v2_engine

common shared-properties
ghc-options:
-Wall -Wcompat -Widentities -Wincomplete-record-updates
-Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints
-Wall -Wextra -Wcompat
-Widentities -Wincomplete-record-updates -Wincomplete-uni-patterns
-Wpartial-fields -Wredundant-constraints

if flag(releasebuild)
ghc-options:
Expand Down
38 changes: 21 additions & 17 deletions hstream-sql/src/HStream/SQL/AST.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
module HStream.SQL.AST where

import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC
import qualified Data.ByteString.Lazy as BL
import Data.Default
import Data.Hashable
import qualified Data.HashMap.Strict as HM
import Data.Int (Int64)
import Data.Kind (Type)
import qualified Data.List as L
import Data.List.Extra (anySame)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust)
import qualified Data.Scientific as Scientific
import Data.Text (Text)
Expand All @@ -43,8 +39,7 @@ import HStream.SQL.Abs
import HStream.SQL.Exception (SomeSQLException (..),
throwSQLException)
import HStream.SQL.Extra
import HStream.SQL.Print (printTree)
import HStream.Utils (cBytesToText, textToCBytes)
import HStream.Utils (textToCBytes)
import qualified HStream.Utils.Aeson as HsAeson
import Text.Read (readMaybe)
import qualified Z.Data.CBytes as CB
Expand Down Expand Up @@ -160,9 +155,9 @@ jsonValueToFlowValue v = case v of
Aeson.Null -> FlowNull
Aeson.Bool b -> FlowBoolean b
Aeson.String t -> FlowText t
Aeson.Number n -> case Scientific.floatingOrInteger n of
Aeson.Number n -> case Scientific.floatingOrInteger @Double @Int n of
Left f -> FlowFloat f
Right i -> FlowInt (fromIntegral i)
Right i -> FlowInt i
Aeson.Array arr -> FlowArray (V.toList $ jsonValueToFlowValue <$> arr)
Aeson.Object obj -> case HsAeson.toList obj of
[("$numberLong", Aeson.String t)] ->
Expand Down Expand Up @@ -195,7 +190,6 @@ jsonValueToFlowValue v = case v of
Just i -> FlowInterval i
Nothing -> throwSQLException RefineException Nothing ("Invalid $interval value" <> Text.unpack t)
_ -> FlowSubObject (jsonObjectToFlowObject' obj)
_ -> throwSQLException RefineException Nothing ("Invalid json value: " <> show v)

flowObjectToJsonObject :: FlowObject -> Aeson.Object
flowObjectToJsonObject hm =
Expand Down Expand Up @@ -407,7 +401,7 @@ data UnaryOp = OpSin | OpSinh | OpAsin | OpAsinh | OpCos | OpCosh
| OpAbs | OpCeil | OpFloor | OpRound | OpSign
| OpSqrt | OpLog | OpLog2 | OpLog10 | OpExp
| OpIsInt | OpIsFloat | OpIsBool | OpIsStr
| OpIsArr | OpIsDate | OpIsTime
| OpIsArr | OpIsDate | OpIsTime | OpIsNum
| OpToStr
| OpToLower | OpToUpper | OpTrim | OpLTrim | OpRTrim
| OpReverse | OpStrLen
Expand Down Expand Up @@ -608,6 +602,7 @@ instance Refine ScalarFunc where
ScalarFuncIsArr _ e -> RExprUnaryOp (trimSpacesPrint func) OpIsArr (refine e)
ScalarFuncIsDate _ e -> RExprUnaryOp (trimSpacesPrint func) OpIsDate (refine e)
ScalarFuncIsTime _ e -> RExprUnaryOp (trimSpacesPrint func) OpIsTime (refine e)
ScalarFuncIsNum _ e -> RExprUnaryOp (trimSpacesPrint func) OpIsNum (refine e)
ScalarFuncToStr _ e -> RExprUnaryOp (trimSpacesPrint func) OpToStr (refine e)
ScalarFuncToLower _ e -> RExprUnaryOp (trimSpacesPrint func) OpToLower (refine e)
ScalarFuncToUpper _ e -> RExprUnaryOp (trimSpacesPrint func) OpToUpper (refine e)
Expand All @@ -623,6 +618,7 @@ instance Refine ScalarFunc where
ArrayFuncMin _ e -> RExprUnaryOp (trimSpacesPrint func) OpArrMin (refine e)
ArrayFuncSort _ e -> RExprUnaryOp (trimSpacesPrint func) OpSort (refine e)


type instance RefinedType SetFunc = RValueExpr
instance Refine SetFunc where
refine func = case func of
Expand Down Expand Up @@ -755,7 +751,7 @@ instance Refine TableRef where
refine (TableRefJoinUsing _ r1 typ r2 cols interval) = RTableRefJoinUsing (refine r1) (refine typ) (refine r2) (extractStreamNameFromColName <$> cols) (refine interval)
where extractStreamNameFromColName col = case col of
ColNameSimple _ colIdent -> refine colIdent
ColNameStream pos _ _ -> throwImpossible
ColNameStream _ _ _ -> throwImpossible
refine (TableRefTumbling _ ref interval) = RTableRefWindowed (refine ref) (Tumbling (refine interval))
refine (TableRefHopping _ ref len hop) = RTableRefWindowed (refine ref) (Hopping (refine len) (refine hop))
refine (TableRefSession _ ref interval) = RTableRefWindowed (refine ref) (Session (refine interval))
Expand Down Expand Up @@ -801,9 +797,11 @@ data RGroupBy = RGroupByEmpty
type instance RefinedType GroupBy = RGroupBy
instance Refine GroupBy where
refine (DGroupByEmpty _) = RGroupByEmpty
refine (DGroupBy _ cols) = RGroupBy
(L.map (\col -> let (RExprCol _ m_stream field) = refine col
in (m_stream, field)) cols
refine (DGroupBy pos cols) = RGroupBy
(L.map (\col -> case refine col of
RExprCol _ m_stream field -> (m_stream, field)
_ -> throwSQLException RefineException pos "Group By Columns should be RExprCol"
) cols
) Nothing
-- refine (DGroupByWin pos cols win) =
-- let (RGroupBy tups Nothing) = refine (DGroupBy pos cols)
Expand Down Expand Up @@ -847,7 +845,7 @@ instance Refine Explain where
refine (ExplainCreate _ (CreateAs _ _ select)) = refine select
refine (ExplainCreate _ (CreateAsOp _ _ select _)) = refine select
refine (ExplainCreate _ (CreateView _ _ select)) = refine select
refine (ExplainCreate pos _) = throwImpossible
refine (ExplainCreate _ _) = throwImpossible

---- CREATE
data RStreamOptions = RStreamOptions
Expand Down Expand Up @@ -883,11 +881,16 @@ instance Refine [StreamOption] where
_ -> False
) options
factor = maybe (rRepFactor def)
(\(OptionRepFactor _ n') -> fromInteger $ extractPNInteger n')
(\case
OptionRepFactor _ n' -> fromInteger $ extractPNInteger n'
_ -> throwSQLException RefineException Nothing "Invalid StreamOption OptionRepFactor"
)
factor_m
duration = maybe (rBacklogDuration def)
(\(OptionDuration _ interval) ->
(\case
OptionDuration _ interval ->
fromIntegral (calendarDiffTimeToMs (refine (interval :: Interval))) `div` 1000
_ -> throwSQLException RefineException Nothing "Invalid StreamOption OptionDuration"
)
duration_m
in RStreamOptions { rRepFactor = factor
Expand All @@ -901,6 +904,7 @@ instance Refine [ConnectorOption] where
toPair :: ConnectorOption -> (Text, Aeson.Value)
toPair (ConnectorProperty _ key expr) = (extractHIdent key, toValue (refine expr))
toValue (RExprConst _ c) = Aeson.toJSON c
toValue _ = throwSQLException RefineException Nothing "Connector Property should be const literal expression"

type instance RefinedType Create = RCreate
instance Refine Create where
Expand Down
105 changes: 51 additions & 54 deletions hstream-sql/src/HStream/SQL/Codegen/AggOp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ module HStream.SQL.Codegen.AggOp
, aggMergeOnValue
) where

import qualified Data.List as L
import Data.Scientific
import qualified Data.Text as T
import qualified Data.List as L
#ifdef HStreamUseV2Engine
import DiffFlow.Error
#else
import HStream.Processing.Error
#endif
import HStream.SQL.AST
import HStream.SQL.Codegen.Utils

#ifdef HStreamUseV2Engine
#define ERROR_TYPE DiffFlowError
Expand Down Expand Up @@ -61,7 +58,7 @@ unaryAggOpOnValue AggCount acc row = op_count acc row
unaryAggOpOnValue AggSum acc row = op_sum acc row
unaryAggOpOnValue AggMax acc row = op_max acc row
unaryAggOpOnValue AggMin acc row = op_min acc row
unaryAggOpOnValue AggAvg acc row = Left $ ERR "Avg: not supported now"
unaryAggOpOnValue AggAvg _ _ = Left $ ERR "Avg: not supported now"

op_count :: FlowValue -> FlowValue -> Either ERROR_TYPE FlowValue
op_count (FlowInt acc_x) _ = Right $ FlowInt (acc_x + 1)
Expand Down Expand Up @@ -100,61 +97,61 @@ op_min _ _ = Left $ ERR "Min: internal error. Please report this as a bug"

----
binaryAggOpOnValue :: BinaryAggregate -> FlowValue -> FlowValue -> FlowValue -> Either ERROR_TYPE FlowValue
binaryAggOpOnValue agg acc row1 row2 = undefined

op_topk :: FlowValue -> FlowValue -> FlowValue -> Either ERROR_TYPE FlowValue
op_topk (FlowArray acc_x) (FlowInt k) (FlowInt row_x) =
let arr = (L.take k) . (L.sortBy (flip compare)) $ (FlowFloat (fromIntegral row_x)) : acc_x
in Right $ FlowArray arr
op_topk (FlowArray acc_x) (FlowInt k) (FlowFloat row_x) =
let arr = (L.take k) . (L.sortBy (flip compare)) $ (FlowFloat row_x) : acc_x
in Right $ FlowArray arr
op_topk (FlowArray acc_x) (FlowInt k) _ = Left $ ERR "TopK: type mismatch (expect a numeral value)"
op_topk (FlowArray acc_x) _ _ = Left $ ERR "TopK: type mismatch (expect an integer value)"
op_topk FlowNull _ _ = Right FlowNull
op_topk _ _ _ = Left $ ERR "TopK: internal error. Please report this as a bug"

op_topkdistinct :: FlowValue -> FlowValue -> FlowValue -> Either ERROR_TYPE FlowValue
op_topkdistinct (FlowArray acc_x) (FlowInt k) (FlowInt row_x) =
let arr = (L.take k) . (L.sortBy (flip compare)) . L.nub $ (FlowFloat (fromIntegral row_x)) : acc_x
in Right $ FlowArray arr
op_topkdistinct (FlowArray acc_x) (FlowInt k) (FlowFloat row_x) =
let arr = (L.take k) . (L.sortBy (flip compare)) . L.nub $ (FlowFloat row_x) : acc_x
in Right $ FlowArray arr
op_topkdistinct (FlowArray acc_x) (FlowInt k) _ = Left $ ERR "TopKDistinct: type mismatch (expect a numeral value)"
op_topkdistinct (FlowArray acc_x) _ _ = Left $ ERR "TopKDistinct: type mismatch (expect an integer value)"
op_topkdistinct FlowNull _ _ = Right FlowNull
op_topkdistinct _ _ _ = Left $ ERR "TopKDistinct: internal error. Please report this as a bug"
binaryAggOpOnValue _agg _acc _row1 _row2 = undefined

-- op_topk :: FlowValue -> FlowValue -> FlowValue -> Either ERROR_TYPE FlowValue
-- op_topk (FlowArray acc_x) (FlowInt k) (FlowInt row_x) =
-- let arr = (L.take k) . (L.sortBy (flip compare)) $ (FlowFloat (fromIntegral row_x)) : acc_x
-- in Right $ FlowArray arr
-- op_topk (FlowArray acc_x) (FlowInt k) (FlowFloat row_x) =
-- let arr = (L.take k) . (L.sortBy (flip compare)) $ (FlowFloat row_x) : acc_x
-- in Right $ FlowArray arr
-- op_topk (FlowArray _) (FlowInt _) _ = Left $ ERR "TopK: type mismatch (expect a numeral value)"
-- op_topk (FlowArray _) _ _ = Left $ ERR "TopK: type mismatch (expect an integer value)"
-- op_topk FlowNull _ _ = Right FlowNull
-- op_topk _ _ _ = Left $ ERR "TopK: internal error. Please report this as a bug"

-- op_topkdistinct :: FlowValue -> FlowValue -> FlowValue -> Either ERROR_TYPE FlowValue
-- op_topkdistinct (FlowArray acc_x) (FlowInt k) (FlowInt row_x) =
-- let arr = (L.take k) . (L.sortBy (flip compare)) . L.nub $ (FlowFloat (fromIntegral row_x)) : acc_x
-- in Right $ FlowArray arr
-- op_topkdistinct (FlowArray acc_x) (FlowInt k) (FlowFloat row_x) =
-- let arr = (L.take k) . (L.sortBy (flip compare)) . L.nub $ (FlowFloat row_x) : acc_x
-- in Right $ FlowArray arr
-- op_topkdistinct (FlowArray _) (FlowInt _) _ = Left $ ERR "TopKDistinct: type mismatch (expect a numeral value)"
-- op_topkdistinct (FlowArray _) _ _ = Left $ ERR "TopKDistinct: type mismatch (expect an integer value)"
-- op_topkdistinct FlowNull _ _ = Right FlowNull
-- op_topkdistinct _ _ _ = Left $ ERR "TopKDistinct: internal error. Please report this as a bug"

--------------------------------------------------------------------------------
aggMergeOnValue :: Aggregate expr -> FlowObject -> FlowValue -> FlowValue -> Either ERROR_TYPE FlowValue
aggMergeOnValue (Nullary AggCountAll) k (FlowInt n1) (FlowInt n2) = Right $ FlowInt (n1 + n2)
aggMergeOnValue (Nullary AggCountAll) k _ _ = Left $ ERR "CountAll: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggCount _) k (FlowInt n1) (FlowInt n2) = Right $ FlowInt (n1 + n2)
aggMergeOnValue (Unary AggCount _) k _ _ = Left $ ERR "Count: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggSum _) k (FlowInt n1) (FlowInt n2) = Right $ FlowInt (n1 + n2)
aggMergeOnValue (Unary AggSum _) k (FlowInt n1) (FlowFloat n2) = Right $ FlowFloat (fromIntegral n1 + n2)
aggMergeOnValue (Unary AggSum _) k (FlowFloat n1) (FlowInt n2) = Right $ FlowFloat (n1 + fromIntegral n2)
aggMergeOnValue (Unary AggSum _) k (FlowFloat n1) (FlowFloat n2) = Right $ FlowFloat (n1 + n2)
aggMergeOnValue (Unary AggSum _) k _ _ = Left $ ERR "Sum: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggMax _) k (FlowInt n1) (FlowInt n2) = Right $ FlowInt (max n1 n2)
aggMergeOnValue (Unary AggMax _) k (FlowInt n1) (FlowFloat n2) = Right $ FlowFloat (max (fromIntegral n1) n2)
aggMergeOnValue (Unary AggMax _) k (FlowFloat n1) (FlowInt n2) = Right $ FlowFloat (max n1 (fromIntegral n2))
aggMergeOnValue (Unary AggMax _) k (FlowFloat n1) (FlowFloat n2) = Right $ FlowFloat (max n1 n2)
aggMergeOnValue (Unary AggMax _) k _ _ = Left $ ERR "Max: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggMin _) k (FlowInt n1) (FlowInt n2) = Right $ FlowInt (min n1 n2)
aggMergeOnValue (Unary AggMin _) k (FlowInt n1) (FlowFloat n2) = Right $ FlowFloat (min (fromIntegral n1) n2)
aggMergeOnValue (Unary AggMin _) k (FlowFloat n1) (FlowInt n2) = Right $ FlowFloat (min n1 (fromIntegral n2))
aggMergeOnValue (Unary AggMin _) k (FlowFloat n1) (FlowFloat n2) = Right $ FlowFloat (min n1 n2)
aggMergeOnValue (Unary AggMin _) k _ _ = Left $ ERR "Min: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggAvg _) k _ _ = Left $ ERR "Avg: not supported now"
aggMergeOnValue (Binary AggTopK _ _) k (FlowArray x1) (FlowArray x2) =
aggMergeOnValue (Nullary AggCountAll) _ (FlowInt n1) (FlowInt n2) = Right $ FlowInt (n1 + n2)
aggMergeOnValue (Nullary AggCountAll) _ _ _ = Left $ ERR "CountAll: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggCount _) _ (FlowInt n1) (FlowInt n2) = Right $ FlowInt (n1 + n2)
aggMergeOnValue (Unary AggCount _) _ _ _ = Left $ ERR "Count: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggSum _) _ (FlowInt n1) (FlowInt n2) = Right $ FlowInt (n1 + n2)
aggMergeOnValue (Unary AggSum _) _ (FlowInt n1) (FlowFloat n2) = Right $ FlowFloat (fromIntegral n1 + n2)
aggMergeOnValue (Unary AggSum _) _ (FlowFloat n1) (FlowInt n2) = Right $ FlowFloat (n1 + fromIntegral n2)
aggMergeOnValue (Unary AggSum _) _ (FlowFloat n1) (FlowFloat n2) = Right $ FlowFloat (n1 + n2)
aggMergeOnValue (Unary AggSum _) _ _ _ = Left $ ERR "Sum: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggMax _) _ (FlowInt n1) (FlowInt n2) = Right $ FlowInt (max n1 n2)
aggMergeOnValue (Unary AggMax _) _ (FlowInt n1) (FlowFloat n2) = Right $ FlowFloat (max (fromIntegral n1) n2)
aggMergeOnValue (Unary AggMax _) _ (FlowFloat n1) (FlowInt n2) = Right $ FlowFloat (max n1 (fromIntegral n2))
aggMergeOnValue (Unary AggMax _) _ (FlowFloat n1) (FlowFloat n2) = Right $ FlowFloat (max n1 n2)
aggMergeOnValue (Unary AggMax _) _ _ _ = Left $ ERR "Max: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggMin _) _ (FlowInt n1) (FlowInt n2) = Right $ FlowInt (min n1 n2)
aggMergeOnValue (Unary AggMin _) _ (FlowInt n1) (FlowFloat n2) = Right $ FlowFloat (min (fromIntegral n1) n2)
aggMergeOnValue (Unary AggMin _) _ (FlowFloat n1) (FlowInt n2) = Right $ FlowFloat (min n1 (fromIntegral n2))
aggMergeOnValue (Unary AggMin _) _ (FlowFloat n1) (FlowFloat n2) = Right $ FlowFloat (min n1 n2)
aggMergeOnValue (Unary AggMin _) _ _ _ = Left $ ERR "Min: internal error. Please report this as a bug"
aggMergeOnValue (Unary AggAvg _) _ _ _ = Left $ ERR "Avg: not supported now"
aggMergeOnValue (Binary AggTopK _ _) _ (FlowArray x1) (FlowArray x2) =
let len = L.length x1
arr = (L.take len) . (L.sortBy (flip compare)) $ x1 <> x2
in Right $ FlowArray arr
aggMergeOnValue (Binary AggTopK _ _) k _ _ = Left $ ERR "TopK: internal error. Please report this as a bug"
aggMergeOnValue (Binary AggTopKDistinct _ _) k (FlowArray x1) (FlowArray x2) =
aggMergeOnValue (Binary AggTopK _ _) _ _ _ = Left $ ERR "TopK: internal error. Please report this as a bug"
aggMergeOnValue (Binary AggTopKDistinct _ _) _ (FlowArray x1) (FlowArray x2) =
let len = L.length x1
arr = (L.take len) . (L.sortBy (flip compare)) . (L.nub) $ x1 <> x2
in Right $ FlowArray arr
aggMergeOnValue (Binary AggTopKDistinct _ _) k _ _ = Left $ ERR "TopKDistinct: internal error. Please report this as a bug"
aggMergeOnValue (Binary AggTopKDistinct _ _) _ _ _ = Left $ ERR "TopKDistinct: internal error. Please report this as a bug"
1 change: 0 additions & 1 deletion hstream-sql/src/HStream/SQL/Codegen/BinOp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import HStream.SQL.AST
import HStream.SQL.Codegen.Utils

import Data.Typeable
import Debug.Trace

#ifdef HStreamUseV2Engine
#define ERROR_TYPE DiffFlowError
Expand Down
4 changes: 1 addition & 3 deletions hstream-sql/src/HStream/SQL/Codegen/Cast.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ module HStream.SQL.Codegen.Cast
( castOnValue
) where

import qualified Data.Map.Strict as Map
import Data.Scientific
import qualified Data.Text as T
import Data.Time as Time
import Data.Time.Calendar.OrdinalDate (fromOrdinalDate)
Expand All @@ -20,7 +18,6 @@ import HStream.Processing.Error
#endif
import Data.Time.Format.ISO8601 (iso8601ParseM)
import HStream.SQL.AST
import HStream.SQL.Exception
import qualified Z.Data.CBytes as CB

#ifdef HStreamUseV2Engine
Expand Down Expand Up @@ -119,6 +116,7 @@ castToBoolean x =
"T" -> mkOk True
"F" -> mkOk False
_ -> mkCanNotParseTextErr x typName
_ -> mkErr

castToByte :: FlowValue -> Either ERROR_TYPE FlowValue
castToByte x =
Expand Down
4 changes: 2 additions & 2 deletions hstream-sql/src/HStream/SQL/Codegen/ColumnCatalog.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ getField :: ColumnCatalog -> FlowObject -> Maybe (ColumnCatalog, FlowValue)
getField (ColumnCatalog k stream_m) o =
let filterCond = case stream_m of
Nothing -> \(ColumnCatalog f _) _ -> f == k
Just s -> \(ColumnCatalog f s_m) _ -> f == k && s_m == stream_m
Just _ -> \(ColumnCatalog f s_m) _ -> f == k && s_m == stream_m
in case HM.toList (HM.filterWithKey filterCond o) of
[] -> Nothing
[(k,v)] -> Just (k, v)
Expand All @@ -39,5 +39,5 @@ streamRenamer :: Text -> FlowObject -> FlowObject
streamRenamer newName =
HM.mapKeys (\cata@(ColumnCatalog f s_m) -> case s_m of
Nothing -> cata
Just s -> ColumnCatalog f (Just newName)
Just _ -> ColumnCatalog f (Just newName)
)
Loading