Skip to content

Commit

Permalink
perf: reduce memory allocations (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Sep 19, 2022
1 parent dab3fd6 commit 461d027
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 64 deletions.
27 changes: 9 additions & 18 deletions lib/redis_client/cluster/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/normalized_cmd_name'

class RedisClient
class Cluster
Expand Down Expand Up @@ -31,14 +32,16 @@ def load(nodes) # rubocop:disable Metrics/MethodLength

def parse_command_details(rows)
rows&.reject { |row| row[0].nil? }.to_h do |row|
[row[0].downcase, { arity: row[1], flags: row[2], first: row[3], last: row[4], step: row[5] }]
[
::RedisClient::Cluster::NormalizedCmdName.instance.get_by_name(row[0]),
{ arity: row[1], flags: row[2], first: row[3], last: row[4], step: row[5] }
]
end
end
end

def initialize(details)
@details = pick_details(details)
@normalized_cmd_name_cache = {}
end

def extract_first_key(command)
Expand All @@ -59,7 +62,8 @@ def should_send_to_replica?(command)
end

def exists?(name)
@details.key?(name.to_s.downcase)
key = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_name(name)
@details.key?(key)
end

private
Expand All @@ -75,14 +79,14 @@ def pick_details(details)
end

def dig_details(command, key)
name = normalize_cmd_name(command)
name = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
return if name.empty? || !@details.key?(name)

@details.fetch(name).fetch(key)
end

def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/MethodLength
case normalize_cmd_name(command)
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
when 'eval', 'evalsha', 'zinterstore', 'zunionstore' then 3
when 'object' then 2
when 'memory'
Expand Down Expand Up @@ -111,19 +115,6 @@ def extract_hash_tag(key)

key[s + 1..e - 1]
end

def normalize_cmd_name(command)
return EMPTY_STRING unless command.is_a?(Array)

name = case e = command.first
when String then e
when Array then e.first
end
return EMPTY_STRING if name.nil? || name.empty?

@normalized_cmd_name_cache[name] = name.downcase unless @normalized_cmd_name_cache.key?(name)
@normalized_cmd_name_cache[name]
end
end
end
end
67 changes: 67 additions & 0 deletions lib/redis_client/cluster/normalized_cmd_name.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# frozen_string_literal: true

require 'singleton'

class RedisClient
class Cluster
class NormalizedCmdName
include Singleton

EMPTY_STRING = ''

def initialize
@cache = {}
@mutex = Mutex.new
end

def get_by_command(command)
get(command, index: 0)
end

def get_by_subcommand(command)
get(command, index: 1)
end

def get_by_name(name)
get(name, index: 0)
end

def clear
@mutex.synchronize { @cache.clear }
end

private

def get(command, index:)
name = extract_name(command, index: index)
return EMPTY_STRING if name.nil? || name.empty?

normalize(name)
end

def extract_name(command, index:)
case command
when String, Symbol then index.zero? ? command : nil
when Array then extract_name_from_array(command, index: index)
end
end

def extract_name_from_array(command, index:)
return if command.size - 1 < index

case e = command[index]
when String, Symbol then e
when Array then e[index]
end
end

def normalize(name)
return @cache[name] if @cache.key?(name)
return name.to_s.downcase if @mutex.locked?

@mutex.synchronize { @cache[name] = name.to_s.downcase }
@cache[name]
end
end
end
end
26 changes: 15 additions & 11 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'redis_client/cluster/key_slot_converter'
require 'redis_client/cluster/node'
require 'redis_client/cluster/node_key'
require 'redis_client/cluster/normalized_cmd_name'

class RedisClient
class Cluster
Expand All @@ -25,7 +26,7 @@ def initialize(config, pool: nil, **kwargs)
end

def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength, Metrics/PerceivedComplexity
cmd = command.first.to_s.downcase
cmd = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_command(command)
case cmd
when 'acl', 'auth', 'bgrewriteaof', 'bgsave', 'quit', 'save'
@node.call_all(method, command, args, &block).first
Expand Down Expand Up @@ -65,7 +66,12 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
# @see https://redis.io/topics/cluster-spec#redirection-and-resharding
# Redirection and resharding
def try_send(node, method, command, args, retry_count: 3, &block) # rubocop:disable Metrics/MethodLength, Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
node.send(method, *args, command, &block)
if args.empty?
# prevent memory allocation for variable-length args
node.send(method, command, &block)
else
node.send(method, *args, command, &block)
end
rescue ::RedisClient::CommandError => e
raise if retry_count <= 0

Expand Down Expand Up @@ -193,34 +199,32 @@ def send_wait_command(method, command, args, retry_count: 3, &block)
end

def send_config_command(method, command, args, &block)
case command[1].to_s.downcase
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
when 'resetstat', 'rewrite', 'set'
@node.call_all(method, command, args, &block).first
else assign_node(command).send(method, *args, command, &block)
end
end

def send_memory_command(method, command, args, &block)
case command[1].to_s.downcase
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
when 'stats' then @node.call_all(method, command, args, &block)
when 'purge' then @node.call_all(method, command, args, &block).first
else assign_node(command).send(method, *args, command, &block)
end
end

def send_client_command(method, command, args, &block)
case command[1].to_s.downcase
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
when 'list' then @node.call_all(method, command, args, &block).flatten
when 'pause', 'reply', 'setname'
@node.call_all(method, command, args, &block).first
else assign_node(command).send(method, *args, command, &block)
end
end

def send_cluster_command(method, command, args, &block) # rubocop:disable Metrics/MethodLength
subcommand = command[1].to_s.downcase

case subcommand
def send_cluster_command(method, command, args, &block)
case subcommand = ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
when 'addslots', 'delslots', 'failover', 'forget', 'meet', 'replicate',
'reset', 'set-config-epoch', 'setslot'
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported, ['cluster', subcommand]
Expand All @@ -234,7 +238,7 @@ def send_cluster_command(method, command, args, &block) # rubocop:disable Metric
end

def send_script_command(method, command, args, &block)
case command[1].to_s.downcase
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
when 'debug', 'kill'
@node.call_all(method, command, args, &block).first
when 'flush', 'load'
Expand All @@ -244,7 +248,7 @@ def send_script_command(method, command, args, &block)
end

def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
case command[1].to_s.downcase
case ::RedisClient::Cluster::NormalizedCmdName.instance.get_by_subcommand(command)
when 'channels' then @node.call_all(method, command, args, &block).flatten.uniq.sort_by(&:to_s)
when 'numsub'
@node.call_all(method, command, args, &block).reject(&:empty?).map { |e| Hash[*e] }
Expand Down
20 changes: 8 additions & 12 deletions test/bench_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ class PrimaryOnly < BenchmarkWrapper
private

def new_test_client
config = ::RedisClient::ClusterConfig.new(
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config)
).new_client
end
end

