Skip to content

Commit

Permalink
router: implement partial map-reduce API
Browse files Browse the repository at this point in the history
Introduce a partial ref-map-reduce API for vshard. It guarantees
that in case of success the function is executed exactly once on
the storages, that contain the given list of buckets.

Algorithm.

Router:
1. Group buckets by replicasets based on the router's cache.
2. Ref stage. For each storage:
   a. Async send ref id, timeout and a group of the corresponding buckets to
   the storage. The aim is to reference this storage and check what buckets
   are absent.
Storage:
3. Refer session with the ref id and timeout, passed from the router.
4. Lookup for the passed buckets. If any of them were not found on the storage,
return these buckets back in response to the router.
Router:
5. Await and collect returned responses. If timeout has expired, set the error for
this response.
6. If any of responses contains error,send unref to the refed storages and return
the error to the user.
7. If the collected results contain moved buckets, search for them and update
the router's cache. Decrease the timeout and goto 1.
8. Map stage. For each storage:
   a. Replace a bucket list with a group of buckets refed on the target storage.
   b. Async send a map function with modified arguments and a ref id to the storage.
Storage:
9. Execute storage_map: if the ref id has expired, return error. Otherwise,
ref.use -> execute -> ref.del from storage_map(). Return results.
Router:
10. Reduce stage. Await results (and optionally apply a callback to each result):
    if timeout expired, return error to the user. Otherwise, return result.
  • Loading branch information
darthunix committed Nov 29, 2023
1 parent 6f621cd commit 82d9669
Show file tree
Hide file tree
Showing 2 changed files with 363 additions and 1 deletion.
127 changes: 127 additions & 0 deletions test/router-luatest/router_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,133 @@ g.test_map_callrw_raw = function(g)
end)
end

g.test_group_buckets = function(g)
local res = g.router:exec(function()
local val, err = ivshard.router.group({2, 1, 1})
return {
val = val,
err = err,
}
end)
assert(res.err == nil)
local rs2_uuid = g.replica_2_a:replicaset_uuid()
local expected = {
[rs2_uuid] = {1, 2},
}
t.assert_equals(res.val, expected)
end

g.test_partial_callrw = function(g)
local expected, rs2_uuid, res

local create_map_func_f = function(res1)
rawset(_G, 'do_map', function(res2)
return {res1, res2}
end)
end
g.replica_1_a:exec(create_map_func_f, {1})
g.replica_2_a:exec(create_map_func_f, {2})

--
-- Successful partial map.
--
local res = g.router:exec(function()
local val, err, err_uuid = ivshard.router.partial_callrw({1, 2}, 'do_map', {3},
{timeout = iwait_timeout})
return {
val = val,
err = err,
err_uuid = err_uuid,
}
end)
t.assert(not res.err)
t.assert(not res.err_uuid)
rs2_uuid = g.replica_2_a:replicaset_uuid()
expected = {
[rs2_uuid] = {2, 3},
}
t.assert_equals(res.val, expected, 'partial callrw success')
--
-- Successful partial map with callback.
--
res = g.router:exec(function()
local f = function(_, res, cb_res)
if not cb_res then
cb_res = 0
end
return cb_res + #res
end
local val, err, err_uuid = ivshard.router.partial_callrw({1, 2}, 'do_map', {3},
{timeout = iwait_timeout}, f)
return {
val = val,
err = err,
err_uuid = err_uuid,
}
end)
t.assert(not res.err)
t.assert(not res.err_uuid)
expected = {
[rs2_uuid] = 2,
}
t.assert_equals(res.val, expected, 'partial callrw success')
--
-- Error in the callback.
--
res = g.router:exec(function()
local f = function(_, _, _)
return box.error(box.error.PROC_LUA, "cb_err")
end
local val, err, err_uuid = ivshard.router.partial_callrw({1, 2}, 'do_map', {3},
{timeout = iwait_timeout}, f)
return {
val = val,
err = err,
err_uuid = err_uuid,
}
end)
t.assert(res.val == nil)
t.assert_covers(res.err, {
code = box.error.PROC_LUA,
type = 'ClientError',
message = 'cb_err'
})
t.assert_equals(res.err_uuid, rs2_uuid)
--
-- Error at map stage.
--
g.replica_2_a:exec(function()
_G.do_map = function()
return box.error(box.error.PROC_LUA, "map_err")
end
end)
res = g.router:exec(function()
local val, err, err_uuid = ivshard.router.partial_callrw({2, 1}, 'do_map', {3},
{timeout = iwait_timeout})
return {
val = val,
err = err,
err_uuid = err_uuid,
}
end)
t.assert(res.val == nil)
t.assert_covers(res.err, {
code = box.error.PROC_LUA,
type = 'ClientError',
message = 'map_err'
})
t.assert_equals(res.err_uuid, rs2_uuid)
--
-- Cleanup.
--
g.replica_1_a:exec(function()
_G.do_map = nil
end)
g.replica_2_a:exec(function()
_G.do_map = nil
end)
end

