From 71c6a9e9d80efda50d935186f15cd08f5939deee Mon Sep 17 00:00:00 2001 From: Kenneth Jia <48558845+kenneth-jia@users.noreply.github.com> Date: Tue, 7 Mar 2023 14:22:46 +0800 Subject: [PATCH] Add interceptor for broker's state & update to librdkafka v2.0.2 --- .github/workflows/kafka_api_bazel_build.yml | 2 +- .github/workflows/kafka_api_ci_tests.yml | 2 +- .../workflows/kafka_api_demo_conan_build.yml | 28 +--- README.md | 2 +- .../conan_build/CMakeLists.txt | 23 +--- .../conan_build/conanfile.txt | 2 +- include/kafka/Interceptors.h | 35 +++-- include/kafka/KafkaClient.h | 25 +++- tests/integration/TestKafkaConsumer.cc | 128 +++++++++++------- 9 files changed, 144 insertions(+), 103 deletions(-) diff --git a/.github/workflows/kafka_api_bazel_build.yml b/.github/workflows/kafka_api_bazel_build.yml index 7f70e1d7e..f32ec5959 100644 --- a/.github/workflows/kafka_api_bazel_build.yml +++ b/.github/workflows/kafka_api_bazel_build.yml @@ -9,7 +9,7 @@ on: env: KAFKA_SRC_LINK: https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz CPU_CORE_NUM: 2 - LIBRDKAFKA_TAG: v1.9.2 + LIBRDKAFKA_TAG: v2.0.2 jobs: kafka-api-bazel-build: diff --git a/.github/workflows/kafka_api_ci_tests.yml b/.github/workflows/kafka_api_ci_tests.yml index 9d528b629..1ca2a5e40 100644 --- a/.github/workflows/kafka_api_ci_tests.yml +++ b/.github/workflows/kafka_api_ci_tests.yml @@ -9,7 +9,7 @@ on: env: KAFKA_SRC_LINK: https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz CPU_CORE_NUM: 2 - LIBRDKAFKA_TAG: v1.9.2 + LIBRDKAFKA_TAG: v2.0.2 BUILD_SUB_DIR: builds/sub-build jobs: diff --git a/.github/workflows/kafka_api_demo_conan_build.yml b/.github/workflows/kafka_api_demo_conan_build.yml index 7192d3c8c..8d13a0cd4 100644 --- a/.github/workflows/kafka_api_demo_conan_build.yml +++ b/.github/workflows/kafka_api_demo_conan_build.yml @@ -26,19 +26,9 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Prepare (non-windows) - if: ${{!contains(matrix.os, 'windows')}} - run: | - if [[ ${OS_VERSION} == 'macos'* ]]; then - brew install conan - else - pip3 install conan - fi - - - name: Prepare (windows) - if: ${{contains(matrix.os, 'windows')}} + - name: Prepare run: | - pip3 install conan + pip3 install conan==1.59.0 - name: Build (non-windows) if: ${{!contains(matrix.os, 'windows')}} @@ -52,11 +42,8 @@ jobs: cmake .. -G "Unix Makefiles" cmake --build . - bin/kafka_sync_producer - bin/kafka_async_producer_copy_payload - bin/kafka_async_producer_not_copy_payload - bin/kafka_auto_commit_consumer - bin/kafka_manual_commit_consumer + bin/kafka_producer + bin/kafka_consumer - name: Build (windows) if: contains(matrix.os, 'windows') @@ -70,9 +57,6 @@ jobs: cmake .. cmake --build . - bin/kafka_sync_producer.exe - bin/kafka_async_producer_copy_payload.exe - bin/kafka_async_producer_not_copy_payload.exe - bin/kafka_auto_commit_consumer.exe - bin/kafka_manual_commit_consumer.exe + bin/kafka_producer.exe + bin/kafka_consumer.exe diff --git a/README.md b/README.md index f6467859f..dd0f6a7bf 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ About the *Modern C++ Kafka API* The [modern-cpp-kafka API](http://opensource.morganstanley.com/modern-cpp-kafka/doxygen/annotated.html) is a layer of ***C++*** wrapper based on [librdkafka](https://github.com/confluentinc/librdkafka) (the ***C*** part only), with high quality, but more friendly to users. -- By now, [modern-cpp-kafka](https://github.com/morganstanley/modern-cpp-kafka) is compatible with [librdkafka v1.9.2](https://github.com/confluentinc/librdkafka/releases/tag/v1.9.2). +- By now, [modern-cpp-kafka](https://github.com/morganstanley/modern-cpp-kafka) is compatible with [librdkafka v2.0.2](https://github.com/confluentinc/librdkafka/releases/tag/v2.0.2). ``` diff --git a/demo_projects_for_build/conan_build/CMakeLists.txt b/demo_projects_for_build/conan_build/CMakeLists.txt index 6fab20db0..ee9af3cd5 100644 --- a/demo_projects_for_build/conan_build/CMakeLists.txt +++ b/demo_projects_for_build/conan_build/CMakeLists.txt @@ -7,22 +7,11 @@ set(CMAKE_CXX_STANDARD_REQUIRED True) include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake) conan_basic_setup() -# Target: kafka_sync_producer -add_executable("kafka_sync_producer" "../../examples/kafka_sync_producer.cc") -target_link_libraries("kafka_sync_producer" ${CONAN_LIBS}) +# Target: kafka_producer +add_executable("kafka_producer" "../../examples/kafka_async_producer_not_copy_payload.cc") +target_link_libraries("kafka_producer" ${CONAN_LIBS}) -# Target: kafka_async_producer_copy_payload -add_executable("kafka_async_producer_copy_payload" "../../examples/kafka_async_producer_copy_payload.cc") -target_link_libraries("kafka_async_producer_copy_payload" ${CONAN_LIBS}) +# Target: kafka_consumer +add_executable("kafka_consumer" "../../examples/kafka_auto_commit_consumer.cc") +target_link_libraries("kafka_consumer" ${CONAN_LIBS}) -# Target: kafka_async_producer_not_copy_payload -add_executable("kafka_async_producer_not_copy_payload" "../../examples/kafka_async_producer_not_copy_payload.cc") -target_link_libraries("kafka_async_producer_not_copy_payload" ${CONAN_LIBS}) - -# Target: kafka_auto_commit_consumer -add_executable("kafka_auto_commit_consumer" "../../examples/kafka_auto_commit_consumer.cc") -target_link_libraries("kafka_auto_commit_consumer" ${CONAN_LIBS}) - -# Target: kafka_manual_commit_consumer -add_executable("kafka_manual_commit_consumer" "../../examples/kafka_manual_commit_consumer.cc") -target_link_libraries("kafka_manual_commit_consumer" ${CONAN_LIBS}) diff --git a/demo_projects_for_build/conan_build/conanfile.txt b/demo_projects_for_build/conan_build/conanfile.txt index 857cf1ea5..9f2f83037 100644 --- a/demo_projects_for_build/conan_build/conanfile.txt +++ b/demo_projects_for_build/conan_build/conanfile.txt @@ -1,5 +1,5 @@ [requires] -modern-cpp-kafka/2022.06.15 +modern-cpp-kafka/2023.01.05 [generators] cmake diff --git a/include/kafka/Interceptors.h b/include/kafka/Interceptors.h index 7897afb3f..fbf4b60bd 100644 --- a/include/kafka/Interceptors.h +++ b/include/kafka/Interceptors.h @@ -16,32 +16,47 @@ class Interceptors /** * Callback type for thread-start interceptor. */ - using ThreadStartCallback = std::function; + using ThreadStartCb = std::function; /** * Callback type for thread-exit interceptor. */ - using ThreadExitCallback = std::function; + using ThreadExitCb = std::function; + + /** + * Callback type for broker-state-change interceptor. + */ + using BrokerStateChangeCb = std::function; /** * Set interceptor for thread start. */ - Interceptors& onThreadStart(ThreadStartCallback cb) { _valid = true; _threadStartCb = std::move(cb); return *this; } + Interceptors& onThreadStart(ThreadStartCb cb) { _valid = true; _threadStartCb = std::move(cb); return *this; } /** * Set interceptor for thread exit. */ - Interceptors& onThreadExit(ThreadExitCallback cb) { _valid = true; _threadExitCb = std::move(cb); return *this; } + Interceptors& onThreadExit(ThreadExitCb cb) { _valid = true; _threadExitCb = std::move(cb); return *this; } + + /** + * Set interceptor for broker state change. + */ + Interceptors& onBrokerStateChange(BrokerStateChangeCb cb) { _valid = true; _brokerStateChangeCb = std::move(cb); return *this; } /** * Get interceptor for thread start. */ - ThreadStartCallback onThreadStart() const { return _threadStartCb; } + ThreadStartCb onThreadStart() const { return _threadStartCb; } /** * Get interceptor for thread exit. */ - ThreadExitCallback onThreadExit() const { return _threadExitCb; } + ThreadExitCb onThreadExit() const { return _threadExitCb; } + + /** + * Get interceptor for broker state change. + */ + BrokerStateChangeCb onBrokerStateChange() const { return _brokerStateChangeCb; } /** * Check if there's no interceptor. @@ -49,9 +64,11 @@ class Interceptors bool empty() const { return !_valid; } private: - ThreadStartCallback _threadStartCb; - ThreadExitCallback _threadExitCb; - bool _valid = false; + ThreadStartCb _threadStartCb; + ThreadExitCb _threadExitCb; + BrokerStateChangeCb _brokerStateChangeCb; + + bool _valid = false; }; } } // end of KAFKA_API::clients diff --git a/include/kafka/KafkaClient.h b/include/kafka/KafkaClient.h index 4d1a79338..36d9a368c 100644 --- a/include/kafka/KafkaClient.h +++ b/include/kafka/KafkaClient.h @@ -189,6 +189,7 @@ class KafkaClient static rd_kafka_resp_err_t configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* conf, void* opaque, char* errStr, std::size_t maxErrStrSize); static rd_kafka_resp_err_t interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque); static rd_kafka_resp_err_t interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque); + static rd_kafka_resp_err_t interceptorOnBrokerStateChange(rd_kafka_t* rk, int id, const char* secproto, const char* host, int port, const char* state, void* opaque); // Log callback (for class instance) void onLog(int level, const char* fac, const char* buf) const; @@ -205,6 +206,7 @@ class KafkaClient // Interceptor callback (for class instance) void interceptThreadStart(const std::string& threadName, const std::string& threadType); void interceptThreadExit(const std::string& threadName, const std::string& threadType); + void interceptBrokerStateChange(int id, const std::string& secproto, const std::string& host, int port, const std::string& state); protected: struct Pollable @@ -608,6 +610,12 @@ KafkaClient::interceptThreadExit(const std::string& threadName, const std::strin if (const auto& cb = _interceptors.onThreadExit()) cb(threadName, threadType); } +inline void +KafkaClient::interceptBrokerStateChange(int id, const std::string& secproto, const std::string& host, int port, const std::string& state) +{ + if (const auto& cb = _interceptors.onBrokerStateChange()) cb(id, secproto, host, port, state); +} + inline rd_kafka_resp_err_t KafkaClient::configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* /*conf*/, void* opaque, char* /*errStr*/, std::size_t /*maxErrStrSize*/) { @@ -621,11 +629,16 @@ KafkaClient::configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* /*con return result; } + if (auto result = rd_kafka_interceptor_add_on_broker_state_change(rk, "on_broker_state_change", KafkaClient::interceptorOnBrokerStateChange, opaque)) + { + return result; + } + return RD_KAFKA_RESP_ERR_NO_ERROR; } inline rd_kafka_resp_err_t -KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /*opaque*/) +KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /* opaque */) { kafkaClient(rk).interceptThreadStart(threadName, toString(threadType)); @@ -633,13 +646,21 @@ KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t thr } inline rd_kafka_resp_err_t -KafkaClient::interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /*opaque*/) +KafkaClient::interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /* opaque */) { kafkaClient(rk).interceptThreadExit(threadName, toString(threadType)); return RD_KAFKA_RESP_ERR_NO_ERROR; } +inline rd_kafka_resp_err_t +KafkaClient::interceptorOnBrokerStateChange(rd_kafka_t* rk, int id, const char* secproto, const char* host, int port, const char* state, void* /* opaque */) +{ + kafkaClient(rk).interceptBrokerStateChange(id, secproto, host, port, state); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + inline Optional KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::milliseconds timeout, bool disableErrorLogging) { diff --git a/tests/integration/TestKafkaConsumer.cc b/tests/integration/TestKafkaConsumer.cc index ba7e8069f..516454200 100644 --- a/tests/integration/TestKafkaConsumer.cc +++ b/tests/integration/TestKafkaConsumer.cc @@ -23,67 +23,97 @@ TEST(KafkaConsumer, BasicPoll) KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); - // The auto-commit consumer - kafka::clients::consumer::KafkaConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig()); - std::cout << "[" << kafka::utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl; + std::map brokersState; + + kafka::clients::Interceptors interceptors; + interceptors.onBrokerStateChange([&brokersState](int id, const std::string& proto, const std::string& name, int port, const std::string& state) { + const std::string brokerDescription = (std::to_string(id) + " - " + proto + "://" + name + ":" + std::to_string(port)); + std::cout << "Broker[" << brokerDescription << "] ==> " << state << std::endl; + if (!name.empty() && name != "GroupCoordinator") + { + brokersState[name + ":" + std::to_string(port)] = state; + } + }); - // Subscribe topics - consumer.subscribe({topic}, - [](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps) { - if (et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned) { - // assignment finished - std::cout << "[" << kafka::utility::getCurrentTime() << "] assigned partitions: " << kafka::toString(tps) << std::endl; - } - }); - EXPECT_FALSE(consumer.subscription().empty()); + { + // Config the consumer with interceptors + kafka::clients::consumer::KafkaConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig() + .put(kafka::clients::Config::INTERCEPTORS, interceptors)); - // No message yet - auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1)); - EXPECT_EQ(0, records.size()); + std::cout << "[" << kafka::utility::getCurrentTime() << "] " << consumer.name() << " started" << std::endl; - // Try to get the beginning offsets - const kafka::TopicPartition tp{topic, partition}; - std::cout << "[" << kafka::utility::getCurrentTime() << "] Consumer get the beginningOffset[" << consumer.beginningOffsets({tp})[tp] << "]" << std::endl;; + // Subscribe topics + consumer.subscribe({topic}, + [](kafka::clients::consumer::RebalanceEventType et, const kafka::TopicPartitions& tps) { + if (et == kafka::clients::consumer::RebalanceEventType::PartitionsAssigned) { + // assignment finished + std::cout << "[" << kafka::utility::getCurrentTime() << "] assigned partitions: " << kafka::toString(tps) << std::endl; + } + }); + EXPECT_FALSE(consumer.subscription().empty()); - // Prepare some messages to send - const std::vector> messages = { - {kafka::Headers{}, "key1", "value1"}, - {kafka::Headers{}, "key2", "value2"}, - {kafka::Headers{}, "key3", "value3"}, - }; + // No message yet + auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer, std::chrono::seconds(1)); + EXPECT_EQ(0, records.size()); - // Send the messages - KafkaTestUtility::ProduceMessages(topic, partition, messages); + // Should be able to get all brokers' state + EXPECT_EQ(KafkaTestUtility::GetNumberOfKafkaBrokers(), brokersState.size()); + // All brokers' state should be "UP" + for (const auto& brokerState: brokersState) + { + EXPECT_EQ("UP", brokerState.second); + } - // Poll these messages - records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer); - EXPECT_EQ(messages.size(), records.size()); + // Try to get the beginning offsets + const kafka::TopicPartition tp{topic, partition}; + std::cout << "[" << kafka::utility::getCurrentTime() << "] Consumer get the beginningOffset[" << consumer.beginningOffsets({tp})[tp] << "]" << std::endl;; - // Copyable ConsumerRecord - { - auto recordsCopy = records; - recordsCopy.clear(); - } + // Prepare some messages to send + const std::vector> messages = { + {kafka::Headers{}, "key1", "value1"}, + {kafka::Headers{}, "key2", "value2"}, + {kafka::Headers{}, "key3", "value3"}, + }; - // Check messages - std::size_t rcvMsgCount = 0; - for (auto& record: records) - { - ASSERT_TRUE(rcvMsgCount < messages.size()); + // Send the messages + KafkaTestUtility::ProduceMessages(topic, partition, messages); - EXPECT_EQ(topic, record.topic()); - EXPECT_EQ(partition, record.partition()); - EXPECT_EQ(0, record.headers().size()); - EXPECT_EQ(std::get<1>(messages[rcvMsgCount]).size(), record.key().size()); - EXPECT_EQ(0, std::memcmp(std::get<1>(messages[rcvMsgCount]).c_str(), record.key().data(), record.key().size())); - EXPECT_EQ(std::get<2>(messages[rcvMsgCount]).size(), record.value().size()); - EXPECT_EQ(0, std::memcmp(std::get<2>(messages[rcvMsgCount]).c_str(), record.value().data(), record.value().size())); + // Poll these messages + records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer); + EXPECT_EQ(messages.size(), records.size()); - ++rcvMsgCount; + // Copyable ConsumerRecord + { + auto recordsCopy = records; + recordsCopy.clear(); + } + + // Check messages + std::size_t rcvMsgCount = 0; + for (auto& record: records) + { + ASSERT_TRUE(rcvMsgCount < messages.size()); + + EXPECT_EQ(topic, record.topic()); + EXPECT_EQ(partition, record.partition()); + EXPECT_EQ(0, record.headers().size()); + EXPECT_EQ(std::get<1>(messages[rcvMsgCount]).size(), record.key().size()); + EXPECT_EQ(0, std::memcmp(std::get<1>(messages[rcvMsgCount]).c_str(), record.key().data(), record.key().size())); + EXPECT_EQ(std::get<2>(messages[rcvMsgCount]).size(), record.value().size()); + EXPECT_EQ(0, std::memcmp(std::get<2>(messages[rcvMsgCount]).c_str(), record.value().data(), record.value().size())); + + ++rcvMsgCount; + } + + // Close the consumer + consumer.close(); } - // Close the consumer - consumer.close(); + // All brokers' state should be "DOWN" + for (const auto& brokerState: brokersState) + { + EXPECT_EQ("DOWN", brokerState.second); + } } TEST(KafkaConsumer, PollWithHeaders)