Expand All @@ -25,14 +24,13 @@ class ScaleReadRandom < BenchmarkWrapper
private

def new_test_client
config = ::RedisClient::ClusterConfig.new(
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
replica_affinity: :random,
fixed_hostname: TEST_FIXED_HOSTNAME,
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config)
).new_client
end
end

Expand All @@ -42,14 +40,13 @@ class ScaleReadLatency < BenchmarkWrapper
private

def new_test_client
config = ::RedisClient::ClusterConfig.new(
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
replica: true,
replica_affinity: :latency,
fixed_hostname: TEST_FIXED_HOSTNAME,
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config)
).new_client
end
end

Expand All @@ -59,12 +56,11 @@ class Pooled < BenchmarkWrapper
private

def new_test_client
config = ::RedisClient::ClusterConfig.new(
::RedisClient.cluster(
nodes: TEST_NODE_URIS,
fixed_hostname: TEST_FIXED_HOSTNAME,
**TEST_GENERIC_OPTIONS
)
::RedisClient::Cluster.new(config, pool: { timeout: TEST_TIMEOUT_SEC, size: 2 })
).new_pool(timeout: TEST_TIMEOUT_SEC, size: 2)
end
end

Expand Down
26 changes: 21 additions & 5 deletions test/prof_mem.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,36 @@ module ProfMem

def run
%w[primary_only scale_read_random scale_read_latency pooled].each do |cli_type|
print "################################################################################\n"
print "# #{cli_type}\n"
print "################################################################################\n"
print "\n"

prepare
print_letter(cli_type, 'w/ pipelining')
profile do
send("new_#{cli_type}_client".to_sym).pipelined do |pi|
ATTEMPT_COUNT.times { |i| pi.call('SET', i, i) }
ATTEMPT_COUNT.times { |i| pi.call('GET', i) }
end
end

prepare
print_letter(cli_type, 'w/o pipelining')
profile do
cli = send("new_#{cli_type}_client".to_sym)
ATTEMPT_COUNT.times { |i| cli.call('SET', i, i) }
ATTEMPT_COUNT.times { |i| cli.call('GET', i) }
end
end
end

def prepare
::RedisClient::Cluster::NormalizedCmdName.instance.clear
end

def print_letter(title, sub_titile)
print "################################################################################\n"
print "# #{title}: #{sub_titile}\n"
print "################################################################################\n"
print "\n"
end

def profile(&block)
# https://github.com/SamSaffron/memory_profiler
report = ::MemoryProfiler.report(top: 10, &block)
Expand Down
Loading

0 comments on commit 461d027

Please sign in to comment.