g.test_uri_compare_and_reuse = function(g)
-- Multilisten itself is not used, but URI-table is supported since the same
-- version.
Expand Down
237 changes: 236 additions & 1 deletion vshard/router/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,39 @@ local function bucket_resolve(router, bucket_id)
return replicaset
end

-- Group bucket ids by replicasets according to the router cache.
local function buckets_group(router, bucket_ids)
local replicaset, err
local replicaset_buckets = {}
local prev_id = nil

-- Sort buckets to skip duplicates.
table.sort(bucket_ids)
for _, bucket_id in pairs(bucket_ids) do
if bucket_id == prev_id then
goto continue
end

replicaset, err = bucket_resolve(router, bucket_id)
if err ~= nil then
return nil, err
end
if replicaset == nil then
return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id)
end
if replicaset_buckets[replicaset.uuid] then
table.insert(replicaset_buckets[replicaset.uuid], bucket_id)
else
replicaset_buckets[replicaset.uuid] = {bucket_id}
end

::continue::
prev_id = bucket_id
end

return replicaset_buckets
end

--
-- Arrange downloaded buckets to the route map so as they
-- reference a given replicaset.
Expand Down Expand Up @@ -881,7 +914,7 @@ local function router_map_callrw(router, func, args, opts)
futures[uuid] = res
end
--
-- Ref stage: collect.
-- Map stage: collect.
--
if do_return_raw then
for uuid, f in pairs(futures) do
Expand Down Expand Up @@ -945,6 +978,206 @@ local function router_map_callrw(router, func, args, opts)
return nil, err, err_uuid
end

--
-- Partial Ref-Map-Reduce. The given function is called on the masters
-- with a guarantee that in case of success it was executed exactly once
-- on all storages that contain the given buckets. Execution on masters
-- is chosen to allow buckets be accessed for reads and writes during the
-- call.
--
-- The execution consists of the following stages:
--
-- Ref stage. We ref all the storages with the given buckets. It is
-- an iterative process, because the router's cache may be outdated,
-- and we need to discover the moved buckets. At the end of this stage
-- we have refed all the storages containing the given buckets with the
-- given timeout.
--
-- Map stage. We call the given function on all the storages we have
-- refed. On the storage side we use a `storage_map` function, which
-- switches the timeout ref to the manual mode and unrefs the storage
-- after the function is called.
--
-- Reduce stage. We collect the results of the function call.
--
-- @param router Router instance to use.
-- @param bucket_ids List of bucket identifiers.
-- @param func Name of the function to call.
-- @param args Function arguments passed in netbox style (as an array).
-- @param opts Can only contain 'timeout' as a number of seconds. Note that the
-- refs may end up being kept on the storages during this entire timeout if
-- something goes wrong. For instance, network issues appear. This means
-- better not use a value bigger than necessary. A stuck infinite ref can
-- only be dropped by this router restart/reconnect or the storage restart.
-- @param callback Optional function to be called on the result of the function
-- call on each replicaset. The function should accept 3 arguments: UUID of
-- the replicaset, result of the function call on the replicaset, and the
-- result of the previous callback call or nil if it is the first call.
--
-- @return In case of success - a map with replicaset UUID keys and values being
-- what the function returned from the replicaset.
--
-- @return In case of an error - nil, error object, optional UUID of the
-- replicaset where the error happened. UUID may be not present if it wasn't
-- about concrete replicaset. For example, not all buckets were found even
-- though all replicasets were scanned.
--
local function router_partial_callrw(router, bucket_ids, func, args, opts, callback)
local replicaset
local grouped_buckets
local err, err_uuid, res, ok, map
local cb_res = nil
local call_args
local replicasets = {}
local preallocated = false
local futures = {}
local call_opts = {is_async = true}
local rs_count = 0
local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN
local deadline = fiber_clock() + timeout
local rid = M.ref_id
M.ref_id = rid + 1

