Skip to content

svedentsov/automation-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Автоматизация с Kafka + Java

Содержание


Добро пожаловать в репозиторий проекта автоматизированного тестирования с Kafka

  • Этот репозиторий создан для помощи в автоматизации тестирования потоковой передачи событий с Kafka.
  • Проект включает в себя настройку Kafka, ZooKeeper, Schema Registry, а также примеры Producers и Consumers на Java с использованием Avro.
  • В проекте реализованы интеграционные тесты для проверки функциональности Kafka.
  • Многие тексты основаны на официальной документации и переведены для удобства.
  • Дополнительно сюда включена «Анатомия Kafka» — расширенный материал о том, как устроена Kafka и её основные компоненты.

1. Что такое Apache Kafka?

В этом разделе мы рассмотрим, что такое Kafka, почему она стала так популярна и какие ключевые концепции лежат в её основе.

Немного истории:
Платформа Kafka была создана в LinkedIn в 2010 году, когда компания столкнулась с проблемой обработки огромного количества данных в реальном времени. Kafka позволяет различным системам быстро и надёжно обмениваться сообщениями (данными). Впоследствии проект был передан в фонд Apache и стал одним из самых популярных решений для потоковой передачи данных.

1.1. Потоковая передача данных

Стриминг событий — это цифровой эквивалент центральной нервной системы человеческого тела. Технически это практика:

  • Захвата данных в реальном времени из источников событий (базы данных, датчики, мобильные устройства, облачные сервисы).
  • Надёжного долговременного хранения этих потоков.
  • Обработки и анализа событий как в реальном времени, так и ретроспективно.
  • Маршрутизации потоков к нужным системам.

Стриминг позволяет нужной информации оказаться в нужном месте в нужное время, обеспечивая гибкость и оперативность.

1.2. Применение потоковой передачи событий

Стриминг событий применяется в самых разных сферах:

  • Финансовые транзакции (банки, фондовые биржи).
  • Логистика: мониторинг и отслеживание транспорта.
  • Аналитика данных от IoT-устройств (заводы, умные города).
  • Системы электронной коммерции (заказы, платежи, уведомления).
  • Автоматизация микросервисов и корпоративных платформ данных.

1.3. Платформа потоковой передачи событий

Kafka сочетает в себе ключевые возможности:

  1. Публикация и подписка на потоки событий (импорт/экспорт данных).
  2. Надёжное долговременное хранение потоков.
  3. Обработка потоков событий в реальном времени или ретроспективно.

Она развёртывается как распределённая, масштабируемая и отказоустойчивая система, которую можно использовать как локально, так и в облачных средах.

1.4. Как работает Kafka

Kafka — это распределённая система, состоящая из серверов (брокеров) и клиентов:

  • Серверы:
    Запускаются как кластер, который может включать несколько брокеров Kafka (слой хранения), а также сервисы Kafka Connect для интеграции с другими системами. Кластер высокомасштабируем и отказоустойчив: при сбое одного сервера его работу берут на себя другие.

  • Клиенты:
    Под клиентами понимаются приложения, которые читают, записывают и обрабатывают потоки событий. Kafka предоставляет богатый набор клиентов для разных языков программирования (Java, Scala, Go, Python и др.). Также существует библиотека Kafka Streams для более сложной обработки потока в реальном времени.

Вся коммуникация идёт по высокопроизводительному протоколу TCP.

1.5. Из чего состоит сообщение в Kafka

При работе с Kafka сообщения (также называемые событиями или записями) — это основная сущность для обмена данными между системами. Обычно сообщения передаются в формате JSON или Avro, но могут быть и другие форматы. Вот основные компоненты сообщения:

  1. Ключ (key):
    Необязательный, но полезный элемент, чтобы гарантировать, что связанные сообщения попадут в одну и ту же партицию.

  2. Значение (value):
    Основные данные сообщения (JSON, Avro и т.д.). В них передаётся реальная информация — например, структура заказа, данные о пользователе, лог-событие и т.п.

  3. Временная метка (timestamp):
    Указывает, когда сообщение было создано. Важна для определения порядка событий и аналитики во временном разрезе.

  4. Тип сжатия (опционально):
    Позволяет экономить место, используя gzip, snappy и другие алгоритмы сжатия.

  5. Заголовки (headers) (опционально):
    Набор метаданных (ключ-значение), которые можно передать вместе с сообщением без изменения его основного содержимого.

  6. ID партиции и смещение:
    После записи сообщения в Kafka ему присваиваются партиция и смещение (подробнее о партициях и смещениях — в разделе 3.1 и 3.2).

