From fd24e5bfc9540fc00764a59ddf39a993bbd63ba2 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Fri, 26 Jul 2024 12:04:34 +0200 Subject: [PATCH 1/4] Refactor Active Record adapters to have a similar internal interface Adapters have very inconsistent internal APIs to perform queries. This refactoring tries to improve consistency with a common provite API for all of them. Abstract methods: - `raw_execute`: the only method where an adapter should perform a query. It returns a native, adapter specific result object. Does not apply query transformations. Does not check for writes. - `cast_result`: receives the native result object and returns a generic `ActiveRecord::Result`. - `affected_rows`: receives the native result object and returns the number of affected rows. By just implementing these 3 methods all adapters automatically get: - `raw_exec_query`: same as `raw_execute` but returns an `ActiveRecord::Result`. - `internal_exec_query`: same as `raw_exec_query` but check for writes and apply query transformations. - `internal_execute`: same as `internal_exec_query` but retuns the native, adapter specific, result object. With this increased conisistency, we can now reduce the ammount of duplicated code in every adapter. There's some room for futher improvments but I tried to not go too far all at once. Also previously some adapters had a block based query interface that allowed to eagerly clear the native result object. It may make sense to bring that capability back in a consistent way, but short term I opted for consistency. --- .../abstract/database_statements.rb | 69 +++++++---- .../connection_adapters/abstract/quoting.rb | 2 +- .../connection_adapters/abstract_adapter.rb | 23 ++-- .../abstract_mysql_adapter.rb | 23 +--- .../mysql/schema_statements.rb | 88 +++++++------- .../mysql2/database_statements.rb | 102 +++++----------- .../connection_adapters/mysql2_adapter.rb | 9 +- .../postgresql/database_statements.rb | 113 +++++++++++------- .../postgresql/schema_statements.rb | 2 +- .../connection_adapters/postgresql_adapter.rb | 94 ++------------- .../sqlite3/database_statements.rb | 107 ++++++++--------- .../connection_adapters/sqlite3_adapter.rb | 1 + .../trilogy/database_statements.rb | 41 +++---- .../connection_adapters/trilogy_adapter.rb | 17 --- .../lib/active_record/future_result.rb | 2 +- .../adapters/sqlite3/sqlite3_adapter_test.rb | 12 +- activerecord/test/cases/calculations_test.rb | 8 +- activerecord/test/cases/query_cache_test.rb | 3 +- 18 files changed, 298 insertions(+), 418 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index 1ceaa588430b0..dcc486b2c2b57 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -163,14 +163,14 @@ def exec_insert(sql, name = nil, binds = [], pk = nil, sequence_name = nil, retu # +binds+ as the bind substitutes. +name+ is logged along with # the executed +sql+ statement. def exec_delete(sql, name = nil, binds = []) - internal_exec_query(sql, name, binds) + affected_rows(internal_execute(sql, name, binds)) end # Executes update +sql+ statement in the context of this connection using # +binds+ as the bind substitutes. +name+ is logged along with # the executed +sql+ statement. def exec_update(sql, name = nil, binds = []) - internal_exec_query(sql, name, binds) + affected_rows(internal_execute(sql, name, binds)) end def exec_insert_all(sql, name) # :nodoc: @@ -532,30 +532,57 @@ def high_precision_current_timestamp HIGH_PRECISION_CURRENT_TIMESTAMP end - def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) # :nodoc: - raise NotImplementedError + # Same as raw_execute but returns an ActiveRecord::Result object. + def raw_exec_query(...) # :nodoc: + cast_result(raw_execute(...)) + end + + # Execute a query and returns an ActiveRecord::Result + def internal_exec_query(...) # :nodoc: + cast_result(internal_execute(...)) end private - def internal_execute(sql, name = "SCHEMA", allow_retry: false, materialize_transactions: true) - sql = transform_query(sql) - check_if_write_query(sql) + # Lowest level way to execute a query. Doesn't check for illegal writes, doesn't annotate queries, yields a native result object. + def raw_execute(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) + raise NotImplementedError + end + + # Receive a native adapter result object and returns an ActiveRecord::Result object. + def cast_result(raw_result) + raise NotImplementedError + end + + def affected_rows(raw_result) + raise NotImplementedError + end + def preprocess_query(sql) + check_if_write_query(sql) mark_transaction_written_if_write(sql) - raw_execute(sql, name, allow_retry: allow_retry, materialize_transactions: materialize_transactions) + # We call tranformers after the write checks so we don't add extra parsing work. + # This means we assume no transformer whille change a read for a write + # but it would be insane to do such a thing. + ActiveRecord.query_transformers.each do |transformer| + sql = transformer.call(sql, self) + end + + sql + end + + # Same as #internal_exec_query, but yields a native adapter result + def internal_execute(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true, &block) + sql = preprocess_query(sql) + raw_execute(sql, name, binds, prepare: prepare, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions, &block) end def execute_batch(statements, name = nil) statements.each do |statement| - internal_execute(statement, name) + raw_execute(statement, name) end end - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true) - raise NotImplementedError - end - DEFAULT_INSERT_VALUE = Arel.sql("DEFAULT").freeze private_constant :DEFAULT_INSERT_VALUE @@ -637,6 +664,8 @@ def select(sql, name = nil, binds = [], prepare: false, async: false, allow_retr raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions" end + # We make sure to run query transformers on the orignal thread + sql = preprocess_query(sql) future_result = async.new( pool, sql, @@ -649,14 +678,14 @@ def select(sql, name = nil, binds = [], prepare: false, async: false, allow_retr else future_result.execute!(self) end - return future_result - end - - result = internal_exec_query(sql, name, binds, prepare: prepare, allow_retry: allow_retry) - if async - FutureResult.wrap(result) + future_result else - result + result = internal_exec_query(sql, name, binds, prepare: prepare, allow_retry: allow_retry) + if async + FutureResult.wrap(result) + else + result + end end end diff --git a/activerecord/lib/active_record/connection_adapters/abstract/quoting.rb b/activerecord/lib/active_record/connection_adapters/abstract/quoting.rb index a80b56d221d72..5d6c02ce3ca00 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/quoting.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/quoting.rb @@ -222,7 +222,7 @@ def sanitize_as_sql_comment(value) # :nodoc: private def type_casted_binds(binds) - binds.map do |value| + binds&.map do |value| if ActiveModel::Attribute === value type_cast(value.value_for_database) else diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index a4b6793999d51..de4c5add79334 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -1106,14 +1106,16 @@ def type_map end end - def translate_exception_class(e, sql, binds) - message = "#{e.class.name}: #{e.message}" + def translate_exception_class(native_error, sql, binds) + return native_error if native_error.is_a?(ActiveRecordError) - exception = translate_exception( - e, message: message, sql: sql, binds: binds + message = "#{native_error.class.name}: #{native_error.message}" + + active_record_error = translate_exception( + native_error, message: message, sql: sql, binds: binds ) - exception.set_backtrace e.backtrace - exception + active_record_error.set_backtrace(native_error.backtrace) + active_record_error end def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = nil, async: false, &block) # :doc: @@ -1134,13 +1136,6 @@ def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = raise ex.set_query(sql, binds) end - def transform_query(sql) - ActiveRecord.query_transformers.each do |transformer| - sql = transformer.call(sql, self) - end - sql - end - def translate_exception(exception, message:, sql:, binds:) # override in derived class case exception @@ -1152,7 +1147,7 @@ def translate_exception(exception, message:, sql:, binds:) end def without_prepared_statement?(binds) - !prepared_statements || binds.empty? + !prepared_statements || binds.nil? || binds.empty? end def column_for(table_name, column_name) diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb index df1884a3a479f..8d65732b5e823 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb @@ -197,12 +197,6 @@ def index_algorithms # HELPER METHODS =========================================== - # The two drivers have slightly different ways of yielding hashes of results, so - # this method must be implemented to provide a uniform interface. - def each_hash(result) # :nodoc: - raise NotImplementedError - end - # Must return the MySQL error number from the exception, if the exception has an # error number. def error_number(exception) # :nodoc: @@ -226,17 +220,6 @@ def disable_referential_integrity # :nodoc: # DATABASE STATEMENTS ====================================== #++ - # Mysql2Adapter doesn't have to free a result after using it, but we use this method - # to write stuff in an abstract way without concerning ourselves about whether it - # needs to be explicitly freed or not. - def execute_and_free(sql, name = nil, async: false, allow_retry: false) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - - mark_transaction_written_if_write(sql) - yield raw_execute(sql, name, async: async, allow_retry: allow_retry) - end - def begin_db_transaction # :nodoc: internal_execute("BEGIN", "TRANSACTION", allow_retry: true, materialize_transactions: false) end @@ -961,13 +944,11 @@ def configure_connection end.join(", ") # ...and send them all in one query - internal_execute("SET #{encoding} #{sql_mode_assignment} #{variable_assignments}") + raw_execute("SET #{encoding} #{sql_mode_assignment} #{variable_assignments}", "SCHEMA") end def column_definitions(table_name) # :nodoc: - execute_and_free("SHOW FULL FIELDS FROM #{quote_table_name(table_name)}", "SCHEMA") do |result| - each_hash(result) - end + internal_exec_query("SHOW FULL FIELDS FROM #{quote_table_name(table_name)}", "SCHEMA") end def create_table_info(table_name) # :nodoc: diff --git a/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb b/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb index 40deb2bf198b3..3a18161b6f549 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb @@ -8,45 +8,43 @@ module SchemaStatements # :nodoc: def indexes(table_name) indexes = [] current_index = nil - execute_and_free("SHOW KEYS FROM #{quote_table_name(table_name)}", "SCHEMA") do |result| - each_hash(result) do |row| - if current_index != row[:Key_name] - next if row[:Key_name] == "PRIMARY" # skip the primary key - current_index = row[:Key_name] - - mysql_index_type = row[:Index_type].downcase.to_sym - case mysql_index_type - when :fulltext, :spatial - index_type = mysql_index_type - when :btree, :hash - index_using = mysql_index_type - end - - indexes << [ - row[:Table], - row[:Key_name], - row[:Non_unique].to_i == 0, - [], - lengths: {}, - orders: {}, - type: index_type, - using: index_using, - comment: row[:Index_comment].presence - ] + internal_exec_query("SHOW KEYS FROM #{quote_table_name(table_name)}", "SCHEMA").each do |row| + if current_index != row["Key_name"] + next if row["Key_name"] == "PRIMARY" # skip the primary key + current_index = row["Key_name"] + + mysql_index_type = row["Index_type"].downcase.to_sym + case mysql_index_type + when :fulltext, :spatial + index_type = mysql_index_type + when :btree, :hash + index_using = mysql_index_type end - if row[:Expression] - expression = row[:Expression].gsub("\\'", "'") - expression = +"(#{expression})" unless expression.start_with?("(") - indexes.last[-2] << expression - indexes.last[-1][:expressions] ||= {} - indexes.last[-1][:expressions][expression] = expression - indexes.last[-1][:orders][expression] = :desc if row[:Collation] == "D" - else - indexes.last[-2] << row[:Column_name] - indexes.last[-1][:lengths][row[:Column_name]] = row[:Sub_part].to_i if row[:Sub_part] - indexes.last[-1][:orders][row[:Column_name]] = :desc if row[:Collation] == "D" - end + indexes << [ + row["Table"], + row["Key_name"], + row["Non_unique"].to_i == 0, + [], + lengths: {}, + orders: {}, + type: index_type, + using: index_using, + comment: row["Index_comment"].presence + ] + end + + if expression = row["Expression"] + expression = expression.gsub("\\'", "'") + expression = +"(#{expression})" unless expression.start_with?("(") + indexes.last[-2] << expression + indexes.last[-1][:expressions] ||= {} + indexes.last[-1][:expressions][expression] = expression + indexes.last[-1][:orders][expression] = :desc if row["Collation"] == "D" + else + indexes.last[-2] << row["Column_name"] + indexes.last[-1][:lengths][row["Column_name"]] = row["Sub_part"].to_i if row["Sub_part"] + indexes.last[-1][:orders][row["Column_name"]] = :desc if row["Collation"] == "D" end end @@ -182,12 +180,12 @@ def default_type(table_name, field_name) end def new_column_from_field(table_name, field, _definitions) - field_name = field.fetch(:Field) - type_metadata = fetch_type_metadata(field[:Type], field[:Extra]) - default, default_function = field[:Default], nil + field_name = field.fetch("Field") + type_metadata = fetch_type_metadata(field["Type"], field["Extra"]) + default, default_function = field["Default"], nil if type_metadata.type == :datetime && /\ACURRENT_TIMESTAMP(?:\([0-6]?\))?\z/i.match?(default) - default = "#{default} ON UPDATE #{default}" if /on update CURRENT_TIMESTAMP/i.match?(field[:Extra]) + default = "#{default} ON UPDATE #{default}" if /on update CURRENT_TIMESTAMP/i.match?(field["Extra"]) default, default_function = nil, default elsif type_metadata.extra == "DEFAULT_GENERATED" default = +"(#{default})" unless default.start_with?("(") @@ -203,13 +201,13 @@ def new_column_from_field(table_name, field, _definitions) end MySQL::Column.new( - field[:Field], + field["Field"], default, type_metadata, - field[:Null] == "YES", + field["Null"] == "YES", default_function, - collation: field[:Collation], - comment: field[:Comment].presence + collation: field["Collation"], + comment: field["Comment"].presence ) end diff --git a/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb b/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb index b81117928556f..bd1026d98ffd0 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb @@ -13,45 +13,12 @@ def select_all(*, **) # :nodoc: end end - def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) # :nodoc: - if without_prepared_statement?(binds) - execute_and_free(sql, name, async: async, allow_retry: allow_retry) do |result| - if result - build_result(columns: result.fields, rows: result.to_a) - else - build_result(columns: [], rows: []) - end - end - else - exec_stmt_and_free(sql, name, binds, cache_stmt: prepare, async: async) do |_, result| - if result - build_result(columns: result.fields, rows: result.to_a) - else - build_result(columns: [], rows: []) - end - end - end - end - - def exec_delete(sql, name = nil, binds = []) # :nodoc: - if without_prepared_statement?(binds) - with_raw_connection do |conn| - @affected_rows_before_warnings = nil - execute_and_free(sql, name) { @affected_rows_before_warnings || conn.affected_rows } - end - else - exec_stmt_and_free(sql, name, binds) { |stmt| stmt.affected_rows } - end - end - alias :exec_update :exec_delete - private def sync_timezone_changes(raw_connection) raw_connection.query_options[:database_timezone] = default_timezone end def execute_batch(statements, name = nil) - statements = statements.map { |sql| transform_query(sql) } combine_multi_statements(statements).each do |statement| with_raw_connection do |conn| raw_execute(statement, name) @@ -91,61 +58,50 @@ def with_multi_statements end end - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true) + def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true) log(sql, name, async: async) do |notification_payload| with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| sync_timezone_changes(conn) - result = conn.query(sql) - conn.abandon_results! - verified! - handle_warnings(sql) - notification_payload[:row_count] = result&.size || 0 - result - end - end - end - - def exec_stmt_and_free(sql, name, binds, cache_stmt: false, async: false) - sql = transform_query(sql) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - - type_casted_binds = type_casted_binds(binds) - - log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| - with_raw_connection do |conn| - sync_timezone_changes(conn) - - if cache_stmt + result = if prepare stmt = @statements[sql] ||= conn.prepare(sql) - else - stmt = conn.prepare(sql) - end - begin - result = ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - stmt.execute(*type_casted_binds) - end - verified! - result - rescue ::Mysql2::Error => e - if cache_stmt + begin + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + stmt.execute(*type_casted_binds) + end + rescue ::Mysql2::Error @statements.delete(sql) - else stmt.close + raise end - raise e + verified! + else + conn.query(sql) end - ret = yield stmt, result + @affected_rows_before_warnings = conn.affected_rows + conn.abandon_results! + + verified! + handle_warnings(sql) notification_payload[:row_count] = result&.size || 0 - result.free if result - stmt.close unless cache_stmt - ret + result end end end + + def cast_result(result) + if result.nil? || result.size.zero? + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(result.fields, result.to_a) + end + end + + def affected_rows(result) + @affected_rows_before_warnings + end end end end diff --git a/activerecord/lib/active_record/connection_adapters/mysql2_adapter.rb b/activerecord/lib/active_record/connection_adapters/mysql2_adapter.rb index 2a85a24864958..59c4b1a78e6d5 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql2_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql2_adapter.rb @@ -55,6 +55,7 @@ def initialize_type_map(m) def initialize(...) super + @affected_rows_before_warnings = nil @config[:flags] ||= 0 if @config[:flags].kind_of? Array @@ -92,14 +93,6 @@ def supports_lazy_transactions? # HELPER METHODS =========================================== - def each_hash(result, &block) # :nodoc: - if block_given? - result.each(as: :hash, symbolize_keys: true, &block) - else - to_enum(:each_hash, result) - end - end - def error_number(exception) exception.error_number if exception.respond_to?(:error_number) end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb b/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb index 33f1b277477d1..0c6adb1144466 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb @@ -12,16 +12,8 @@ def explain(arel, binds = [], options = []) # Queries the database and returns the results in an Array-like object def query(sql, name = nil) # :nodoc: - mark_transaction_written_if_write(sql) - - log(sql, name) do |notification_payload| - with_raw_connection do |conn| - result = conn.async_exec(sql).map_types!(@type_map_for_results).values - verified! - notification_payload[:row_count] = result.count - result - end - end + result = internal_execute(sql, name) + result.map_types!(@type_map_for_results).values end READ_QUERY = ActiveRecord::ConnectionAdapters::AbstractAdapter.build_read_query_regexp( @@ -50,36 +42,6 @@ def execute(...) # :nodoc: @notice_receiver_sql_warnings = [] end - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true) - log(sql, name, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - result = conn.async_exec(sql) - verified! - handle_warnings(result) - notification_payload[:row_count] = result.count - result - end - end - end - - def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) # :nodoc: - execute_and_clear(sql, name, binds, prepare: prepare, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |result| - types = {} - fields = result.fields - fields.each_with_index do |fname, i| - ftype = result.ftype i - fmod = result.fmod i - types[fname] = types[i] = get_oid_type(ftype, fmod, fname) - end - build_result(columns: fields, rows: result.values, column_types: types.freeze) - end - end - - def exec_delete(sql, name = nil, binds = []) # :nodoc: - execute_and_clear(sql, name, binds) { |result| result.cmd_tuples } - end - alias :exec_update :exec_delete - def exec_insert(sql, name = nil, binds = [], pk = nil, sequence_name = nil, returning: nil) # :nodoc: if use_insert_returning? || pk == false super @@ -170,8 +132,77 @@ def cancel_any_running_query rescue PG::Error end + def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true) + update_typemap_for_default_timezone + + type_casted_binds = type_casted_binds(binds) + + begin + with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| + stmt_key = prepare_statement(sql, binds, conn) if prepare + + log(sql, name, binds, type_casted_binds, stmt_key, async: async) do |notification_payload| + begin + result = if prepare + conn.exec_prepared(stmt_key, type_casted_binds) + elsif without_prepared_statement?(binds) + conn.async_exec(sql) + else + conn.exec_params(sql, type_casted_binds) + end + rescue => original_exception + # Contrary to all other adapters we have to enter `with_raw_connection` before `log` + # so that we can prepare the statement and pass the key to `log`. + # So we need to translate exceptions ourselves. + raise translate_exception_class(original_exception, sql, binds) + end + + verified! + handle_warnings(result) + notification_payload[:row_count] = result.count + result + end + end + rescue ActiveRecord::StatementInvalid => error + if prepare + raise unless is_cached_plan_failure?(error) + + # Nothing we can do if we are in a transaction because all commands + # will raise InFailedSQLTransaction + if in_transaction? + raise ActiveRecord::PreparedStatementCacheExpired.new(error.cause.message, connection_pool: @pool) + else + @lock.synchronize do + # outside of transactions we can simply flush this query and retry + @statements.delete sql_key(sql) + end + retry + end + else + raise + end + end + end + + def cast_result(result) + types = {} + fields = result.fields + fields.each_with_index do |fname, i| + ftype = result.ftype i + fmod = result.fmod i + types[fname] = types[i] = get_oid_type(ftype, fmod, fname) + end + ar_result = ActiveRecord::Result.new(fields, result.values, types.freeze) + result.clear + ar_result + end + + def affected_rows(result) + result.cmd_tuples + end + def execute_batch(statements, name = nil) - execute(combine_multi_statements(statements)) + raw_execute(combine_multi_statements(statements), name) end def build_truncate_statements(table_names) diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb b/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb index 7928ee58a0032..49891552dc6dd 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb @@ -250,7 +250,7 @@ def client_min_messages # Set the client message level. def client_min_messages=(level) - internal_execute("SET client_min_messages TO '#{level}'") + internal_execute("SET client_min_messages TO '#{level}'", "SCHEMA") end # Returns the sequence name for a table's primary key or some other specified key. diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 197cd416014d6..1f7494e19f46c 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -405,7 +405,7 @@ def self.native_database_types # :nodoc: end def set_standard_conforming_strings - internal_execute("SET standard_conforming_strings = on") + internal_execute("SET standard_conforming_strings = on", "SCHEMA") end def supports_ddl_transactions? @@ -841,9 +841,8 @@ def get_oid_type(oid, fmod, column_name, sql_type = "") def load_additional_types(oids = nil) initializer = OID::TypeMapInitializer.new(type_map) load_types_queries(initializer, oids) do |query| - execute_and_clear(query, "SCHEMA", [], allow_retry: true, materialize_transactions: false) do |records| - initializer.run(records) - end + records = internal_execute(query, "SCHEMA", [], allow_retry: true, materialize_transactions: false) + initializer.run(records) end end @@ -864,73 +863,6 @@ def load_types_queries(initializer, oids) FEATURE_NOT_SUPPORTED = "0A000" # :nodoc: - def execute_and_clear(sql, name, binds, prepare: false, async: false, allow_retry: false, materialize_transactions: true) - sql = transform_query(sql) - check_if_write_query(sql) - - if !prepare || without_prepared_statement?(binds) - result = exec_no_cache(sql, name, binds, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions) - else - result = exec_cache(sql, name, binds, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions) - end - begin - ret = yield result - ensure - result.clear - end - ret - end - - def exec_no_cache(sql, name, binds, async:, allow_retry:, materialize_transactions:) - mark_transaction_written_if_write(sql) - - # make sure we carry over any changes to ActiveRecord.default_timezone that have been - # made since we established the connection - update_typemap_for_default_timezone - - type_casted_binds = type_casted_binds(binds) - log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - result = conn.exec_params(sql, type_casted_binds) - verified! - notification_payload[:row_count] = result.count - result - end - end - end - - def exec_cache(sql, name, binds, async:, allow_retry:, materialize_transactions:) - mark_transaction_written_if_write(sql) - - update_typemap_for_default_timezone - - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - stmt_key = prepare_statement(sql, binds, conn) - type_casted_binds = type_casted_binds(binds) - - log(sql, name, binds, type_casted_binds, stmt_key, async: async) do |notification_payload| - result = conn.exec_prepared(stmt_key, type_casted_binds) - verified! - notification_payload[:row_count] = result.count - result - end - end - rescue ActiveRecord::StatementInvalid => e - raise unless is_cached_plan_failure?(e) - - # Nothing we can do if we are in a transaction because all commands - # will raise InFailedSQLTransaction - if in_transaction? - raise ActiveRecord::PreparedStatementCacheExpired.new(e.cause.message, connection_pool: @pool) - else - @lock.synchronize do - # outside of transactions we can simply flush this query and retry - @statements.delete sql_key(sql) - end - retry - end - end - # Annoyingly, the code for prepared statements whose return value may # have changed is FEATURE_NOT_SUPPORTED. # @@ -1020,16 +952,16 @@ def configure_connection variables = @config.fetch(:variables, {}).stringify_keys # Set interval output format to ISO 8601 for ease of parsing by ActiveSupport::Duration.parse - internal_execute("SET intervalstyle = iso_8601") + internal_execute("SET intervalstyle = iso_8601", "SCHEMA") # SET statements from :variables config hash # https://www.postgresql.org/docs/current/static/sql-set.html variables.map do |k, v| if v == ":default" || v == :default # Sets the value to the global or compile default - internal_execute("SET SESSION #{k} TO DEFAULT") + internal_execute("SET SESSION #{k} TO DEFAULT", "SCHEMA") elsif !v.nil? - internal_execute("SET SESSION #{k} TO #{quote(v)}") + internal_execute("SET SESSION #{k} TO #{quote(v)}", "SCHEMA") end end @@ -1050,9 +982,9 @@ def reconfigure_connection_timezone # If using Active Record's time zone support configure the connection # to return TIMESTAMP WITH ZONE types in UTC. if default_timezone == :utc - internal_execute("SET SESSION timezone TO 'UTC'") + internal_execute("SET SESSION timezone TO 'UTC'", "SCHEMA") else - internal_execute("SET SESSION timezone TO DEFAULT") + internal_execute("SET SESSION timezone TO DEFAULT", "SCHEMA") end end @@ -1119,9 +1051,8 @@ def can_perform_case_insensitive_comparison_for?(column) AND castsource = #{quote column.sql_type}::regtype ) SQL - execute_and_clear(sql, "SCHEMA", [], allow_retry: true, materialize_transactions: false) do |result| - result.getvalue(0, 0) - end + result = internal_execute(sql, "SCHEMA", [], allow_retry: true, materialize_transactions: false) + result.getvalue(0, 0) end end end @@ -1177,9 +1108,8 @@ def add_pg_decoders FROM pg_type as t WHERE t.typname IN (%s) SQL - coders = execute_and_clear(query, "SCHEMA", [], allow_retry: true, materialize_transactions: false) do |result| - result.filter_map { |row| construct_coder(row, coders_by_name[row["typname"]]) } - end + result = internal_execute(query, "SCHEMA", [], allow_retry: true, materialize_transactions: false) + coders = result.filter_map { |row| construct_coder(row, coders_by_name[row["typname"]]) } map = PG::TypeMapByOid.new coders.each { |coder| map.add_coder(coder) } diff --git a/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb b/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb index c4cae471bbe0b..4ad812001bd57 100644 --- a/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb @@ -21,50 +21,6 @@ def explain(arel, binds = [], _options = []) SQLite3::ExplainPrettyPrinter.new.pp(result) end - def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - - mark_transaction_written_if_write(sql) - - type_casted_binds = type_casted_binds(binds) - - log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| - with_raw_connection do |conn| - if prepare - stmt = @statements[sql] ||= conn.prepare(sql) - cols = stmt.columns - stmt.reset! - stmt.bind_params(type_casted_binds) - records = stmt.to_a - else - # Don't cache statements if they are not prepared. - stmt = conn.prepare(sql) - begin - cols = stmt.columns - unless without_prepared_statement?(binds) - stmt.bind_params(type_casted_binds) - end - records = stmt.to_a - ensure - stmt.close - end - end - verified! - - result = build_result(columns: cols, rows: records) - notification_payload[:row_count] = result.length - result - end - end - end - - def exec_delete(sql, name = "SQL", binds = []) # :nodoc: - internal_exec_query(sql, name, binds) - @raw_connection.changes - end - alias :exec_update :exec_delete - def begin_deferred_transaction(isolation = nil) # :nodoc: internal_begin_transaction(:deferred, isolation) end @@ -104,6 +60,12 @@ def high_precision_current_timestamp HIGH_PRECISION_CURRENT_TIMESTAMP end + def execute(...) # :nodoc: + # SQLite3Adapter was refactored to use ActiveRecord::Result internally + # but for backward compatibility we have to keep returning arrays of hashes here + super&.to_a + end + private def internal_begin_transaction(mode, isolation) if isolation @@ -124,17 +86,58 @@ def internal_begin_transaction(mode, isolation) end end - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: false) - log(sql, name, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - result = conn.execute(sql) + def raw_execute(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) # :nodoc: + type_casted_binds = type_casted_binds(binds) + + result = log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| + with_raw_connection(materialize_transactions: materialize_transactions) do |conn| + if prepare + stmt = @statements[sql] ||= conn.prepare(sql) + stmt.reset! + stmt.bind_params(type_casted_binds) + + result = if stmt.column_count.zero? # No return + stmt.step + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(stmt.columns, stmt.to_a) + end + else + # Don't cache statements if they are not prepared. + stmt = conn.prepare(sql) + begin + unless without_prepared_statement?(binds) + stmt.bind_params(type_casted_binds) + end + result = if stmt.column_count.zero? # No return + stmt.step + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(stmt.columns, stmt.to_a) + end + ensure + stmt.close + end + end + @last_affected_rows = @raw_connection.changes verified! + notification_payload[:row_count] = result.length result end end end + def cast_result(result) + # Given that SQLite3 doesn't really a Result type, raw_execute already return an ActiveRecord::Result + # and we have nothing to cast here. + result + end + + def affected_rows(result) + @last_affected_rows + end + def reset_read_uncommitted read_uncommitted = ActiveSupport::IsolatedExecutionState[:active_record_read_uncommitted] return unless read_uncommitted @@ -143,18 +146,12 @@ def reset_read_uncommitted end def execute_batch(statements, name = nil) - statements = statements.map { |sql| transform_query(sql) } sql = combine_multi_statements(statements) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - log(sql, name) do |notification_payload| with_raw_connection do |conn| - result = conn.execute_batch2(sql) + conn.execute_batch2(sql) verified! - notification_payload[:row_count] = result.length - result end end end diff --git a/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb b/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb index 98c81d77a1b82..d40a781cb4d7e 100644 --- a/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb @@ -119,6 +119,7 @@ def initialize(...) end end + @last_affected_rows = nil @config[:strict] = ConnectionAdapters::SQLite3Adapter.strict_strings_by_default unless @config.key?(:strict) @connection_parameters = @config.merge( database: @config[:database].to_s, diff --git a/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb b/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb index 2f2b48a8bb294..4c7b1b78ba818 100644 --- a/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb @@ -4,37 +4,13 @@ module ActiveRecord module ConnectionAdapters module Trilogy module DatabaseStatements - def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - - result = raw_execute(sql, name, async: async, allow_retry: allow_retry) - ActiveRecord::Result.new(result.fields, result.to_a) - end - def exec_insert(sql, name, binds, pk = nil, sequence_name = nil, returning: nil) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - sql, _binds = sql_for_insert(sql, pk, binds, returning) - raw_execute(sql, name) - end - - def exec_delete(sql, name = nil, binds = []) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - - result = raw_execute(to_sql(sql, binds), name) - result.affected_rows + internal_execute(sql, name) end - alias :exec_update :exec_delete # :nodoc: - private - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true) + def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true) log(sql, name, async: async) do |notification_payload| with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| sync_timezone_changes(conn) @@ -50,6 +26,18 @@ def raw_execute(sql, name, async: false, allow_retry: false, materialize_transac end end + def cast_result(result) + if result.count.zero? + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(result.fields, result.rows) + end + end + + def affected_rows(result) + result.affected_rows + end + def last_inserted_id(result) if supports_insert_returning? super @@ -68,7 +56,6 @@ def sync_timezone_changes(conn) end def execute_batch(statements, name = nil) - statements = statements.map { |sql| transform_query(sql) } combine_multi_statements(statements).each do |statement| with_raw_connection do |conn| raw_execute(statement, name) diff --git a/activerecord/lib/active_record/connection_adapters/trilogy_adapter.rb b/activerecord/lib/active_record/connection_adapters/trilogy_adapter.rb index 60083681575d3..f55c8fcfddf49 100644 --- a/activerecord/lib/active_record/connection_adapters/trilogy_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/trilogy_adapter.rb @@ -149,23 +149,6 @@ def text_type?(type) TYPE_MAP.lookup(type).is_a?(Type::String) || TYPE_MAP.lookup(type).is_a?(Type::Text) end - def each_hash(result) - return to_enum(:each_hash, result) unless block_given? - - keys = result.fields.map(&:to_sym) - result.rows.each do |row| - hash = {} - idx = 0 - row.each do |value| - hash[keys[idx]] = value - idx += 1 - end - yield hash - end - - nil - end - def error_number(exception) exception.error_code if exception.respond_to?(:error_code) end diff --git a/activerecord/lib/active_record/future_result.rb b/activerecord/lib/active_record/future_result.rb index f55473ff2cd89..289fc6f761dcc 100644 --- a/activerecord/lib/active_record/future_result.rb +++ b/activerecord/lib/active_record/future_result.rb @@ -163,7 +163,7 @@ def execute_query(connection, async: false) end def exec_query(connection, *args, **kwargs) - connection.internal_exec_query(*args, **kwargs) + connection.raw_exec_query(*args, **kwargs) end class SelectAll < FutureResult # :nodoc: diff --git a/activerecord/test/cases/adapters/sqlite3/sqlite3_adapter_test.rb b/activerecord/test/cases/adapters/sqlite3/sqlite3_adapter_test.rb index 7e123935a0f01..ac55259ab76fb 100644 --- a/activerecord/test/cases/adapters/sqlite3/sqlite3_adapter_test.rb +++ b/activerecord/test/cases/adapters/sqlite3/sqlite3_adapter_test.rb @@ -926,14 +926,12 @@ def test_statement_closed statement = ::SQLite3::Statement.new(db, "CREATE TABLE statement_test (number integer not null)") statement.stub(:step, -> { raise ::SQLite3::BusyException.new("busy") }) do - assert_called(statement, :columns, returns: []) do - assert_called(statement, :close) do - ::SQLite3::Statement.stub(:new, statement) do - error = assert_raises ActiveRecord::StatementInvalid do - @conn.exec_query "select * from statement_test" - end - assert_equal @conn.pool, error.connection_pool + assert_called(statement, :close) do + ::SQLite3::Statement.stub(:new, statement) do + error = assert_raises ActiveRecord::StatementInvalid do + @conn.exec_query "select * from statement_test" end + assert_equal @conn.pool, error.connection_pool end end end diff --git a/activerecord/test/cases/calculations_test.rb b/activerecord/test/cases/calculations_test.rb index 8a4061fe32665..9c5e4245cc272 100644 --- a/activerecord/test/cases/calculations_test.rb +++ b/activerecord/test/cases/calculations_test.rb @@ -303,12 +303,12 @@ def test_no_limit_no_offset end def test_count_on_invalid_columns_raises - e = assert_raises(ActiveRecord::StatementInvalid) { + error = assert_raises(ActiveRecord::StatementInvalid) do Account.select("credit_limit, firm_name").count - } + end - assert_match %r{accounts}i, e.sql - assert_match "credit_limit, firm_name", e.sql + assert_match %r{accounts}i, error.sql + assert_match "credit_limit, firm_name", error.sql end def test_apply_distinct_in_count diff --git a/activerecord/test/cases/query_cache_test.rb b/activerecord/test/cases/query_cache_test.rb index 3b636bf7b2d05..556b3e620bb1a 100644 --- a/activerecord/test/cases/query_cache_test.rb +++ b/activerecord/test/cases/query_cache_test.rb @@ -493,7 +493,8 @@ def test_cache_is_available_when_using_a_not_connected_connection assert_not_predicate Task, :connected? Task.cache do - assert_queries_count(1) { Task.find(1); Task.find(1) } + assert_queries_count(1) { Task.find(1) } + assert_no_queries { Task.find(1) } ensure ActiveRecord::Base.establish_connection(original_connection) end From f9f7debc2678dd17d7ba1f246f07480dd4281416 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Mon, 29 Jul 2024 12:54:22 +0200 Subject: [PATCH 2/4] Refactor `PostgresqlAdapter#raw_execute` to be less unique The postgres adapter used to be more complex than the others to be able to pass the prepared statement key to the `log` method. If we instead add it to the payload later, we can simplify the method further and it opens the door to refactor `log` and `with_raw_connection` out of `raw_execute`. --- .../connection_adapters/abstract_adapter.rb | 3 +- .../postgresql/database_statements.rb | 65 ++++++++----------- .../connection_adapters/postgresql_adapter.rb | 3 +- .../adapters/postgresql/connection_test.rb | 7 +- .../adapters/trilogy/trilogy_adapter_test.rb | 2 - 5 files changed, 35 insertions(+), 45 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index de4c5add79334..bcd5570a63807 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -1118,14 +1118,13 @@ def translate_exception_class(native_error, sql, binds) active_record_error end - def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = nil, async: false, &block) # :doc: + def log(sql, name = "SQL", binds = [], type_casted_binds = [], async: false, &block) # :doc: @instrumenter.instrument( "sql.active_record", sql: sql, name: name, binds: binds, type_casted_binds: type_casted_binds, - statement_name: statement_name, async: async, connection: self, transaction: current_transaction.user_transaction.presence, diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb b/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb index 0c6adb1144466..a463088734a9a 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb @@ -137,49 +137,40 @@ def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retr type_casted_binds = type_casted_binds(binds) - begin + log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - stmt_key = prepare_statement(sql, binds, conn) if prepare - - log(sql, name, binds, type_casted_binds, stmt_key, async: async) do |notification_payload| + result = if prepare begin - result = if prepare - conn.exec_prepared(stmt_key, type_casted_binds) - elsif without_prepared_statement?(binds) - conn.async_exec(sql) - else - conn.exec_params(sql, type_casted_binds) + stmt_key = prepare_statement(sql, binds, conn) + notification_payload[:statement_name] = stmt_key + conn.exec_prepared(stmt_key, type_casted_binds) + rescue PG::FeatureNotSupported => error + if is_cached_plan_failure?(error) + # Nothing we can do if we are in a transaction because all commands + # will raise InFailedSQLTransaction + if in_transaction? + raise PreparedStatementCacheExpired.new(error.message, connection_pool: @pool) + else + @lock.synchronize do + # outside of transactions we can simply flush this query and retry + @statements.delete sql_key(sql) + end + retry + end end - rescue => original_exception - # Contrary to all other adapters we have to enter `with_raw_connection` before `log` - # so that we can prepare the statement and pass the key to `log`. - # So we need to translate exceptions ourselves. - raise translate_exception_class(original_exception, sql, binds) - end - verified! - handle_warnings(result) - notification_payload[:row_count] = result.count - result - end - end - rescue ActiveRecord::StatementInvalid => error - if prepare - raise unless is_cached_plan_failure?(error) - - # Nothing we can do if we are in a transaction because all commands - # will raise InFailedSQLTransaction - if in_transaction? - raise ActiveRecord::PreparedStatementCacheExpired.new(error.cause.message, connection_pool: @pool) - else - @lock.synchronize do - # outside of transactions we can simply flush this query and retry - @statements.delete sql_key(sql) + raise end - retry + elsif without_prepared_statement?(binds) + conn.async_exec(sql) + else + conn.exec_params(sql, type_casted_binds) end - else - raise + + verified! + handle_warnings(result) + notification_payload[:row_count] = result.count + result end end end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 1f7494e19f46c..e4e1f07158afa 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -872,8 +872,7 @@ def load_types_queries(initializer, oids) # # Check here for more details: # https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/utils/cache/plancache.c#l573 - def is_cached_plan_failure?(e) - pgerror = e.cause + def is_cached_plan_failure?(pgerror) pgerror.result.result_error_field(PG::PG_DIAG_SQLSTATE) == FEATURE_NOT_SUPPORTED && pgerror.result.result_error_field(PG::PG_DIAG_SOURCE_FUNCTION) == "RevalidateCachedQuery" rescue diff --git a/activerecord/test/cases/adapters/postgresql/connection_test.rb b/activerecord/test/cases/adapters/postgresql/connection_test.rb index 7692aa3ea0ac0..8a44912cb6d42 100644 --- a/activerecord/test/cases/adapters/postgresql/connection_test.rb +++ b/activerecord/test/cases/adapters/postgresql/connection_test.rb @@ -130,8 +130,11 @@ def test_schema_names_logs_name def test_statement_key_is_logged bind = Relation::QueryAttribute.new(nil, 1, Type::Value.new) @connection.exec_query("SELECT $1::integer", "SQL", [bind], prepare: true) - name = @subscriber.payloads.last[:statement_name] - assert name + + payload = @subscriber.payloads.find { |p| p[:sql] == "SELECT $1::integer" } + name = payload[:statement_name] + assert_not_nil name + res = @connection.exec_query("EXPLAIN (FORMAT JSON) EXECUTE #{name}(1)") plan = res.column_types["QUERY PLAN"].deserialize res.rows.first.first assert_operator plan.length, :>, 0 diff --git a/activerecord/test/cases/adapters/trilogy/trilogy_adapter_test.rb b/activerecord/test/cases/adapters/trilogy/trilogy_adapter_test.rb index c7c728965e9b2..3813f69f5f64a 100644 --- a/activerecord/test/cases/adapters/trilogy/trilogy_adapter_test.rb +++ b/activerecord/test/cases/adapters/trilogy/trilogy_adapter_test.rb @@ -222,8 +222,6 @@ class TrilogyAdapterTest < ActiveRecord::TrilogyTestCase assert_includes payload, :type_casted_binds assert_equal [], payload[:type_casted_binds] - # :stament_name is always nil and never set 🤷‍♂️ - assert_includes payload, :statement_name assert_nil payload[:statement_name] assert_not_includes payload, :cached From b57dcec6fb9145ad70ca209dec36b744350c905a Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Mon, 29 Jul 2024 15:03:17 +0200 Subject: [PATCH 3/4] Refactor Active Record connection adapters further A `raw_execute` implementation is now provided, instead adapters have to implement `perform_query`. It's a much simpler method that no longer need to concern itself with Active Support notifications nor calling `with_raw_connection`. --- .../abstract/database_statements.rb | 22 +++++++ .../abstract_mysql_adapter.rb | 1 + .../mysql2/database_statements.rb | 51 +++++++-------- .../postgresql/database_statements.rb | 61 ++++++++---------- .../sqlite3/database_statements.rb | 64 +++++++++---------- .../trilogy/database_statements.rb | 22 +++---- 6 files changed, 112 insertions(+), 109 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index dcc486b2c2b57..e30ded9dfd1b1 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -545,6 +545,28 @@ def internal_exec_query(...) # :nodoc: private # Lowest level way to execute a query. Doesn't check for illegal writes, doesn't annotate queries, yields a native result object. def raw_execute(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) + type_casted_binds = type_casted_binds(binds) + notification_payload = { + sql: sql, + name: name, + binds: binds, + type_casted_binds: type_casted_binds, + async: async, + connection: self, + transaction: current_transaction.user_transaction.presence, + statement_name: nil, + row_count: 0, + } + @instrumenter.instrument("sql.active_record", notification_payload) do + with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| + perform_query(conn, sql, binds, type_casted_binds, prepare: prepare, notification_payload: notification_payload) + end + rescue ActiveRecord::StatementInvalid => ex + raise ex.set_query(sql, binds) + end + end + + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) raise NotImplementedError end diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb index 8d65732b5e823..7866909e5a179 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb @@ -773,6 +773,7 @@ def warning_ignored?(warning) # Make sure we carry over any changes to ActiveRecord.default_timezone that have been # made since we established the connection def sync_timezone_changes(raw_connection) + raise NotImplementedError end # See https://dev.mysql.com/doc/mysql-errors/en/server-error-reference.html diff --git a/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb b/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb index bd1026d98ffd0..8d051d9ab3e83 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb @@ -58,37 +58,34 @@ def with_multi_statements end end - def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true) - log(sql, name, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - sync_timezone_changes(conn) - - result = if prepare - stmt = @statements[sql] ||= conn.prepare(sql) - - begin - ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - stmt.execute(*type_casted_binds) - end - rescue ::Mysql2::Error - @statements.delete(sql) - stmt.close - raise - end - verified! - else - conn.query(sql) - end + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) + sync_timezone_changes(raw_connection) - @affected_rows_before_warnings = conn.affected_rows - conn.abandon_results! + result = if prepare + stmt = @statements[sql] ||= raw_connection.prepare(sql) - verified! - handle_warnings(sql) - notification_payload[:row_count] = result&.size || 0 - result + begin + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + stmt.execute(*type_casted_binds) + end + rescue ::Mysql2::Error + @statements.delete(sql) + stmt.close + raise end + verified! + else + raw_connection.query(sql) end + + notification_payload[:row_count] = result&.size || 0 + + @affected_rows_before_warnings = raw_connection.affected_rows + raw_connection.abandon_results! + + verified! + handle_warnings(sql) + result end def cast_result(result) diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb b/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb index a463088734a9a..7a1c3a9b918db 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb @@ -132,47 +132,40 @@ def cancel_any_running_query rescue PG::Error end - def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true) + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) update_typemap_for_default_timezone - - type_casted_binds = type_casted_binds(binds) - - log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - result = if prepare - begin - stmt_key = prepare_statement(sql, binds, conn) - notification_payload[:statement_name] = stmt_key - conn.exec_prepared(stmt_key, type_casted_binds) - rescue PG::FeatureNotSupported => error - if is_cached_plan_failure?(error) - # Nothing we can do if we are in a transaction because all commands - # will raise InFailedSQLTransaction - if in_transaction? - raise PreparedStatementCacheExpired.new(error.message, connection_pool: @pool) - else - @lock.synchronize do - # outside of transactions we can simply flush this query and retry - @statements.delete sql_key(sql) - end - retry - end + result = if prepare + begin + stmt_key = prepare_statement(sql, binds, raw_connection) + notification_payload[:statement_name] = stmt_key + raw_connection.exec_prepared(stmt_key, type_casted_binds) + rescue PG::FeatureNotSupported => error + if is_cached_plan_failure?(error) + # Nothing we can do if we are in a transaction because all commands + # will raise InFailedSQLTransaction + if in_transaction? + raise PreparedStatementCacheExpired.new(error.message, connection_pool: @pool) + else + @lock.synchronize do + # outside of transactions we can simply flush this query and retry + @statements.delete sql_key(sql) end - - raise + retry end - elsif without_prepared_statement?(binds) - conn.async_exec(sql) - else - conn.exec_params(sql, type_casted_binds) end - verified! - handle_warnings(result) - notification_payload[:row_count] = result.count - result + raise end + elsif without_prepared_statement?(binds) + raw_connection.async_exec(sql) + else + raw_connection.exec_params(sql, type_casted_binds) end + + verified! + handle_warnings(result) + notification_payload[:row_count] = result.count + result end def cast_result(result) diff --git a/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb b/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb index 4ad812001bd57..49a741c07819b 100644 --- a/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb @@ -86,46 +86,40 @@ def internal_begin_transaction(mode, isolation) end end - def raw_execute(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) # :nodoc: - type_casted_binds = type_casted_binds(binds) - - result = log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| - with_raw_connection(materialize_transactions: materialize_transactions) do |conn| - if prepare - stmt = @statements[sql] ||= conn.prepare(sql) - stmt.reset! + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) + if prepare + stmt = @statements[sql] ||= raw_connection.prepare(sql) + stmt.reset! + stmt.bind_params(type_casted_binds) + + result = if stmt.column_count.zero? # No return + stmt.step + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(stmt.columns, stmt.to_a) + end + else + # Don't cache statements if they are not prepared. + stmt = raw_connection.prepare(sql) + begin + unless without_prepared_statement?(binds) stmt.bind_params(type_casted_binds) - - result = if stmt.column_count.zero? # No return - stmt.step - ActiveRecord::Result.empty - else - ActiveRecord::Result.new(stmt.columns, stmt.to_a) - end + end + result = if stmt.column_count.zero? # No return + stmt.step + ActiveRecord::Result.empty else - # Don't cache statements if they are not prepared. - stmt = conn.prepare(sql) - begin - unless without_prepared_statement?(binds) - stmt.bind_params(type_casted_binds) - end - result = if stmt.column_count.zero? # No return - stmt.step - ActiveRecord::Result.empty - else - ActiveRecord::Result.new(stmt.columns, stmt.to_a) - end - ensure - stmt.close - end + ActiveRecord::Result.new(stmt.columns, stmt.to_a) end - @last_affected_rows = @raw_connection.changes - verified! - - notification_payload[:row_count] = result.length - result + ensure + stmt.close end end + @last_affected_rows = raw_connection.changes + verified! + + notification_payload[:row_count] = result.length + result end def cast_result(result) diff --git a/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb b/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb index 4c7b1b78ba818..94b17813a4b71 100644 --- a/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb @@ -10,20 +10,16 @@ def exec_insert(sql, name, binds, pk = nil, sequence_name = nil, returning: nil) end private - def raw_execute(sql, name, binds = nil, prepare: false, async: false, allow_retry: false, materialize_transactions: true) - log(sql, name, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - sync_timezone_changes(conn) - result = conn.query(sql) - while conn.more_results_exist? - conn.next_result - end - verified! - handle_warnings(sql) - notification_payload[:row_count] = result.count - result - end + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) + sync_timezone_changes(raw_connection) + result = raw_connection.query(sql) + while raw_connection.more_results_exist? + raw_connection.next_result end + verified! + handle_warnings(sql) + notification_payload[:row_count] = result.count + result end def cast_result(result) From 8078ebc26dbdbcebbe1a8d41d28b6d4e97308be8 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Mon, 29 Jul 2024 15:22:24 +0200 Subject: [PATCH 4/4] Simplify some `perform_query` implementation Inline simple operations like reseting the timezone. --- .../abstract_mysql_adapter.rb | 6 ------ .../mysql2/database_statements.rb | 8 +++----- .../connection_adapters/postgresql_adapter.rb | 4 ++-- .../trilogy/database_statements.rb | 18 ++++++++---------- 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb index 7866909e5a179..e6af17084ba8a 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb @@ -770,12 +770,6 @@ def warning_ignored?(warning) warning.level == "Note" || super end - # Make sure we carry over any changes to ActiveRecord.default_timezone that have been - # made since we established the connection - def sync_timezone_changes(raw_connection) - raise NotImplementedError - end - # See https://dev.mysql.com/doc/mysql-errors/en/server-error-reference.html ER_DB_CREATE_EXISTS = 1007 ER_FILSORT_ABORT = 1028 diff --git a/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb b/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb index 8d051d9ab3e83..47ff5051bb331 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb @@ -14,10 +14,6 @@ def select_all(*, **) # :nodoc: end private - def sync_timezone_changes(raw_connection) - raw_connection.query_options[:database_timezone] = default_timezone - end - def execute_batch(statements, name = nil) combine_multi_statements(statements).each do |statement| with_raw_connection do |conn| @@ -59,7 +55,9 @@ def with_multi_statements end def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) - sync_timezone_changes(raw_connection) + # Make sure we carry over any changes to ActiveRecord.default_timezone that have been + # made since we established the connection + raw_connection.query_options[:database_timezone] = default_timezone result = if prepare stmt = @statements[sql] ||= raw_connection.prepare(sql) diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index e4e1f07158afa..241db5c62286e 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -981,9 +981,9 @@ def reconfigure_connection_timezone # If using Active Record's time zone support configure the connection # to return TIMESTAMP WITH ZONE types in UTC. if default_timezone == :utc - internal_execute("SET SESSION timezone TO 'UTC'", "SCHEMA") + raw_execute("SET SESSION timezone TO 'UTC'", "SCHEMA") else - internal_execute("SET SESSION timezone TO DEFAULT", "SCHEMA") + raw_execute("SET SESSION timezone TO DEFAULT", "SCHEMA") end end diff --git a/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb b/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb index 94b17813a4b71..3c8841de8ef76 100644 --- a/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb @@ -11,7 +11,14 @@ def exec_insert(sql, name, binds, pk = nil, sequence_name = nil, returning: nil) private def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) - sync_timezone_changes(raw_connection) + # Make sure we carry over any changes to ActiveRecord.default_timezone that have been + # made since we established the connection + if default_timezone == :local + raw_connection.query_flags |= ::Trilogy::QUERY_FLAGS_LOCAL_TIMEZONE + else + raw_connection.query_flags &= ~::Trilogy::QUERY_FLAGS_LOCAL_TIMEZONE + end + result = raw_connection.query(sql) while raw_connection.more_results_exist? raw_connection.next_result @@ -42,15 +49,6 @@ def last_inserted_id(result) end end - def sync_timezone_changes(conn) - # Sync any changes since connection last established. - if default_timezone == :local - conn.query_flags |= ::Trilogy::QUERY_FLAGS_LOCAL_TIMEZONE - else - conn.query_flags &= ~::Trilogy::QUERY_FLAGS_LOCAL_TIMEZONE - end - end - def execute_batch(statements, name = nil) combine_multi_statements(statements).each do |statement| with_raw_connection do |conn|