-- Nil checks are done explicitly here (== nil instead of 'not'), because
-- netbox requests return box.NULL instead of nils.

-- Ref stage.
while #bucket_ids > 0 do
-- Group the buckets by replicasets according to the router cache.
grouped_buckets, err = buckets_group(router, bucket_ids)
if err ~= nil or grouped_buckets == nil then
goto fail
end

-- Send ref requests with timeouts to the replicasets.
futures = table_new(0, #grouped_buckets)
for uuid, buckets in pairs(grouped_buckets) do
replicaset = router.replicasets[uuid]
-- Netbox async requests work only with active connections. Need to wait
-- for the connection explicitly.
timeout, err = replicaset:wait_connected(timeout)
if timeout == nil then
err_uuid = uuid
goto fail
end
call_args = {'storage_ref_with_lookup', rid, timeout, buckets}
res, err = replicaset:callrw('vshard.storage._call', call_args, call_opts)
if res == nil then
err_uuid = uuid
goto fail
end
futures[uuid] = res
rs_count = rs_count + 1
end

-- We hope that router's cache is up to date and only a single
-- loop iteration is required.
if not preallocated then
replicasets = table_new(0, rs_count)
preallocated = true
end

-- Wait for the refs to be done and collect moved buckets.
bucket_ids = {}
for uuid, future in pairs(futures) do
res, err = future_wait(future, timeout)
-- Handle netbox error first.
if res == nil then
err_uuid = uuid
goto fail
end
-- Ref returns nil,err or moved buckets.
res, err = res[1], res[2]
if res == nil then
err_uuid = uuid
goto fail
end
for _, bucket_id in pairs(res) do
bucket_reset(router, bucket_id)
table.insert(bucket_ids, bucket_id)
end
replicasets[uuid] = router.replicasets[uuid]
timeout = deadline - fiber_clock()
end
end

-- Map stage.
map = table_new(0, rs_count)
futures = table_new(0, rs_count)
args = {'storage_map', rid, func, args}
-- Send map requests.
for uuid, rs in pairs(replicasets) do
res, err = rs:callrw('vshard.storage._call', args, call_opts)
if res == nil then
err_uuid = uuid
goto fail
end
futures[uuid] = res
end

-- Reduce stage.
-- Collect map responses (refs were deleted by the storages for non-error results).
for uuid, future in pairs(futures) do
res, err = future_wait(future, timeout)
if res == nil then
err_uuid = uuid
goto fail
end
-- Map returns true,res or nil,err.
ok, res = res[1], res[2]
if ok == nil then
err = res
err_uuid = uuid
goto fail
end
if res ~= nil then
-- Execute callback on the replicaset result.
if callback ~= nil then
ok, cb_res = pcall(callback, uuid, res, cb_res)
if not ok then
err_uuid = uuid
err = lerror.make(cb_res)
goto fail
end
map[uuid] = cb_res
else
map[uuid] = res
end
end
timeout = deadline - fiber_clock()
end
do return map end

::fail::
local f
for uuid, future in pairs(futures) do
future:discard()
-- Best effort to remove the created refs before exiting. Can help if
-- the timeout was big and the error happened early.
call_args = {'storage_unref', rid}
f = replicasets[uuid]:callrw('vshard.storage._call', call_args, call_opts)
if f ~= nil then
-- Don't care waiting for a result - no time for this. But it won't
-- affect the request sending if the connection is still alive.
f:discard()
end
end
err = lerror.make(err)
return nil, err, err_uuid
end

--
-- Group bucket ids by replicasets.
-- @param bucket_ids List of bucket identifiers.
-- @retval Map of replicasets to lists of bucket identifiers.
--
local function router_group(router, bucket_ids)
if type(bucket_ids) ~= 'table' then
error('Usage: router.group(bucket_ids)')
end
return buckets_group(router, bucket_ids)
end

--
-- Get replicaset object by bucket identifier.
-- @param bucket_id Bucket identifier.
Expand Down Expand Up @@ -1744,7 +1977,9 @@ local router_mt = {
callrw = router_make_api(router_callrw),
callre = router_make_api(router_callre),
callbre = router_make_api(router_callbre),
group = router_make_api(router_group),
map_callrw = router_make_api(router_map_callrw),
partial_callrw = router_make_api(router_partial_callrw),
route = router_make_api(router_route),
routeall = router_make_api(router_routeall),
bucket_id = router_make_api(router_bucket_id),
Expand Down

0 comments on commit 82d9669

Please sign in to comment.