1.6. Пример JSON-сообщения

Ниже приведён пример сообщения в формате JSON (поля могут различаться в зависимости от бизнес-логики):

{
  "key": "order_98765",
  "value": {
    "orderId": "98765",
    "customer": {
      "customerId": "c12345",
      "name": "John Doe",
      "email": "[email protected]"
    },
    "orderDetails": {
      "items": [
        {
          "itemId": "p101",
          "name": "Wireless Mouse",
          "quantity": 2,
          "price": 19.99
        },
        {
          "itemId": "p202",
          "name": "Mechanical Keyboard",
          "quantity": 1,
          "price": 79.99
        }
      ],
      "totalPrice": 119.97,
      "currency": "USD"
    },
    "orderStatus": "processing",
    "createdAt": "2025-01-27T14:35:00Z",
    "updatedAt": "2025-01-27T15:10:00Z"
  },
  "timestamp": 1723472100000,
  "headers": {
    "source": "ecommerce-platform",
    "transaction-id": "b342d7a9-9874-4c2d-b30d-543c8a7f9841",
    "event-type": "orderUpdate",
    "content-type": "application/json",
    "priority": "high"
  }
}

1.7. Продюсеры и консьюмеры (Асинхронность)

Главный принцип Kafka — асинхронная коммуникация. Продюсеры отправляют данные в топики, не ожидая, когда консьюмеры их прочитают. Консьюмеры могут обрабатывать сообщения в своём темпе, даже если они временно недоступны.

  • Продюсеры (Producers): отправляют сообщения в Kafka (в конкретные топики).
  • Консьюмеры (Consumers): подписываются на топики и асинхронно считывают сообщения.

Такой подход обеспечивает высокую производительность и независимость систем. Если консьюмер отвалился на время, данные не потеряются: Kafka сохранит их, а вернувшийся консьюмер продолжит чтение с нужного места.

1.8. Роль брокеров

Внутри кластера Kafka каждый сервер называется брокером. Это «распределительные центры», через которые проходят сообщения:

  • Брокеры принимают сообщения от продюсеров.
  • Хранят их в партициях (на диске) до тех пор, пока консьюмеры их не прочитают.
  • Обеспечивают горизонтальную масштабируемость. Чем больше брокеров, тем выше пропускная способность.

Если один брокер выходит из строя, Kafka автоматически перераспределяет нагрузку с помощью механизма репликации (копирования) и системы координации (ZooKeeper, либо встроенный KRaft в новых версиях).

1.9. Consumer Groups и Exactly-once

Чтобы обеспечить более удобную параллельную обработку, консьюмеры объединяются в consumer groups:

  • Consumer Group имеет общий group.id.
  • Kafka распределяет партиции топика между участниками этой группы так, чтобы каждая партиция доставалась ровно одному консьюмеру внутри группы.
  • Это гарантирует параллелизм: количество одновременно работающих консьюмеров в группе может быть равно числу партиций или меньше.

Exactly-once или «ровно один раз» — это режим обработки, позволяющий избежать дублирования при сбоях и повторных считываниях. В Kafka он достигается комбинацией:

  • Идемпотентных продюсеров (idempotent producers).
  • Транзакций на стороне брокера (transactional.id + enable.idempotence=true).

Для большинства сценариев достаточно «at-least-once» обработки (данные не пропадут, но при сбоях возможно дублирование). Однако при использовании транзакций Kafka поддерживает и «exactly once» при должной настройке.


2. Что такое ZooKeeper?

Apache ZooKeeper — это централизованный сервис для хранения конфигурации и координации в распределённых системах.

Раньше (до Kafka 0.9) ZooKeeper занимался хранением смещений (offsets) консьюмеров. В более новых версиях Kafka (0.9+), смещения хранятся в специальном системном топике __consumer_offsets, но ZooKeeper всё ещё важен для управления состоянием кластера, выбора лидера партиций, координации брокеров и других задач.

2.1. Управление брокерами

Kafka — это распределённая система, поэтому в кластере обычно несколько брокеров. ZooKeeper отслеживает, какие брокеры активны, а если какой-то выходит из строя, помогает перераспределить нагрузку.

2.2. Координация смещений

  • До версии Kafka 0.9: смещения консьюмеров хранились напрямую в ZooKeeper, что приводило к лишней нагрузке.
  • C версии Kafka 0.9 и выше: смещения перешли в специальный топик __consumer_offsets, и управление стало более эффективным.

