diff --git a/test/router-luatest/map_part_test.lua b/test/router-luatest/map_part_test.lua index 8a9dc3d3..5841e41e 100644 --- a/test/router-luatest/map_part_test.lua +++ b/test/router-luatest/map_part_test.lua @@ -1,4 +1,3 @@ - local t = require('luatest') local vtest = require('test.luatest_helpers.vtest') @@ -22,279 +21,209 @@ local cfg_template = { }, }, }, - bucket_count = 100, + bucket_count = 10, test_user_grant_range = 'super', } local global_cfg = vtest.config_new(cfg_template) -g.before_all(function() - vtest.cluster_new(g, global_cfg) - - t.assert_equals(g.replica_1_a:exec(function() +g.before_all(function(cg) + vtest.cluster_new(cg, global_cfg) + t.assert_equals(cg.replica_1_a:exec(function() return #ivshard.storage.info().alerts end), 0, 'no alerts after boot') - local router = vtest.router_new(g, 'router', global_cfg) - g.router = router + local router = vtest.router_new(cg, 'router', global_cfg) + cg.router = router local res, err = router:exec(function() - return ivshard.router.bootstrap({timeout = iwait_timeout}) + local res, err = ivshard.router.bootstrap({timeout = iwait_timeout}) + rawset(_G, 'do_map', function(bids, args, opts) + local val, err, err_uuid = ivshard.router.map_part_callrw( + bids, 'do_map', args, opts) + return { + val = val, + err = err, + err_uuid = err_uuid, + } + end) + return res, err end) t.assert(res and not err, 'bootstrap buckets') + vtest.cluster_exec_each(cg, function() + rawset(_G, 'do_map', function(res) + ilt.assert_gt(require('vshard.storage.ref').count, 0) + return {ivutil.replicaset_uuid(), res} + end) + rawset(_G, 'bucket_send', function(bid, dst) + local _, err = ivshard.storage.bucket_send( + bid, dst, {timeout = iwait_timeout}) + ilt.assert_equals(err, nil) + end) + end) + cg.rs1_uuid = cg.replica_1_a:replicaset_uuid() + cg.rs2_uuid = cg.replica_2_a:replicaset_uuid() end) -g.after_all(function() - g.cluster:drop() +g.after_all(function(cg) + cg.cluster:drop() end) -local function map_part_init() - local rs1_uuid = g.replica_1_a:replicaset_uuid() - local rs2_uuid = g.replica_2_a:replicaset_uuid() - - local create_map_func_f = function(res1) - rawset(_G, 'do_map', function(res2) - return {res1, res2} - end) - end - g.replica_1_a:exec(create_map_func_f, {1}) - g.replica_2_a:exec(create_map_func_f, {2}) - - local bids1 = vtest.storage_get_n_buckets(g.replica_1_a, 4) - local bids2 = vtest.storage_get_n_buckets(g.replica_2_a, 1) - - return { - rs1_uuid = rs1_uuid, - rs2_uuid = rs2_uuid, - bids1 = bids1, - bids2 = bids2, - } +local function router_do_map(router, bids, args, opts) + return router:exec(function(bids, args, opts) + return _G.do_map(bids, args, opts) + end, {bids, args, opts}) end -g.test_map_part_single_rs = function(g) - local expected, res - local init = map_part_init() - - res = g.router:exec(function(bid1, bid2) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid1, bid2}, - 'do_map', - {3}, - {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[3], init.bids1[2]}) +g.test_map_part_single_rs = function(cg) + local bids = vtest.storage_get_n_buckets(cg.replica_1_a, 4) + local res = router_do_map(cg.router, {bids[3], bids[2]}, + {123}, {timeout = iwait_timeout}) t.assert_equals(res.err, nil) t.assert_equals(res.err_uuid, nil) - expected = { - [init.rs1_uuid] = {{1, 3}}, - } - t.assert_equals(res.val, expected) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 123}}, + }) end -g.test_map_part_multi_rs = function(g) - local expected, res - local init = map_part_init() - - res = g.router:exec(function(bid1, bid2) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[1], init.bids2[1]}) +g.test_map_part_multi_rs = function(cg) + local bid1 = vtest.storage_first_bucket(cg.replica_1_a) + local bid2 = vtest.storage_first_bucket(cg.replica_2_a) + local res = router_do_map(cg.router, {bid1, bid2}, + {123}, {timeout = iwait_timeout}) t.assert_equals(res.err, nil) t.assert_equals(res.err_uuid, nil) - expected = { - [init.rs1_uuid] = {{1, 42}}, - [init.rs2_uuid] = {{2, 42}}, - } - t.assert_equals(res.val, expected) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 123}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, 123}}, + }) end -g.test_map_part_ref = function(g) - local expected, res - local init = map_part_init() - +g.test_map_part_ref = function(cg) -- First move some buckets from rs1 to rs2 and then pause gc on rs1. -- As a result, the buckets will be in the SENT state on rs1 and -- in the ACTIVE state on rs2. - g.replica_1_a:exec(function(bid1, bid2, to) - ivshard.storage.internal.errinj.ERRINJ_BUCKET_GC_PAUSE = true - ivshard.storage.bucket_send(bid1, to) - ivshard.storage.bucket_send(bid2, to) - end, {init.bids1[1], init.bids1[2], init.rs2_uuid}) + local bids1 = vtest.storage_get_n_buckets(cg.replica_1_a, 3) + cg.replica_1_a:exec(function(bid1, bid2, to) + _G.bucket_gc_pause() + _G.bucket_send(bid1, to) + _G.bucket_send(bid2, to) + end, {bids1[1], bids1[2], cg.rs2_uuid}) -- The buckets are ACTIVE on rs2, so the partial map should succeed. - res = g.router:exec(function(bid1, bid2) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[1], init.bids1[2]}) + local res = router_do_map(cg.router, {bids1[1], bids1[2]}, + {42}, {timeout = iwait_timeout}) t.assert_equals(res.err, nil) t.assert_equals(res.err_uuid, nil) - expected = { - [init.rs2_uuid] = {{2, 42}}, - } - t.assert_equals(res.val, expected) + t.assert_equals(res.val, { + [cg.rs2_uuid] = {{cg.rs2_uuid, 42}}, + }) -- But if we use some active bucket from rs1, the partial map should fail. -- The reason is that the moved buckets are still in the SENT state and -- we can't take a ref. - res = g.router:exec(function(bid1) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid1}, 'do_map', {42}, {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[3]}) + res = router_do_map(cg.router, {bids1[3]}, {42}, {timeout = 0.1}) t.assert_equals(res.val, nil) t.assert(res.err) - t.assert_equals(res.err_uuid, init.rs1_uuid) + t.assert_equals(res.err_uuid, cg.rs1_uuid) -- The moved buckets still exist on the rs1 with non-active status. -- Let's remove them and re-enable gc on rs1. - g.replica_1_a:exec(function() - ivshard.storage.internal.errinj.ERRINJ_BUCKET_GC_PAUSE = false + cg.replica_1_a:exec(function() + _G.bucket_gc_continue() _G.bucket_gc_wait() end) -- Now move the buckets back to rs1 and pause gc on rs2. -- The buckets will be ACTIVE on rs1 and SENT on rs2, -- so the partial map should succeed. - res = g.replica_2_a:exec(function(bid1, bid2, to) - ivshard.storage.internal.errinj.ERRINJ_BUCKET_GC_PAUSE = true - ivshard.storage.bucket_send(bid1, to) - local res, err = ivshard.storage.bucket_send(bid2, to) - return { - res = res, - err = err, - } - end, {init.bids1[1], init.bids1[2], init.rs1_uuid}) - t.assert_equals(res.err, nil) - - res = g.router:exec(function(bid1, bid2) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[1], init.bids1[2]}) + cg.replica_2_a:exec(function(bid1, bid2, to) + _G.bucket_gc_pause() + _G.bucket_send(bid1, to) + _G.bucket_send(bid2, to) + end, {bids1[1], bids1[2], cg.rs1_uuid}) + + res = router_do_map(cg.router, {bids1[1], bids1[2]}, + {42}, {timeout = iwait_timeout}) t.assert_equals(res.err, nil) t.assert_equals(res.err_uuid, nil) - expected = { - [init.rs1_uuid] = {{1, 42}}, - } - t.assert_equals(res.val, expected) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 42}}, + }) -- Re-enable gc on rs2. - g.replica_2_a:exec(function() - ivshard.storage.internal.errinj.ERRINJ_BUCKET_GC_PAUSE = false + cg.replica_2_a:exec(function() + _G.bucket_gc_continue() _G.bucket_gc_wait() end) end -g.test_map_part_double_ref = function(g) - local expected, res - local init = map_part_init() - +g.test_map_part_double_ref = function(cg) + local bid1 = vtest.storage_first_bucket(cg.replica_1_a) + local bid2 = vtest.storage_first_bucket(cg.replica_2_a) -- First, disable discovery on the router to disable route cache update. - g.router:exec(function() + cg.router:exec(function(bid, uuid) ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = true - end) + -- Make sure the location of the bucket is known. + local rs, err = ivshard.router.bucket_discovery(bid) + ilt.assert_equals(err, nil) + ilt.assert_equals(rs.uuid, uuid) + end, {bid1, cg.rs1_uuid}) -- Then, move the bucket form rs1 to rs2. Now the router has an outdated -- route cache. - g.replica_1_a:exec(function(bid, to) - ivshard.storage.bucket_send(bid, to) - end, {init.bids1[4], init.rs2_uuid}) - -- Call a partial map for the moved bucket and some bucket from rs2. The ref - -- stage should be done in two steps: + cg.replica_1_a:exec(function(bid, to) + _G.bucket_send(bid, to) + _G.bucket_gc_wait() + end, {bid1, cg.rs2_uuid}) + -- Call a partial map for the moved bucket and some bucket + -- from rs2. The ref stage should be done in two steps: -- 1. ref rs2 and returns the moved bucket; -- 2. discover the moved bucket on rs2 and avoid double reference; - res = g.router:exec(function(bid1, bid2) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[4], init.bids2[1]}) - t.assert_equals(res.err, nil) - t.assert_equals(res.err_uuid, nil) - expected = { - [init.rs2_uuid] = {{2, 42}}, - } - t.assert_equals(res.val, expected) - -- Call a partial map one more time to make sure there are no references - -- left. - res = g.router:exec(function(bid1, bid2) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid1, bid2}, 'do_map', {42}, {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[4], init.bids2[1]}) + local res = router_do_map(cg.router, {bid1, bid2}, + {42}, {timeout = iwait_timeout}) t.assert_equals(res.err, nil) t.assert_equals(res.err_uuid, nil) - t.assert_equals(res.val, expected) + t.assert_equals(res.val, { + [cg.rs2_uuid] = {{cg.rs2_uuid, 42}}, + }) + -- Make sure there are no references left. + vtest.cluster_exec_each(cg, function() + ilt.assert_equals(require('vshard.storage.ref').count, 0) + end) -- Return the bucket back and re-enable discovery on the router. - g.replica_2_a:exec(function(bid, to) - ivshard.storage.bucket_send(bid, to) - end, {init.bids1[4], init.rs1_uuid}) - g.router:exec(function() + cg.replica_2_a:exec(function(bid, to) + _G.bucket_send(bid, to) + _G.bucket_gc_wait() + end, {bid1, cg.rs1_uuid}) + cg.router:exec(function() ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false end) end -g.test_map_part_map = function(g) - local res - local init = map_part_init() - - g.replica_2_a:exec(function() +g.test_map_part_map = function(cg) + local bid1 = vtest.storage_first_bucket(cg.replica_1_a) + local bid2 = vtest.storage_first_bucket(cg.replica_2_a) + cg.replica_2_a:exec(function() + rawset(_G, 'old_do_map', _G.do_map) _G.do_map = function() return box.error(box.error.PROC_LUA, "map_err") end end) - res = g.router:exec(function(bid1, bid2) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid2, bid1}, 'do_map', {3}, {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[1], init.bids2[1]}) + local res = router_do_map(cg.router, {bid1, bid2}, + {3}, {timeout = iwait_timeout}) t.assert_equals(res.val, nil) t.assert_covers(res.err, { code = box.error.PROC_LUA, type = 'ClientError', message = 'map_err' }) - t.assert_equals(res.err_uuid, init.rs2_uuid) + t.assert_equals(res.err_uuid, cg.rs2_uuid) -- Check that there is no dangling references after the error. - init = map_part_init() - res = g.router:exec(function(bid1, bid2) - local val, err, err_uuid = ivshard.router.map_part_callrw( - {bid1, bid2}, 'do_map', {3}, {timeout = iwait_timeout}) - return { - val = val, - err = err, - err_uuid = err_uuid, - } - end, {init.bids1[1], init.bids2[1]}) - t.assert_equals(res.err, nil) + vtest.cluster_exec_each(cg, function() + ilt.assert_equals(require('vshard.storage.ref').count, 0) + end) + cg.replica_2_a:exec(function() + _G.do_map = _G.old_do_map + _G.old_do_map = nil + end) + res = router_do_map(cg.router, {bid1, bid2}, {3}, {timeout = iwait_timeout}) + t.assert_equals(res.err, nil, res.err) t.assert_equals(res.err_uuid, nil) t.assert_equals(res.val, { - [init.rs1_uuid] = {{1, 3}}, - [init.rs2_uuid] = {{2, 3}}, + [cg.rs1_uuid] = {{cg.rs1_uuid, 3}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, 3}}, }) end diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index f9cd6920..96ebf0f2 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -288,21 +288,16 @@ end g.test_group_buckets = function(g) local bids = vtest.storage_get_n_buckets(g.replica_1_a, 2) - - local res = g.router:exec(function(bid1, bid2) - local val, err = ivshard.router._buckets_group({bid2, bid1, bid1}) - return { - val = val, - err = err, - } + local val, err = g.router:exec(function(bid1, bid2) + return ivshard.router._buckets_group({bid2, bid1, bid1, bid2}, + ivtest.wait_timeout) end, {bids[1], bids[2]}) - assert(res.err == nil) + assert(err == nil) local rs1_uuid = g.replica_1_a:replicaset_uuid() local expected = { [rs1_uuid] = {bids[1], bids[2]}, } - table.sort(expected[rs1_uuid]) - t.assert_equals(res.val, expected) + t.assert_equals(val, expected) end g.test_uri_compare_and_reuse = function(g) diff --git a/test/router/router.result b/test/router/router.result index 8f678564..9ad9bce1 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1503,6 +1503,7 @@ error_messages - Use replicaset:update_master(...) instead of replicaset.update_master(...) - Use replicaset:wait_connected(...) instead of replicaset.wait_connected(...) - Use replicaset:wait_connected_all(...) instead of replicaset.wait_connected_all(...) + - Use replicaset:wait_master(...) instead of replicaset.wait_master(...) ... _, replica = next(replicaset.replicas) --- diff --git a/test/storage-luatest/storage_1_test.lua b/test/storage-luatest/storage_1_test.lua index 9e760ebe..aed35a06 100644 --- a/test/storage-luatest/storage_1_test.lua +++ b/test/storage-luatest/storage_1_test.lua @@ -228,31 +228,32 @@ end test_group.test_ref_with_lookup = function(g) g.replica_1_a:exec(function() + local lref = require('vshard.storage.ref') local res, err, _ - local timeout = 0.1 local rid = 42 local bids = _G.get_n_buckets(2) - local bid_extra = 3001 + local bucket_count = ivshard.storage.internal.total_bucket_count - -- Check for a single bucket. + -- No buckets. res, err = ivshard.storage._call( - 'storage_ref_with_lookup', - rid, - timeout, - {bids[1]} - ) + 'storage_ref_with_lookup', rid, iwait_timeout, {}) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {moved = {}}) + ilt.assert_equals(lref.count, 0) + + -- Check for a single ok bucket. + res, err = ivshard.storage._call( + 'storage_ref_with_lookup', rid, iwait_timeout, {bids[1]}) ilt.assert_equals(err, nil) ilt.assert_equals(res, {rid = rid, moved = {}}) + ilt.assert_equals(lref.count, 1) _, err = ivshard.storage._call('storage_unref', rid) ilt.assert_equals(err, nil) + ilt.assert_equals(lref.count, 0) - -- Check for multiple buckets. + -- Check for multiple ok buckets. res, err = ivshard.storage._call( - 'storage_ref_with_lookup', - rid, - timeout, - {bids[1], bids[2]} - ) + 'storage_ref_with_lookup', rid, iwait_timeout, {bids[1], bids[2]}) ilt.assert_equals(err, nil) ilt.assert_equals(res, {rid = rid, moved = {}}) _, err = ivshard.storage._call('storage_unref', rid) @@ -260,83 +261,67 @@ test_group.test_ref_with_lookup = function(g) -- Check for double referencing. res, err = ivshard.storage._call( - 'storage_ref_with_lookup', - rid, - timeout, - {bids[1], bids[1]} - ) + 'storage_ref_with_lookup', rid, iwait_timeout, {bids[1], bids[1]}) ilt.assert_equals(err, nil) ilt.assert_equals(res, {rid = rid, moved = {}}) + ilt.assert_equals(lref.count, 1) _, err = ivshard.storage._call('storage_unref', rid) ilt.assert_equals(err, nil) - _, err = ivshard.storage._call('storage_unref', rid) - t.assert_str_contains(err.message, - 'Can not delete a storage ref: no ref') + ilt.assert_equals(lref.count, 0) - -- Check for an absent bucket. + -- Bucket mix. res, err = ivshard.storage._call( - 'storage_ref_with_lookup', - rid, - timeout, - {bids[1], bids[2], bid_extra} - ) + 'storage_ref_with_lookup', rid, iwait_timeout, + {bucket_count + 1, bids[1], bucket_count + 2, bids[2], + bucket_count + 3}) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {rid = rid, moved = {bid_extra}}) + ilt.assert_equals(res, { + rid = rid, + moved = {bucket_count + 1, bucket_count + 2, bucket_count + 3} + }) _, err = ivshard.storage._call('storage_unref', rid) ilt.assert_equals(err, nil) - -- Check that we do not create a reference if there are no buckets. + -- No ref when all buckets are missing. res, err = ivshard.storage._call( 'storage_ref_with_lookup', rid, - timeout, - {bid_extra} + iwait_timeout, + {bucket_count + 1, bucket_count + 2} ) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {rid = nil, moved = {bid_extra}}) - res, err = ivshard.storage._call('storage_unref', rid) - t.assert_str_contains(err.message, - 'Can not delete a storage ref: no ref') - ilt.assert_equals(res, nil) + ilt.assert_equals(res, {moved = {bucket_count + 1, bucket_count + 2}}) + ilt.assert_equals(lref.count, 0) - -- Check for a timeout. - -- Emulate a case when all buckets are not writable. - local func = ivshard.storage.internal.bucket_are_all_rw - ivshard.storage.internal.bucket_are_all_rw = function() return false end + -- Timeout when some buckets aren't writable. Doesn't have to be the + -- same buckets as for moving. + box.space._bucket:update( + {bids[1]}, {{'=', 2, ivconst.BUCKET.SENDING}}) res, err = ivshard.storage._call( - 'storage_ref_with_lookup', - rid, - timeout, - {bids[1]} - ) - ivshard.storage.internal.bucket_are_all_rw = func + 'storage_ref_with_lookup', rid, 0.01, {bids[2]}) + box.space._bucket:update( + {bids[1]}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) t.assert_str_contains(err.message, 'Timeout exceeded') ilt.assert_equals(res, nil) - -- Check that the reference was not created. - res, err = ivshard.storage._call('storage_unref', rid) - t.assert_str_contains(err.message, - 'Can not delete a storage ref: no ref') - ilt.assert_equals(res, nil) + ilt.assert_equals(lref.count, 0) end) end -test_group.test_absent_buckets = function(g) +test_group.test_moved_buckets = function(g) g.replica_1_a:exec(function() local res, err = ivshard.storage._call( - 'storage_absent_buckets', + 'storage_moved_buckets', {_G.get_first_bucket()} ) ilt.assert_equals(err, nil) ilt.assert_equals(res, {moved = {}}) - end) - g.replica_1_a:exec(function() - local bid_extra = 3001 - local res, err = ivshard.storage._call( - 'storage_absent_buckets', - {_G.get_first_bucket(), bid_extra} + local bucket_count = ivshard.storage.internal.total_bucket_count + res, err = ivshard.storage._call( + 'storage_moved_buckets', + {_G.get_first_bucket(), bucket_count + 1, bucket_count + 2} ) ilt.assert_equals(err, nil) - ilt.assert_equals(res, {moved = {bid_extra}}) + ilt.assert_equals(res, {moved = {bucket_count + 1, bucket_count + 2}}) end) end diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result index c5b99ba0..7ca95377 100644 --- a/test/upgrade/upgrade.result +++ b/test/upgrade/upgrade.result @@ -177,8 +177,8 @@ vshard.storage._call('test_api', 1, 2, 3) | - rebalancer_apply_routes | - rebalancer_request_state | - recovery_bucket_stat - | - storage_absent_buckets | - storage_map + | - storage_moved_buckets | - storage_ref | - storage_ref_with_lookup | - storage_unref diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 0add102e..edead7c6 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -225,7 +225,6 @@ end -- Group bucket ids by replicasets according to the router cache. local function buckets_group(router, bucket_ids, timeout) - timeout = timeout or consts.CALL_TIMEOUT_MIN local deadline = fiber_clock() + timeout local replicaset, err local replicaset_buckets = {} @@ -238,19 +237,13 @@ local function buckets_group(router, bucket_ids, timeout) if bucket_id == prev_id then goto continue end - if fiber_clock() > deadline then return nil, lerror.timeout() end - replicaset, err = bucket_resolve(router, bucket_id) if err ~= nil then return nil, err end - if replicaset == nil then - return nil, lerror.vshard(lerror.code.NO_ROUTE_TO_BUCKET, bucket_id) - end - id = replicaset.id buckets = replicaset_buckets[id] if buckets then @@ -805,22 +798,19 @@ local function router_call(router, bucket_id, opts, ...) end local function wait_connected_to_masters(replicasets, timeout) - local master, id local err, err_id - -- Start connecting to all masters in parallel. local deadline = fiber_clock() + timeout for _, replicaset in pairs(replicasets) do + local master master, err = replicaset:wait_master(timeout) - id = replicaset.id if not master then - err_id = id + err_id = replicaset.id goto fail end replicaset:connect_replica(master) timeout = deadline - fiber_clock() end - -- Wait until all connections are established. for _, replicaset in pairs(replicasets) do timeout, err = replicaset:wait_connected(timeout) @@ -830,7 +820,6 @@ local function wait_connected_to_masters(replicasets, timeout) end end do return timeout end - ::fail:: return nil, err, err_id end @@ -901,11 +890,7 @@ local function router_map_callrw(router, func, args, opts) -- -- Netbox async requests work only with active connections. Need to wait -- for the connection explicitly. - local rs_list = table_new(0, #replicasets) - for _, rs in pairs(replicasets) do - table.insert(rs_list, rs) - end - timeout, err, err_id = wait_connected_to_masters(rs_list, timeout) + timeout, err, err_id = wait_connected_to_masters(replicasets, timeout) if not timeout then goto fail end @@ -1068,18 +1053,26 @@ end -- though all replicasets were scanned. -- local function router_map_part_callrw(router, bucket_ids, func, args, opts) - local replicaset local grouped_buckets + local group_count local err, err_id, res, ok, map - local call_args local rs_list - local replicasets = {} - local preallocated = false + local rs_map = {} local futures = {} - local call_opts = {is_async = true} local rs_count = 0 - local timeout = opts and opts.timeout or consts.CALL_TIMEOUT_MIN + local do_return_raw + local timeout + if opts then + timeout = opts.timeout or consts.CALL_TIMEOUT_MIN + do_return_raw = opts.return_raw + else + timeout = consts.CALL_TIMEOUT_MIN + end + local args_ref, args_map + local opts_ref = {is_async = true} + local opts_map = {is_async = true, return_raw = do_return_raw} local deadline = fiber_clock() + timeout + local replicasets = router.replicasets local rid = M.ref_id M.ref_id = rid + 1 @@ -1097,10 +1090,11 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) -- Netbox async requests work only with active connections. -- So, first need to wait for the master connection explicitly. - rs_list = table_new(0, #grouped_buckets) + rs_list = {} + group_count = 0 for uuid, _ in pairs(grouped_buckets) do - replicaset = router.replicasets[uuid] - table.insert(rs_list, replicaset) + group_count = group_count + 1 + table.insert(rs_list, replicasets[uuid]) end timeout, err, err_id = wait_connected_to_masters(rs_list, timeout) if not timeout then @@ -1108,25 +1102,21 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) end -- Send ref requests with timeouts to the replicasets. - futures = table_new(0, #grouped_buckets) + futures = table_new(0, group_count) for id, buckets in pairs(grouped_buckets) do - replicaset = router.replicasets[id] - -- Netbox async requests work only with active connections. Need to wait - -- for the connection explicitly. - timeout, err = replicaset:wait_connected(timeout) if timeout == nil then err_id = id goto fail end - if replicasets[id] then - -- Replicaset is already referred on the previous iteration. + if rs_map[id] then + -- Replicaset is already referenced on a previous iteration. -- Simply get the moved buckets without double referencing. - call_args = {'storage_absent_buckets', buckets} + args_ref = {'storage_moved_buckets', buckets} else - call_args = {'storage_ref_with_lookup', rid, timeout, buckets} - rs_count = rs_count + 1 + args_ref = {'storage_ref_with_lookup', rid, timeout, buckets} end - res, err = replicaset:callrw('vshard.storage._call', call_args, call_opts) + res, err = replicasets[id]:callrw('vshard.storage._call', + args_ref, opts_ref) if res == nil then err_id = id goto fail @@ -1134,17 +1124,10 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) futures[id] = res end - if not preallocated then - -- Current preallocation works only for the first iteration - -- (i.e. router cache is not outdated). - replicasets = table_new(0, rs_count) - preallocated = true - end - -- Wait for the refs to be done and collect moved buckets. bucket_ids = {} - for id, future in pairs(futures) do - res, err = future_wait(future, timeout) + for id, f in pairs(futures) do + res, err = future_wait(f, timeout) -- Handle netbox error first. if res == nil then err_id = id @@ -1161,32 +1144,35 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) table.insert(bucket_ids, bucket_id) end if res.rid then - -- If there are no buckets on the replicaset, - -- it would not be referred. - replicasets[id] = router.replicasets[id] + assert(not rs_map[id]) + rs_count = rs_count + 1 + -- If there are no buckets on the replicaset, it would not be + -- referenced. + rs_map[id] = replicasets[id] end timeout = deadline - fiber_clock() end end - - -- Map stage. + -- + -- Map stage: send. + -- map = table_new(0, rs_count) futures = table_new(0, rs_count) - args = {'storage_map', rid, func, args} + args_map = {'storage_map', rid, func, args} -- Send map requests. - for id, rs in pairs(replicasets) do - res, err = rs:callrw('vshard.storage._call', args, call_opts) + for id, rs in pairs(rs_map) do + res, err = rs:callrw('vshard.storage._call', args_map, opts_map) if res == nil then err_id = id goto fail end futures[id] = res end - - -- Reduce stage. - -- Collect map responses (refs were deleted by the storages for non-error results). - for id, future in pairs(futures) do - res, err = future_wait(future, timeout) + -- + -- Map stage: collect. + -- + for id, f in pairs(futures) do + res, err = future_wait(f, timeout) if res == nil then err_id = id goto fail @@ -1208,20 +1194,16 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts) do return map end ::fail:: - local f - for id, future in pairs(futures) do - future:discard() + for id, f in pairs(futures) do + f:discard() -- Best effort to remove the created refs before exiting. Can help if -- the timeout was big and the error happened early. - call_args = {'storage_unref', rid} - replicaset = router.replicasets[id] - if replicaset then - f = replicaset:callrw('vshard.storage._call', call_args, call_opts) - if f ~= nil then - -- Don't care waiting for a result - no time for this. But it won't - -- affect the request sending if the connection is still alive. - f:discard() - end + f = replicasets[id]:callrw('vshard.storage._call', + {'storage_unref', rid}, opts_ref) + if f ~= nil then + -- Don't care waiting for a result - no time for this. But it won't + -- affect the request sending if the connection is still alive. + f:discard() end end err = lerror.make(err) diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 2fbfba27..ad24eba3 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3187,7 +3187,7 @@ end -- @return A dictionary with a list of bucket identifiers which are -- not present on the current instance. -- -local function storage_absent_buckets(bucket_ids) +local function storage_moved_buckets(bucket_ids) local status = consts.BUCKET local moved_buckets = {} for _, bucket_id in pairs(bucket_ids) do @@ -3213,30 +3213,12 @@ end -- with the list of absent buckets is returned. -- local function storage_ref_with_lookup(rid, timeout, bucket_ids) - local moved = storage_absent_buckets(bucket_ids).moved + local moved = storage_moved_buckets(bucket_ids).moved if #moved == #bucket_ids then - -- Take an advantage that moved buckets are returned in the same - -- order as in the input list. - local do_match = true - local next_moved = next(moved) - local next_passed = next(bucket_ids) - ::continue:: - if next_moved then - if next_moved == next_passed then - next_moved = next(moved, next_moved) - next_passed = next(bucket_ids, next_passed) - goto continue - else - do_match = false - end - end - if do_match then - -- If all the passed buckets are absent, there is no need - -- to create a ref. - return {rid = nil, moved = moved} - end + -- If all the passed buckets are absent, there is no need to create a + -- ref. + return {rid = nil, moved = moved} end - local ok, err = storage_ref(rid, timeout) if not ok then return nil, err @@ -3298,7 +3280,7 @@ service_call_api = setmetatable({ rebalancer_apply_routes = rebalancer_apply_routes, rebalancer_request_state = rebalancer_request_state, recovery_bucket_stat = recovery_bucket_stat, - storage_absent_buckets = storage_absent_buckets, + storage_moved_buckets = storage_moved_buckets, storage_ref = storage_ref, storage_ref_with_lookup = storage_ref_with_lookup, storage_unref = storage_unref,