Skip to content

Commit

Permalink
Merge pull request #1034 from fatkodima/lmove-blmove
Browse files Browse the repository at this point in the history
[Redis 6.2] Add LMOVE/BLMOVE commands
  • Loading branch information
byroot authored Oct 8, 2021
2 parents 8328eff + f375fa8 commit eaf446c
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 0 deletions.
68 changes: 68 additions & 0 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,59 @@ def llen(key)
end
end

# Remove the first/last element in a list, append/prepend it to another list and return it.
#
# @param [String] source source key
# @param [String] destination destination key
# @param [String, Symbol] where_source from where to remove the element from the source list
# e.g. 'LEFT' - from head, 'RIGHT' - from tail
# @param [String, Symbol] where_destination where to push the element to the source list
# e.g. 'LEFT' - to head, 'RIGHT' - to tail
#
# @return [nil, String] the element, or nil when the source key does not exist
#
# @note This command comes in place of the now deprecated RPOPLPUSH.
# Doing LMOVE RIGHT LEFT is equivalent.
def lmove(source, destination, where_source, where_destination)
where_source, where_destination = _normalize_move_wheres(where_source, where_destination)

synchronize do |client|
client.call([:lmove, source, destination, where_source, where_destination])
end
end

# Remove the first/last element in a list and append/prepend it
# to another list and return it, or block until one is available.
#
# @example With timeout
# element = redis.blmove("foo", "bar", "LEFT", "RIGHT", timeout: 5)
# # => nil on timeout
# # => "element" on success
# @example Without timeout
# element = redis.blmove("foo", "bar", "LEFT", "RIGHT")
# # => "element"
#
# @param [String] source source key
# @param [String] destination destination key
# @param [String, Symbol] where_source from where to remove the element from the source list
# e.g. 'LEFT' - from head, 'RIGHT' - from tail
# @param [String, Symbol] where_destination where to push the element to the source list
# e.g. 'LEFT' - to head, 'RIGHT' - to tail
# @param [Hash] options
# - `:timeout => Numeric`: timeout in seconds, defaults to no timeout
#
# @return [nil, String] the element, or nil when the source key does not exist or the timeout expired
#
def blmove(source, destination, where_source, where_destination, timeout: 0)
where_source, where_destination = _normalize_move_wheres(where_source, where_destination)

synchronize do |client|
command = [:blmove, source, destination, where_source, where_destination, timeout]
timeout += client.timeout if timeout > 0
client.call_with_timeout(command, timeout)
end
end

# Prepend one or more values to a list, creating the list if it doesn't exist
#
# @param [String] key
Expand Down Expand Up @@ -3716,6 +3769,21 @@ def _xread(args, keys, ids, blocking_timeout_msec)
end
end
end

def _normalize_move_wheres(where_source, where_destination)
where_source = where_source.to_s.upcase
where_destination = where_destination.to_s.upcase

if where_source != "LEFT" && where_source != "RIGHT"
raise ArgumentError, "where_source must be 'LEFT' or 'RIGHT'"
end

if where_destination != "LEFT" && where_destination != "RIGHT"
raise ArgumentError, "where_destination must be 'LEFT' or 'RIGHT'"
end

[where_source, where_destination]
end
end

require_relative "redis/version"
Expand Down
15 changes: 15 additions & 0 deletions lib/redis/distributed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,21 @@ def llen(key)
node_for(key).llen(key)
end

# Remove the first/last element in a list, append/prepend it to another list and return it.
def lmove(source, destination, where_source, where_destination)
ensure_same_node(:lmove, [source, destination]) do |node|
node.lmove(source, destination, where_source, where_destination)
end
end

# Remove the first/last element in a list and append/prepend it
# to another list and return it, or block until one is available.
def blmove(source, destination, where_source, where_destination, timeout: 0)
ensure_same_node(:lmove, [source, destination]) do |node|
node.blmove(source, destination, where_source, where_destination, timeout: timeout)
end
end

# Prepend one or more values to a list.
def lpush(key, value)
node_for(key).lpush(key, value)
Expand Down
8 changes: 8 additions & 0 deletions test/blocking_commands_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def assert_takes_longer_than_client_timeout
end
end

def test_blmove_disable_client_timeout
target_version "6.2" do
assert_takes_longer_than_client_timeout do |r|
assert_equal '0', r.blmove('foo', 'bar', 'LEFT', 'RIGHT')
end
end
end

def test_blpop_disable_client_timeout
assert_takes_longer_than_client_timeout do |r|
assert_equal %w[foo 0], r.blpop('foo')
Expand Down
6 changes: 6 additions & 0 deletions test/cluster_commands_on_lists_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ class TestClusterCommandsOnLists < Minitest::Test
include Helper::Cluster
include Lint::Lists

def test_lmove
target_version "6.2" do
assert_raises(Redis::CommandError) { super }
end
end

def test_rpoplpush
assert_raises(Redis::CommandError) { super }
end
Expand Down
8 changes: 8 additions & 0 deletions test/distributed_blocking_commands_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ class TestDistributedBlockingCommands < Minitest::Test
include Helper::Distributed
include Lint::BlockingCommands

def test_blmove_raises
target_version "6.2" do
assert_raises(Redis::Distributed::CannotDistribute) do
r.blmove('foo', 'bar', 'LEFT', 'RIGHT')
end
end
end

