Skip to content

Commit

Permalink
[RUBY-3496] Fix legacy read pool retry error (#2878)
Browse files Browse the repository at this point in the history
Co-authored-by: Dmitry Rybakov <[email protected]>
Co-authored-by: Dmitry Rybakov <[email protected]>
  • Loading branch information
3 people authored Jul 4, 2024
1 parent de60b9e commit 3c5dc93
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 38 deletions.
31 changes: 28 additions & 3 deletions lib/mongo/retryable/base_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def initialize(retryable)

private

# Indicate which exception classes that are generally retryable.
# Indicate which exception classes that are generally retryable
# when using modern retries mechanism.
#
# @return [ Array<Mongo:Error> ] Array of exception classes that are
# considered retryable.
Expand All @@ -58,18 +59,42 @@ def retryable_exceptions
Error::ConnectionPerished,
Error::ServerNotUsable,
Error::SocketError,
Error::SocketTimeoutError
Error::SocketTimeoutError,
].freeze
end

# Indicate which exception classes that are generally retryable
# when using legacy retries mechanism.
#
# @return [ Array<Mongo:Error> ] Array of exception classes that are
# considered retryable.
def legacy_retryable_exceptions
[
Error::ConnectionPerished,
Error::ServerNotUsable,
Error::SocketError,
Error::SocketTimeoutError,
Error::PoolClearedError,
Error::PoolPausedError,
].freeze
end


# Tests to see if the given exception instance is of a type that can
# be retried.
# be retried with modern retry mechanism.
#
# @return [ true | false ] true if the exception is retryable.
def is_retryable_exception?(e)
retryable_exceptions.any? { |klass| klass === e }
end

# Tests to see if the given exception instance is of a type that can
# be retried with legacy retry mechanism.
#
# @return [ true | false ] true if the exception is retryable.
def is_legacy_retryable_exception?(e)
legacy_retryable_exceptions.any? { |klass| klass === e }
end
# Logs the given deprecation warning the first time it is called for a
# given key; after that, it does nothing when given the same key.
def deprecation_warning(key, warning)
Expand Down
17 changes: 9 additions & 8 deletions lib/mongo/retryable/read_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def modern_read_with_retry(session, server_selector, &block)
raise e if !is_retryable_exception?(e) && !e.write_retryable?
retry_read(e, session, server_selector, failed_server: server, &block)
end

# Attempts to do a "legacy" read with retry. The operation will be
# attempted multiple times, up to the client's `max_read_retries`
# setting.
Expand All @@ -213,17 +213,18 @@ def modern_read_with_retry(session, server_selector, &block)
def legacy_read_with_retry(session, server_selector, &block)
attempt = attempt ? attempt + 1 : 1
yield select_server(cluster, server_selector, session)
rescue *retryable_exceptions, Error::OperationFailure, Error::PoolError => e
rescue *legacy_retryable_exceptions, Error::OperationFailure => e
e.add_notes('legacy retry', "attempt #{attempt}")

if is_retryable_exception?(e)

if is_legacy_retryable_exception?(e)

raise e if attempt > client.max_read_retries || session&.in_transaction?
elsif e.retryable? && !session&.in_transaction?
raise e if attempt > client.max_read_retries
else
raise e
end

