Skip to content

Commit

Permalink
router: improve master connection parallelism in map-reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
darthunix committed Mar 6, 2024
1 parent e629884 commit d261e12
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 9 deletions.
1 change: 1 addition & 0 deletions vshard/replicaset.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,7 @@ local replicaset_mt = {
up_replica_priority = replicaset_up_replica_priority;
wait_connected = replicaset_wait_connected,
wait_connected_all = replicaset_wait_connected_all,
wait_master = replicaset_wait_master,
call = replicaset_master_call;
callrw = replicaset_master_call;
callro = replicaset_template_multicallro(false, false);
Expand Down
75 changes: 66 additions & 9 deletions vshard/router/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,47 @@ local function router_call(router, bucket_id, opts, ...)
...)
end

--
-- Wait until all masters in the list are connected.
--
-- @param replicasets List of replicasets.
-- @param timeout Timeout in seconds.
--
-- @return In case of success - remaining timeout.
--
-- @return In case of error - nil, err, UUID of the failed replicaset.
--
local function wait_connected_to_masters(replicasets, timeout)
local master, uuid
local err, err_uuid

-- Start connecting to all masters in parallel.
local deadline = fiber_clock() + timeout
for _, replicaset in pairs(replicasets) do
master, err = replicaset:wait_master(timeout)
uuid = replicaset.uuid
if not master then
err_uuid = uuid
goto fail
end
replicaset:connect_replica(master)
timeout = deadline - fiber_clock()
end

-- Wait until all connections are established.
for _, replicaset in pairs(replicasets) do
timeout, err = replicaset:wait_connected(timeout)
if not timeout then
err_uuid = replicaset.uuid
goto fail
end
end
do return timeout end

::fail::
return nil, err, err_uuid
end

--
-- Consistent Map-Reduce. The given function is called on all masters in the
-- cluster with a guarantee that in case of success it was executed with all
Expand Down Expand Up @@ -863,21 +904,24 @@ local function router_map_callrw(router, func, args, opts)
--
-- Ref stage: send.
--
for id, rs in pairs(replicasets) do
-- Netbox async requests work only with active connections. Need to wait
-- for the connection explicitly.
timeout, err = rs:wait_connected(timeout)
if timeout == nil then
err_id = id
goto fail
end
-- Netbox async requests work only with active connections. Need to wait
-- for the connection explicitly.
local rs_list = table_new(0, #replicasets)
for _, rs in pairs(replicasets) do
table.insert(rs_list, rs)
end
timeout, err, err_uuid = wait_connected_to_masters(rs_list, timeout)
if not timeout then
goto fail
end
for uuid, rs in pairs(replicasets) do
res, err = rs:callrw('vshard.storage._call',
{'storage_ref', rid, timeout}, opts_ref)
if res == nil then
err_uuid = uuid
goto fail
end
futures[id] = res
futures[uuid] = res
rs_count = rs_count + 1
end
map = table_new(0, rs_count)
Expand Down Expand Up @@ -1033,6 +1077,7 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts)
local grouped_buckets
local err, err_uuid, res, ok, map
local call_args
local rs_list
local replicasets = {}
local preallocated = false
local futures = {}
Expand All @@ -1055,6 +1100,18 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts)
end
timeout = deadline - fiber_clock()

-- Netbox async requests work only with active connections.
-- So, first need to wait for the master connection explicitly.
rs_list = table_new(0, #grouped_buckets)
for uuid, _ in pairs(grouped_buckets) do
replicaset = router.replicasets[uuid]
table.insert(rs_list, replicaset)
end
timeout, err, err_uuid = wait_connected_to_masters(rs_list, timeout)
if not timeout then
goto fail
end

-- Send ref requests with timeouts to the replicasets.
futures = table_new(0, #grouped_buckets)
for uuid, buckets in pairs(grouped_buckets) do
Expand Down

0 comments on commit d261e12

Please sign in to comment.