Skip to content

Commit

Permalink
router: add callback to the map_callrw
Browse files Browse the repository at this point in the history
  • Loading branch information
darthunix committed Dec 4, 2023
1 parent 3734c0d commit 776757a
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 6 deletions.
50 changes: 50 additions & 0 deletions test/router-luatest/router_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,56 @@ g.test_map_callrw_raw = function(g)
t.assert_equals(res.map_type, 'userdata', 'values are msgpacks')
t.assert(not res.err, 'no error')
--
-- Successful map with callback.
--
res = g.router:exec(function()
local f = function(_, _, cb_res)
if not cb_res then
cb_res = 42
end
return cb_res
end
local val, err, err_uuid = ivshard.router.map_callrw('do_map', {3},
{timeout = iwait_timeout}, f)
return {
val = val,
err = err,
err_uuid = err_uuid,
}
end)
t.assert(not res.err)
t.assert(not res.err_uuid)
expected = {
[rs1_uuid] = {42},
[rs2_uuid] = {42},
}
t.assert_equals(res.val, expected, 'callrw success')
--
-- Successful raw map with callback.
--
res = g.router:exec(function()
local f = function(_, _, cb_res)
if not cb_res then
cb_res = 1
end
return cb_res
end
local val, err, err_uuid = ivshard.router.map_callrw('do_map', {3},
{timeout = iwait_timeout, return_raw = true}, f)
return {
val = val,
err = err,
err_uuid = err_uuid,
}
end)
t.assert(not res.err)
t.assert(not res.err_uuid)
expected = {
[rs1_uuid] = 1,
[rs2_uuid] = 1,
}
t.assert_equals(res.val, expected, 'callrw success')
--
-- Successful map, but one of the storages returns nothing.
--
g.replica_2_a:exec(function()
Expand Down
36 changes: 30 additions & 6 deletions vshard/router/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -898,9 +898,10 @@ end
-- about concrete replicaset. For example, not all buckets were found even
-- though all replicasets were scanned.
--
local function router_map_callrw(router, func, args, opts)
local function router_map_callrw(router, func, args, opts, callback)
local replicasets = router.replicasets
local timeout
local cb_res
local do_return_raw
if opts then
timeout = opts.timeout or consts.CALL_TIMEOUT_MIN
Expand Down Expand Up @@ -1006,7 +1007,19 @@ local function router_map_callrw(router, func, args, opts)
goto fail
end
if count > 1 then
map[uuid] = res:take_array(count - 1)
res = res:take_array(count - 1)
-- execute callback on the replicaset result.
if callback ~= nil then
ok, cb_res = pcall(callback, uuid, res, cb_res)
if not ok then
err_uuid = uuid
err = lerror.make(cb_res)
goto fail
end
map[uuid] = cb_res
else
map[uuid] = res
end
end
timeout = deadline - fiber_clock()
end
Expand All @@ -1025,9 +1038,20 @@ local function router_map_callrw(router, func, args, opts)
goto fail
end
if res ~= nil then
-- Store as a table so in future it could be extended for
-- multireturn.
map[uuid] = {res}
-- execute callback on the replicaset result.
if callback ~= nil then
ok, cb_res = pcall(callback, uuid, res, cb_res)
if not ok then
err_uuid = uuid
err = lerror.make(cb_res)
goto fail
end
map[uuid] = {cb_res}
else
-- Store as a table so in future it could be extended for
-- multireturn.
map[uuid] = {res}
end
end
timeout = deadline - fiber_clock()
end
Expand Down Expand Up @@ -1211,7 +1235,7 @@ local function router_partial_callrw(router, bucket_ids, func, args, opts, callb
goto fail
end
if res ~= nil then
-- Execute callback on the replicaset result.
-- execute callback on the replicaset result.
if callback ~= nil then
ok, cb_res = pcall(callback, uuid, res, cb_res)
if not ok then
Expand Down

0 comments on commit 776757a

Please sign in to comment.