Skip to content

Commit

Permalink
Merge pull request #811 from redis/stream-improvements
Browse files Browse the repository at this point in the history
Stream feature polish
  • Loading branch information
byroot authored Dec 13, 2018
2 parents dde1af7 + 60079e4 commit 7ff90f1
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 132 deletions.
16 changes: 6 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ before_install:
script: make

rvm:
- 2.2.2
- 2.3.3
- 2.4.1
- 2.5.0
- 2.3.8
- 2.4.5
- 2.5.3
- jruby-9.1.17.0

gemfile: ".travis/Gemfile"
Expand All @@ -30,11 +29,10 @@ env:
matrix:
- DRIVER=ruby REDIS_BRANCH=3.0
- DRIVER=ruby REDIS_BRANCH=3.2
- DRIVER=hiredis REDIS_BRANCH=3.0
- DRIVER=hiredis REDIS_BRANCH=3.2
- DRIVER=synchrony REDIS_BRANCH=3.0
- DRIVER=synchrony REDIS_BRANCH=3.2
- DRIVER=ruby REDIS_BRANCH=4.0
- DRIVER=ruby REDIS_BRANCH=5.0
- DRIVER=hiredis REDIS_BRANCH=5.0
- DRIVER=synchrony REDIS_BRANCH=5.0

branches:
only:
Expand All @@ -52,8 +50,6 @@ matrix:
env: DRIVER=ruby REDIS_BRANCH=3.2 LOW_TIMEOUT=0.3
- rvm: jruby-9.1.17.0
env: DRIVER=ruby REDIS_BRANCH=4.0 LOW_TIMEOUT=0.3
- rvm: 2.5.3
env: DRIVER=ruby REDIS_BRANCH=5.0

notifications:
irc:
Expand Down
7 changes: 7 additions & 0 deletions bin/console
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env ruby

$LOAD_PATH.unshift(File.expand_path('../../lib', __FILE__))
require 'redis'

require 'irb'
IRB.start
97 changes: 61 additions & 36 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2990,13 +2990,31 @@ def xdel(key, *ids)
# redis.xrange('mystream', count: 10)
#
# @param key [String] the stream key
# @param first [String] first entry id of range, default value is `-`
# @param last [String] last entry id of range, default value is `+`
# @param start [String] first entry id of range, default value is `+`
# @param end [String] last entry id of range, default value is `-`
# @param count [Integer] the number of entries as limit
#
# @return [Hash{String => Hash}] the entries
def xrange(key, first: '-', last: '+', count: nil)
args = [:xrange, key, first, last]

# Fetches entries of the stream in ascending order.
#
# @example Without options
# redis.xrange('mystream')
# @example With a specific start
# redis.xrange('mystream', '0-1')
# @example With a specific start and end
# redis.xrange('mystream', '0-1', '0-3')
# @example With count options
# redis.xrange('mystream', count: 10)
#
# @param key [String] the stream key
# @param start [String] first entry id of range, default value is `-`
# @param end [String] last entry id of range, default value is `+`
# @param count [Integer] the number of entries as limit
#
# @return [Array<Array<String, Hash>>] the ids and entries pairs
def xrange(key, start = '-', _end = '+', count: nil)
args = [:xrange, key, start, _end]
args.concat(['COUNT', count]) if count
synchronize { |client| client.call(args, &HashifyStreamEntries) }
end
Expand All @@ -3005,21 +3023,21 @@ def xrange(key, first: '-', last: '+', count: nil)
#
# @example Without options
# redis.xrevrange('mystream')
# @example With first entry id option
# redis.xrevrange('mystream', first: '0-1')
# @example With first and last entry id options
# redis.xrevrange('mystream', first: '0-1', last: '0-3')
# @example With a specific end
# redis.xrevrange('mystream', '0-3')
# @example With a specific end and start
# redis.xrevrange('mystream', '0-3', '0-1')
# @example With count options
# redis.xrevrange('mystream', count: 10)
#
# @param key [String] the stream key
# @param first [String] first entry id of range, default value is `-`
# @param last [String] last entry id of range, default value is `+`
# @param count [Integer] the number of entries as limit
# @param key [String] the stream key
# @param end [String] first entry id of range, default value is `+`
# @param start [String] last entry id of range, default value is `-`
# @params count [Integer] the number of entries as limit
#
# @return [Hash{String => Hash}] the entries
def xrevrange(key, first: '-', last: '+', count: nil)
args = [:xrevrange, key, last, first]
# @return [Array<Array<String, Hash>>] the ids and entries pairs
def xrevrange(key, _end = '+', start = '-', count: nil)
args = [:xrevrange, key, _end, start]
args.concat(['COUNT', count]) if count
synchronize { |client| client.call(args, &HashifyStreamEntries) }
end
Expand Down Expand Up @@ -3055,8 +3073,8 @@ def xlen(key)
# @return [Hash{String => Hash{String => Hash}}] the entries
def xread(keys, ids, count: nil, block: nil)
args = [:xread]
args.concat(['COUNT', count]) if count
args.concat(['BLOCK', block.to_i]) if block
args << 'COUNT' << count if count
args << 'BLOCK' << block.to_i if block
_xread(args, keys, ids, block)
end

