Skip to content

Commit

Permalink
Implement #watch and #multi specially for cluster-client
Browse files Browse the repository at this point in the history
This PR makes watch & multi work more or less the same way for
clustering as they do for normal redis.

Since it's supposed to be valid to perform your multi call on the
original redis object, like this:

```
redis.watch('key') do
  redis.multi do |tx|
    # tx is performed on the same connection as the watch
  end
end
```

we need to keeps some state in an ivar @active_watcher so we know to
call MULTI on the same actual connection as WATCH (and appropriately
fail if the keys got redirected or the node went down). This is
technically threadsafe, because the watch/multi implementation is
wrapped in the `synchronize` monitor; however, for good performance in
multithreaded environments, you will most likely want to use a
connection pool of Redis::Cluster instances.
  • Loading branch information
KJTsanaktsidis committed Feb 21, 2024
1 parent 7cc45e5 commit e2e2872
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 4 deletions.
56 changes: 56 additions & 0 deletions cluster/lib/redis/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,62 @@ def cluster(subcommand, *args)
send_command([:cluster, subcommand] + args, &block)
end

# Transactions need different implementations in cluster mode, using purpose-build
# primitive available in redis-cluster-client. These methods (watch and multii
# implement the same interface as the methods in ::Redis::Commands::Transactions.

def watch(*keys)
synchronize do |client|
# client is a ::Redis::Cluster::Client instance, which is a subclass of
# ::RedisClient::Cluster

if @active_watcher
# We're already within a #watch block, just add keys to the existing watch
@active_watcher.watch(keys)
else
unless block_given?
raise ArgumentError, "#{self.class.name} requires that the initial #watch call of a transaction " \
"passes a block"
end

client.watch(keys) do |watcher|
@active_watcher = watcher
yield self
ensure
@active_watcher = nil
end

end
end
end

def multi
synchronize do |client|
if @active_watcher
# If we're inside a #watch block, use that to execute the transaction
@active_watcher.multi do |tx|
yield MultiConnection.new(tx)
end
else
# Make a new transaction from whole cloth.
client.multi do |tx|
yield MultiConnection.new(tx)
end
end
end
end

def unwatch
synchronize do
if @active_watcher
@active_watcher.unwatch
else
# This will raise an AmbiguiousNodeError
super
end
end
end

private

def initialize_client(options)
Expand Down
4 changes: 4 additions & 0 deletions cluster/lib/redis/cluster/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ def multi(watch: nil, &block)
handle_errors { super(watch: watch, &block) }
end

def watch(keys, &block)
handle_errors { super(keys, &block) }
end

private

def handle_errors
Expand Down
69 changes: 69 additions & 0 deletions cluster/test/client_transactions_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,73 @@ def test_cluster_client_does_not_support_transaction_by_multiple_keys
assert_nil(redis.get("key#{i}"))
end
end

def test_cluster_client_does_support_transaction_with_optimistic_locking
redis.mset('{key}1', '1', '{key}2', '2')

another = Fiber.new do
cli = build_another_client
cli.mset('{key}1', '3', '{key}2', '4')
cli.close
end

redis.watch('{key}1', '{key}2') do
another.resume
v1 = redis.get('{key}1')
v2 = redis.get('{key}2')
redis.multi do |tx|
tx.set('{key}1', v2)
tx.set('{key}2', v1)
end
end

assert_equal %w[3 4], redis.mget('{key}1', '{key}2')
end

def test_cluster_client_can_unwatch_transaction
redis.set('key1', 'initial_value')

another = Fiber.new do
cli = build_another_client
cli.set('key1', 'another_value')
end

redis.watch('key1') do
another.resume
redis.unwatch
end
# After calling unwatch, the same connection can be used to open a transaction which
# isn't conditional and so will commit
got = redis.multi do |tx|
tx.set('key1', 'final_value')
end

assert_equal ['OK'], got
assert_equal 'final_value', redis.get('key1')
end

def test_cluster_client_unwatches_on_exception
redis.set('key1', 'initial_value')

another = Fiber.new do
cli = build_another_client
cli.set('key1', 'another_value')
end

assert_raises(RuntimeError) do
redis.watch('key1') do
another.resume
raise 'bang'
end
end
# After catching the exception, the same connection can be used to open a transaction which
# isn't conditional and so will commit
# n.b. the actual behaviour which ensures this is actually in redis-cluster-client
got = redis.multi do |tx|
tx.set('key1', 'final_value')
end

assert_equal ['OK'], got
assert_equal 'final_value', redis.get('key1')
end
end
6 changes: 2 additions & 4 deletions cluster/test/commands_on_transactions_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ def test_unwatch
end

def test_watch
assert_raises(Redis::CommandError, "CROSSSLOT Keys in request don't hash to the same slot") do
redis.watch('key1', 'key2')
assert_raises(Redis::Cluster::TransactionConsistencyError) do
redis.watch('key1', 'key2') {}
end

assert_equal 'OK', redis.watch('{key}1', '{key}2')
end
end

0 comments on commit e2e2872

Please sign in to comment.