From b80d17ca45ccb2aa9984d7b2e2bacbd291a4b81c Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Thu, 30 Nov 2023 12:17:00 +0700 Subject: [PATCH] router: improve master connection parallelism in map-reduce --- vshard/router/init.lua | 75 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 9 deletions(-) diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 154dd013..abadd5f9 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -799,6 +799,47 @@ local function router_call(router, bucket_id, opts, ...) ...) end +-- +-- Wait until all masters in the list are connected. +-- +-- @param replicasets List of replicasets. +-- @param timeout Timeout in seconds. +-- +-- @return In case of success - remaining timeout. +-- +-- @return In case of error - nil, err, UUID of the failed replicaset. +-- +local function wait_connected_to_masters(replicasets, timeout) + local master, uuid + local err, err_uuid + + -- Start connecting to all masters in parallel. + local deadline = fiber_clock() + timeout + for _, replicaset in pairs(replicasets) do + master, err = replicaset:wait_master(timeout) + uuid = replicaset.uuid + if not master then + err_uuid = uuid + goto fail + end + replicaset:connect_replica(master) + timeout = deadline - fiber_clock() + end + + -- Wait until all connections are established. + for _, replicaset in pairs(replicasets) do + timeout, err = replicaset:wait_connected(timeout) + if not timeout then + err_uuid = replicaset.uuid + goto fail + end + end + do return timeout end + + ::fail:: + return nil, err, err_uuid +end + -- -- Consistent Map-Reduce. The given function is called on all masters in the -- cluster with a guarantee that in case of success it was executed with all @@ -863,21 +904,24 @@ local function router_map_callrw(router, func, args, opts) -- -- Ref stage: send. -- - for id, rs in pairs(replicasets) do - -- Netbox async requests work only with active connections. Need to wait - -- for the connection explicitly. - timeout, err = rs:wait_connected(timeout) - if timeout == nil then - err_id = id - goto fail - end + -- Netbox async requests work only with active connections. Need to wait + -- for the connection explicitly. + local rs_list = table_new(0, #replicasets) + for _, rs in pairs(replicasets) do + table.insert(rs_list, rs) + end + timeout, err, err_uuid = wait_connected_to_masters(rs_list, timeout) + if not timeout then + goto fail + end + for uuid, rs in pairs(replicasets) do res, err = rs:callrw('vshard.storage._call', {'storage_ref', rid, timeout}, opts_ref) if res == nil then err_uuid = uuid goto fail end - futures[id] = res + futures[uuid] = res rs_count = rs_count + 1 end map = table_new(0, rs_count) @@ -1033,6 +1077,7 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) local grouped_buckets local err, err_uuid, res, ok, map local call_args + local rs_list local replicasets = {} local preallocated = false local futures = {} @@ -1055,6 +1100,18 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) end timeout = deadline - fiber_clock() + -- Netbox async requests work only with active connections. + -- So, first need to wait for the master connection explicitly. + rs_list = table_new(0, #grouped_buckets) + for uuid, _ in pairs(grouped_buckets) do + replicaset = router.replicasets[uuid] + table.insert(rs_list, replicaset) + end + timeout, err, err_uuid = wait_connected_to_masters(rs_list, timeout) + if not timeout then + goto fail + end + -- Send ref requests with timeouts to the replicasets. futures = table_new(0, #grouped_buckets) for uuid, buckets in pairs(grouped_buckets) do