Expand Down Expand Up @@ -3113,9 +3131,9 @@ def xgroup(subcommand, key, group, id_or_consumer = nil, mkstream: false)
# @return [Hash{String => Hash{String => Hash}}] the entries
def xreadgroup(group, consumer, keys, ids, opts = {})
args = [:xreadgroup, 'GROUP', group, consumer]
args.concat(['COUNT', opts[:count]]) if opts[:count]
args.concat(['BLOCK', opts[:block].to_i]) if opts[:block]
args << 'NOACK' if opts[:noack]
args << 'COUNT' << opts[:count] if opts[:count]
args << 'BLOCK' << opts[:block].to_i if opts[:block]
args << 'NOACK' if opts[:noack]
_xread(args, keys, ids, opts[:block])
end

Expand Down Expand Up @@ -3186,26 +3204,31 @@ def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
# @example With key and group
# redis.xpending('mystream', 'mygroup')
# @example With range options
# redis.xpending('mystream', 'mygroup', first: '-', last: '+', count: 10)
# redis.xpending('mystream', 'mygroup', '-', '+', 10)
# @example With range and consumer options
# redis.xpending('mystream', 'mygroup', 'consumer1', first: '-', last: '+', count: 10)
# redis.xpending('mystream', 'mygroup', '-', '+', 10, 'consumer1')
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param opts [Hash] several options for `XPENDING` command
#
# @option opts [String] :first first entry id of range
# @option opts [String] :last last entry id of range
# @option opts [Integer] :count the number of entries as limit
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param start [String] start first entry id of range
# @param end [String] end last entry id of range
# @param count [Integer] count the number of entries as limit
# @param consumer [String] the consumer name
#
# @return [Hash] the summary of pending entries
# @return [Array<Hash>] the pending entries details if options were specified
def xpending(key, group, consumer = nil, **opts)
args = [:xpending, key, group, opts[:first], opts[:last], opts[:count], consumer].compact
summary_needed = consumer.nil? && opts.empty?
def xpending(key, group, *args)
command_args = [:xpending, key, group]
case args.size
when 0, 3, 4
command_args.concat(args)
else
raise ArgumentError, "wrong number of arguments (given #{args.size + 2}, expected 2, 5 or 6)"
end

summary_needed = args.empty?
blk = summary_needed ? HashifyStreamPendings : HashifyStreamPendingDetails
synchronize { |client| client.call(args, &blk) }
synchronize { |client| client.call(command_args, &blk) }
end

# Interact with the sentinel command (masters, master, slaves, failover)
Expand Down Expand Up @@ -3365,7 +3388,7 @@ def method_missing(command, *args)
lambda { |reply|
reply.map do |entry_id, values|
[entry_id, values.each_slice(2).to_h]
end.to_h
end
}

HashifyStreamPendings =
Expand Down Expand Up @@ -3459,7 +3482,9 @@ def _subscription(method, timeout, channels, block)
def _xread(args, keys, ids, blocking_timeout_msec)
keys = keys.is_a?(Array) ? keys : [keys]
ids = ids.is_a?(Array) ? ids : [ids]
args.concat(['STREAMS'], keys, ids)
args << 'STREAMS'
args.concat(keys)
args.concat(ids)

synchronize do |client|
if blocking_timeout_msec.nil?
Expand Down
22 changes: 14 additions & 8 deletions test/cluster_commands_on_streams_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ def test_xread_with_multiple_keys_and_hash_tags
redis.xadd('{s}1', { f: 'v02' }, id: '0-2')
redis.xadd('{s}2', { f: 'v11' }, id: '1-1')
redis.xadd('{s}2', { f: 'v12' }, id: '1-2')

actual = redis.xread(%w[{s}1 {s}2], %w[0-1 1-1])
assert_equal 1, actual['{s}1'].size
assert_equal 1, actual['{s}2'].size
assert_equal 'v02', actual['{s}1']['0-2']['f']
assert_equal 'v12', actual['{s}2']['1-2']['f']

assert_equal %w(0-2), actual['{s}1'].map(&:first)
assert_equal %w(v02), actual['{s}1'].map { |i| i.last['f'] }

assert_equal %w(1-2), actual['{s}2'].map(&:first)
assert_equal %w(v12), actual['{s}2'].map { |i| i.last['f'] }
end

def test_xreadgroup_with_multiple_keys
Expand All @@ -38,10 +41,13 @@ def test_xreadgroup_with_multiple_keys_and_hash_tags
redis.xgroup(:create, '{s}2', 'g1', '$')
redis.xadd('{s}1', { f: 'v02' }, id: '0-2')
redis.xadd('{s}2', { f: 'v12' }, id: '1-2')

actual = redis.xreadgroup('g1', 'c1', %w[{s}1 {s}2], %w[> >])
assert_equal 1, actual['{s}1'].size
assert_equal 1, actual['{s}2'].size
assert_equal 'v02', actual['{s}1']['0-2']['f']
assert_equal 'v12', actual['{s}2']['1-2']['f']

assert_equal %w(0-2), actual['{s}1'].map(&:first)
assert_equal %w(v02), actual['{s}1'].map { |i| i.last['f'] }

assert_equal %w(1-2), actual['{s}2'].map(&:first)
assert_equal %w(v12), actual['{s}2'].map { |i| i.last['f'] }
end
end
Loading

0 comments on commit 7ff90f1

Please sign in to comment.