diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp index 73b0e09215..8a6339234a 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.cpp @@ -178,7 +178,6 @@ void KafkaSource::parseMessage(void * rkmessage, size_t /*total_count*/, void * void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) { assert(format_executor); - assert(convert_non_virtual_to_physical_action); ReadBufferFromMemory buffer(static_cast(kmessage->payload), kmessage->len); auto new_rows = format_executor->execute(buffer); @@ -196,9 +195,7 @@ void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage) if (!new_rows) return; - auto result_block = non_virtual_header.cloneWithColumns(format_executor->getResultColumns()); - convert_non_virtual_to_physical_action->execute(result_block); - + auto result_block = physical_header.cloneWithColumns(format_executor->getResultColumns()); MutableColumns new_data(result_block.mutateColumns()); if (!request_virtual_columns) @@ -271,20 +268,13 @@ void KafkaSource::initFormatExecutor() kafka.getFormatSettings(query_context)); format_executor = std::make_unique( - non_virtual_header, + physical_header, std::move(input_format), [this](const MutableColumns &, Exception & ex) -> size_t { format_error = ex.what(); return 0; }); - - auto converting_dag = ActionsDAG::makeConvertingActions( - non_virtual_header.cloneEmpty().getColumnsWithTypeAndName(), - physical_header.cloneEmpty().getColumnsWithTypeAndName(), - ActionsDAG::MatchColumnsMode::Name); - - convert_non_virtual_to_physical_action = std::make_shared(std::move(converting_dag)); } void KafkaSource::calculateColumnPositions() diff --git a/src/Storages/ExternalStream/Kafka/KafkaSource.h b/src/Storages/ExternalStream/Kafka/KafkaSource.h index 83cd1d3ab9..1971a28b28 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSource.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSource.h @@ -66,8 +66,6 @@ class KafkaSource final : public Streaming::ISource Block physical_header; Chunk header_chunk; - std::shared_ptr convert_non_virtual_to_physical_action = nullptr; - std::unique_ptr format_executor; ReadBufferFromMemory read_buffer;