Тем не менее, ZooKeeper остаётся ключевым элементом, чтобы знать, какие брокеры живы, координировать консьюмеры (через группы) и поддерживать целостность кластера.

2.3. Выборы лидера партиций

Каждая партиция в Kafka имеет ведущую (лидирующую) копию. Если брокер-лидер выходит из строя, ZooKeeper помогает автоматически выбрать нового лидера среди реплик. Благодаря этому Kafka обеспечивает высокую доступность и устойчивость к сбоям.

2.4. Мониторинг кластера

ZooKeeper фактически «следит» за тем, чтобы все брокеры, партиции и консьюмеры были в актуальном состоянии. При сбоях — активирует переизбрание лидера, перераспределение партиций и т.д., что делает Kafka надёжной, «самовосстанавливающейся» системой.

2.5. Будущее ZooKeeper в Kafka (KIP-500)

Начиная с Kafka 2.8, была представлена возможность режима KRaft, где ZooKeeper больше не требуется (KIP-500). При включённом KRaft кластер Kafka сам управляет метаданными. Однако многие продакшн-системы пока продолжают работать с ZooKeeper. В перспективе (начиная с версий Kafka 3.*) Kafka может полностью перейти на KRaft и отказаться от ZooKeeper.


3. Что такое Топик

Топики в Kafka — это логические «контейнеры» для сообщений. Если представить Kafka как «почтовую службу», то топики — это разные «ящики» или «папки», куда складываются сообщения одного типа (например, «orders», «payments», «notifications»).

Kafka гарантирует, что сообщения внутри топика упорядочены (в пределах партиции) и могут быть прочитаны многократно — сколько консьюмеров ни было бы.

3.1. Партиции

Чтобы масштабировать и ускорять работу, каждый топик разбивается на партиции — логические «части» топика:

  • Параллельная обработка: несколько консьюмеров могут обрабатывать разные партиции одного топика одновременно.
  • Распределение нагрузки: партиции могут храниться и обслуживаться разными брокерами, повышая пропускную способность.

Партиции позволяют сохранять порядок сообщений внутри каждой партиции, но не гарантируют порядок между разными партициями.

3.2. Смещения

Смещение (offset) — это порядковый номер сообщения внутри партиции (начинается с 0 и увеличивается). Оно служит «указателем», по которому консьюмер понимает, где он остановился:

  • Когда консьюмер прочитал сообщение до смещения n, он знает, что следующее будет n+1.
  • Если консьюмер перезапустится, он может начать чтение с того же смещения n+1, не теряя и не дублируя сообщения.

Смещения не хранят само сообщение, а только указывают на его позицию в партиции.

3.3. Почему используются смещения и партиции

  • Масштабируемость: позволяет увеличивать число брокеров и партиций при росте нагрузки, балансируя обработку сообщений между несколькими машинами.
  • Параллелизм: разные консьюмеры (или их группы) могут обрабатывать сообщения одновременно, не мешая друг другу.
  • Надёжность: смещения обеспечивают, что мы не потеряем место, на котором остановились, и можем продолжить чтение при сбоях.

3.4. Политика хранения (Retention Policy)

Kafka хранит сообщения в топиках в течение заданного периода (по умолчанию 7 дней) или до достижения заданного объёма. Это настраиваемая Retention Policy:

  • Количество дней: например, 7 или 14 дней хранения.
  • Максимальный объём: например, 10 ГБ на партицию.

По истечении этого времени (или при превышении объёма) старые сообщения удаляются или могут быть сжаты (в зависимости от стратегии).


4. Настройка окружения

Далее описаны шаги по установке Kafka и ZooKeeper, а также запуск через Docker Compose.

4.1. Установка Kafka

Скачать Kafka можно с официального сайта.

После скачивания распакуйте архив в удобную папку.

4.2. Установка ZooKeeper

Ссылка для скачивания Zookeeper

После скачивания распакуйте архив в удобную папку (например, C:\zookeeper).

4.3. Запуск Kafka и ZooKeeper

Здесь вы можете выбрать, запускать ли Kafka и Zookeeper на вашей машине или использовать docker-compose.

4.3.1. Локальный запуск через терминал

Запуск Kafka на Mac/Linux

  1. Перейдите в папку kafka/bin/.
  2. Запустите ZooKeeper:
    zookeeper-server-start.sh config/zookeeper.properties
  3. В новом терминале запустите Kafka:
    kafka-server-start.sh config/server.properties

