From dc3007f566fdee35b8cc2dd7f88d2e9a0cf50122 Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Mon, 6 Jan 2025 15:35:23 +0100 Subject: [PATCH] fix: SASL/PLAIN The previous fix did not work. This is now definitely verified to be working and also makes coherent sense. --- .circleci/config.yml | 23 +++++++++++++++++------ docker-compose-kafka.yml | 15 ++++++++++++--- src/messenger.rs | 4 +++- tests/client.rs | 4 ++++ 4 files changed, 36 insertions(+), 10 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 35523f3..59a936d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -226,7 +226,7 @@ jobs: name: zookeeper environment: - ALLOW_ANONYMOUS_LOGIN=yes - - image: docker.io/bitnami/kafka:3 + - image: docker.io/bitnami/kafka:3.9.0 name: kafka-0 environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 @@ -237,18 +237,25 @@ jobs: - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://kafka-0:9093 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false - - image: docker.io/bitnami/kafka:3 + - KAFKA_CLIENT_USERS=admin + - KAFKA_CLIENT_PASSWORDS=admin-secret + - KAFKA_CLIENT_LISTENER_NAME=SECURE + - image: docker.io/bitnami/kafka:3.9.0 name: kafka-1 environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_BROKER_ID=1 - ALLOW_PLAINTEXT_LISTENER=yes - - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT - - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 - - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://kafka-1:9093 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,SASL_PLAIN:SASL_PLAINTEXT + - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093,SASL_PLAIN://:9094 + - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://kafka-1:9093,SASL_PLAIN://kafka-1:9094 + - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false - - image: docker.io/bitnami/kafka:3 + - KAFKA_CLIENT_USERS=admin + - KAFKA_CLIENT_PASSWORDS=admin-secret + - KAFKA_CLIENT_LISTENER_NAME=SECURE + - image: docker.io/bitnami/kafka:3.9.0 name: kafka-2 environment: - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 @@ -259,6 +266,9 @@ jobs: - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://kafka-2:9093 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false + - KAFKA_CLIENT_USERS=admin + - KAFKA_CLIENT_PASSWORDS=admin-secret + - KAFKA_CLIENT_LISTENER_NAME=SECURE - image: serjs/go-socks5-proxy name: proxy resource_class: xlarge # use of a smaller executor tends crashes on link @@ -279,6 +289,7 @@ jobs: # Don't use the first node here since this is likely the controller and we want to ensure that we automatically # pick the controller for certain actions (e.g. topic creation) and don't just get lucky. KAFKA_CONNECT: "invalid:9093,kafka-1:9093" + KAFKA_SASL_CONNECT: kafka-1:9094 SOCKS_PROXY: "proxy:1080" steps: - checkout diff --git a/docker-compose-kafka.yml b/docker-compose-kafka.yml index 472ce7d..c0561ca 100644 --- a/docker-compose-kafka.yml +++ b/docker-compose-kafka.yml @@ -11,7 +11,7 @@ services: volumes: - zookeeper_data:/bitnami/zookeeper kafka-0: - image: docker.io/bitnami/kafka:3 + image: docker.io/bitnami/kafka:3.9.0 ports: - "9010:9010" - "9096:9096" @@ -24,13 +24,16 @@ services: - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020,SECURE://localhost:9096 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false + - KAFKA_CLIENT_USERS=admin + - KAFKA_CLIENT_PASSWORDS=admin-secret + - KAFKA_CLIENT_LISTENER_NAME=SECURE volumes: - kafka_0_data:/bitnami/kafka - ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf depends_on: - zookeeper kafka-1: - image: docker.io/bitnami/kafka:3 + image: docker.io/bitnami/kafka:3.9.0 ports: - "9011:9011" - "9097:9097" @@ -43,13 +46,16 @@ services: - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021,SECURE://localhost:9097 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false + - KAFKA_CLIENT_USERS=admin + - KAFKA_CLIENT_PASSWORDS=admin-secret + - KAFKA_CLIENT_LISTENER_NAME=SECURE volumes: - kafka_1_data:/bitnami/kafka - ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf depends_on: - zookeeper kafka-2: - image: docker.io/bitnami/kafka:3 + image: docker.io/bitnami/kafka:3.9.0 ports: - "9012:9012" - "9098:9098" @@ -62,6 +68,9 @@ services: - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022,SECURE://localhost:9098 - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false + - KAFKA_CLIENT_USERS=admin + - KAFKA_CLIENT_PASSWORDS=admin-secret + - KAFKA_CLIENT_LISTENER_NAME=SECURE volumes: - kafka_2_data:/bitnami/kafka - ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf diff --git a/src/messenger.rs b/src/messenger.rs index 34fea00..06a8d60 100644 --- a/src/messenger.rs +++ b/src/messenger.rs @@ -610,7 +610,9 @@ where let authentication_response = self.sasl_authentication(to_sent.into_inner()).await?; data_received = Some(authentication_response.auth_bytes.0); - } else { + } + + if state.is_finished() { break; } } diff --git a/tests/client.rs b/tests/client.rs index 024dc7b..b87d4bb 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -44,6 +44,10 @@ async fn test_sasl() { .sasl_config(rskafka::client::SaslConfig::Plain( rskafka::client::Credentials::new("admin".to_string(), "admin-secret".to_string()), )) + .backoff_config(BackoffConfig { + deadline: Some(Duration::from_secs(1)), + ..Default::default() + }) .build() .await .unwrap();