Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: implement partial map-reduce #442

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions test/instances/storage.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ _G.ivconst = require('vshard.consts')
_G.ivutil = require('vshard.util')
_G.iverror = require('vshard.error')
_G.ivtest = require('test.luatest_helpers.vtest')
_G.itable_new = require('table.new')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need it, table.new() is already available in the global namespace.


_G.iwait_timeout = _G.ivtest.wait_timeout

Expand Down Expand Up @@ -69,6 +70,20 @@ local function get_first_bucket()
return res ~= nil and res.id or nil
end

local function get_n_buckets(n)
if n <= 0 then
error('Invalid number of buckets')
end
local ids = _G.itable_new(0, n)
for _, tuple in box.space._bucket.index.status:pairs(vconst.BUCKET.ACTIVE) do
table.insert(ids, tuple.id)
if #ids == n then
return ids
end
end
error('Not enough buckets')
end

local function session_set(key, value)
box.session.storage[key] = value
return true
Expand Down Expand Up @@ -167,6 +182,7 @@ _G.box_error = box_error
_G.echo = echo
_G.get_uuid = get_uuid
_G.get_first_bucket = get_first_bucket
_G.get_n_buckets = get_n_buckets
_G.session_set = session_set
_G.session_get = session_get
_G.bucket_gc_wait = bucket_gc_wait
Expand Down
111 changes: 111 additions & 0 deletions test/storage-luatest/storage_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,114 @@ test_group.test_named_hot_reload = function(g)
_G.vshard.storage = storage
end)
end

test_group.test_ref_with_lookup = function(g)
g.replica_1_a:exec(function()
local res, err
local timeout = 0.1
local rid = 42
local bids = _G.get_n_buckets(2)
local bid_extra = 3001

-- Check for a single bucket.
res, err = ivshard.storage._call(
'storage_ref_with_lookup',
rid,
timeout,
{bids[1]}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {rid = rid, moved = {}})
res, err = ivshard.storage._call('storage_unref', rid)
ilt.assert_equals(err, nil)

-- Check for multiple buckets.
res, err = ivshard.storage._call(
'storage_ref_with_lookup',
rid,
timeout,
{bids[1], bids[2]}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {rid = rid, moved = {}})
res, err = ivshard.storage._call('storage_unref', rid)
ilt.assert_equals(err, nil)

-- Check for double referencing.
res, err = ivshard.storage._call(
'storage_ref_with_lookup',
rid,
timeout,
{bids[1], bids[1]}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {rid = rid, moved = {}})
res, err = ivshard.storage._call('storage_unref', rid)
ilt.assert_equals(err, nil)
res, err = ivshard.storage._call('storage_unref', rid)
t.assert_str_contains(err.message, 'Can not delete a storage ref: no ref')

-- Check for an absent bucket.
res, err = ivshard.storage._call(
'storage_ref_with_lookup',
rid,
timeout,
{bids[1], bids[2], bid_extra}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {rid = rid, moved = {bid_extra}})
ivshard.storage._call('storage_unref', rid)

-- Check that we do not create a reference if there are no buckets.
res, err = ivshard.storage._call(
'storage_ref_with_lookup',
rid,
timeout,
{bid_extra}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {rid = nil, moved = {bid_extra}})
res, err = vshard.storage._call('storage_unref', rid)
t.assert_str_contains(err.message, 'Can not delete a storage ref: no ref')
ilt.assert_equals(res, nil)

-- Check for a timeout.
-- Emulate a case when all buckets are not writable.
local func = ivshard.storage.internal.bucket_are_all_rw
ivshard.storage.internal.bucket_are_all_rw = function() return false end
res, err = ivshard.storage._call(
'storage_ref_with_lookup',
rid,
timeout,
{bids[1]}
)
ivshard.storage.internal.bucket_are_all_rw = func
t.assert_str_contains(err.message, 'Timeout exceeded')
ilt.assert_equals(res, nil)
-- Check that the reference was not created.
res, err = ivshard.storage._call('storage_unref', rid)
t.assert_str_contains(err.message, 'Can not delete a storage ref: no ref')
ilt.assert_equals(res, nil)
end)
end

test_group.test_absent_buckets = function(g)
g.replica_1_a:exec(function()
local res, err = ivshard.storage._call(
'storage_absent_buckets',
{_G.get_first_bucket()}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {})
end)

g.replica_1_a:exec(function()
local bid_extra = 3001
local res, err = ivshard.storage._call(
'storage_absent_buckets',
{_G.get_first_bucket(), bid_extra}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {bid_extra})
end)
end
67 changes: 67 additions & 0 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3142,6 +3142,71 @@ local function storage_ref(rid, timeout)
return bucket_count()
end

--
-- Lookup for absent active buckets.
--
-- @param bucket_ids List of bucket identifiers.
-- @return A dictionary with a list of bucket identifiers which are
-- not present on the current instance.
--
local function storage_absent_buckets(bucket_ids)
local bucket = nil
local status = consts.BUCKET
local moved_buckets = {}
for _, bucket_id in pairs(bucket_ids) do
bucket = box.space._bucket:get{bucket_id}
if not bucket or bucket.status ~= status.ACTIVE then
table.insert(moved_buckets, bucket_id)
end
end
return { moved = moved_buckets }
end

--
-- Bind a new storage ref to the current box session and check
-- that all the buckets are present. Is used as a part of the
-- partial Map-Reduce API.
--
-- @param rid Unique reference ID.
-- @param timeout Timeout in seconds.
-- @param bucket_ids List of bucket identifiers.
-- @return A dictionary with reference id and a list of bucket identifiers
-- which are not present on the current instance. If all the buckets
-- are absent, the reference is not created and a nil reference id
-- with the list of absent buckets is returned.
--
local function storage_ref_with_lookup(rid, timeout, bucket_ids)
local moved = storage_absent_buckets(bucket_ids).moved
if #moved == #bucket_ids then
-- Take an advantage that moved buckets are returned in the same
-- order as in the input list.
local do_match = true
local next_moved = next(moved)
local next_passed = next(bucket_ids)
::continue::
if next_moved then
if next_moved == next_passed then
next_moved = next(moved, next_moved)
next_passed = next(bucket_ids, next_passed)
goto continue
else
do_match = false
end
end
if do_match then
-- If all the passed buckets are absent, there is no need
-- to create a ref.
return {rid = nil, moved = moved}
end
end

local ok, err = storage_ref(rid, timeout)
if not ok then
return nil, err
end
return {rid = rid, moved = moved}
end

--
-- Drop a storage ref from the current box session. Is used as a part of
-- Map-Reduce API.
Expand Down Expand Up @@ -3196,7 +3261,9 @@ service_call_api = setmetatable({
rebalancer_apply_routes = rebalancer_apply_routes,
rebalancer_request_state = rebalancer_request_state,
recovery_bucket_stat = recovery_bucket_stat,
storage_absent_buckets = storage_absent_buckets,
Gerold103 marked this conversation as resolved.
Show resolved Hide resolved
storage_ref = storage_ref,
storage_ref_with_lookup = storage_ref_with_lookup,
storage_unref = storage_unref,
storage_map = storage_map,
info = storage_service_info,
Expand Down