Запуск Kafka на Windows

  1. Перейдите в папку kafka/bin/windows.
  2. Запустите ZooKeeper:
    zookeeper-server-start.bat C:\kafka\config\zookeeper.properties
  3. В новом терминале запустите Kafka:
    kafka-server-start.bat C:\kafka\config\server.properties
4.3.1.1. Создание топика

На Windows:

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_user

На Mac/Linux:

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic_user
4.3.1.2. Список топиков
kafka-topics.sh --list --bootstrap-server localhost:9092

(На Windows — kafka-topics.bat)

4.3.1.3. Отправка сообщения в топик
kafka-console-producer.sh --broker-list localhost:9092 --topic topic_user

(На Windows — kafka-console-producer.bat)

4.3.1.4. Потребление топика
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_user --from-beginning

(На Windows — kafka-console-consumer.bat)

4.3.2. Запуск через Docker Compose

В проекте уже имеется docker-compose.yml, который запускает Kafka, ZooKeeper, Schema Registry и Control Center.

  1. Убедитесь, что Docker установлен на вашем компьютере. Инструкции по установке здесь.

  2. Запустите сервисы:

    docker-compose up -d
  3. Остановите сервисы:

    docker-compose down
  4. Создание топика (если он не создан автоматически):

    docker-compose exec kafka kafka-topics --create --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 --topic topic_user

5. Структура проекта

├── docker-compose.yml                           # Файл для запуска сервисов Kafka, Zookeeper и Schema Registry с помощью Docker
├── pom.xml                                      # Основной файл конфигурации Maven с зависимостями и плагинами
├── src/                                         # Исходный код проекта
│   ├── main/
│   │   ├── java/                                # Основной исходный код Java
│   │   │   └── com/kafka/
│   │   │       ├── consumer/                    # Пакет с классами для потребления сообщений из Kafka
│   │   │       │   ├── UserConsumer.java        # Потребитель сообщений с типом String
│   │   │       │   └── UserAvroConsumer.java    # Потребитель сообщений с типом Avro
│   │   │       ├── model/                       # Пакет с моделями данных
│   │   │       │   └── User.java                # Модель для пользователя
│   │   │       ├── producer/                    # Пакет с классами для отправки сообщений в Kafka
│   │   │       │   ├── UserProducer.java        # Продюсер для сообщений с типом String
│   │   │       │   └── UserAvroProducer.java    # Продюсер для сообщений с типом Avro
│   │   │       └── utils/                       # Пакет утилит для работы с Kafka и другими компонентами
│   │   │           ├── DefaultProperties.java   # Класс для получения стандартных свойств Kafka
│   │   │           └── ReadYml.java             # Утилита для чтения YAML-файлов
│   ├── resources/                               # Ресурсы проекта (например, конфигурационные файлы)
│   │   ├── data/
│   │   │   └── data.yml                         # Пример YAML-файла с конфигурацией данных
│   │   └── avro/                                # Папка с Avro-схемами
│   │       └── user.avsc                        # Avro-схема для сообщений о пользователе
│   └── test/                                    # Тесты проекта
│       └── java/
│           └── com/kafka/tests/                 # Пакет с тестами для проекта
│               └── KafkaTests.java              # Интеграционные тесты для проверки взаимодействия с Kafka
└── README.md                                    # Файл документации проекта

5.1. Основные пакеты

  • com.kafka.consumer:

    • UserConsumer: потребляет стандартные JSON-сообщения из топика.
    • UserAvroConsumer: потребляет Avro-сообщения из топика.
  • com.kafka.producer:

    • UserProducer: отправляет стандартные JSON-сообщения в топик.
    • UserAvroProducer: отправляет Avro-сообщения в топик.
  • com.kafka.model:

    • User: модель пользователя с полями name, email, age.
  • com.kafka.utils:

    • DefaultProperties: содержит методы для получения конфигураций Kafka Producer и Consumer.
    • ReadYml: утилита для чтения YAML-файлов и преобразования их в Properties.
  • com.kafka.tests:

    • KafkaTests: интеграционные тесты для проверки работы Producers и Consumers.

5.2. Конфигурация Maven

Файл pom.xml содержит все необходимые зависимости и плагины для работы проекта, включая Kafka, Avro, Lombok, JUnit и другие.

5.3. Docker Compose

Файл docker-compose.yml настроен для запуска следующих сервисов:

  • ZooKeeper
  • Kafka
  • Schema Registry
  • Confluent Control Center

6. Работа с Avro

Avro — популярный формат сериализации данных, хорошо подходящий для использования в Kafka (особенно с Confluent Schema Registry).

