diff --git a/test/instances/storage.lua b/test/instances/storage.lua index 4bbff541..964da042 100755 --- a/test/instances/storage.lua +++ b/test/instances/storage.lua @@ -69,6 +69,21 @@ local function get_first_bucket() return res ~= nil and res.id or nil end +local function get_n_buckets(n) + if n <= 0 then + error('Invalid number of buckets') + end + local index = box.space._bucket.index.status + local ids = table.new(0, n) + for _, tuple in index:pairs(vconst.BUCKET.ACTIVE) do + table.insert(ids, tuple.id) + if #ids == n then + return ids + end + end + error('Not enough buckets') +end + local function session_set(key, value) box.session.storage[key] = value return true @@ -167,6 +182,7 @@ _G.box_error = box_error _G.echo = echo _G.get_uuid = get_uuid _G.get_first_bucket = get_first_bucket +_G.get_n_buckets = get_n_buckets _G.session_set = session_set _G.session_get = session_get _G.bucket_gc_wait = bucket_gc_wait diff --git a/test/lua_libs/util.lua b/test/lua_libs/util.lua index 3f435ccd..fd9d947e 100644 --- a/test/lua_libs/util.lua +++ b/test/lua_libs/util.lua @@ -181,8 +181,10 @@ if not BUILDDIR then BUILDDIR = SOURCEDIR end +local VARDIR = fio.abspath(os.getenv('VARDIR') or './') + local function git_checkout(dst_dir, version) - local vshard_copy_path = BUILDDIR..'/test/var/'..dst_dir + local vshard_copy_path = VARDIR..'/'..dst_dir -- Cleanup the directory after a previous build. os.execute('rm -rf ' .. vshard_copy_path) -- `git worktree` cannot be used because PACKPACK mounts diff --git a/test/luatest_helpers/vtest.lua b/test/luatest_helpers/vtest.lua index a4fc2091..5350140d 100644 --- a/test/luatest_helpers/vtest.lua +++ b/test/luatest_helpers/vtest.lua @@ -8,6 +8,9 @@ local yaml = require('yaml') local vcfg = require('vshard.cfg') local vrepset = require('vshard.replicaset') local log = require('log') +-- Otherwise in non-Debug builds apparently the unknown variables are treated as +-- nil-global-variables. +require('strict').on() local wait_timeout = 50 -- Use it in busy-loops like `while !cond do fiber.sleep(busy_step) end`. @@ -463,6 +466,15 @@ local function storage_first_bucket(storage) end) end +-- +-- Get n active buckets from the storage. +-- +local function storage_get_n_buckets(storage, n) + return storage:exec(function(n) + return _G.get_n_buckets(n) + end, {n}) +end + -- -- Disable rebalancer on all storages. -- @@ -851,6 +863,7 @@ return { cluster_wait_fullsync = cluster_wait_fullsync, cluster_rebalancer_find = cluster_rebalancer_find, storage_first_bucket = storage_first_bucket, + storage_get_n_buckets = storage_get_n_buckets, storage_stop = storage_stop, storage_start = storage_start, router_new = router_new, diff --git a/test/reload_evolution/storage.result b/test/reload_evolution/storage.result index 4a1b3a09..74cf355a 100644 --- a/test/reload_evolution/storage.result +++ b/test/reload_evolution/storage.result @@ -24,10 +24,10 @@ REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } --- ... -test_run:create_cluster(REPLICASET_1, 'reload_evolution') +test_run:create_cluster(REPLICASET_1, 'reload_evolution', {args = vshard_copy_path}) --- ... -test_run:create_cluster(REPLICASET_2, 'reload_evolution') +test_run:create_cluster(REPLICASET_2, 'reload_evolution', {args = vshard_copy_path}) --- ... util = require('util') diff --git a/test/reload_evolution/storage.test.lua b/test/reload_evolution/storage.test.lua index 3800309a..02a9cca6 100644 --- a/test/reload_evolution/storage.test.lua +++ b/test/reload_evolution/storage.test.lua @@ -12,8 +12,8 @@ vshard_copy_path = util.git_checkout('vshard_git_tree_copy', REPLICASET_1 = { 'storage_1_a', 'storage_1_b' } REPLICASET_2 = { 'storage_2_a', 'storage_2_b' } -test_run:create_cluster(REPLICASET_1, 'reload_evolution') -test_run:create_cluster(REPLICASET_2, 'reload_evolution') +test_run:create_cluster(REPLICASET_1, 'reload_evolution', {args = vshard_copy_path}) +test_run:create_cluster(REPLICASET_2, 'reload_evolution', {args = vshard_copy_path}) util = require('util') util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') diff --git a/test/reload_evolution/storage_1_a.lua b/test/reload_evolution/storage_1_a.lua index eb189a0e..f4b19a0e 100755 --- a/test/reload_evolution/storage_1_a.lua +++ b/test/reload_evolution/storage_1_a.lua @@ -1,16 +1,10 @@ #!/usr/bin/env tarantool local util = require('util') NAME = require('fio').basename(arg[0], '.lua') - --- Run one storage on a different vshard version. --- To do that, place vshard src to --- BUILDDIR/test/var/vshard_git_tree_copy/. +local source_path = arg[1] original_package_path = package.path if NAME == 'storage_2_a' then - vshard_copy = util.BUILDDIR .. '/test/var/vshard_git_tree_copy' - package.path = string.format( - '%s/?.lua;%s/?/init.lua;%s', - vshard_copy, vshard_copy, original_package_path - ) + package.path = string.format('%s/?.lua;%s/?/init.lua;%s', source_path, + source_path, package.path) end require('storage_template') diff --git a/test/router-luatest/map_callrw_test.lua b/test/router-luatest/map_callrw_test.lua new file mode 100644 index 00000000..2de0291c --- /dev/null +++ b/test/router-luatest/map_callrw_test.lua @@ -0,0 +1,527 @@ +local fiber = require('fiber') +local t = require('luatest') +local vtest = require('test.luatest_helpers.vtest') +local vutil = require('vshard.util') + +local g = t.group('router') +local cfg_template = { + sharding = { + { + replicas = { + replica_1_a = { + master = true, + }, + replica_1_b = {}, + }, + }, + { + replicas = { + replica_2_a = { + master = true, + }, + replica_2_b = {}, + }, + }, + { + replicas = { + replica_3_a = { + master = true, + }, + replica_3_b = {}, + }, + }, + }, + bucket_count = 30, + test_user_grant_range = 'super', +} +local global_cfg = vtest.config_new(cfg_template) + +g.before_all(function(cg) + vtest.cluster_new(cg, global_cfg) + t.assert_equals(cg.replica_1_a:exec(function() + return #ivshard.storage.info().alerts + end), 0, 'no alerts after boot') + local _ + local router = vtest.router_new(cg, 'router', global_cfg) + cg.router = router + local res, err = router:exec(function() + local res, err = ivshard.router.bootstrap({timeout = iwait_timeout}) + rawset(_G, 'do_map', function(args, opts) + local old_opts = table.copy(opts) + local val, err, err_uuid = ivshard.router.map_callrw( + 'do_map', args, opts) + -- Make sure the options aren't changed by vshard. + ilt.assert_equals(old_opts, opts) + local val_type + if opts.return_raw and val ~= nil then + -- Src+value. The src is plain Lua data. The value is raw. + local _, one_map = next(val) + val_type = type(one_map) + else + val_type = type(val) + end + return { + val = val, + val_type = val_type, + err = err, + err_uuid = err_uuid, + } + end) + return res, err + end) + t.assert(res and not err, 'bootstrap buckets') + _, err = vtest.cluster_exec_each(cg, function() + rawset(_G, 'do_map', function(res) + ilt.assert_gt(require('vshard.storage.ref').count, 0) + return {ivutil.replicaset_uuid(), res} + end) + rawset(_G, 'bucket_send', function(bid, dst) + local _, err = ivshard.storage.bucket_send( + bid, dst, {timeout = iwait_timeout}) + ilt.assert_equals(err, nil) + end) + end) + t.assert_equals(err, nil) + cg.rs1_uuid = cg.replica_1_a:replicaset_uuid() + cg.rs2_uuid = cg.replica_2_a:replicaset_uuid() + cg.rs3_uuid = cg.replica_3_a:replicaset_uuid() +end) + +g.after_all(function(cg) + cg.cluster:drop() +end) + +local function router_do_map(router, args, opts) + return router:exec(function(args, opts) + return _G.do_map(args, opts) + end, {args, opts}) +end + +g.test_map_part_single_rs = function(cg) + local bids = vtest.storage_get_n_buckets(cg.replica_1_a, 4) + local res = router_do_map(cg.router, {123}, { + timeout = vtest.wait_timeout, + bucket_ids = {bids[3], bids[2]}, + }) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 123}}, + }) +end + +g.test_map_part_multi_rs = function(cg) + local bid1 = vtest.storage_first_bucket(cg.replica_1_a) + local bid2 = vtest.storage_first_bucket(cg.replica_2_a) + local res = router_do_map(cg.router, {123}, { + timeout = vtest.wait_timeout, + bucket_ids = {bid1, bid2}, + }) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 123}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, 123}}, + }) +end + +g.test_map_part_all_rs = function(cg) + local bid1 = vtest.storage_first_bucket(cg.replica_1_a) + local bid2 = vtest.storage_first_bucket(cg.replica_2_a) + local bid3 = vtest.storage_first_bucket(cg.replica_3_a) + local res = router_do_map(cg.router, {123}, { + timeout = vtest.wait_timeout, + bucket_ids = {bid1, bid2, bid3}, + }) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 123}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, 123}}, + [cg.rs3_uuid] = {{cg.rs3_uuid, 123}}, + }) +end + +g.test_map_part_ref = function(cg) + -- First move some buckets from rs1 to rs2 and then pause gc on rs1. + -- As a result, the buckets will be in the SENT state on rs1 and + -- in the ACTIVE state on rs2. + local bids1 = vtest.storage_get_n_buckets(cg.replica_1_a, 3) + cg.replica_1_a:exec(function(bid1, bid2, to) + _G.bucket_gc_pause() + _G.bucket_send(bid1, to) + _G.bucket_send(bid2, to) + end, {bids1[1], bids1[2], cg.rs2_uuid}) + -- The buckets are ACTIVE on rs2, so the partial map should succeed. + local res = router_do_map(cg.router, {42}, { + timeout = vtest.wait_timeout, + bucket_ids = {bids1[1], bids1[2]}, + }) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, { + [cg.rs2_uuid] = {{cg.rs2_uuid, 42}}, + }) + -- But if we use some active bucket from rs1, the partial map should fail. + -- The reason is that the moved buckets are still in the SENT state and + -- we can't take a ref. + res = router_do_map(cg.router, {42}, { + timeout = 0.1, + bucket_ids = {bids1[3]}, + }) + t.assert_equals(res.val, nil) + t.assert(res.err) + t.assert_equals(res.err_uuid, cg.rs1_uuid) + -- The moved buckets still exist on the rs1 with non-active status. + -- Let's remove them and re-enable gc on rs1. + cg.replica_1_a:exec(function() + _G.bucket_gc_continue() + _G.bucket_gc_wait() + end) + -- Now move the buckets back to rs1 and pause gc on rs2. + -- The buckets will be ACTIVE on rs1 and SENT on rs2, + -- so the partial map should succeed. + cg.replica_2_a:exec(function(bid1, bid2, to) + _G.bucket_gc_pause() + _G.bucket_send(bid1, to) + _G.bucket_send(bid2, to) + end, {bids1[1], bids1[2], cg.rs1_uuid}) + + res = router_do_map(cg.router, {42}, { + timeout = vtest.wait_timeout, + bucket_ids = {bids1[1], bids1[2]}, + }) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 42}}, + }) + -- Re-enable gc on rs2. + cg.replica_2_a:exec(function() + _G.bucket_gc_continue() + _G.bucket_gc_wait() + end) +end + +g.test_map_part_double_ref = function(cg) + local bid1 = vtest.storage_first_bucket(cg.replica_1_a) + local bid2 = vtest.storage_first_bucket(cg.replica_2_a) + -- First, disable discovery on the router to disable route cache update. + cg.router:exec(function(bid, uuid) + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = true + -- Make sure the location of the bucket is known. + local rs, err = ivshard.router.bucket_discovery(bid) + ilt.assert_equals(err, nil) + ilt.assert_equals(rs.uuid, uuid) + end, {bid1, cg.rs1_uuid}) + -- Then, move the bucket form rs1 to rs2. Now the router has an outdated + -- route cache. + cg.replica_1_a:exec(function(bid, to) + _G.bucket_send(bid, to) + _G.bucket_gc_wait() + end, {bid1, cg.rs2_uuid}) + -- Call a partial map for the moved bucket and some bucket + -- from rs2. The ref stage should be done in two steps: + -- 1. ref rs2 and returns the moved bucket; + -- 2. discover the moved bucket on rs2 and avoid double reference; + local res = router_do_map(cg.router, {42}, { + timeout = vtest.wait_timeout, + bucket_ids = {bid1, bid2}, + }) + t.assert_equals(res.err, nil) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, { + [cg.rs2_uuid] = {{cg.rs2_uuid, 42}}, + }) + -- Make sure there are no references left. + local _, err = vtest.cluster_exec_each(cg, function() + ilt.assert_equals(require('vshard.storage.ref').count, 0) + end) + t.assert_equals(err, nil) + -- Return the bucket back and re-enable discovery on the router. + cg.replica_2_a:exec(function(bid, to) + _G.bucket_send(bid, to) + _G.bucket_gc_wait() + end, {bid1, cg.rs1_uuid}) + cg.router:exec(function() + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false + end) +end + +g.test_map_part_ref_timeout = function(cg) + local bids = vtest.storage_get_n_buckets(cg.replica_1_a, 2) + local bid1 = bids[1] + local bid2 = bids[2] + + bids = vtest.storage_get_n_buckets(cg.replica_2_a, 2) + local bid3 = bids[1] + local bid4 = bids[2] + + -- First, disable discovery on the router to disable route cache update. + cg.router:exec(function(bids) + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = true + -- Make sure the location of the bucket is known. + for _, bid in pairs(bids) do + local _, err = ivshard.router.bucket_discovery(bid) + ilt.assert_equals(err, nil) + end + end, {{bid1, bid2, bid3, bid4}}) + + -- Count the map calls. The loss of ref must be detected before the + -- map-stage. + local _, err = vtest.cluster_exec_each_master(cg, function() + rawset(_G, 'old_do_map', _G.do_map) + rawset(_G, 'map_count', 0) + _G.do_map = function(...) + _G.map_count = _G.map_count + 1 + return _G.old_do_map(...) + end + end) + t.assert_equals(err, nil) + + -- Send bucket so the router thinks: + -- rs1: {b1, b2}, rs2: {b3, b4} + -- and actually the state is: + -- rs1: {b1}, rs2: {b2, b3, b4} + cg.replica_1_a:exec(function(bid, to) + _G.bucket_send(bid, to) + _G.bucket_gc_wait() + end, {bid2, cg.rs2_uuid}) + + -- Partial map goes with the outdated mapping to the storages, successfully + -- refs rs1. Then gets a bit stuck in rs2. Rs1 ref in the meantime time is + -- lost. Due to restart or timeout or whatever. + cg.replica_2_a:exec(function() + local lref = require('vshard.storage.ref') + rawset(_G, 'old_ref_add', lref.add) + lref.add = function(rid, sid, ...) + ilt.assert_equals(rawget(_G, 'test_ref'), nil) + rawset(_G, 'test_ref', {rid = rid, sid = sid}) + local ok, err = _G.old_ref_add(rid, sid, ...) + ilt.helpers.retrying({timeout = iwait_timeout}, function() + if rawget(_G, 'test_ref') then + error('Test refs is not picked up') + end + end) + return ok, err + end + end) + local f = fiber.new(function() + return router_do_map(cg.router, {42}, { + timeout = vtest.wait_timeout, + bucket_ids = {bid1, bid2, bid3, bid4}, + }) + end) + f:set_joinable(true) + cg.replica_2_a:exec(function() + ilt.helpers.retrying({timeout = iwait_timeout}, function() + if not rawget(_G, 'test_ref') then + error('Test refs is not set') + end + end) + local lref = require('vshard.storage.ref') + local _, err = lref.del(_G.test_ref.rid, _G.test_ref.sid) + ilt.assert_equals(err, nil) + -- Cleanup. + lref.add = _G.old_ref_add + _G.old_ref_add = nil + _G.test_ref = nil + end) + + -- The whole request must fail now. + local ok, res = f:join() + t.assert(ok) + t.assert(res) + t.assert_not_equals(res.err, nil) + t.assert_equals(res.err_uuid, cg.rs2_uuid) + + -- Make sure there are no references left. + _, err = vtest.cluster_exec_each(cg, function() + ilt.assert_equals(require('vshard.storage.ref').count, 0) + end) + t.assert_equals(err, nil) + + -- No maps had a chance to get executed. + _, err = vtest.cluster_exec_each_master(cg, function() + ilt.assert_equals(_G.map_count, 0) + _G.do_map = _G.old_do_map + _G.old_do_map = nil + _G.map_count = nil + end) + t.assert_equals(err, nil) + + -- Return the bucket back and re-enable discovery on the router. + cg.replica_2_a:exec(function(bid, to) + _G.bucket_send(bid, to) + _G.bucket_gc_wait() + end, {bid2, cg.rs1_uuid}) + cg.router:exec(function() + ivshard.router.internal.errinj.ERRINJ_LONG_DISCOVERY = false + end) +end + +g.test_map_part_map = function(cg) + local bid1 = vtest.storage_first_bucket(cg.replica_1_a) + local bid2 = vtest.storage_first_bucket(cg.replica_2_a) + cg.replica_2_a:exec(function() + rawset(_G, 'old_do_map', _G.do_map) + _G.do_map = function() + return box.error(box.error.PROC_LUA, "map_err") + end + end) + local res = router_do_map(cg.router, {3}, { + timeout = vtest.wait_timeout, + bucket_ids = {bid1, bid2}, + }) + t.assert_equals(res.val, nil) + t.assert_covers(res.err, { + code = box.error.PROC_LUA, + type = 'ClientError', + message = 'map_err' + }) + t.assert_equals(res.err_uuid, cg.rs2_uuid) + -- Check that there is no dangling references after the error. + local _, err = vtest.cluster_exec_each(cg, function() + ilt.assert_equals(require('vshard.storage.ref').count, 0) + end) + t.assert_equals(err, nil) + cg.replica_2_a:exec(function() + _G.do_map = _G.old_do_map + _G.old_do_map = nil + end) + res = router_do_map(cg.router, {3}, { + timeout = vtest.wait_timeout, + bucket_ids = {bid1, bid2}, + }) + t.assert_equals(res.err, nil, res.err) + t.assert_equals(res.err_uuid, nil) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 3}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, 3}}, + }) +end + +g.test_map_part_callrw_raw = function(cg) + t.run_only_if(vutil.feature.netbox_return_raw) + -- + -- Successful map. + -- + local bid1 = vtest.storage_first_bucket(cg.replica_1_a) + local bid2 = vtest.storage_first_bucket(cg.replica_2_a) + local res = router_do_map(cg.router, {3}, { + timeout = vtest.wait_timeout, + return_raw = true, + bucket_ids = {bid1, bid2}, + }) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 3}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, 3}}, + }) + t.assert_equals(res.val_type, 'userdata') + t.assert(not res.err) + -- + -- Successful map, but one of the storages returns nothing. + -- + cg.replica_2_a:exec(function() + rawset(_G, 'old_do_map', _G.do_map) + _G.do_map = function() + return + end + end) + res = router_do_map(cg.router, {}, { + timeout = vtest.wait_timeout, + return_raw = true, + bucket_ids = {bid1, bid2}, + }) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid}}, + }) + -- + -- Error at map stage. + -- + cg.replica_2_a:exec(function() + _G.do_map = function() + return box.error(box.error.PROC_LUA, "map_err") + end + end) + res = router_do_map(cg.router, {}, { + timeout = vtest.wait_timeout, + return_raw = true, + bucket_ids = {bid1, bid2}, + }) + t.assert_equals(res.val, nil) + t.assert_covers(res.err, { + code = box.error.PROC_LUA, + type = 'ClientError', + message = 'map_err' + }, 'error object') + t.assert_equals(res.err_uuid, cg.rs2_uuid, 'error uuid') + -- + -- Cleanup. + -- + cg.replica_2_a:exec(function() + _G.do_map = _G.old_do_map + _G.old_do_map = nil + end) +end + +g.test_map_all_callrw_raw = function(cg) + t.run_only_if(vutil.feature.netbox_return_raw) + -- + -- Successful map. + -- + local res = router_do_map(cg.router, {3}, { + timeout = vtest.wait_timeout, + return_raw = true, + }) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid, 3}}, + [cg.rs2_uuid] = {{cg.rs2_uuid, 3}}, + [cg.rs3_uuid] = {{cg.rs3_uuid, 3}}, + }) + t.assert_equals(res.val_type, 'userdata') + t.assert(not res.err) + -- + -- Successful map, but one of the storages returns nothing. + -- + cg.replica_2_a:exec(function() + rawset(_G, 'old_do_map', _G.do_map) + _G.do_map = function() + return + end + end) + res = router_do_map(cg.router, {}, { + timeout = vtest.wait_timeout, + return_raw = true, + }) + t.assert_equals(res.val, { + [cg.rs1_uuid] = {{cg.rs1_uuid}}, + [cg.rs3_uuid] = {{cg.rs3_uuid}}, + }) + -- + -- Error at map stage. + -- + cg.replica_2_a:exec(function() + _G.do_map = function() + return box.error(box.error.PROC_LUA, "map_err") + end + end) + res = router_do_map(cg.router, {}, { + timeout = vtest.wait_timeout, + return_raw = true, + }) + t.assert_equals(res.val, nil) + t.assert_covers(res.err, { + code = box.error.PROC_LUA, + type = 'ClientError', + message = 'map_err' + }, 'error object') + t.assert_equals(res.err_uuid, cg.rs2_uuid, 'error uuid') + -- + -- Cleanup. + -- + cg.replica_2_a:exec(function() + _G.do_map = _G.old_do_map + _G.old_do_map = nil + end) +end diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index b2e12951..cb4456fa 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -206,84 +206,18 @@ g.test_return_raw = function(g) end) end -g.test_map_callrw_raw = function(g) - t.run_only_if(vutil.feature.netbox_return_raw) - - local create_map_func_f = function(res1) - rawset(_G, 'do_map', function(res2) - return {res1, res2} - end) - end - g.replica_1_a:exec(create_map_func_f, {1}) - g.replica_2_a:exec(create_map_func_f, {2}) - -- - -- Successful map. - -- - local res = g.router:exec(function() - local val, err = ivshard.router.map_callrw( - 'do_map', imsgpack.object({3}), {timeout = iwait_timeout, - return_raw = true}) - local _, one_map = next(val) - return { - val = val, - map_type = type(one_map), - err = err, - } - end) +g.test_group_buckets = function(g) + local bids = vtest.storage_get_n_buckets(g.replica_1_a, 2) + local val, err = g.router:exec(function(bid1, bid2) + return ivshard.router._buckets_group({bid2, bid1, bid1, bid2}, + ivtest.wait_timeout) + end, {bids[1], bids[2]}) + assert(err == nil) local rs1_uuid = g.replica_1_a:replicaset_uuid() - local rs2_uuid = g.replica_2_a:replicaset_uuid() local expected = { - [rs1_uuid] = {{1, 3}}, - [rs2_uuid] = {{2, 3}}, + [rs1_uuid] = {bids[2], bids[1]}, } - t.assert_equals(res.val, expected, 'map callrw success') - t.assert_equals(res.map_type, 'userdata', 'values are msgpacks') - t.assert(not res.err, 'no error') - -- - -- Successful map, but one of the storages returns nothing. - -- - g.replica_2_a:exec(function() - _G.do_map = function() - return - end - end) - res = g.router:exec(function() - return ivshard.router.map_callrw('do_map', {}, {timeout = iwait_timeout, - return_raw = true}) - end) - expected = { - [rs1_uuid] = {{1}}, - } - t.assert_equals(res, expected, 'map callrw without one value success') - -- - -- Error at map stage. - -- - g.replica_2_a:exec(function() - _G.do_map = function() - return box.error(box.error.PROC_LUA, "map_err") - end - end) - local err, err_uuid - res, err, err_uuid = g.router:exec(function() - return ivshard.router.map_callrw('do_map', {}, {timeout = iwait_timeout, - return_raw = true}) - end) - t.assert(res == nil, 'no result') - t.assert_covers(err, { - code = box.error.PROC_LUA, - type = 'ClientError', - message = 'map_err' - }, 'error object') - t.assert_equals(err_uuid, rs2_uuid, 'error uuid') - -- - -- Cleanup. - -- - g.replica_1_a:exec(function() - _G.do_map = nil - end) - g.replica_2_a:exec(function() - _G.do_map = nil - end) + t.assert_equals(val, expected) end g.test_uri_compare_and_reuse = function(g) diff --git a/test/router/router.result b/test/router/router.result index 8f678564..9ad9bce1 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1503,6 +1503,7 @@ error_messages - Use replicaset:update_master(...) instead of replicaset.update_master(...) - Use replicaset:wait_connected(...) instead of replicaset.wait_connected(...) - Use replicaset:wait_connected_all(...) instead of replicaset.wait_connected_all(...) + - Use replicaset:wait_master(...) instead of replicaset.wait_master(...) ... _, replica = next(replicaset.replicas) --- diff --git a/test/storage-luatest/storage_1_test.lua b/test/storage-luatest/storage_1_test.lua index 0d22df20..1e1600b3 100644 --- a/test/storage-luatest/storage_1_test.lua +++ b/test/storage-luatest/storage_1_test.lua @@ -225,3 +225,365 @@ test_group.test_bucket_skip_validate = function(g) internal.errinj.ERRINJ_SKIP_BUCKET_STATUS_VALIDATE = false end) end + +test_group.test_ref_with_buckets_basic = function(g) + g.replica_1_a:exec(function() + local lref = require('vshard.storage.ref') + local res, err, _ + local rid = 42 + local bids = _G.get_n_buckets(2) + local bucket_count = ivshard.storage.internal.total_bucket_count + + -- No buckets. + res, err = ivshard.storage._call( + 'storage_ref_make_with_buckets', rid, iwait_timeout, {}) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {moved = {}}) + ilt.assert_equals(lref.count, 0) + + -- Check for a single ok bucket. + res, err = ivshard.storage._call( + 'storage_ref_make_with_buckets', rid, iwait_timeout, {bids[1]}) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {is_done = true, moved = {}}) + ilt.assert_equals(lref.count, 1) + _, err = ivshard.storage._call('storage_unref', rid) + ilt.assert_equals(err, nil) + ilt.assert_equals(lref.count, 0) + + -- Check for multiple ok buckets. + res, err = ivshard.storage._call( + 'storage_ref_make_with_buckets', rid, iwait_timeout, + {bids[1], bids[2]}) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {is_done = true, moved = {}}) + _, err = ivshard.storage._call('storage_unref', rid) + ilt.assert_equals(err, nil) + + -- Check for double referencing. + res, err = ivshard.storage._call( + 'storage_ref_make_with_buckets', rid, iwait_timeout, + {bids[1], bids[1]}) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {is_done = true, moved = {}}) + ilt.assert_equals(lref.count, 1) + _, err = ivshard.storage._call('storage_unref', rid) + ilt.assert_equals(err, nil) + ilt.assert_equals(lref.count, 0) + + -- Bucket mix. + res, err = ivshard.storage._call( + 'storage_ref_make_with_buckets', rid, iwait_timeout, + {bucket_count + 1, bids[1], bucket_count + 2, bids[2], + bucket_count + 3}) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, { + is_done = true, + moved = { + {id = bucket_count + 1}, + {id = bucket_count + 2}, + {id = bucket_count + 3}, + } + }) + _, err = ivshard.storage._call('storage_unref', rid) + ilt.assert_equals(err, nil) + + -- No ref when all buckets are missing. + res, err = ivshard.storage._call( + 'storage_ref_make_with_buckets', + rid, + iwait_timeout, + {bucket_count + 1, bucket_count + 2} + ) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {moved = { + {id = bucket_count + 1}, + {id = bucket_count + 2}, + }}) + ilt.assert_equals(lref.count, 0) + end) +end + +test_group.test_ref_with_buckets_timeout = function(g) + g.replica_1_a:exec(function() + local lref = require('vshard.storage.ref') + local rid = 42 + local bids = _G.get_n_buckets(2) + -- + -- Timeout when some buckets aren't writable. Doesn't have to be the + -- same buckets as for moving. + -- + _G.bucket_recovery_pause() + box.space._bucket:update( + {bids[1]}, {{'=', 2, ivconst.BUCKET.SENDING}}) + local res, err = ivshard.storage._call( + 'storage_ref_make_with_buckets', rid, 0.01, {bids[2]}) + box.space._bucket:update( + {bids[1]}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) + t.assert_str_contains(err.message, 'Timeout exceeded') + ilt.assert_equals(res, nil) + ilt.assert_equals(lref.count, 0) + _G.bucket_recovery_continue() + end) +end + +test_group.test_ref_with_buckets_return_last_known_dst = function(g) + g.replica_1_a:exec(function() + local lref = require('vshard.storage.ref') + local rid = 42 + local bid = _G.get_first_bucket() + local luuid = require('uuid') + local id = luuid.str() + -- Make the bucket follow the correct state sequence. Another way is + -- validated and not allowed. + _G.bucket_recovery_pause() + _G.bucket_gc_pause() + box.space._bucket:update( + {bid}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}}) + box.space._bucket:update( + {bid}, {{'=', 2, ivconst.BUCKET.SENT}}) + local res, err = ivshard.storage._call( + 'storage_ref_make_with_buckets', rid, iwait_timeout, {bid}) + ilt.assert_equals(err, nil) + ilt.assert_equals(res, {moved = {{ + id = bid, + dst = id, + status = ivconst.BUCKET.SENT, + }}}) + ilt.assert_equals(lref.count, 0) + -- Cleanup. + _G.bucket_gc_continue() + _G.bucket_gc_wait() + box.space._bucket:insert({bid, ivconst.BUCKET.RECEIVING}) + box.space._bucket:update({bid}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) + _G.bucket_recovery_continue() + end) +end + +test_group.test_ref_with_buckets_move_part_while_referencing = function(g) + g.replica_1_a:exec(function() + local lref = require('vshard.storage.ref') + local _ + local rid = 42 + local bids = _G.get_n_buckets(3) + local luuid = require('uuid') + local id = luuid.str() + -- + -- Was not moved until ref, but moved while ref was waiting. + -- + _G.bucket_recovery_pause() + _G.bucket_gc_pause() + -- Block the refs for a while. + box.space._bucket:update( + {bids[3]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}}) + -- Start referencing. + local session_id + local f = ifiber.new(function() + session_id = box.session.id() + return ivshard.storage._call('storage_ref_make_with_buckets', rid, + iwait_timeout, {bids[1], bids[2]}) + end) + f:set_joinable(true) + -- While waiting, one of the target buckets starts moving. + box.space._bucket:update( + {bids[2]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}}) + -- Now they are moved. + box.space._bucket:update({bids[2]}, {{'=', 2, ivconst.BUCKET.SENT}}) + box.space._bucket:update({bids[3]}, {{'=', 2, ivconst.BUCKET.SENT}}) + _G.bucket_gc_continue() + _G.bucket_gc_wait() + local ok, res, err = f:join() + t.assert(ok) + t.assert_equals(err, nil) + ilt.assert_equals(res, { + moved = {{id = bids[2], dst = id}}, + is_done = true, + }) + -- Ref was done, because at least one bucket was ok. + ilt.assert_equals(lref.count, 1) + -- Cleanup. + _, err = lref.del(rid, session_id) + ilt.assert_equals(err, nil) + ilt.assert_equals(lref.count, 0) + box.space._bucket:insert({bids[2], ivconst.BUCKET.RECEIVING}) + box.space._bucket:update({bids[2]}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) + box.space._bucket:insert({bids[3], ivconst.BUCKET.RECEIVING}) + box.space._bucket:update({bids[3]}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) + _G.bucket_recovery_continue() + end) +end + +test_group.test_ref_with_buckets_move_all_while_referencing = function(g) + g.replica_1_a:exec(function() + local lref = require('vshard.storage.ref') + local rid = 42 + local bids = _G.get_n_buckets(3) + local luuid = require('uuid') + local id = luuid.str() + -- + -- Now same, but all buckets were moved. No ref should be left then. + -- + _G.bucket_recovery_pause() + _G.bucket_gc_pause() + -- Block the refs for a while. + box.space._bucket:update( + {bids[3]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}}) + -- Start referencing. + local f = ifiber.new(function() + return ivshard.storage._call('storage_ref_make_with_buckets', rid, + iwait_timeout, {bids[1], bids[2]}) + end) + f:set_joinable(true) + -- While waiting, all the target buckets start moving. + box.space._bucket:update( + {bids[1]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}}) + box.space._bucket:update( + {bids[2]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}}) + -- Now they are moved. + box.space._bucket:update({bids[1]}, {{'=', 2, ivconst.BUCKET.SENT}}) + box.space._bucket:update({bids[2]}, {{'=', 2, ivconst.BUCKET.SENT}}) + -- And the other bucket doesn't matter. Can revert it back. + box.space._bucket:update({bids[3]}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) + _G.bucket_gc_continue() + _G.bucket_gc_wait() + local ok, res, err = f:join() + t.assert(ok) + t.assert_equals(err, nil) + ilt.assert_equals(res, { + moved = { + {id = bids[1], dst = id}, + {id = bids[2], dst = id}, + } + }) + -- Ref was not done, because all the buckets moved out. + ilt.assert_equals(lref.count, 0) + -- Cleanup. + box.space._bucket:insert({bids[1], ivconst.BUCKET.RECEIVING}) + box.space._bucket:update({bids[1]}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) + box.space._bucket:insert({bids[2], ivconst.BUCKET.RECEIVING}) + box.space._bucket:update({bids[2]}, {{'=', 2, ivconst.BUCKET.ACTIVE}}) + _G.bucket_recovery_continue() + end) +end + +test_group.test_moved_buckets_various_statuses = function(g) + g.replica_1_a:exec(function() + local _bucket = box.space._bucket + -- Make sure that if any bucket status is added/deleted/changed, the + -- test gets an update. + ilt.assert_equals(ivconst.BUCKET, { + ACTIVE = 'active', + PINNED = 'pinned', + SENDING = 'sending', + SENT = 'sent', + RECEIVING = 'receiving', + GARBAGE = 'garbage', + }) + _G.bucket_recovery_pause() + _G.bucket_gc_pause() + local luuid = require('uuid') + -- +1 to delete and make it a 404 bucket. + local bids = _G.get_n_buckets(7) + + -- ACTIVE = bids[1]. + -- + -- PINNED = bids[2]. + local bid_pinned = bids[2] + _bucket:update({bid_pinned}, + {{'=', 2, ivconst.BUCKET.PINNED}}) + + -- SENDING = bids[3]. + local bid_sending = bids[3] + local id_sending = luuid.str() + _bucket:update({bid_sending}, + {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id_sending}}) + + -- SENT = bids[4]. + local bid_sent = bids[4] + local id_sent = luuid.str() + _bucket:update({bid_sent}, + {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id_sent}}) + _bucket:update({bid_sent}, + {{'=', 2, ivconst.BUCKET.SENT}}) + + -- RECEIVING = bids[5]. + local bid_receiving = bids[5] + local id_receiving = luuid.str() + _bucket:update({bid_receiving}, + {{'=', 2, ivconst.BUCKET.SENDING}}) + _bucket:update({bid_receiving}, + {{'=', 2, ivconst.BUCKET.SENT}}) + _bucket:update({bid_receiving}, + {{'=', 2, ivconst.BUCKET.GARBAGE}}) + _bucket:delete({bid_receiving}) + _bucket:insert({bid_receiving, ivconst.BUCKET.RECEIVING, id_receiving}) + + -- GARBAGE = bids[6]. + local bid_garbage = bids[6] + local id_garbage = luuid.str() + _bucket:update({bid_garbage}, + {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id_garbage}}) + _bucket:update({bid_garbage}, + {{'=', 2, ivconst.BUCKET.SENT}}) + _bucket:update({bid_garbage}, + {{'=', 2, ivconst.BUCKET.GARBAGE}}) + + -- NOT EXISTING = bids[7]. + local bid_404 = bids[7] + _bucket:update({bid_404}, + {{'=', 2, ivconst.BUCKET.SENDING}}) + _bucket:update({bid_404}, + {{'=', 2, ivconst.BUCKET.SENT}}) + _bucket:update({bid_404}, + {{'=', 2, ivconst.BUCKET.GARBAGE}}) + _bucket:delete({bid_404}) + + local moved = ivshard.storage.internal.bucket_get_moved(bids) + ilt.assert_items_equals(moved, { + { + id = bid_sent, + dst = id_sent, + status = ivconst.BUCKET.SENT, + }, + { + id = bid_garbage, + dst = id_garbage, + status = ivconst.BUCKET.GARBAGE, + }, + { + id = bid_404, + } + }) + -- + -- Cleanup. + -- + -- NOT EXISTING. + _bucket:insert({bid_404, ivconst.BUCKET.RECEIVING}) + _bucket:update({bid_404}, + {{'=', 2, ivconst.BUCKET.ACTIVE}}) + -- GARBAGE. + _bucket:delete({bid_garbage}) + _bucket:insert({bid_garbage, ivconst.BUCKET.RECEIVING}) + _bucket:update({bid_garbage}, + {{'=', 2, ivconst.BUCKET.ACTIVE}}) + -- RECEIVING. + _bucket:update({bid_receiving}, + {{'=', 2, ivconst.BUCKET.ACTIVE}}) + -- SENT. + _bucket:update({bid_sent}, + {{'=', 2, ivconst.BUCKET.GARBAGE}}) + _bucket:delete({bid_sent}) + _bucket:insert({bid_sent, ivconst.BUCKET.RECEIVING}) + _bucket:update({bid_sent}, + {{'=', 2, ivconst.BUCKET.ACTIVE}}) + -- SENDING. + _bucket:update({bid_sending}, + {{'=', 2, ivconst.BUCKET.ACTIVE}}) + -- PINNED. + _bucket:update({bid_pinned}, + {{'=', 2, ivconst.BUCKET.ACTIVE}}) + + _G.bucket_recovery_continue() + _G.bucket_gc_continue() + end) +end diff --git a/test/storage/ref.result b/test/storage/ref.result index a81ab5d8..d53176bd 100644 --- a/test/storage/ref.result +++ b/test/storage/ref.result @@ -249,6 +249,11 @@ function make_ref(rid, timeout) end | --- | ... +function check_ref(rid) \ + return lref.check(rid, box.session.id()) \ +end + | --- + | ... function use_ref(rid) \ return lref.use(rid, box.session.id()) \ end @@ -289,6 +294,43 @@ _ = test_run:switch('storage_1_a') | --- | ... +-- Check works. +ok, err = c:call('check_ref', {1}) + | --- + | ... +assert(ok and not err) + | --- + | - true + | ... +_ = test_run:switch('storage_2_a') + | --- + | ... +assert(lref.count == 1) + | --- + | - true + | ... +_ = test_run:switch('storage_1_a') + | --- + | ... + +ok, err = c:call('check_ref', {2}) + | --- + | ... +assert(ok == nil and err.message) + | --- + | - 'Can not use a storage ref: no ref' + | ... +_ = test_run:switch('storage_2_a') + | --- + | ... +assert(lref.count == 1) + | --- + | - true + | ... +_ = test_run:switch('storage_1_a') + | --- + | ... + -- Use works. c:call('use_ref', {1}) | --- diff --git a/test/storage/ref.test.lua b/test/storage/ref.test.lua index 17264579..76c67a33 100644 --- a/test/storage/ref.test.lua +++ b/test/storage/ref.test.lua @@ -106,6 +106,9 @@ small_timeout = 0.001 function make_ref(rid, timeout) \ return lref.add(rid, box.session.id(), timeout) \ end +function check_ref(rid) \ + return lref.check(rid, box.session.id()) \ +end function use_ref(rid) \ return lref.use(rid, box.session.id()) \ end @@ -124,6 +127,19 @@ _ = test_run:switch('storage_2_a') assert(lref.count == 1) _ = test_run:switch('storage_1_a') +-- Check works. +ok, err = c:call('check_ref', {1}) +assert(ok and not err) +_ = test_run:switch('storage_2_a') +assert(lref.count == 1) +_ = test_run:switch('storage_1_a') + +ok, err = c:call('check_ref', {2}) +assert(ok == nil and err.message) +_ = test_run:switch('storage_2_a') +assert(lref.count == 1) +_ = test_run:switch('storage_1_a') + -- Use works. c:call('use_ref', {1}) _ = test_run:switch('storage_2_a') diff --git a/test/upgrade/upgrade.result b/test/upgrade/upgrade.result index 5f4f0582..e0c9469b 100644 --- a/test/upgrade/upgrade.result +++ b/test/upgrade/upgrade.result @@ -179,6 +179,8 @@ vshard.storage._call('test_api', 1, 2, 3) | - recovery_bucket_stat | - storage_map | - storage_ref + | - storage_ref_check_with_buckets + | - storage_ref_make_with_buckets | - storage_unref | - test_api | - 1 diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index abdffe67..4f93bdee 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -1225,6 +1225,7 @@ local replicaset_mt = { up_replica_priority = replicaset_up_replica_priority; wait_connected = replicaset_wait_connected, wait_connected_all = replicaset_wait_connected_all, + wait_master = replicaset_wait_master, call = replicaset_master_call; callrw = replicaset_master_call; callro = replicaset_template_multicallro(false, false); @@ -1513,15 +1514,31 @@ local function buildall(sharding_cfg) return new_replicasets end --- --- Wait for masters connection during RECONNECT_TIMEOUT seconds. --- -local function wait_masters_connect(replicasets) - for _, rs in pairs(replicasets) do - if rs.master then - rs.master.conn:wait_connected(consts.RECONNECT_TIMEOUT) +local function wait_masters_connect(replicasets, timeout) + local err, err_id + -- Start connecting to all masters in parallel. + local deadline = fiber_clock() + timeout + for _, replicaset in pairs(replicasets) do + local master + master, err = replicaset:wait_master(timeout) + if not master then + err_id = replicaset.id + goto fail + end + replicaset:connect_replica(master) + timeout = deadline - fiber_clock() + end + -- Wait until all connections are established. + for _, replicaset in pairs(replicasets) do + timeout, err = replicaset:wait_connected(timeout) + if not timeout then + err_id = replicaset.id + goto fail end end + do return timeout end + ::fail:: + return nil, err, err_id end return { diff --git a/vshard/router/init.lua b/vshard/router/init.lua index cbd978ef..6c2dc14a 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -223,6 +223,39 @@ local function bucket_resolve(router, bucket_id) return replicaset end +-- Group bucket ids by replicasets according to the router cache. +local function buckets_group(router, bucket_ids, timeout) + local deadline = fiber_clock() + timeout + local replicaset, err + local replicaset_buckets = {} + local buckets, id + local bucket_map = {} + for _, bucket_id in pairs(bucket_ids) do + if bucket_map[bucket_id] then + goto continue + end + -- Avoid duplicates. + bucket_map[bucket_id] = true + if fiber_clock() > deadline then + return nil, lerror.timeout() + end + replicaset, err = bucket_resolve(router, bucket_id) + if err ~= nil then + return nil, err + end + id = replicaset.id + buckets = replicaset_buckets[id] + if buckets then + table.insert(buckets, bucket_id) + else + replicaset_buckets[id] = {bucket_id} + end + ::continue:: + end + + return replicaset_buckets +end + -- -- Arrange downloaded buckets to the route map so as they -- reference a given replicaset. @@ -762,60 +795,15 @@ local function router_call(router, bucket_id, opts, ...) end -- --- Consistent Map-Reduce. The given function is called on all masters in the --- cluster with a guarantee that in case of success it was executed with all --- buckets being accessible for reads and writes. +-- Perform Ref stage of the Ref-Map-Reduce process on all the known replicasets. -- --- Consistency in scope of map-reduce means all the data was accessible, and --- didn't move during map requests execution. To preserve the consistency there --- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce. --- --- Refs are broadcast before Map stage to pin the buckets to their storages, and --- ensure they won't move until maps are done. --- --- Map requests are broadcast in case all refs are done successfully. They --- execute the user function + delete the refs to enable rebalancing again. --- --- On the storages there are additional means to ensure map-reduces don't block --- rebalancing forever and vice versa. --- --- The function is not as slow as it may seem - it uses netbox's feature --- is_async to send refs and maps in parallel. So cost of the function is about --- 2 network exchanges to the most far storage in terms of time. --- --- @param router Router instance to use. --- @param func Name of the function to call. --- @param args Function arguments passed in netbox style (as an array). --- @param opts Can only contain 'timeout' as a number of seconds. Note that the --- refs may end up being kept on the storages during this entire timeout if --- something goes wrong. For instance, network issues appear. This means --- better not use a value bigger than necessary. A stuck infinite ref can --- only be dropped by this router restart/reconnect or the storage restart. --- --- @return In case of success - a map with replicaset ID (UUID or name) keys and --- values being what the function returned from the replicaset. --- --- @return In case of an error - nil, error object, optional UUID or name of the --- replicaset where the error happened. UUID or name may be not present if --- it wasn't about concrete replicaset. For example, not all buckets were --- found even though all replicasets were scanned. --- -local function router_map_callrw(router, func, args, opts) +local function router_ref_storage_all(router, timeout) local replicasets = router.replicasets - local timeout - local do_return_raw - if opts then - timeout = opts.timeout or consts.CALL_TIMEOUT_MIN - do_return_raw = opts.return_raw - else - timeout = consts.CALL_TIMEOUT_MIN - end local deadline = fiber_clock() + timeout - local err, err_id, res, ok, map + local err, err_id, res local futures = {} local bucket_count = 0 - local opts_ref = {is_async = true} - local opts_map = {is_async = true, return_raw = do_return_raw} + local opts_async = {is_async = true} local rs_count = 0 local rid = M.ref_id M.ref_id = rid + 1 @@ -825,16 +813,16 @@ local function router_map_callrw(router, func, args, opts) -- -- Ref stage: send. -- + -- Netbox async requests work only with active connections. Need to wait + -- for the connection explicitly. + timeout, err, err_id = lreplicaset.wait_masters_connect( + replicasets, timeout) + if not timeout then + goto fail + end for id, rs in pairs(replicasets) do - -- Netbox async requests work only with active connections. Need to wait - -- for the connection explicitly. - timeout, err = rs:wait_connected(timeout) - if timeout == nil then - err_id = id - goto fail - end res, err = rs:callrw('vshard.storage._call', - {'storage_ref', rid, timeout}, opts_ref) + {'storage_ref', rid, timeout}, opts_async) if res == nil then err_id = id goto fail @@ -842,7 +830,6 @@ local function router_map_callrw(router, func, args, opts) futures[id] = res rs_count = rs_count + 1 end - map = table_new(0, rs_count) -- -- Ref stage: collect. -- @@ -873,36 +860,160 @@ local function router_map_callrw(router, func, args, opts) router.total_bucket_count - bucket_count) goto fail end + do return timeout, nil, nil, rid, futures, replicasets end + + ::fail:: + return nil, err, err_id, rid, futures +end + +-- +-- Perform Ref stage of the Ref-Map-Reduce process on a subset of all the +-- replicasets, which contains all the listed bucket IDs. +-- +local function router_ref_storage_by_buckets(router, bucket_ids, timeout) + local grouped_buckets + local group_count + local err, err_id, res + local replicasets_all = router.replicasets + local replicasets_to_map = {} + local futures = {} + local opts_async = {is_async = true} + local deadline = fiber_clock() + timeout + local rid = M.ref_id + M.ref_id = rid + 1 + + -- Nil checks are done explicitly here (== nil instead of 'not'), because + -- netbox requests return box.NULL instead of nils. + + -- Ref stage. + while next(bucket_ids) do + -- Group the buckets by replicasets according to the router cache. + grouped_buckets, err = buckets_group(router, bucket_ids, timeout) + if grouped_buckets == nil then + goto fail + end + timeout = deadline - fiber_clock() + + -- Netbox async requests work only with active connections. + -- So, first need to wait for the master connection explicitly. + local replicasets_to_check = {} + group_count = 0 + for uuid, _ in pairs(grouped_buckets) do + group_count = group_count + 1 + table.insert(replicasets_to_check, replicasets_all[uuid]) + end + timeout, err, err_id = lreplicaset.wait_masters_connect( + replicasets_to_check, timeout) + if not timeout then + goto fail + end + + -- Send ref requests with timeouts to the replicasets. + futures = table_new(0, group_count) + for id, buckets in pairs(grouped_buckets) do + if timeout == nil then + err_id = id + goto fail + end + local args_ref + if replicasets_to_map[id] then + -- Replicaset is already referenced on a previous iteration. + -- Simply get the moved buckets without double referencing. + args_ref = { + 'storage_ref_check_with_buckets', rid, buckets} + else + args_ref = { + 'storage_ref_make_with_buckets', rid, timeout, buckets} + end + res, err = replicasets_all[id]:callrw('vshard.storage._call', + args_ref, opts_async) + if res == nil then + err_id = id + goto fail + end + futures[id] = res + end + + -- Wait for the refs to be done and collect moved buckets. + bucket_ids = {} + for id, f in pairs(futures) do + res, err = future_wait(f, timeout) + -- Handle netbox error first. + if res == nil then + err_id = id + goto fail + end + -- Ref returns nil,err or {is_done, moved}. + res, err = res[1], res[2] + if res == nil then + err_id = id + goto fail + end + for _, bucket in pairs(res.moved) do + local bid = bucket.id + local dst = bucket.dst + -- 'Reset' regardless of 'set'. So as not to + -- bother with 'set' errors. If it fails, then + -- won't matter. It is a best-effort thing. + bucket_reset(router, bid) + if dst ~= nil then + bucket_set(router, bid, dst) + end + table.insert(bucket_ids, bid) + end + if res.is_done then + assert(not replicasets_to_map[id]) + -- If there are no buckets on the replicaset, it would not be + -- referenced. + replicasets_to_map[id] = replicasets_all[id] + end + timeout = deadline - fiber_clock() + end + end + do return timeout, nil, nil, rid, futures, replicasets_to_map end + + ::fail:: + return nil, err, err_id, rid, futures +end + +-- +-- Perform map-reduce stages on the given set of replicasets. The map expects a +-- valid ref ID which must be done beforehand. +-- +local function replicasets_map_reduce(replicasets, rid, func, args, opts) + assert(opts) + local timeout = opts.timeout or consts.CALL_TIMEOUT_MIN + local do_return_raw = opts.return_raw + local deadline = fiber_clock() + timeout + local opts_map = {is_async = true, return_raw = do_return_raw} + local futures = {} + local map = {} -- -- Map stage: send. -- args = {'storage_map', rid, func, args} - for id, rs in pairs(replicasets) do - res, err = rs:callrw('vshard.storage._call', args, opts_map) + for _, rs in pairs(replicasets) do + local res, err = rs:callrw('vshard.storage._call', args, opts_map) if res == nil then - err_id = id - goto fail + return nil, err, rs.id end - futures[id] = res + futures[rs.id] = res end -- - -- Ref stage: collect. + -- Map stage: collect. -- if do_return_raw then for id, f in pairs(futures) do - res, err = future_wait(f, timeout) + local res, err = future_wait(f, timeout) if res == nil then - err_id = id - goto fail + return nil, err, id end -- Map returns true,res or nil,err. res = res:iterator() local count = res:decode_array_header() - ok = res:decode() + local ok = res:decode() if ok == nil then - err = res:decode() - err_id = id - goto fail + return nil, res:decode(), id end if count > 1 then map[id] = res:take_array(count - 1) @@ -911,17 +1022,15 @@ local function router_map_callrw(router, func, args, opts) end else for id, f in pairs(futures) do - res, err = future_wait(f, timeout) + local res, err = future_wait(f, timeout) if res == nil then - err_id = id - goto fail + return nil, err, id end + local ok -- Map returns true,res or nil,err. ok, res = res[1], res[2] if ok == nil then - err = res - err_id = id - goto fail + return nil, res, id end if res ~= nil then -- Store as a table so in future it could be extended for @@ -931,21 +1040,109 @@ local function router_map_callrw(router, func, args, opts) timeout = deadline - fiber_clock() end end - do return map end + return map +end -::fail:: +-- +-- Cancel whatever pending requests are still waiting for a response and free +-- the given ref ID on all the affected storages. This is helpful when +-- map-reduce breaks in the middle. Makes sense to let the refs go to unblock +-- the rebalancer. +-- +local function replicasets_map_cancel_refs(replicasets, futures, rid) + local opts_async = {is_async = true} for id, f in pairs(futures) do f:discard() -- Best effort to remove the created refs before exiting. Can help if -- the timeout was big and the error happened early. f = replicasets[id]:callrw('vshard.storage._call', - {'storage_unref', rid}, opts_ref) + {'storage_unref', rid}, opts_async) if f ~= nil then -- Don't care waiting for a result - no time for this. But it won't -- affect the request sending if the connection is still alive. f:discard() end end +end + +-- +-- Consistent Map-Reduce. The given function is called on masters in the cluster +-- with a guarantee that in case of success it was executed with all buckets +-- being accessible for reads and writes. +-- +-- The selection of masters depends on bucket_ids option. When specified, the +-- Map-Reduce is performed only on masters having at least one of these buckets. +-- Otherwise it is executed on all the masters in the cluster. +-- +-- Consistency in scope of map-reduce means all the data was accessible, and +-- didn't move during map requests execution. To preserve the consistency there +-- is a third stage - Ref. So the algorithm is actually Ref-Map-Reduce. +-- +-- Refs are broadcast before Map stage to pin the buckets to their storages, and +-- ensure they won't move until maps are done. +-- +-- Map requests are broadcast in case all refs are done successfully. They +-- execute the user function + delete the refs to enable rebalancing again. +-- +-- On the storages there are additional means to ensure map-reduces don't block +-- rebalancing forever and vice versa. +-- +-- The function is not as slow as it may seem - it uses netbox's feature +-- is_async to send refs and maps in parallel. So cost of the function is about +-- 2 network exchanges to the most far storage in terms of time. +-- +-- @param router Router instance to use. +-- @param func Name of the function to call. +-- @param args Function arguments passed in netbox style (as an array). +-- @param opts Options. See below: +-- - timeout - a number of seconds. Note that the refs may end up being kept +-- on the storages during this entire timeout if something goes wrong. +-- For instance, network issues appear. This means better not use a +-- value bigger than necessary. A stuck infinite ref can only be dropped +-- by this router restart/reconnect or the storage restart. +-- - return_raw - true/false. When specified, the returned values are not +-- decoded into Lua native objects and stay packed as a msgpack object +-- (see 'msgpack' module). By default all is decoded. That might be +-- undesirable when the returned values are going to be forwarded back +-- into the network anyway. +-- - bucket_ids - an array of bucket IDs which have to be covered by +-- Map-Reduce. By default the whole cluster is covered. +-- +-- @return In case of success - a map with replicaset ID (UUID or name) keys and +-- values being what the function returned from the replicaset. +-- +-- @return In case of an error - nil, error object, optional UUID or name of the +-- replicaset where the error happened. UUID or name may be not present if +-- it wasn't about concrete replicaset. For example, not all buckets were +-- found even though all replicasets were scanned. +-- +local function router_map_callrw(router, func, args, opts) + local replicasets_to_map, err, err_id, map, futures, rid + local timeout, do_return_raw, bucket_ids + if opts then + timeout = opts.timeout or consts.CALL_TIMEOUT_MIN + do_return_raw = opts.return_raw + bucket_ids = opts.bucket_ids + else + timeout = consts.CALL_TIMEOUT_MIN + end + if bucket_ids then + timeout, err, err_id, rid, futures, replicasets_to_map = + router_ref_storage_by_buckets(router, bucket_ids, timeout) + else + timeout, err, err_id, rid, futures, replicasets_to_map = + router_ref_storage_all(router, timeout) + end + if timeout then + map, err, err_id = replicasets_map_reduce(replicasets_to_map, rid, func, + args, { + timeout = timeout, return_raw = do_return_raw + }) + if map then + return map + end + end + replicasets_map_cancel_refs(router.replicasets, futures, rid) err = lerror.make(err) return nil, err, err_id end @@ -1349,7 +1546,7 @@ local function router_cfg(router, cfg, is_reload) for _, replicaset in pairs(new_replicasets) do replicaset:connect_all() end - lreplicaset.wait_masters_connect(new_replicasets) + lreplicaset.wait_masters_connect(new_replicasets, consts.RECONNECT_TIMEOUT) lreplicaset.outdate_replicasets(router.replicasets, vshard_cfg.connection_outdate_delay) router.connection_outdate_delay = vshard_cfg.connection_outdate_delay @@ -1809,6 +2006,7 @@ local router_mt = { discovery_set = router_make_api(discovery_set), _route_map_clear = router_make_api(route_map_clear), _bucket_reset = router_make_api(bucket_reset), + _buckets_group = router_make_api(buckets_group), disable = router_disable, enable = router_enable, } diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index f6809a0d..1c174663 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3180,6 +3180,33 @@ local function storage_ref(rid, timeout) return bucket_count() end +-- +-- Lookup buckets which are definitely not going to recover into ACTIVE state +-- under any circumstances. +-- +local function bucket_get_moved(bucket_ids) + local allstatus = consts.BUCKET + local res = {} + for _, bucket_id in pairs(bucket_ids) do + local bucket = box.space._bucket:get{bucket_id} + local is_moved + if not bucket then + is_moved = true + else + local status = bucket.status + is_moved = status == allstatus.GARBAGE or status == allstatus.SENT + end + if is_moved then + table.insert(res, { + id = bucket_id, + dst = bucket and bucket.destination or M.route_map[bucket_id], + status = bucket and bucket.status, + }) + end + end + return res +end + -- -- Drop a storage ref from the current box session. Is used as a part of -- Map-Reduce API. @@ -3188,6 +3215,61 @@ local function storage_unref(rid) return lref.del(rid, box.session.id()) end +-- +-- Bind a new storage ref to the current box session and check +-- that all the buckets are present. Is used as a part of the +-- partial Map-Reduce API. +-- +-- @param rid Unique reference ID. +-- @param timeout Seconds. +-- @param bucket_ids List of bucket identifiers. +-- @return A dictionary with reference id and a list of bucket identifiers +-- which are not present on the current instance. If all the buckets +-- are absent, the reference is not created and a nil reference id +-- with the list of absent buckets is returned. +-- +local function storage_ref_make_with_buckets(rid, timeout, bucket_ids) + local moved = bucket_get_moved(bucket_ids) + if #moved == #bucket_ids then + -- If all the passed buckets are absent, there is no need to create a + -- ref. + return {moved = moved} + end + local bucket_generation = M.bucket_generation + local ok, err = storage_ref(rid, timeout) + if not ok then + return nil, err + end + if M.bucket_generation ~= bucket_generation then + -- Need to redo it. Otherwise there is a risk that some buckets were + -- moved while waiting for the ref. + moved = bucket_get_moved(bucket_ids) + if #moved == #bucket_ids then + storage_unref(rid) + return {moved = moved} + end + end + return {is_done = true, moved = moved} +end + +-- +-- Check which buckets from the given list are moved out of this storage, while +-- also making sure that the storage-ref is still in place. +-- +-- Partial Map-Reduce makes a ref on the storages having any of the needed +-- buckets, but then can come back if other storages report the already visited +-- ones as having the needed buckets. Only makes sense to check, if this storage +-- still holds the ref. Otherwise its previous guarantees given during the ref +-- creation are all gone. +-- +local function storage_ref_check_with_buckets(rid, bucket_ids) + local ok, err = lref.check(rid, box.session.id()) + if not ok then + return nil, err + end + return {moved = bucket_get_moved(bucket_ids)} +end + -- -- Execute a user's function under an infinite storage ref protecting from -- bucket moves. The ref should exist before, and is deleted after, regardless @@ -3235,6 +3317,8 @@ service_call_api = setmetatable({ rebalancer_request_state = rebalancer_request_state, recovery_bucket_stat = recovery_bucket_stat, storage_ref = storage_ref, + storage_ref_make_with_buckets = storage_ref_make_with_buckets, + storage_ref_check_with_buckets = storage_ref_check_with_buckets, storage_unref = storage_unref, storage_map = storage_map, info = storage_service_info, @@ -4116,6 +4200,7 @@ M.bucket_state_edges = bucket_state_edges M.bucket_are_all_rw = bucket_are_all_rw_public M.bucket_generation_wait = bucket_generation_wait +M.bucket_get_moved = bucket_get_moved lregistry.storage = M -- diff --git a/vshard/storage/ref.lua b/vshard/storage/ref.lua index df7af092..0b517819 100644 --- a/vshard/storage/ref.lua +++ b/vshard/storage/ref.lua @@ -247,6 +247,14 @@ local function ref_session_new(sid) return true end + local function ref_session_check(self, rid) + local ref = ref_map[rid] + if not ref then + return nil, lerror.vshard(lerror.code.STORAGE_REF_USE, 'no ref') + end + return true + end + -- -- Ref use means it can't be expired until deleted explicitly. Should be -- done when the request affecting the whole storage starts. After use it is @@ -301,6 +309,7 @@ local function ref_session_new(sid) del = ref_session_del, gc = ref_session_gc, add = ref_session_add, + check = ref_session_check, use = ref_session_use, kill = ref_session_kill, } @@ -364,6 +373,14 @@ local function ref_add(rid, sid, timeout) return nil, err end +local function ref_check(rid, sid) + local session = M.session_map[sid] + if not session then + return nil, lerror.vshard(lerror.code.STORAGE_REF_USE, 'no session') + end + return session:check(rid) +end + local function ref_use(rid, sid) local session = M.session_map[sid] if not session then @@ -407,6 +424,7 @@ end M.del = ref_del M.gc = ref_gc M.add = ref_add +M.check = ref_check M.use = ref_use M.cfg = ref_cfg M.kill = ref_kill_session