diff --git a/src/Storages/ExternalStream/ExternalStreamCounter.h b/src/Storages/ExternalStream/ExternalStreamCounter.h index c8d8a941df..b46cfddbe2 100644 --- a/src/Storages/ExternalStream/ExternalStreamCounter.h +++ b/src/Storages/ExternalStream/ExternalStreamCounter.h @@ -16,6 +16,7 @@ class ExternalStreamCounter inline uint64_t getWriteFailed() const { return write_failed.load(); } inline uint64_t getMessageSizeLimit() const { return messages_by_row.load(); } inline uint64_t getMessageRowsLimit() const { return messages_by_row.load(); } + inline uint64_t getOversizedMessageCount() const { return oversized_message_count.load(); } inline void addToReadBytes(uint64_t bytes) { read_bytes.fetch_add(bytes); } inline void addToReadCounts(uint64_t counts) { read_counts.fetch_add(counts); } @@ -25,6 +26,7 @@ class ExternalStreamCounter inline void addToWriteFailed(uint64_t amount) { write_failed.fetch_add(amount); } inline void addToMessagesBySize(uint64_t counts) { messages_by_size.fetch_add(counts); } inline void addToMessagesByRow(uint64_t counts) { messages_by_row.fetch_add(counts); } + inline void addToOversizedMessageCount(uint64_t counts) { oversized_message_count.fetch_add(counts); } std::map getCounters() const { @@ -37,6 +39,7 @@ class ExternalStreamCounter {"WriteFailed", write_failed.load()}, {"MessagesBySize", messages_by_size.load()}, {"MessagesByRow", messages_by_row.load()}, + {"OversizedMessageCount", oversized_message_count.load()}, }; } @@ -51,6 +54,8 @@ class ExternalStreamCounter std::atomic messages_by_size; /// Number of Kafka messages generated by reaching the `kafka_max_message_rows` limit. std::atomic messages_by_row; + /// Number of messages whose size exceeds the `kafka_max_message_size` limit. + std::atomic oversized_message_count; }; using ExternalStreamCounterPtr = std::shared_ptr; diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.cpp b/src/Storages/ExternalStream/Kafka/KafkaSink.cpp index 7f8420e1b0..f7cfa6bbf7 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.cpp +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.cpp @@ -140,8 +140,8 @@ KafkaSink::KafkaSink( { /// If the buffer_size (kafka_max_message_size) is reached, the buffer will be forced to flush. wb = std::make_unique( - [this](char * pos, size_t len, size_t total_len) { addMessageToBatch(pos, len, total_len); }, - [this]() { tryCarryOverPendingData(); }, + /*on_next=*/ [this](char * pos, size_t len, size_t total_len) { addMessageToBatch(pos, len, total_len); }, + /*after_next=*/ [this]() { tryCarryOverPendingData(); }, /*buffer_size=*/ context->getSettingsRef().kafka_max_message_size.value); const auto & data_format = kafka.dataFormat(); @@ -235,20 +235,29 @@ KafkaSink::KafkaSink( /// However, it's still possible that, one single row is still too big and it exceeds that limit. There is nothing we can do about it for now. void KafkaSink::addMessageToBatch(char * pos, size_t len, size_t total_len) { - auto pending_size = pending_data.size(); + auto pending_size = pending_data.offset(); /// There are complete data to consume. if (len > 0) { StringRef key = message_key_expr ? keys_for_current_batch[current_batch_row++] : ""; + nlog::ByteVector payload; + /// Data at pos (which is in the WriteBuffer) will be overwritten, thus it must be kept somewhere else (in `batch_payload`). auto msg_size = pending_size + len; - nlog::ByteVector payload{msg_size}; + if (!oversized_payload.empty()) [[unlikely]] + { + msg_size += oversized_payload.size(); + payload.swap(oversized_payload); + external_stream_counter->addToOversizedMessageCount(1); + } payload.resize(msg_size); /// set the size to the right value + if (pending_size) - memcpy(payload.data(), pending_data.data(), pending_size); - memcpy(payload.data() + pending_size, pos, len); + memcpy(payload.data() + (msg_size - len - pending_size), pending_data.buffer().begin(), pending_size); + + memcpy(payload.data() + (msg_size - len) + pending_size, pos, len); current_batch.push_back(rd_kafka_message_t{ .partition = next_partition, @@ -262,7 +271,10 @@ void KafkaSink::addMessageToBatch(char * pos, size_t len, size_t total_len) batch_payload.push_back(std::move(payload)); ++state.outstandings; - pending_data.resize(0); + if (len < total_len) + external_stream_counter->addToMessagesBySize(1); + + pending_data.next(); /// reset pending_data pending_size = 0; rows_in_current_message = 0; } @@ -271,24 +283,40 @@ void KafkaSink::addMessageToBatch(char * pos, size_t len, size_t total_len) /// Nothing left return; - /// There are some remaining incomplete data, copy them to pending_data. + /// There are some remaining incomplete data, a few scenarios need to handle auto remaining = total_len - len; - pending_data.resize(pending_size + remaining); - memcpy(pending_data.data() + pending_size, pos + len, remaining); + auto * remaining_pos = pos + len; + + /// Scenario 1 - There is buffered oversized data, then it should append the remaining data to it. + if (!oversized_payload.empty()) [[unlikely]] + { + auto n = oversized_payload.size(); + oversized_payload.resize(n + remaining); + memcpy(oversized_payload.data() + n, remaining_pos, remaining); + return; + } + + /// Scenario 2 - No buffered oversized data, but pending data will be full after appending the remaining data. + /// In this case, we move all the data to oversized_payload, so that we don't need to resize pending_data. + /// This is important because keeping pending_data have the same size as `wb` allows us to swap them + /// instead of copying data from pending_data to `wb` again. + if (pending_size + remaining >= pending_data.available()) [[unlikely]] + { + oversized_payload.resize(pending_size + remaining); + memcpy(oversized_payload.data(), pending_data.buffer().begin(), pending_size); + memcpy(oversized_payload.data() + pending_size, remaining_pos, remaining); + pending_data.next(); /// reset pending_data + return; + } - external_stream_counter->addToMessagesBySize(1); + /// Scenario 3 - No buffered oversized data, and the remaining can fit into pending_data. + pending_data.write(remaining_pos, remaining); } void KafkaSink::tryCarryOverPendingData() { - /// If there are pending data and it can be fit into the buffer, then write the data back to the buffer, - /// so that we can use the buffer to limit the message size. - /// If the pending data are too big, that means we get a over-size row. - if (!pending_data.empty() && pending_data.size() < wb->available()) - { - wb->write(pending_data.data(), pending_data.size()); - pending_data.resize(0); - } + if (pending_data.offset()) + wb->swap(pending_data); } void KafkaSink::consume(Chunk chunk) diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.h b/src/Storages/ExternalStream/Kafka/KafkaSink.h index 927d053950..c17478070f 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace DB { @@ -114,7 +115,8 @@ class KafkaSink final : public SinkToStorage /// For constructing the message batch UInt64 rows_in_current_message{0}; - nlog::ByteVector pending_data; + NullWriteBuffer pending_data; + nlog::ByteVector oversized_payload; std::vector current_batch; std::vector batch_payload; std::vector keys_for_current_batch;