log_retry(e, message: 'Legacy read retry')
sleep(client.read_retry_interval) unless is_retryable_exception?(e)
retry
Expand Down Expand Up @@ -261,7 +262,7 @@ def read_without_retry(session, server_selector, &block)
# @param [ Mongo::Server ] failed_server The server on which the original
# operation failed.
# @param [ Proc ] block The block to execute.
#
#
# @return [ Result ] The result of the operation.
def retry_read(original_error, session, server_selector, failed_server: nil, &block)
begin
Expand All @@ -270,9 +271,9 @@ def retry_read(original_error, session, server_selector, failed_server: nil, &bl
original_error.add_note("later retry failed: #{e.class}: #{e}")
raise original_error
end

log_retry(original_error, message: 'Read retry')

begin
yield server, true
rescue *retryable_exceptions => e
Expand Down
8 changes: 4 additions & 4 deletions lib/mongo/retryable/write_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def nro_write_with_retry(write_concern, context:, &block)
session = context.session
server = select_server(cluster, ServerSelector.primary, session)
options = session&.client&.options || {}

if options[:retry_writes]
begin
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
Expand Down Expand Up @@ -219,7 +219,7 @@ def legacy_write_with_retry(server = nil, context:)
def modern_write_with_retry(session, server, context, &block)
txn_num = nil
connection_succeeded = false

server.with_connection(connection_global_id: context.connection_global_id) do |connection|
connection_succeeded = true

Expand Down Expand Up @@ -264,7 +264,7 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
# a socket error or a not master error should have marked the respective
# server unknown). Here we just need to wait for server selection.
server = select_server(cluster, ServerSelector.primary, session, failed_server)

unless server.retry_writes?
# Do not need to add "modern retry" here, it should already be on
# the first exception.
Expand All @@ -278,7 +278,7 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
# special marker class to bypass the ordinarily applicable rescues.
raise Error::RaiseOriginalError
end

log_retry(original_error, message: 'Write retry')
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
yield(connection, txn_num, context)
Expand Down
57 changes: 34 additions & 23 deletions spec/integration/retryable_reads_errors_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,42 @@
client.subscribe(Mongo::Monitoring::CONNECTION_POOL, subscriber)
end

it "retries on PoolClearedError" do
# After the first find fails, the pool is paused and retry is triggered.
# Now, a race is started between the second find acquiring a connection,
# and the first retrying the read. Now, retry reads cause the cluster to
# be rescanned and the pool to be unpaused, allowing the second checkout
# to succeed (when it should fail). Therefore we want the second find's
# check out to win the race. This gives the check out a little head start.
allow_any_instance_of(Mongo::Server::ConnectionPool).to receive(:ready).and_wrap_original do |m, *args, &block|
::Utils.wait_for_condition(5) do
# check_out_results should contain:
# - find1 connection check out successful
# - pool cleared
# - find2 connection check out failed
# We wait here for the third event to happen before we ready the pool.
cmap_events.select do |e|
event_types.include?(e.class)
end.length >= 3
shared_examples_for 'retries on PoolClearedError' do
it "retries on PoolClearedError" do
# After the first find fails, the pool is paused and retry is triggered.
# Now, a race is started between the second find acquiring a connection,
# and the first retrying the read. Now, retry reads cause the cluster to
# be rescanned and the pool to be unpaused, allowing the second checkout
# to succeed (when it should fail). Therefore we want the second find's
# check out to win the race. This gives the check out a little head start.
allow_any_instance_of(Mongo::Server::ConnectionPool).to receive(:ready).and_wrap_original do |m, *args, &block|
::Utils.wait_for_condition(5) do
# check_out_results should contain:
# - find1 connection check out successful
# - pool cleared
# - find2 connection check out failed
# We wait here for the third event to happen before we ready the pool.
cmap_events.select do |e|
event_types.include?(e.class)
end.length >= 3
end
m.call(*args, &block)
end
m.call(*args, &block)
threads.map(&:join)
expect(check_out_results[0]).to be_a(Mongo::Monitoring::Event::Cmap::ConnectionCheckedOut)
expect(check_out_results[1]).to be_a(Mongo::Monitoring::Event::Cmap::PoolCleared)
expect(check_out_results[2]).to be_a(Mongo::Monitoring::Event::Cmap::ConnectionCheckOutFailed)
expect(find_events.length).to eq(3)
end
threads.map(&:join)
expect(check_out_results[0]).to be_a(Mongo::Monitoring::Event::Cmap::ConnectionCheckedOut)
expect(check_out_results[1]).to be_a(Mongo::Monitoring::Event::Cmap::PoolCleared)
expect(check_out_results[2]).to be_a(Mongo::Monitoring::Event::Cmap::ConnectionCheckOutFailed)
expect(find_events.length).to eq(3)
end

it_behaves_like 'retries on PoolClearedError'

context 'legacy read retries' do

let(:client) { authorized_client.with(options.merge(retry_reads: false, max_read_retries: 1)) }

it_behaves_like 'retries on PoolClearedError'
end

after do
Expand Down

0 comments on commit 3c5dc93

Please sign in to comment.