def test_blpop_raises
assert_raises(Redis::Distributed::CannotDistribute) do
r.blpop(%w[foo bar])
Expand Down
8 changes: 8 additions & 0 deletions test/distributed_commands_on_lists_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ class TestDistributedCommandsOnLists < Minitest::Test
include Helper::Distributed
include Lint::Lists

def test_lmove
target_version "6.2" do
assert_raises Redis::Distributed::CannotDistribute do
r.lmove('foo', 'bar', 'LEFT', 'RIGHT')
end
end
end

def test_rpoplpush
assert_raises Redis::Distributed::CannotDistribute do
r.rpoplpush('foo', 'bar')
Expand Down
13 changes: 13 additions & 0 deletions test/distributed_commands_requiring_clustering_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ def test_renamenx
assert_equal "s2", r.get("{qux}bar")
end

def test_lmove
target_version "6.2" do
r.rpush("{qux}foo", "s1")
r.rpush("{qux}foo", "s2")
r.rpush("{qux}bar", "s3")
r.rpush("{qux}bar", "s4")

assert_equal "s1", r.lmove("{qux}foo", "{qux}bar", "LEFT", "RIGHT")
assert_equal ["s2"], r.lrange("{qux}foo", 0, -1)
assert_equal ["s3", "s4", "s1"], r.lrange("{qux}bar", 0, -1)
end
end

def test_brpoplpush
r.rpush "{qux}foo", "s1"
r.rpush "{qux}foo", "s2"
Expand Down
31 changes: 31 additions & 0 deletions test/lint/blocking_commands.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def mock(options = {}, &blk)

def build_mock_commands(options = {})
{
blmove: lambda do |*args|
sleep options[:delay] if options.key?(:delay)
to_protocol(args.last)
end,
blpop: lambda do |*args|
sleep options[:delay] if options.key?(:delay)
to_protocol([args.first, args.last])
Expand All @@ -55,6 +59,23 @@ def build_mock_commands(options = {})
}
end

def test_blmove
target_version "6.2" do
assert_equal 's1', r.blmove('{zap}foo', '{zap}bar', 'LEFT', 'RIGHT')
assert_equal ['s2'], r.lrange('{zap}foo', 0, -1)
assert_equal ['s1', 's2', 's1'], r.lrange('{zap}bar', 0, -1)
end
end

def test_blmove_timeout
target_version "6.2" do
mock do |r|
assert_equal '0', r.blmove('{zap}foo', '{zap}bar', 'LEFT', 'RIGHT')
assert_equal LOW_TIMEOUT.to_s, r.blmove('{zap}foo', '{zap}bar', 'LEFT', 'RIGHT', timeout: LOW_TIMEOUT)
end
end
end

def test_blpop
assert_equal ['{zap}foo', 's1'], r.blpop('{zap}foo')
assert_equal ['{zap}foo', 's2'], r.blpop(['{zap}foo'])
Expand Down Expand Up @@ -166,6 +187,16 @@ def test_bzpopmax
end

driver(:ruby, :hiredis) do
def test_blmove_socket_timeout
target_version "6.2" do
mock(delay: LOW_TIMEOUT * 5) do |r|
assert_raises(Redis::TimeoutError) do
r.blmove('{zap}foo', '{zap}bar', 'LEFT', 'RIGHT', timeout: LOW_TIMEOUT)
end
end
end
end

def test_blpop_socket_timeout
mock(delay: LOW_TIMEOUT * 5) do |r|
assert_raises(Redis::TimeoutError) do
Expand Down
27 changes: 27 additions & 0 deletions test/lint/lists.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,33 @@

module Lint
module Lists
def test_lmove
target_version "6.2" do
r.lpush("foo", "s1")
r.lpush("foo", "s2") # foo = [s2, s1]
r.lpush("bar", "s3")
r.lpush("bar", "s4") # bar = [s4, s3]

assert_nil r.lmove("nonexistent", "foo", "LEFT", "LEFT")

assert_equal "s2", r.lmove("foo", "foo", "LEFT", "RIGHT") # foo = [s1, s2]
assert_equal "s1", r.lmove("foo", "foo", "LEFT", "LEFT") # foo = [s1, s2]

assert_equal "s1", r.lmove("foo", "bar", "LEFT", "RIGHT") # foo = [s2], bar = [s4, s3, s1]
assert_equal ["s2"], r.lrange("foo", 0, -1)
assert_equal ["s4", "s3", "s1"], r.lrange("bar", 0, -1)

assert_equal "s2", r.lmove("foo", "bar", "LEFT", "LEFT") # foo = [], bar = [s2, s4, s3, s1]
assert_nil r.lmove("foo", "bar", "LEFT", "LEFT") # foo = [], bar = [s2, s4, s3, s1]
assert_equal ["s2", "s4", "s3", "s1"], r.lrange("bar", 0, -1)

error = assert_raises(ArgumentError) do
r.lmove("foo", "bar", "LEFT", "MIDDLE")
end
assert_equal "where_destination must be 'LEFT' or 'RIGHT'", error.message
end
end

def test_lpush
r.lpush "foo", "s1"
r.lpush "foo", "s2"
Expand Down

0 comments on commit eaf446c

Please sign in to comment.