From ac4c5b1037f9ec8ccbbbc235d2520180acb87014 Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Fri, 24 Nov 2023 12:14:42 +0700 Subject: [PATCH 1/3] storage: implement partial map-reduce API --- test/instances/storage.lua | 16 ++++ test/storage-luatest/storage_1_test.lua | 111 ++++++++++++++++++++++++ vshard/storage/init.lua | 67 ++++++++++++++ 3 files changed, 194 insertions(+) diff --git a/test/instances/storage.lua b/test/instances/storage.lua index 4bbff541..d696b21e 100755 --- a/test/instances/storage.lua +++ b/test/instances/storage.lua @@ -13,6 +13,7 @@ _G.ivconst = require('vshard.consts') _G.ivutil = require('vshard.util') _G.iverror = require('vshard.error') _G.ivtest = require('test.luatest_helpers.vtest') +_G.itable_new = require('table.new') _G.iwait_timeout = _G.ivtest.wait_timeout @@ -69,6 +70,20 @@ local function get_first_bucket() return res ~= nil and res.id or nil end +local function get_n_buckets(n) + if n <= 0 then + error('Invalid number of buckets') + end + local ids = _G.itable_new(0, n) + for _, tuple in box.space._bucket.index.status:pairs(vconst.BUCKET.ACTIVE) do + table.insert(ids, tuple.id) + if #ids == n then + return ids + end + end + error('Not enough buckets') +end + local function session_set(key, value) box.session.storage[key] = value return true @@ -167,6 +182,7 @@ _G.box_error = box_error _G.echo = echo _G.get_uuid = get_uuid _G.get_first_bucket = get_first_bucket +_G.get_n_buckets = get_n_buckets _G.session_set = session_set _G.session_get = session_get _G.bucket_gc_wait = bucket_gc_wait diff --git a/test/storage-luatest/storage_1_test.lua b/test/storage-luatest/storage_1_test.lua index f8e758e5..9c882809 100644 --- a/test/storage-luatest/storage_1_test.lua +++ b/test/storage-luatest/storage_1_test.lua @@ -208,3 +208,114 @@ test_group.test_named_hot_reload = function(g) _G.vshard.storage = storage end) end + +test_group.test_ref_with_lookup = function(g) + g.replica_1_a:exec(function() + local res, err + local timeout = 0.1 + local rid = 42 + local bids = _G.get_n_buckets(2) + local bid_extra = 3001 + + -- Check for a single bucket. + res, err = ivshard.storage._call( + 'storage_ref_with_lookup', + rid, + timeout, + {bids[1]} + ) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {rid = rid, moved = {}}) + res, err = ivshard.storage._call('storage_unref', rid) + ilt.assert_equals(err, nil) + + -- Check for multiple buckets. + res, err = ivshard.storage._call( + 'storage_ref_with_lookup', + rid, + timeout, + {bids[1], bids[2]} + ) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {rid = rid, moved = {}}) + res, err = ivshard.storage._call('storage_unref', rid) + ilt.assert_equals(err, nil) + + -- Check for double referencing. + res, err = ivshard.storage._call( + 'storage_ref_with_lookup', + rid, + timeout, + {bids[1], bids[1]} + ) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {rid = rid, moved = {}}) + res, err = ivshard.storage._call('storage_unref', rid) + ilt.assert_equals(err, nil) + res, err = ivshard.storage._call('storage_unref', rid) + t.assert_str_contains(err.message, 'Can not delete a storage ref: no ref') + + -- Check for an absent bucket. + res, err = ivshard.storage._call( + 'storage_ref_with_lookup', + rid, + timeout, + {bids[1], bids[2], bid_extra} + ) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {rid = rid, moved = {bid_extra}}) + ivshard.storage._call('storage_unref', rid) + + -- Check that we do not create a reference if there are no buckets. + res, err = ivshard.storage._call( + 'storage_ref_with_lookup', + rid, + timeout, + {bid_extra} + ) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {rid = nil, moved = {bid_extra}}) + res, err = vshard.storage._call('storage_unref', rid) + t.assert_str_contains(err.message, 'Can not delete a storage ref: no ref') + ilt.assert_equals(res, nil) + + -- Check for a timeout. + -- Emulate a case when all buckets are not writable. + local func = ivshard.storage.internal.bucket_are_all_rw + ivshard.storage.internal.bucket_are_all_rw = function() return false end + res, err = ivshard.storage._call( + 'storage_ref_with_lookup', + rid, + timeout, + {bids[1]} + ) + ivshard.storage.internal.bucket_are_all_rw = func + t.assert_str_contains(err.message, 'Timeout exceeded') + ilt.assert_equals(res, nil) + -- Check that the reference was not created. + res, err = ivshard.storage._call('storage_unref', rid) + t.assert_str_contains(err.message, 'Can not delete a storage ref: no ref') + ilt.assert_equals(res, nil) + end) +end + +test_group.test_absent_buckets = function(g) + g.replica_1_a:exec(function() + local res, err = ivshard.storage._call( + 'storage_absent_buckets', + {_G.get_first_bucket()} + ) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {}) + end) + + g.replica_1_a:exec(function() + local bid_extra = 3001 + local res, err = ivshard.storage._call( + 'storage_absent_buckets', + {_G.get_first_bucket(), bid_extra} + ) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {bid_extra}) + end) +end diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 34560cb9..76472693 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3142,6 +3142,71 @@ local function storage_ref(rid, timeout) return bucket_count() end +-- +-- Lookup for absent active buckets. +-- +-- @param bucket_ids List of bucket identifiers. +-- @return A dictionary with a list of bucket identifiers which are +-- not present on the current instance. +-- +local function storage_absent_buckets(bucket_ids) + local bucket = nil + local status = consts.BUCKET + local moved_buckets = {} + for _, bucket_id in pairs(bucket_ids) do + bucket = box.space._bucket:get{bucket_id} + if not bucket or bucket.status ~= status.ACTIVE then + table.insert(moved_buckets, bucket_id) + end + end + return { moved = moved_buckets } +end + +-- +-- Bind a new storage ref to the current box session and check +-- that all the buckets are present. Is used as a part of the +-- partial Map-Reduce API. +-- +-- @param rid Unique reference ID. +-- @param timeout Timeout in seconds. +-- @param bucket_ids List of bucket identifiers. +-- @return A dictionary with reference id and a list of bucket identifiers +-- which are not present on the current instance. If all the buckets +-- are absent, the reference is not created and a nil reference id +-- with the list of absent buckets is returned. +-- +local function storage_ref_with_lookup(rid, timeout, bucket_ids) + local moved = storage_absent_buckets(bucket_ids).moved + if #moved == #bucket_ids then + -- Take an advantage that moved buckets are returned in the same + -- order as in the input list. + local do_match = true + local next_moved = next(moved) + local next_passed = next(bucket_ids) + ::continue:: + if next_moved then + if next_moved == next_passed then + next_moved = next(moved, next_moved) + next_passed = next(bucket_ids, next_passed) + goto continue + else + do_match = false + end + end + if do_match then + -- If all the passed buckets are absent, there is no need + -- to create a ref. + return {rid = nil, moved = moved} + end + end + + local ok, err = storage_ref(rid, timeout) + if not ok then + return nil, err + end + return {rid = rid, moved = moved} +end + -- -- Drop a storage ref from the current box session. Is used as a part of -- Map-Reduce API. @@ -3196,7 +3261,9 @@ service_call_api = setmetatable({ rebalancer_apply_routes = rebalancer_apply_routes, rebalancer_request_state = rebalancer_request_state, recovery_bucket_stat = recovery_bucket_stat, + storage_absent_buckets = storage_absent_buckets, storage_ref = storage_ref, + storage_ref_with_lookup = storage_ref_with_lookup, storage_unref = storage_unref, storage_map = storage_map, info = storage_service_info, From ce8ea5fb5c835b9a70b257c391b953d06709a2f6 Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Fri, 24 Nov 2023 12:15:36 +0700 Subject: [PATCH 2/3] router: implement partial map-reduce API Introduce a partial ref-map-reduce API for vshard. It guarantees that in case of success the function is executed exactly once on the storages, that contain the given list of buckets. Algorithm. Router: 1. Group buckets by replicasets based on the router's cache. 2. Ref stage. For each storage: a. Async send ref id, timeout and a group of the corresponding buckets to the storage. The aim is to reference this storage and check what buckets are absent. Storage: 3. Refer session with the ref id and timeout, passed from the router. 4. Lookup for the passed buckets. If any of them were not found on the storage, return these buckets back in response to the router. Router: 5. Await and collect returned responses. If timeout has expired, set the error for this response. 6. If any of responses contains error,send unref to the refed storages and return the error to the user. 7. If the collected results contain moved buckets, search for them and update the router's cache. Decrease the timeout and goto 1. 8. Map stage. For each storage: a. Replace a bucket list with a group of buckets refed on the target storage. b. Async send a map function with modified arguments and a ref id to the storage. Storage: 9. Execute storage_map: if the ref id has expired, return error. Otherwise, ref.use -> execute -> ref.del from storage_map(). Return results. Router: 10. Reduce stage. Await results (and optionally apply a callback to each result): if timeout expired, return error to the user. Otherwise, return result. --- test/luatest_helpers/vtest.lua | 10 + test/router-luatest/map_part_test.lua | 299 ++++++++++++++++++++++++++ test/router-luatest/router_test.lua | 19 ++ vshard/router/init.lua | 234 +++++++++++++++++++- 4 files changed, 561 insertions(+), 1 deletion(-) create mode 100644 test/router-luatest/map_part_test.lua diff --git a/test/luatest_helpers/vtest.lua b/test/luatest_helpers/vtest.lua index a4fc2091..123a7c8b 100644 --- a/test/luatest_helpers/vtest.lua +++ b/test/luatest_helpers/vtest.lua @@ -463,6 +463,15 @@ local function storage_first_bucket(storage) end) end +-- +-- Get n active buckets from the storage. +-- +local function storage_get_n_buckets(storage, n) + return storage:exec(function(n) + return _G.get_n_buckets(n) + end, {n}) +end + -- -- Disable rebalancer on all storages. -- @@ -851,6 +860,7 @@ return { cluster_wait_fullsync = cluster_wait_fullsync, cluster_rebalancer_find = cluster_rebalancer_find, storage_first_bucket = storage_first_bucket, + storage_get_n_buckets = storage_get_n_buckets, storage_stop = storage_stop, storage_start = storage_start, router_new = router_new, diff --git a/test/router-luatest/map_part_test.lua b/test/router-luatest/map_part_test.lua new file mode 100644 index 00000000..8e8e382d --- /dev/null +++ b/test/router-luatest/map_part_test.lua @@ -0,0 +1,299 @@ + +local t = require('luatest') +local vtest = require('test.luatest_helpers.vtest') + +local g = t.group('router') +local cfg_template = { + sharding = { + { + replicas = { + replica_1_a = { + master = true, + }, + replica_1_b = {}, + }, + }, + { + replicas = { + replica_2_a = { + master = true, + }, + replica_2_b = {}, + }, + }, + }, + bucket_count = 100, + test_user_grant_range = 'super', +} +local global_cfg = vtest.config_new(cfg_template) + +g.before_all(function() + vtest.cluster_new(g, global_cfg) + + t.assert_equals(g.replica_1_a:exec(function() + return #ivshard.storage.info().alerts + end), 0, 'no alerts after boot') + + local router = vtest.router_new(g, 'router', global_cfg) + g.router = router + local res, err = router:exec(function() + return ivshard.router.bootstrap({timeout = iwait_timeout}) + end) + t.assert(res and not err, 'bootstrap buckets') +end) + +g.after_all(function() + g.cluster:drop() +end) + +local function map_part_init() + local rs1_uuid = g.replica_1_a:replicaset_uuid() + local rs2_uuid = g.replica_2_a:replicaset_uuid() + + local create_map_func_f = function(res1) + rawset(_G, 'do_map', function(res2) + return {res1, res2} + end) + end + g.replica_1_a:exec(create_map_func_f, {1}) + g.replica_2_a:exec(create_map_func_f, {2}) + + local bids1 = vtest.storage_get_n_buckets(g.replica_1_a, 4) + local bids2 = vtest.storage_get_n_buckets(g.replica_2_a, 1) + + return { + rs1_uuid = rs1_uuid, + rs2_uuid = rs2_uuid, + bids1 = bids1, + bids2 = bids2, + } +end + +g.test_map_part_single_rs = function(g) + local expected, res + local init = map_part_init() + + res = g.router:exec(function(bid1, bid2) + local val, err, err_uuid = ivshard.router.map_part_callrw( + {bid1, bid2}, + 'do_map', + {3}, + {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[3], init.bids1[2]}) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + expected = { + [init.rs1_uuid] = {{1, 3}}, + } + t.assert_equals(res.val, expected) +end + +g.test_map_part_multi_rs = function(g) + local expected, res + local init = map_part_init() + + res = g.router:exec(function(bid1, bid2) + local val, err, err_uuid = ivshard.router.map_part_callrw({bid1, bid2}, 'do_map', {42}, + {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[1], init.bids2[1]}) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + expected = { + [init.rs1_uuid] = {{1, 42}}, + [init.rs2_uuid] = {{2, 42}}, + } + t.assert_equals(res.val, expected) +end + +g.test_map_part_ref = function(g) + local expected, res + local init = map_part_init() + + -- First move some buckets from rs1 to rs2 and then pause gc on rs1. + -- As a result, the buckets will be in the SENT state on rs1 and + -- in the ACTIVE state on rs2. + g.replica_1_a:exec(function(bid1, bid2, to) + ivshard.storage.internal.errinj.ERRINJ_BUCKET_GC_PAUSE = true + ivshard.storage.bucket_send(bid1, to) + ivshard.storage.bucket_send(bid2, to) + end, {init.bids1[1], init.bids1[2], init.rs2_uuid}) + -- The buckets are ACTIVE on rs2, so the partial map should succeed. + res = g.router:exec(function(bid1, bid2) + local val, err, err_uuid = ivshard.router.map_part_callrw( + {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[1], init.bids1[2]}) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + expected = { + [init.rs2_uuid] = {{2, 42}}, + } + t.assert_equals(res.val, expected) + -- But if we use some active bucket from rs1, the partial map should fail. + -- The reason is that the moved buckets are still in the SENT state and + -- we can't take a ref. + res = g.router:exec(function(bid1) + local val, err, err_uuid = ivshard.router.map_part_callrw( + {bid1}, 'do_map', {42}, {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[3]}) + t.assert_equals(res.val, nil) + t.assert(res.err) + t.assert_equals(res.err_uuid, init.rs1_uuid) + -- The moved buckets still exist on the rs1 with non-active status. + -- Let's remove them and re-enable gc on rs1. + g.replica_1_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_BUCKET_GC_PAUSE = false + _G.bucket_gc_wait() + end) + -- Now move the buckets back to rs1 and pause gc on rs2. + -- The buckets will be ACTIVE on rs1 and SENT on rs2, + -- so the partial map should succeed. + res = g.replica_2_a:exec(function(bid1, bid2, to) + ivshard.storage.internal.errinj.ERRINJ_BUCKET_GC_PAUSE = true + ivshard.storage.bucket_send(bid1, to) + local res, err = ivshard.storage.bucket_send(bid2, to) + return { + res = res, + err = err, + } + end, {init.bids1[1], init.bids1[2], init.rs1_uuid}) + t.assert_equals(res.err, nil) + + res = g.router:exec(function(bid1, bid2) + local val, err, err_uuid = ivshard.router.map_part_callrw( + {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[1], init.bids1[2]}) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + expected = { + [init.rs1_uuid] = {{1, 42}}, + } + t.assert_equals(res.val, expected) + -- Re-enable gc on rs2. + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_BUCKET_GC_PAUSE = false + _G.bucket_gc_wait() + end) +end + +g.test_map_part_double_ref = function(g) + local expected, res + local init = map_part_init() + + -- First, disable discovery on the router to disable route cache update. + g.router:exec(function() + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = true + end) + -- Then, move the bucket form rs1 to rs2. Now the router has an outdated + -- route cache. + g.replica_1_a:exec(function(bid, to) + ivshard.storage.bucket_send(bid, to) + end, {init.bids1[4], init.rs2_uuid}) + -- Call a partial map for the moved bucket and some bucket from rs2. The ref stage + -- should be done in two steps: + -- 1. ref rs2 and returns the moved bucket; + -- 2. discover the moved bucket on rs2 and avoid double reference; + res = g.router:exec(function(bid1, bid2) + local val, err, err_uuid = ivshard.router.map_part_callrw( + {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[4], init.bids2[1]}) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + expected = { + [init.rs2_uuid] = {{2, 42}}, + } + t.assert_equals(res.val, expected) + -- Call a partial map one more time to make sure there are no references left. + res = g.router:exec(function(bid1, bid2) + local val, err, err_uuid = ivshard.router.map_part_callrw( + {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[4], init.bids2[1]}) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, expected) + -- Return the bucket back and re-enable discovery on the router. + g.replica_2_a:exec(function(bid, to) + ivshard.storage.bucket_send(bid, to) + end, {init.bids1[4], init.rs1_uuid}) + g.router:exec(function() + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false + end) +end + +g.test_map_part_map = function(g) + local res + local init = map_part_init() + + g.replica_2_a:exec(function() + _G.do_map = function() + return box.error(box.error.PROC_LUA, "map_err") + end + end) + res = g.router:exec(function(bid1, bid2) + local val, err, err_uuid = ivshard.router.map_part_callrw({bid2, bid1}, 'do_map', {3}, + {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[1], init.bids2[1]}) + t.assert_equals(res.val, nil) + t.assert_covers(res.err, { + code = box.error.PROC_LUA, + type = 'ClientError', + message = 'map_err' + }) + t.assert_equals(res.err_uuid, init.rs2_uuid) + -- Check that there is no dangling references after the error. + init = map_part_init() + res = g.router:exec(function(bid1, bid2) + local val, err, err_uuid = ivshard.router.map_part_callrw({bid1, bid2}, 'do_map', {3}, + {timeout = iwait_timeout}) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end, {init.bids1[1], init.bids2[1]}) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, { + [init.rs1_uuid] = {{1, 3}}, + [init.rs2_uuid] = {{2, 3}}, + }) +end diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index db96b00a..acfefe98 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -285,6 +285,25 @@ g.test_map_callrw_raw = function(g) end) end +g.test_group_buckets = function(g) + local bids = vtest.storage_get_n_buckets(g.replica_1_a, 2) + + local res = g.router:exec(function(bid1, bid2) + local val, err = ivshard.router.group({bid2, bid1, bid1}) + return { + val = val, + err = err, + } + end, {bids[1], bids[2]}) + assert(res.err == nil) + local rs1_uuid = g.replica_1_a:replicaset_uuid() + local expected = { + [rs1_uuid] = {bids[1], bids[2]}, + } + table.sort(expected[rs1_uuid]) + t.assert_equals(res.val, expected) +end + g.test_uri_compare_and_reuse = function(g) -- Multilisten itself is not used, but URI-table is supported since the same -- version. diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 46f2da20..2fbf489e 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -222,6 +222,49 @@ local function bucket_resolve(router, bucket_id) return replicaset end +-- Group bucket ids by replicasets according to the router cache. +local function buckets_group(router, bucket_ids, timeout) + timeout = timeout or consts.CALL_TIMEOUT_MIN + local deadline = fiber_clock() + timeout + local replicaset, err + local replicaset_buckets = {} + local prev_id = nil + local buckets, id + + -- Sort buckets to skip duplicates. + table.sort(bucket_ids) + for _, bucket_id in pairs(bucket_ids) do + if bucket_id == prev_id then + goto continue + end + + if fiber_clock() > deadline then + return nil, lerror.timeout() + end + + replicaset, err = bucket_resolve(router, bucket_id) + if err ~= nil then + return nil, err + end + if replicaset == nil then + return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id) + end + + id = replicaset.id + buckets = replicaset_buckets[id] + if buckets then + table.insert(buckets, bucket_id) + else + replicaset_buckets[id] = {bucket_id} + end + + ::continue:: + prev_id = bucket_id + end + + return replicaset_buckets +end + -- -- Arrange downloaded buckets to the route map so as they -- reference a given replicaset. @@ -881,7 +924,7 @@ local function router_map_callrw(router, func, args, opts) futures[id] = res end -- - -- Ref stage: collect. + -- Map stage: collect. -- if do_return_raw then for id, f in pairs(futures) do @@ -945,6 +988,194 @@ local function router_map_callrw(router, func, args, opts) return nil, err, err_id end +-- +-- Partial Ref-Map-Reduce. The given function is called on the masters +-- with a guarantee that in case of success it was executed exactly once +-- on all storages that contain the given buckets. Execution on masters +-- is chosen to allow buckets be accessed for reads and writes during the +-- call. +-- +-- The execution consists of the following stages: +-- +-- Ref stage. We ref all the storages with the given buckets. It is +-- an iterative process, because the router's cache may be outdated, +-- and we need to discover the moved buckets. At the end of this stage +-- we have refed all the storages containing the given buckets with the +-- given timeout. +-- +-- Map stage. We call the given function on all the storages we have +-- refed. On the storage side we use a `storage_map` function, which +-- switches the timeout ref to the manual mode and unrefs the storage +-- after the function is called. +-- +-- Reduce stage. We collect the results of the function call. +-- +-- @param router Router instance to use. +-- @param bucket_ids List of bucket identifiers. +-- @param func Name of the function to call. +-- @param args Function arguments passed in netbox style (as an array). +-- @param opts Can only contain 'timeout' as a number of seconds. Note that the +-- refs may end up being kept on the storages during this entire timeout if +-- something goes wrong. For instance, network issues appear. This means +-- better not use a value bigger than necessary. A stuck infinite ref can +-- only be dropped by this router restart/reconnect or the storage restart. +-- +-- @return In case of success - a map with replicaset UUID keys and values being +-- what the function returned from the replicaset. +-- +-- @return In case of an error - nil, error object, optional UUID of the +-- replicaset where the error happened. UUID may be not present if it wasn't +-- about concrete replicaset. For example, not all buckets were found even +-- though all replicasets were scanned. +-- +local function router_map_part_callrw(router, bucket_ids, func, args, opts) + local replicaset + local grouped_buckets + local err, err_id, res, ok, map + local call_args + local replicasets = {} + local preallocated = false + local futures = {} + local call_opts = {is_async = true} + local rs_count = 0 + local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN + local deadline = fiber_clock() + timeout + local rid = M.ref_id + M.ref_id = rid + 1 + + -- Nil checks are done explicitly here (== nil instead of 'not'), because + -- netbox requests return box.NULL instead of nils. + + -- Ref stage. + while #bucket_ids > 0 do + -- Group the buckets by replicasets according to the router cache. + grouped_buckets, err = buckets_group(router, bucket_ids, timeout) + if grouped_buckets == nil then + goto fail + end + timeout = deadline - fiber_clock() + + -- Send ref requests with timeouts to the replicasets. + futures = table_new(0, #grouped_buckets) + for id, buckets in pairs(grouped_buckets) do + replicaset = router.replicasets[id] + -- Netbox async requests work only with active connections. Need to wait + -- for the connection explicitly. + timeout, err = replicaset:wait_connected(timeout) + if timeout == nil then + err_id = id + goto fail + end + if replicasets[id] then + -- Replicaset is already referred on the previous iteration. + -- Simply get the moved buckets without double referencing. + call_args = {'storage_absent_buckets', buckets} + else + call_args = {'storage_ref_with_lookup', rid, timeout, buckets} + rs_count = rs_count + 1 + end + res, err = replicaset:callrw('vshard.storage._call', call_args, call_opts) + if res == nil then + err_id = id + goto fail + end + futures[id] = res + end + + if not preallocated then + -- Current preallocation works only for the first iteration + -- (i.e. router cache is not outdated). + replicasets = table_new(0, rs_count) + preallocated = true + end + + -- Wait for the refs to be done and collect moved buckets. + bucket_ids = {} + for id, future in pairs(futures) do + res, err = future_wait(future, timeout) + -- Handle netbox error first. + if res == nil then + err_id = id + goto fail + end + -- Ref returns nil,err or {rid, moved}. + res, err = res[1], res[2] + if res == nil then + err_id = id + goto fail + end + for _, bucket_id in pairs(res.moved) do + bucket_reset(router, bucket_id) + table.insert(bucket_ids, bucket_id) + end + if res.rid then + -- If there are no buckets on the replicaset, + -- it would not be referred. + replicasets[id] = router.replicasets[id] + end + timeout = deadline - fiber_clock() + end + end + + -- Map stage. + map = table_new(0, rs_count) + futures = table_new(0, rs_count) + args = {'storage_map', rid, func, args} + -- Send map requests. + for id, rs in pairs(replicasets) do + res, err = rs:callrw('vshard.storage._call', args, call_opts) + if res == nil then + err_id = id + goto fail + end + futures[id] = res + end + + -- Reduce stage. + -- Collect map responses (refs were deleted by the storages for non-error results). + for id, future in pairs(futures) do + res, err = future_wait(future, timeout) + if res == nil then + err_id = id + goto fail + end + -- Map returns true,res or nil,err. + ok, res = res[1], res[2] + if ok == nil then + err = res + err_id = id + goto fail + end + if res ~= nil then + -- Store as a table so in future it could be extended for + -- multireturn. + map[id] = {res} + end + timeout = deadline - fiber_clock() + end + do return map end + +::fail:: + local f + for id, future in pairs(futures) do + future:discard() + -- Best effort to remove the created refs before exiting. Can help if + -- the timeout was big and the error happened early. + call_args = {'storage_unref', rid} + replicaset = router.replicasets[id] + if replicaset then + f = replicaset:callrw('vshard.storage._call', call_args, call_opts) + if f ~= nil then + -- Don't care waiting for a result - no time for this. But it won't + -- affect the request sending if the connection is still alive. + f:discard() + end + end + end + err = lerror.make(err) + return nil, err, err_id +end + -- -- Get replicaset object by bucket identifier. -- @param bucket_id Bucket identifier. @@ -1750,6 +1981,7 @@ local router_mt = { callre = router_make_api(router_callre), callbre = router_make_api(router_callbre), map_callrw = router_make_api(router_map_callrw), + map_part_callrw = router_make_api(router_map_part_callrw), route = router_make_api(router_route), routeall = router_make_api(router_routeall), bucket_id = router_make_api(router_bucket_id), From d4c4c4a1ee1b8ac6c5b5c8c49180e0b3520433ca Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Thu, 30 Nov 2023 12:17:00 +0700 Subject: [PATCH 3/3] router: improve master connection parallelism in map-reduce --- vshard/replicaset.lua | 1 + vshard/router/init.lua | 71 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 363037e7..a4b798c7 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -1177,6 +1177,7 @@ local replicaset_mt = { up_replica_priority = replicaset_up_replica_priority; wait_connected = replicaset_wait_connected, wait_connected_all = replicaset_wait_connected_all, + wait_master = replicaset_wait_master, call = replicaset_master_call; callrw = replicaset_master_call; callro = replicaset_template_multicallro(false, false); diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 2fbf489e..dbfc3450 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -799,6 +799,47 @@ local function router_call(router, bucket_id, opts, ...) ...) end +-- +-- Wait until all masters in the list are connected. +-- +-- @param replicasets List of replicasets. +-- @param timeout Timeout in seconds. +-- +-- @return In case of success - remaining timeout. +-- +-- @return In case of error - nil, err, UUID of the failed replicaset. +-- +local function wait_connected_to_masters(replicasets, timeout) + local master, id + local err, err_id + + -- Start connecting to all masters in parallel. + local deadline = fiber_clock() + timeout + for _, replicaset in pairs(replicasets) do + master, err = replicaset:wait_master(timeout) + id = replicaset.id + if not master then + err_id = id + goto fail + end + replicaset:connect_replica(master) + timeout = deadline - fiber_clock() + end + + -- Wait until all connections are established. + for _, replicaset in pairs(replicasets) do + timeout, err = replicaset:wait_connected(timeout) + if not timeout then + err_id = replicaset.id + goto fail + end + end + do return timeout end + + ::fail:: + return nil, err, err_id +end + -- -- Consistent Map-Reduce. The given function is called on all masters in the -- cluster with a guarantee that in case of success it was executed with all @@ -863,14 +904,17 @@ local function router_map_callrw(router, func, args, opts) -- -- Ref stage: send. -- + -- Netbox async requests work only with active connections. Need to wait + -- for the connection explicitly. + local rs_list = table_new(0, #replicasets) + for _, rs in pairs(replicasets) do + table.insert(rs_list, rs) + end + timeout, err, err_id = wait_connected_to_masters(rs_list, timeout) + if not timeout then + goto fail + end for id, rs in pairs(replicasets) do - -- Netbox async requests work only with active connections. Need to wait - -- for the connection explicitly. - timeout, err = rs:wait_connected(timeout) - if timeout == nil then - err_id = id - goto fail - end res, err = rs:callrw('vshard.storage._call', {'storage_ref', rid, timeout}, opts_ref) if res == nil then @@ -1033,6 +1077,7 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) local grouped_buckets local err, err_id, res, ok, map local call_args + local rs_list local replicasets = {} local preallocated = false local futures = {} @@ -1055,6 +1100,18 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) end timeout = deadline - fiber_clock() + -- Netbox async requests work only with active connections. + -- So, first need to wait for the master connection explicitly. + rs_list = table_new(0, #grouped_buckets) + for uuid, _ in pairs(grouped_buckets) do + replicaset = router.replicasets[uuid] + table.insert(rs_list, replicaset) + end + timeout, err, err_id = wait_connected_to_masters(rs_list, timeout) + if not timeout then + goto fail + end + -- Send ref requests with timeouts to the replicasets. futures = table_new(0, #grouped_buckets) for id, buckets in pairs(grouped_buckets) do