6.1. Что такое Avro?

Apache Avro — это открытая система сериализации данных. Avro определяет бинарный формат для данных и позволяет хранить схему для точного понимания структуры сообщений при передаче между сервисами или при их эволюции.

6.2. Преимущества использования Avro с Kafka

  1. Компактность: формат Avro меньше по размеру, чем JSON, так как не дублирует названия полей в каждом сообщении.
  2. Скорость сериализации/десериализации.
  3. Широкая поддержка языков программирования.
  4. Богатое описание схем (в JSON), что упрощает эволюцию данных.
  5. Управление метаданными и совместимость схем.

6.3. Схема Avro

Пример схемы user.avsc:

{
  "type": "record",
  "name": "UserAvro",
  "namespace": "modelAvro.user",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "age",
      "type": "string"
    }
  ]
}

6.4. Настройка Avro в проекте

6.4.1. Установка плагинов Apache Avro IDL

  1. В IntelliJ IDEA:
    • Перейдите в File > Settings > Plugins.
    • Найдите и установите "Apache Avro IDL Schema Support".

6.4.2. Установка библиотек и плагинов Avro

В pom.xml уже добавлены зависимости для Avro и соответствующие плагины:

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>${avro.version}</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>${avro.serializer.version}</version>
</dependency>
<plugin>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-maven-plugin</artifactId>
    <version>1.8.2</version>
    <executions>
        <execution>
            <id>schemas</id>
            <phase>generate-sources</phase>
            <goals>
                <goal>schema</goal>
                <goal>protocol</goal>
                <goal>idl-protocol</goal>
            </goals>
            <configuration>
                <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
                <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
            </configuration>
        </execution>
    </executions>
</plugin>

6.4.3. Генерация класса на основе Avro

При запуске:

mvn clean install

Avro Maven Plugin сгенерирует Java-классы для описанных в *.avsc схем.

6.4.4. Добавление значений в объект Avro

Пример создания объекта Avro:

UserAvro avroMessage = UserAvro.newBuilder()
        .setName("ivan")
        .setEmail("[email protected]")
        .setAge("31")
        .build();

6.4.5. Создание Producer с Avro

package com.kafka.producer;

import com.kafka.utils.DefaultProperties;
import modelAvro.user.UserAvro;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class UserAvroProducer {
    public static void sendTopicMessage(String topic, UserAvro message) {
        String generatedKey = String.valueOf(Math.random());
        Producer<String, UserAvro> producer = new KafkaProducer<>(DefaultProperties.getProducerAvroProperties());

        try {
            producer.send(new ProducerRecord<>(topic, generatedKey, message));
            System.out.println("Avro-сообщение отправлено в топик: " + message.toString());
        } finally {
            producer.close();
        }
    }
}

6.4.6. Создание Consumer с Avro

package com.kafka.consumer;

import com.kafka.utils.DefaultProperties;
import modelAvro.user.UserAvro;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class UserAvroConsumer {
    public static List<String> getTopicMessages(String topic) {
        List<String> messages = new ArrayList<>();
        Consumer<String, UserAvro> consumer = new KafkaConsumer<>(DefaultProperties.getConsumerAvroProperties(topic));
        consumer.subscribe(Collections.singletonList(topic));

        try {
            ConsumerRecords<String, UserAvro> records = consumer.poll(Duration.ofSeconds(10));
            records.forEach(record -> {
                String messageValue = record.value().toString();
                messages.add(messageValue);
                System.out.println("Получено Avro-сообщение из топика: " + messageValue);
            });
            consumer.commitAsync();
        } catch (Exception e) {
            System.err.println("Ошибка при потреблении сообщений из топика: " + e.getMessage());
        } finally {
            consumer.close();
        }

        return messages;
    }
}

7. Тестирование

7.1. Интеграционные тесты

В пакете com.kafka.tests находится класс KafkaTests, содержащий интеграционные тесты для проверки работы Producers и Consumers.

7.2. Запуск тестов

mvn clean test

Тесты проверяют отправку и получение сообщений как в стандартном формате JSON, так и в формате Avro.


8. Confluent Control Center

Confluent Control Center предоставляет визуальный интерфейс для мониторинга и управления кластером Kafka, включая топики, схемы и сообщения.

8.1. Настройка Control Center

В docker-compose.yml уже настроен сервис control-center. Для доступа откройте браузер и перейдите по адресу:

http://localhost:9021

9. Ссылки

Kafka

ZooKeeper

Docker

Lombok

Java

Maven

Avro

Confluent Control Center

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages