Skip to content

Commit

Permalink
backend:drainer-logic-to-handle-db-creates
Browse files Browse the repository at this point in the history
  • Loading branch information
vijaygupta18 committed Dec 19, 2024
1 parent c43d0b3 commit 0f50e15
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
24 changes: 14 additions & 10 deletions src/EulerHS/KVConnector/DBSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type Tag = Text

type DBName = Text

getCreateQuery :: (KVConnector (table Identity), ToJSON (table Identity)) => Text -> Tag -> Double -> DBName -> table Identity -> [(String, String)] -> A.Value
getCreateQuery model tag timestamp dbName dbObject mappings = do
getCreateQuery :: (KVConnector (table Identity), ToJSON (table Identity)) => Text -> Tag -> Double -> DBName -> table Identity -> [(String, String)] -> Bool -> A.Value
getCreateQuery model tag timestamp dbName dbObject mappings forceDrainToDB = do
A.object
[ "contents_v2"
.= A.object
Expand All @@ -44,7 +44,8 @@ getCreateQuery model tag timestamp dbName dbObject mappings = do
],
"mappings" .= A.toJSON (AKM.fromList $ (\(k, v) -> (AKey.fromText $ T.pack k, v)) <$> mappings),
"modelObject" .= dbObject,
"tag" .= ("Create" :: Text)
"tag" .= ("Create" :: Text),
"forceDrainToDB" .= forceDrainToDB
]

getCreateQueryWithCompression ::
Expand All @@ -55,16 +56,17 @@ getCreateQueryWithCompression ::
DBName ->
table Identity ->
[(String, String)] ->
Bool ->
m ByteString
getCreateQueryWithCompression model tag timestamp dbName dbObject mappings =
getCreateQueryWithCompression model tag timestamp dbName dbObject mappings forceDrainToDB =
compressWithoutError (Just model) $
BSL.toStrict $
A.encode $
getCreateQuery model tag timestamp dbName dbObject mappings
getCreateQuery model tag timestamp dbName dbObject mappings forceDrainToDB

-- | This will take updateCommand from getDbUpdateCommandJson and returns Aeson value of Update DBCommand
getUpdateQuery :: Tag -> Double -> DBName -> A.Value -> [(String, String)] -> A.Value -> A.Value
getUpdateQuery tag timestamp dbName updateCommandV2 mappings updatedModel =
getUpdateQuery :: Tag -> Double -> DBName -> A.Value -> [(String, String)] -> A.Value -> Bool -> A.Value
getUpdateQuery tag timestamp dbName updateCommandV2 mappings updatedModel forceDrainToDB =
A.object
[ "contents_v2"
.= A.object
Expand All @@ -76,7 +78,8 @@ getUpdateQuery tag timestamp dbName updateCommandV2 mappings updatedModel =
],
"mappings" .= A.toJSON (AKM.fromList $ (\(k, v) -> (AKey.fromText $ T.pack k, v)) <$> mappings),
"updatedModel" .= A.toJSON updatedModel,
"tag" .= ("Update" :: Text)
"tag" .= ("Update" :: Text),
"forceDrainToDB" .= forceDrainToDB
]

getUpdateQueryWithCompression ::
Expand All @@ -88,12 +91,13 @@ getUpdateQueryWithCompression ::
[(String, String)] ->
A.Value ->
Text ->
Bool ->
m ByteString
getUpdateQueryWithCompression tag timestamp dbName updateCommandV2 mappings updatedModel modelName =
getUpdateQueryWithCompression tag timestamp dbName updateCommandV2 mappings updatedModel modelName forceDrainToDB =
compressWithoutError (Just modelName) $
BSL.toStrict $
A.encode $
getUpdateQuery tag timestamp dbName updateCommandV2 mappings updatedModel
getUpdateQuery tag timestamp dbName updateCommandV2 mappings updatedModel forceDrainToDB

getDbUpdateCommandJson :: forall be table. (Model be table, MeshMeta be table) => Text -> [Set be table] -> Where be table -> A.Value
getDbUpdateCommandJson model setClauses whereClause =
Expand Down
8 changes: 4 additions & 4 deletions src/EulerHS/KVConnector/Flow.hs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ createKV meshCfg value = do
time <- fromIntegral <$> L.getCurrentDateInMillis

qCmd <- if isCompressionAllowed
then do getCreateQueryWithCompression (getTableName @(table Identity)) (pKeyText <> shard) time meshCfg.meshDBName val (getTableMappings @(table Identity))
else do pure $ BSL.toStrict $ A.encode $ getCreateQuery (getTableName @(table Identity)) (pKeyText <> shard) time meshCfg.meshDBName val (getTableMappings @(table Identity))
then do getCreateQueryWithCompression (getTableName @(table Identity)) (pKeyText <> shard) time meshCfg.meshDBName val (getTableMappings @(table Identity)) meshCfg.forceDrainToDB
else do pure $ BSL.toStrict $ A.encode $ getCreateQuery (getTableName @(table Identity)) (pKeyText <> shard) time meshCfg.meshDBName val (getTableMappings @(table Identity)) meshCfg.forceDrainToDB

revMappingRes <- mapM (\secIdx -> do
let sKey = fromString . T.unpack $ secIdx
Expand Down Expand Up @@ -459,8 +459,8 @@ updateObjectRedis meshCfg updVals setClauses addPrimaryKeyToWhereClause whereCla
else getDbUpdateCommandJson (getTableName @(table Identity)) setClauses whereClause

qCmd <- if isCompressionAllowed
then do getUpdateQueryWithCompression (pKeyText <> shard) time meshCfg.meshDBName updateCmd (getTableMappings @(table Identity)) updatedModel (getTableName @(table Identity))
else do pure $ BSL.toStrict $ A.encode $ getUpdateQuery (pKeyText <> shard) time meshCfg.meshDBName updateCmd (getTableMappings @(table Identity)) updatedModel
then do getUpdateQueryWithCompression (pKeyText <> shard) time meshCfg.meshDBName updateCmd (getTableMappings @(table Identity)) updatedModel (getTableName @(table Identity)) meshCfg.forceDrainToDB
else do pure $ BSL.toStrict $ A.encode $ getUpdateQuery (pKeyText <> shard) time meshCfg.meshDBName updateCmd (getTableMappings @(table Identity)) updatedModel meshCfg.forceDrainToDB

case resultToEither $ A.fromJSON updatedModel of
Right value -> do
Expand Down
1 change: 1 addition & 0 deletions src/EulerHS/KVConnector/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ data MeshConfig = MeshConfig
, kvHardKilled :: Bool
, tableShardModRange :: (Int, Int)
, redisKeyPrefix :: Text
, forceDrainToDB :: Bool
}
deriving (Generic, Eq, Show, A.ToJSON)

Expand Down

0 comments on commit 0f50e15

Please sign in to comment.