diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index 1ceaa588430b0..e30ded9dfd1b1 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -163,14 +163,14 @@ def exec_insert(sql, name = nil, binds = [], pk = nil, sequence_name = nil, retu # +binds+ as the bind substitutes. +name+ is logged along with # the executed +sql+ statement. def exec_delete(sql, name = nil, binds = []) - internal_exec_query(sql, name, binds) + affected_rows(internal_execute(sql, name, binds)) end # Executes update +sql+ statement in the context of this connection using # +binds+ as the bind substitutes. +name+ is logged along with # the executed +sql+ statement. def exec_update(sql, name = nil, binds = []) - internal_exec_query(sql, name, binds) + affected_rows(internal_execute(sql, name, binds)) end def exec_insert_all(sql, name) # :nodoc: @@ -532,30 +532,79 @@ def high_precision_current_timestamp HIGH_PRECISION_CURRENT_TIMESTAMP end - def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) # :nodoc: - raise NotImplementedError + # Same as raw_execute but returns an ActiveRecord::Result object. + def raw_exec_query(...) # :nodoc: + cast_result(raw_execute(...)) + end + + # Execute a query and returns an ActiveRecord::Result + def internal_exec_query(...) # :nodoc: + cast_result(internal_execute(...)) end private - def internal_execute(sql, name = "SCHEMA", allow_retry: false, materialize_transactions: true) - sql = transform_query(sql) - check_if_write_query(sql) + # Lowest level way to execute a query. Doesn't check for illegal writes, doesn't annotate queries, yields a native result object. + def raw_execute(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) + type_casted_binds = type_casted_binds(binds) + notification_payload = { + sql: sql, + name: name, + binds: binds, + type_casted_binds: type_casted_binds, + async: async, + connection: self, + transaction: current_transaction.user_transaction.presence, + statement_name: nil, + row_count: 0, + } + @instrumenter.instrument("sql.active_record", notification_payload) do + with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| + perform_query(conn, sql, binds, type_casted_binds, prepare: prepare, notification_payload: notification_payload) + end + rescue ActiveRecord::StatementInvalid => ex + raise ex.set_query(sql, binds) + end + end + + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) + raise NotImplementedError + end + + # Receive a native adapter result object and returns an ActiveRecord::Result object. + def cast_result(raw_result) + raise NotImplementedError + end + def affected_rows(raw_result) + raise NotImplementedError + end + + def preprocess_query(sql) + check_if_write_query(sql) mark_transaction_written_if_write(sql) - raw_execute(sql, name, allow_retry: allow_retry, materialize_transactions: materialize_transactions) + # We call tranformers after the write checks so we don't add extra parsing work. + # This means we assume no transformer whille change a read for a write + # but it would be insane to do such a thing. + ActiveRecord.query_transformers.each do |transformer| + sql = transformer.call(sql, self) + end + + sql + end + + # Same as #internal_exec_query, but yields a native adapter result + def internal_execute(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true, &block) + sql = preprocess_query(sql) + raw_execute(sql, name, binds, prepare: prepare, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions, &block) end def execute_batch(statements, name = nil) statements.each do |statement| - internal_execute(statement, name) + raw_execute(statement, name) end end - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true) - raise NotImplementedError - end - DEFAULT_INSERT_VALUE = Arel.sql("DEFAULT").freeze private_constant :DEFAULT_INSERT_VALUE @@ -637,6 +686,8 @@ def select(sql, name = nil, binds = [], prepare: false, async: false, allow_retr raise AsynchronousQueryInsideTransactionError, "Asynchronous queries are not allowed inside transactions" end + # We make sure to run query transformers on the orignal thread + sql = preprocess_query(sql) future_result = async.new( pool, sql, @@ -649,14 +700,14 @@ def select(sql, name = nil, binds = [], prepare: false, async: false, allow_retr else future_result.execute!(self) end - return future_result - end - - result = internal_exec_query(sql, name, binds, prepare: prepare, allow_retry: allow_retry) - if async - FutureResult.wrap(result) + future_result else - result + result = internal_exec_query(sql, name, binds, prepare: prepare, allow_retry: allow_retry) + if async + FutureResult.wrap(result) + else + result + end end end diff --git a/activerecord/lib/active_record/connection_adapters/abstract/quoting.rb b/activerecord/lib/active_record/connection_adapters/abstract/quoting.rb index a80b56d221d72..5d6c02ce3ca00 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/quoting.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/quoting.rb @@ -222,7 +222,7 @@ def sanitize_as_sql_comment(value) # :nodoc: private def type_casted_binds(binds) - binds.map do |value| + binds&.map do |value| if ActiveModel::Attribute === value type_cast(value.value_for_database) else diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index a4b6793999d51..bcd5570a63807 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -1106,24 +1106,25 @@ def type_map end end - def translate_exception_class(e, sql, binds) - message = "#{e.class.name}: #{e.message}" + def translate_exception_class(native_error, sql, binds) + return native_error if native_error.is_a?(ActiveRecordError) - exception = translate_exception( - e, message: message, sql: sql, binds: binds + message = "#{native_error.class.name}: #{native_error.message}" + + active_record_error = translate_exception( + native_error, message: message, sql: sql, binds: binds ) - exception.set_backtrace e.backtrace - exception + active_record_error.set_backtrace(native_error.backtrace) + active_record_error end - def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = nil, async: false, &block) # :doc: + def log(sql, name = "SQL", binds = [], type_casted_binds = [], async: false, &block) # :doc: @instrumenter.instrument( "sql.active_record", sql: sql, name: name, binds: binds, type_casted_binds: type_casted_binds, - statement_name: statement_name, async: async, connection: self, transaction: current_transaction.user_transaction.presence, @@ -1134,13 +1135,6 @@ def log(sql, name = "SQL", binds = [], type_casted_binds = [], statement_name = raise ex.set_query(sql, binds) end - def transform_query(sql) - ActiveRecord.query_transformers.each do |transformer| - sql = transformer.call(sql, self) - end - sql - end - def translate_exception(exception, message:, sql:, binds:) # override in derived class case exception @@ -1152,7 +1146,7 @@ def translate_exception(exception, message:, sql:, binds:) end def without_prepared_statement?(binds) - !prepared_statements || binds.empty? + !prepared_statements || binds.nil? || binds.empty? end def column_for(table_name, column_name) diff --git a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb index df1884a3a479f..e6af17084ba8a 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_mysql_adapter.rb @@ -197,12 +197,6 @@ def index_algorithms # HELPER METHODS =========================================== - # The two drivers have slightly different ways of yielding hashes of results, so - # this method must be implemented to provide a uniform interface. - def each_hash(result) # :nodoc: - raise NotImplementedError - end - # Must return the MySQL error number from the exception, if the exception has an # error number. def error_number(exception) # :nodoc: @@ -226,17 +220,6 @@ def disable_referential_integrity # :nodoc: # DATABASE STATEMENTS ====================================== #++ - # Mysql2Adapter doesn't have to free a result after using it, but we use this method - # to write stuff in an abstract way without concerning ourselves about whether it - # needs to be explicitly freed or not. - def execute_and_free(sql, name = nil, async: false, allow_retry: false) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - - mark_transaction_written_if_write(sql) - yield raw_execute(sql, name, async: async, allow_retry: allow_retry) - end - def begin_db_transaction # :nodoc: internal_execute("BEGIN", "TRANSACTION", allow_retry: true, materialize_transactions: false) end @@ -787,11 +770,6 @@ def warning_ignored?(warning) warning.level == "Note" || super end - # Make sure we carry over any changes to ActiveRecord.default_timezone that have been - # made since we established the connection - def sync_timezone_changes(raw_connection) - end - # See https://dev.mysql.com/doc/mysql-errors/en/server-error-reference.html ER_DB_CREATE_EXISTS = 1007 ER_FILSORT_ABORT = 1028 @@ -961,13 +939,11 @@ def configure_connection end.join(", ") # ...and send them all in one query - internal_execute("SET #{encoding} #{sql_mode_assignment} #{variable_assignments}") + raw_execute("SET #{encoding} #{sql_mode_assignment} #{variable_assignments}", "SCHEMA") end def column_definitions(table_name) # :nodoc: - execute_and_free("SHOW FULL FIELDS FROM #{quote_table_name(table_name)}", "SCHEMA") do |result| - each_hash(result) - end + internal_exec_query("SHOW FULL FIELDS FROM #{quote_table_name(table_name)}", "SCHEMA") end def create_table_info(table_name) # :nodoc: diff --git a/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb b/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb index 40deb2bf198b3..3a18161b6f549 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql/schema_statements.rb @@ -8,45 +8,43 @@ module SchemaStatements # :nodoc: def indexes(table_name) indexes = [] current_index = nil - execute_and_free("SHOW KEYS FROM #{quote_table_name(table_name)}", "SCHEMA") do |result| - each_hash(result) do |row| - if current_index != row[:Key_name] - next if row[:Key_name] == "PRIMARY" # skip the primary key - current_index = row[:Key_name] - - mysql_index_type = row[:Index_type].downcase.to_sym - case mysql_index_type - when :fulltext, :spatial - index_type = mysql_index_type - when :btree, :hash - index_using = mysql_index_type - end - - indexes << [ - row[:Table], - row[:Key_name], - row[:Non_unique].to_i == 0, - [], - lengths: {}, - orders: {}, - type: index_type, - using: index_using, - comment: row[:Index_comment].presence - ] + internal_exec_query("SHOW KEYS FROM #{quote_table_name(table_name)}", "SCHEMA").each do |row| + if current_index != row["Key_name"] + next if row["Key_name"] == "PRIMARY" # skip the primary key + current_index = row["Key_name"] + + mysql_index_type = row["Index_type"].downcase.to_sym + case mysql_index_type + when :fulltext, :spatial + index_type = mysql_index_type + when :btree, :hash + index_using = mysql_index_type end - if row[:Expression] - expression = row[:Expression].gsub("\\'", "'") - expression = +"(#{expression})" unless expression.start_with?("(") - indexes.last[-2] << expression - indexes.last[-1][:expressions] ||= {} - indexes.last[-1][:expressions][expression] = expression - indexes.last[-1][:orders][expression] = :desc if row[:Collation] == "D" - else - indexes.last[-2] << row[:Column_name] - indexes.last[-1][:lengths][row[:Column_name]] = row[:Sub_part].to_i if row[:Sub_part] - indexes.last[-1][:orders][row[:Column_name]] = :desc if row[:Collation] == "D" - end + indexes << [ + row["Table"], + row["Key_name"], + row["Non_unique"].to_i == 0, + [], + lengths: {}, + orders: {}, + type: index_type, + using: index_using, + comment: row["Index_comment"].presence + ] + end + + if expression = row["Expression"] + expression = expression.gsub("\\'", "'") + expression = +"(#{expression})" unless expression.start_with?("(") + indexes.last[-2] << expression + indexes.last[-1][:expressions] ||= {} + indexes.last[-1][:expressions][expression] = expression + indexes.last[-1][:orders][expression] = :desc if row["Collation"] == "D" + else + indexes.last[-2] << row["Column_name"] + indexes.last[-1][:lengths][row["Column_name"]] = row["Sub_part"].to_i if row["Sub_part"] + indexes.last[-1][:orders][row["Column_name"]] = :desc if row["Collation"] == "D" end end @@ -182,12 +180,12 @@ def default_type(table_name, field_name) end def new_column_from_field(table_name, field, _definitions) - field_name = field.fetch(:Field) - type_metadata = fetch_type_metadata(field[:Type], field[:Extra]) - default, default_function = field[:Default], nil + field_name = field.fetch("Field") + type_metadata = fetch_type_metadata(field["Type"], field["Extra"]) + default, default_function = field["Default"], nil if type_metadata.type == :datetime && /\ACURRENT_TIMESTAMP(?:\([0-6]?\))?\z/i.match?(default) - default = "#{default} ON UPDATE #{default}" if /on update CURRENT_TIMESTAMP/i.match?(field[:Extra]) + default = "#{default} ON UPDATE #{default}" if /on update CURRENT_TIMESTAMP/i.match?(field["Extra"]) default, default_function = nil, default elsif type_metadata.extra == "DEFAULT_GENERATED" default = +"(#{default})" unless default.start_with?("(") @@ -203,13 +201,13 @@ def new_column_from_field(table_name, field, _definitions) end MySQL::Column.new( - field[:Field], + field["Field"], default, type_metadata, - field[:Null] == "YES", + field["Null"] == "YES", default_function, - collation: field[:Collation], - comment: field[:Comment].presence + collation: field["Collation"], + comment: field["Comment"].presence ) end diff --git a/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb b/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb index b81117928556f..47ff5051bb331 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql2/database_statements.rb @@ -13,45 +13,8 @@ def select_all(*, **) # :nodoc: end end - def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) # :nodoc: - if without_prepared_statement?(binds) - execute_and_free(sql, name, async: async, allow_retry: allow_retry) do |result| - if result - build_result(columns: result.fields, rows: result.to_a) - else - build_result(columns: [], rows: []) - end - end - else - exec_stmt_and_free(sql, name, binds, cache_stmt: prepare, async: async) do |_, result| - if result - build_result(columns: result.fields, rows: result.to_a) - else - build_result(columns: [], rows: []) - end - end - end - end - - def exec_delete(sql, name = nil, binds = []) # :nodoc: - if without_prepared_statement?(binds) - with_raw_connection do |conn| - @affected_rows_before_warnings = nil - execute_and_free(sql, name) { @affected_rows_before_warnings || conn.affected_rows } - end - else - exec_stmt_and_free(sql, name, binds) { |stmt| stmt.affected_rows } - end - end - alias :exec_update :exec_delete - private - def sync_timezone_changes(raw_connection) - raw_connection.query_options[:database_timezone] = default_timezone - end - def execute_batch(statements, name = nil) - statements = statements.map { |sql| transform_query(sql) } combine_multi_statements(statements).each do |statement| with_raw_connection do |conn| raw_execute(statement, name) @@ -91,61 +54,49 @@ def with_multi_statements end end - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true) - log(sql, name, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - sync_timezone_changes(conn) - result = conn.query(sql) - conn.abandon_results! - verified! - handle_warnings(sql) - notification_payload[:row_count] = result&.size || 0 - result - end - end - end + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) + # Make sure we carry over any changes to ActiveRecord.default_timezone that have been + # made since we established the connection + raw_connection.query_options[:database_timezone] = default_timezone - def exec_stmt_and_free(sql, name, binds, cache_stmt: false, async: false) - sql = transform_query(sql) - check_if_write_query(sql) + result = if prepare + stmt = @statements[sql] ||= raw_connection.prepare(sql) - mark_transaction_written_if_write(sql) + begin + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + stmt.execute(*type_casted_binds) + end + rescue ::Mysql2::Error + @statements.delete(sql) + stmt.close + raise + end + verified! + else + raw_connection.query(sql) + end - type_casted_binds = type_casted_binds(binds) + notification_payload[:row_count] = result&.size || 0 - log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| - with_raw_connection do |conn| - sync_timezone_changes(conn) + @affected_rows_before_warnings = raw_connection.affected_rows + raw_connection.abandon_results! - if cache_stmt - stmt = @statements[sql] ||= conn.prepare(sql) - else - stmt = conn.prepare(sql) - end - - begin - result = ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - stmt.execute(*type_casted_binds) - end - verified! - result - rescue ::Mysql2::Error => e - if cache_stmt - @statements.delete(sql) - else - stmt.close - end - raise e - end + verified! + handle_warnings(sql) + result + end - ret = yield stmt, result - notification_payload[:row_count] = result&.size || 0 - result.free if result - stmt.close unless cache_stmt - ret - end + def cast_result(result) + if result.nil? || result.size.zero? + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(result.fields, result.to_a) end end + + def affected_rows(result) + @affected_rows_before_warnings + end end end end diff --git a/activerecord/lib/active_record/connection_adapters/mysql2_adapter.rb b/activerecord/lib/active_record/connection_adapters/mysql2_adapter.rb index 2a85a24864958..59c4b1a78e6d5 100644 --- a/activerecord/lib/active_record/connection_adapters/mysql2_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/mysql2_adapter.rb @@ -55,6 +55,7 @@ def initialize_type_map(m) def initialize(...) super + @affected_rows_before_warnings = nil @config[:flags] ||= 0 if @config[:flags].kind_of? Array @@ -92,14 +93,6 @@ def supports_lazy_transactions? # HELPER METHODS =========================================== - def each_hash(result, &block) # :nodoc: - if block_given? - result.each(as: :hash, symbolize_keys: true, &block) - else - to_enum(:each_hash, result) - end - end - def error_number(exception) exception.error_number if exception.respond_to?(:error_number) end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb b/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb index 33f1b277477d1..7a1c3a9b918db 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/database_statements.rb @@ -12,16 +12,8 @@ def explain(arel, binds = [], options = []) # Queries the database and returns the results in an Array-like object def query(sql, name = nil) # :nodoc: - mark_transaction_written_if_write(sql) - - log(sql, name) do |notification_payload| - with_raw_connection do |conn| - result = conn.async_exec(sql).map_types!(@type_map_for_results).values - verified! - notification_payload[:row_count] = result.count - result - end - end + result = internal_execute(sql, name) + result.map_types!(@type_map_for_results).values end READ_QUERY = ActiveRecord::ConnectionAdapters::AbstractAdapter.build_read_query_regexp( @@ -50,36 +42,6 @@ def execute(...) # :nodoc: @notice_receiver_sql_warnings = [] end - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true) - log(sql, name, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - result = conn.async_exec(sql) - verified! - handle_warnings(result) - notification_payload[:row_count] = result.count - result - end - end - end - - def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false, materialize_transactions: true) # :nodoc: - execute_and_clear(sql, name, binds, prepare: prepare, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |result| - types = {} - fields = result.fields - fields.each_with_index do |fname, i| - ftype = result.ftype i - fmod = result.fmod i - types[fname] = types[i] = get_oid_type(ftype, fmod, fname) - end - build_result(columns: fields, rows: result.values, column_types: types.freeze) - end - end - - def exec_delete(sql, name = nil, binds = []) # :nodoc: - execute_and_clear(sql, name, binds) { |result| result.cmd_tuples } - end - alias :exec_update :exec_delete - def exec_insert(sql, name = nil, binds = [], pk = nil, sequence_name = nil, returning: nil) # :nodoc: if use_insert_returning? || pk == false super @@ -170,8 +132,61 @@ def cancel_any_running_query rescue PG::Error end + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) + update_typemap_for_default_timezone + result = if prepare + begin + stmt_key = prepare_statement(sql, binds, raw_connection) + notification_payload[:statement_name] = stmt_key + raw_connection.exec_prepared(stmt_key, type_casted_binds) + rescue PG::FeatureNotSupported => error + if is_cached_plan_failure?(error) + # Nothing we can do if we are in a transaction because all commands + # will raise InFailedSQLTransaction + if in_transaction? + raise PreparedStatementCacheExpired.new(error.message, connection_pool: @pool) + else + @lock.synchronize do + # outside of transactions we can simply flush this query and retry + @statements.delete sql_key(sql) + end + retry + end + end + + raise + end + elsif without_prepared_statement?(binds) + raw_connection.async_exec(sql) + else + raw_connection.exec_params(sql, type_casted_binds) + end + + verified! + handle_warnings(result) + notification_payload[:row_count] = result.count + result + end + + def cast_result(result) + types = {} + fields = result.fields + fields.each_with_index do |fname, i| + ftype = result.ftype i + fmod = result.fmod i + types[fname] = types[i] = get_oid_type(ftype, fmod, fname) + end + ar_result = ActiveRecord::Result.new(fields, result.values, types.freeze) + result.clear + ar_result + end + + def affected_rows(result) + result.cmd_tuples + end + def execute_batch(statements, name = nil) - execute(combine_multi_statements(statements)) + raw_execute(combine_multi_statements(statements), name) end def build_truncate_statements(table_names) diff --git a/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb b/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb index 7928ee58a0032..49891552dc6dd 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql/schema_statements.rb @@ -250,7 +250,7 @@ def client_min_messages # Set the client message level. def client_min_messages=(level) - internal_execute("SET client_min_messages TO '#{level}'") + internal_execute("SET client_min_messages TO '#{level}'", "SCHEMA") end # Returns the sequence name for a table's primary key or some other specified key. diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 197cd416014d6..241db5c62286e 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -405,7 +405,7 @@ def self.native_database_types # :nodoc: end def set_standard_conforming_strings - internal_execute("SET standard_conforming_strings = on") + internal_execute("SET standard_conforming_strings = on", "SCHEMA") end def supports_ddl_transactions? @@ -841,9 +841,8 @@ def get_oid_type(oid, fmod, column_name, sql_type = "") def load_additional_types(oids = nil) initializer = OID::TypeMapInitializer.new(type_map) load_types_queries(initializer, oids) do |query| - execute_and_clear(query, "SCHEMA", [], allow_retry: true, materialize_transactions: false) do |records| - initializer.run(records) - end + records = internal_execute(query, "SCHEMA", [], allow_retry: true, materialize_transactions: false) + initializer.run(records) end end @@ -864,73 +863,6 @@ def load_types_queries(initializer, oids) FEATURE_NOT_SUPPORTED = "0A000" # :nodoc: - def execute_and_clear(sql, name, binds, prepare: false, async: false, allow_retry: false, materialize_transactions: true) - sql = transform_query(sql) - check_if_write_query(sql) - - if !prepare || without_prepared_statement?(binds) - result = exec_no_cache(sql, name, binds, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions) - else - result = exec_cache(sql, name, binds, async: async, allow_retry: allow_retry, materialize_transactions: materialize_transactions) - end - begin - ret = yield result - ensure - result.clear - end - ret - end - - def exec_no_cache(sql, name, binds, async:, allow_retry:, materialize_transactions:) - mark_transaction_written_if_write(sql) - - # make sure we carry over any changes to ActiveRecord.default_timezone that have been - # made since we established the connection - update_typemap_for_default_timezone - - type_casted_binds = type_casted_binds(binds) - log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - result = conn.exec_params(sql, type_casted_binds) - verified! - notification_payload[:row_count] = result.count - result - end - end - end - - def exec_cache(sql, name, binds, async:, allow_retry:, materialize_transactions:) - mark_transaction_written_if_write(sql) - - update_typemap_for_default_timezone - - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - stmt_key = prepare_statement(sql, binds, conn) - type_casted_binds = type_casted_binds(binds) - - log(sql, name, binds, type_casted_binds, stmt_key, async: async) do |notification_payload| - result = conn.exec_prepared(stmt_key, type_casted_binds) - verified! - notification_payload[:row_count] = result.count - result - end - end - rescue ActiveRecord::StatementInvalid => e - raise unless is_cached_plan_failure?(e) - - # Nothing we can do if we are in a transaction because all commands - # will raise InFailedSQLTransaction - if in_transaction? - raise ActiveRecord::PreparedStatementCacheExpired.new(e.cause.message, connection_pool: @pool) - else - @lock.synchronize do - # outside of transactions we can simply flush this query and retry - @statements.delete sql_key(sql) - end - retry - end - end - # Annoyingly, the code for prepared statements whose return value may # have changed is FEATURE_NOT_SUPPORTED. # @@ -940,8 +872,7 @@ def exec_cache(sql, name, binds, async:, allow_retry:, materialize_transactions: # # Check here for more details: # https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/utils/cache/plancache.c#l573 - def is_cached_plan_failure?(e) - pgerror = e.cause + def is_cached_plan_failure?(pgerror) pgerror.result.result_error_field(PG::PG_DIAG_SQLSTATE) == FEATURE_NOT_SUPPORTED && pgerror.result.result_error_field(PG::PG_DIAG_SOURCE_FUNCTION) == "RevalidateCachedQuery" rescue @@ -1020,16 +951,16 @@ def configure_connection variables = @config.fetch(:variables, {}).stringify_keys # Set interval output format to ISO 8601 for ease of parsing by ActiveSupport::Duration.parse - internal_execute("SET intervalstyle = iso_8601") + internal_execute("SET intervalstyle = iso_8601", "SCHEMA") # SET statements from :variables config hash # https://www.postgresql.org/docs/current/static/sql-set.html variables.map do |k, v| if v == ":default" || v == :default # Sets the value to the global or compile default - internal_execute("SET SESSION #{k} TO DEFAULT") + internal_execute("SET SESSION #{k} TO DEFAULT", "SCHEMA") elsif !v.nil? - internal_execute("SET SESSION #{k} TO #{quote(v)}") + internal_execute("SET SESSION #{k} TO #{quote(v)}", "SCHEMA") end end @@ -1050,9 +981,9 @@ def reconfigure_connection_timezone # If using Active Record's time zone support configure the connection # to return TIMESTAMP WITH ZONE types in UTC. if default_timezone == :utc - internal_execute("SET SESSION timezone TO 'UTC'") + raw_execute("SET SESSION timezone TO 'UTC'", "SCHEMA") else - internal_execute("SET SESSION timezone TO DEFAULT") + raw_execute("SET SESSION timezone TO DEFAULT", "SCHEMA") end end @@ -1119,9 +1050,8 @@ def can_perform_case_insensitive_comparison_for?(column) AND castsource = #{quote column.sql_type}::regtype ) SQL - execute_and_clear(sql, "SCHEMA", [], allow_retry: true, materialize_transactions: false) do |result| - result.getvalue(0, 0) - end + result = internal_execute(sql, "SCHEMA", [], allow_retry: true, materialize_transactions: false) + result.getvalue(0, 0) end end end @@ -1177,9 +1107,8 @@ def add_pg_decoders FROM pg_type as t WHERE t.typname IN (%s) SQL - coders = execute_and_clear(query, "SCHEMA", [], allow_retry: true, materialize_transactions: false) do |result| - result.filter_map { |row| construct_coder(row, coders_by_name[row["typname"]]) } - end + result = internal_execute(query, "SCHEMA", [], allow_retry: true, materialize_transactions: false) + coders = result.filter_map { |row| construct_coder(row, coders_by_name[row["typname"]]) } map = PG::TypeMapByOid.new coders.each { |coder| map.add_coder(coder) } diff --git a/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb b/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb index c4cae471bbe0b..49a741c07819b 100644 --- a/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/sqlite3/database_statements.rb @@ -21,50 +21,6 @@ def explain(arel, binds = [], _options = []) SQLite3::ExplainPrettyPrinter.new.pp(result) end - def internal_exec_query(sql, name = nil, binds = [], prepare: false, async: false, allow_retry: false) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - - mark_transaction_written_if_write(sql) - - type_casted_binds = type_casted_binds(binds) - - log(sql, name, binds, type_casted_binds, async: async) do |notification_payload| - with_raw_connection do |conn| - if prepare - stmt = @statements[sql] ||= conn.prepare(sql) - cols = stmt.columns - stmt.reset! - stmt.bind_params(type_casted_binds) - records = stmt.to_a - else - # Don't cache statements if they are not prepared. - stmt = conn.prepare(sql) - begin - cols = stmt.columns - unless without_prepared_statement?(binds) - stmt.bind_params(type_casted_binds) - end - records = stmt.to_a - ensure - stmt.close - end - end - verified! - - result = build_result(columns: cols, rows: records) - notification_payload[:row_count] = result.length - result - end - end - end - - def exec_delete(sql, name = "SQL", binds = []) # :nodoc: - internal_exec_query(sql, name, binds) - @raw_connection.changes - end - alias :exec_update :exec_delete - def begin_deferred_transaction(isolation = nil) # :nodoc: internal_begin_transaction(:deferred, isolation) end @@ -104,6 +60,12 @@ def high_precision_current_timestamp HIGH_PRECISION_CURRENT_TIMESTAMP end + def execute(...) # :nodoc: + # SQLite3Adapter was refactored to use ActiveRecord::Result internally + # but for backward compatibility we have to keep returning arrays of hashes here + super&.to_a + end + private def internal_begin_transaction(mode, isolation) if isolation @@ -124,15 +86,50 @@ def internal_begin_transaction(mode, isolation) end end - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: false) - log(sql, name, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - result = conn.execute(sql) - verified! - notification_payload[:row_count] = result.length - result + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) + if prepare + stmt = @statements[sql] ||= raw_connection.prepare(sql) + stmt.reset! + stmt.bind_params(type_casted_binds) + + result = if stmt.column_count.zero? # No return + stmt.step + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(stmt.columns, stmt.to_a) + end + else + # Don't cache statements if they are not prepared. + stmt = raw_connection.prepare(sql) + begin + unless without_prepared_statement?(binds) + stmt.bind_params(type_casted_binds) + end + result = if stmt.column_count.zero? # No return + stmt.step + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(stmt.columns, stmt.to_a) + end + ensure + stmt.close end end + @last_affected_rows = raw_connection.changes + verified! + + notification_payload[:row_count] = result.length + result + end + + def cast_result(result) + # Given that SQLite3 doesn't really a Result type, raw_execute already return an ActiveRecord::Result + # and we have nothing to cast here. + result + end + + def affected_rows(result) + @last_affected_rows end def reset_read_uncommitted @@ -143,18 +140,12 @@ def reset_read_uncommitted end def execute_batch(statements, name = nil) - statements = statements.map { |sql| transform_query(sql) } sql = combine_multi_statements(statements) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - log(sql, name) do |notification_payload| with_raw_connection do |conn| - result = conn.execute_batch2(sql) + conn.execute_batch2(sql) verified! - notification_payload[:row_count] = result.length - result end end end diff --git a/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb b/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb index 98c81d77a1b82..d40a781cb4d7e 100644 --- a/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/sqlite3_adapter.rb @@ -119,6 +119,7 @@ def initialize(...) end end + @last_affected_rows = nil @config[:strict] = ConnectionAdapters::SQLite3Adapter.strict_strings_by_default unless @config.key?(:strict) @connection_parameters = @config.merge( database: @config[:database].to_s, diff --git a/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb b/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb index 2f2b48a8bb294..3c8841de8ef76 100644 --- a/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/trilogy/database_statements.rb @@ -4,52 +4,43 @@ module ActiveRecord module ConnectionAdapters module Trilogy module DatabaseStatements - def internal_exec_query(sql, name = "SQL", binds = [], prepare: false, async: false, allow_retry: false) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - - result = raw_execute(sql, name, async: async, allow_retry: allow_retry) - ActiveRecord::Result.new(result.fields, result.to_a) - end - def exec_insert(sql, name, binds, pk = nil, sequence_name = nil, returning: nil) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - sql, _binds = sql_for_insert(sql, pk, binds, returning) - raw_execute(sql, name) + internal_execute(sql, name) end - def exec_delete(sql, name = nil, binds = []) # :nodoc: - sql = transform_query(sql) - check_if_write_query(sql) - mark_transaction_written_if_write(sql) - - result = raw_execute(to_sql(sql, binds), name) - result.affected_rows - end + private + def perform_query(raw_connection, sql, binds, type_casted_binds, prepare:, notification_payload:) + # Make sure we carry over any changes to ActiveRecord.default_timezone that have been + # made since we established the connection + if default_timezone == :local + raw_connection.query_flags |= ::Trilogy::QUERY_FLAGS_LOCAL_TIMEZONE + else + raw_connection.query_flags &= ~::Trilogy::QUERY_FLAGS_LOCAL_TIMEZONE + end - alias :exec_update :exec_delete # :nodoc: + result = raw_connection.query(sql) + while raw_connection.more_results_exist? + raw_connection.next_result + end + verified! + handle_warnings(sql) + notification_payload[:row_count] = result.count + result + end - private - def raw_execute(sql, name, async: false, allow_retry: false, materialize_transactions: true) - log(sql, name, async: async) do |notification_payload| - with_raw_connection(allow_retry: allow_retry, materialize_transactions: materialize_transactions) do |conn| - sync_timezone_changes(conn) - result = conn.query(sql) - while conn.more_results_exist? - conn.next_result - end - verified! - handle_warnings(sql) - notification_payload[:row_count] = result.count - result - end + def cast_result(result) + if result.count.zero? + ActiveRecord::Result.empty + else + ActiveRecord::Result.new(result.fields, result.rows) end end + def affected_rows(result) + result.affected_rows + end + def last_inserted_id(result) if supports_insert_returning? super @@ -58,17 +49,7 @@ def last_inserted_id(result) end end - def sync_timezone_changes(conn) - # Sync any changes since connection last established. - if default_timezone == :local - conn.query_flags |= ::Trilogy::QUERY_FLAGS_LOCAL_TIMEZONE - else - conn.query_flags &= ~::Trilogy::QUERY_FLAGS_LOCAL_TIMEZONE - end - end - def execute_batch(statements, name = nil) - statements = statements.map { |sql| transform_query(sql) } combine_multi_statements(statements).each do |statement| with_raw_connection do |conn| raw_execute(statement, name) diff --git a/activerecord/lib/active_record/connection_adapters/trilogy_adapter.rb b/activerecord/lib/active_record/connection_adapters/trilogy_adapter.rb index 60083681575d3..f55c8fcfddf49 100644 --- a/activerecord/lib/active_record/connection_adapters/trilogy_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/trilogy_adapter.rb @@ -149,23 +149,6 @@ def text_type?(type) TYPE_MAP.lookup(type).is_a?(Type::String) || TYPE_MAP.lookup(type).is_a?(Type::Text) end - def each_hash(result) - return to_enum(:each_hash, result) unless block_given? - - keys = result.fields.map(&:to_sym) - result.rows.each do |row| - hash = {} - idx = 0 - row.each do |value| - hash[keys[idx]] = value - idx += 1 - end - yield hash - end - - nil - end - def error_number(exception) exception.error_code if exception.respond_to?(:error_code) end diff --git a/activerecord/lib/active_record/future_result.rb b/activerecord/lib/active_record/future_result.rb index f55473ff2cd89..289fc6f761dcc 100644 --- a/activerecord/lib/active_record/future_result.rb +++ b/activerecord/lib/active_record/future_result.rb @@ -163,7 +163,7 @@ def execute_query(connection, async: false) end def exec_query(connection, *args, **kwargs) - connection.internal_exec_query(*args, **kwargs) + connection.raw_exec_query(*args, **kwargs) end class SelectAll < FutureResult # :nodoc: diff --git a/activerecord/test/cases/adapters/postgresql/connection_test.rb b/activerecord/test/cases/adapters/postgresql/connection_test.rb index 7692aa3ea0ac0..8a44912cb6d42 100644 --- a/activerecord/test/cases/adapters/postgresql/connection_test.rb +++ b/activerecord/test/cases/adapters/postgresql/connection_test.rb @@ -130,8 +130,11 @@ def test_schema_names_logs_name def test_statement_key_is_logged bind = Relation::QueryAttribute.new(nil, 1, Type::Value.new) @connection.exec_query("SELECT $1::integer", "SQL", [bind], prepare: true) - name = @subscriber.payloads.last[:statement_name] - assert name + + payload = @subscriber.payloads.find { |p| p[:sql] == "SELECT $1::integer" } + name = payload[:statement_name] + assert_not_nil name + res = @connection.exec_query("EXPLAIN (FORMAT JSON) EXECUTE #{name}(1)") plan = res.column_types["QUERY PLAN"].deserialize res.rows.first.first assert_operator plan.length, :>, 0 diff --git a/activerecord/test/cases/adapters/sqlite3/sqlite3_adapter_test.rb b/activerecord/test/cases/adapters/sqlite3/sqlite3_adapter_test.rb index 7e123935a0f01..ac55259ab76fb 100644 --- a/activerecord/test/cases/adapters/sqlite3/sqlite3_adapter_test.rb +++ b/activerecord/test/cases/adapters/sqlite3/sqlite3_adapter_test.rb @@ -926,14 +926,12 @@ def test_statement_closed statement = ::SQLite3::Statement.new(db, "CREATE TABLE statement_test (number integer not null)") statement.stub(:step, -> { raise ::SQLite3::BusyException.new("busy") }) do - assert_called(statement, :columns, returns: []) do - assert_called(statement, :close) do - ::SQLite3::Statement.stub(:new, statement) do - error = assert_raises ActiveRecord::StatementInvalid do - @conn.exec_query "select * from statement_test" - end - assert_equal @conn.pool, error.connection_pool + assert_called(statement, :close) do + ::SQLite3::Statement.stub(:new, statement) do + error = assert_raises ActiveRecord::StatementInvalid do + @conn.exec_query "select * from statement_test" end + assert_equal @conn.pool, error.connection_pool end end end diff --git a/activerecord/test/cases/adapters/trilogy/trilogy_adapter_test.rb b/activerecord/test/cases/adapters/trilogy/trilogy_adapter_test.rb index c7c728965e9b2..3813f69f5f64a 100644 --- a/activerecord/test/cases/adapters/trilogy/trilogy_adapter_test.rb +++ b/activerecord/test/cases/adapters/trilogy/trilogy_adapter_test.rb @@ -222,8 +222,6 @@ class TrilogyAdapterTest < ActiveRecord::TrilogyTestCase assert_includes payload, :type_casted_binds assert_equal [], payload[:type_casted_binds] - # :stament_name is always nil and never set 🤷‍♂️ - assert_includes payload, :statement_name assert_nil payload[:statement_name] assert_not_includes payload, :cached diff --git a/activerecord/test/cases/calculations_test.rb b/activerecord/test/cases/calculations_test.rb index 8a4061fe32665..9c5e4245cc272 100644 --- a/activerecord/test/cases/calculations_test.rb +++ b/activerecord/test/cases/calculations_test.rb @@ -303,12 +303,12 @@ def test_no_limit_no_offset end def test_count_on_invalid_columns_raises - e = assert_raises(ActiveRecord::StatementInvalid) { + error = assert_raises(ActiveRecord::StatementInvalid) do Account.select("credit_limit, firm_name").count - } + end - assert_match %r{accounts}i, e.sql - assert_match "credit_limit, firm_name", e.sql + assert_match %r{accounts}i, error.sql + assert_match "credit_limit, firm_name", error.sql end def test_apply_distinct_in_count diff --git a/activerecord/test/cases/query_cache_test.rb b/activerecord/test/cases/query_cache_test.rb index 3b636bf7b2d05..556b3e620bb1a 100644 --- a/activerecord/test/cases/query_cache_test.rb +++ b/activerecord/test/cases/query_cache_test.rb @@ -493,7 +493,8 @@ def test_cache_is_available_when_using_a_not_connected_connection assert_not_predicate Task, :connected? Task.cache do - assert_queries_count(1) { Task.find(1); Task.find(1) } + assert_queries_count(1) { Task.find(1) } + assert_no_queries { Task.find(1) } ensure ActiveRecord::Base.establish_connection(original_connection) end