Skip to content

bbalser/off_broadway_kafka

Repository files navigation

OffBroadwayKafka

This library integrates the Broadway data processing pipeline library with Kafka.

It communicates with Kafka using the Elsa Elixir library, which itself uses the Brod Erlang library.

It can dynamically create Broadway concurrency stages on a per-topic or per-partition basis for a given Kafka topic.

Installation

Add off_broadway_kafka to the list of dependencies in mix.exs:

def deps do
  [
    {:off_broadway_kafka, "~> 1.0"}
  ]
end

Docs can be found on HexDocs or generated with ExDoc.

Examples

ClassicHandler

This example uses Broadway in the traditional way.

It calls Elsa.Supervisor.start_link/1 with the specified kafka_config to set up a Kafka consumer. It receives Kafka messages and adds them as events to the OffBroadway.Kafka.Producer Broadway Producer, which are then handled by the specified Broadway pipeline.

defmodule ClassicBroadway do
  use Broadway

  def start_link(opts) do
    kafka_config = [
      connection: :client1,
      endpoints: [localhost: 9092],
      group_consumer: [
        group: "classic",
        topics: ["topic1"],
        config: [
          begin_offset: :earliest
        ]
      ]
    ]

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {OffBroadway.Kafka.Producer, kafka_config},
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 1
        ]
      ],
      context: %{pid: Keyword.get(opts, :pid)}
    )
  end

  def handle_message(processor, message, context) do
    send(context.pid, {:message, message})
    message
  end
end

ShowtimeHandler

This example uses the OffBroadway.Kafka macro.

It starts a Broadway pipeline for each topic and partition for increased concurrency processing events.

defmodule ShowtimeBroadway do
  use OffBroadway.Kafka

  def kafka_config(_opts) do
    [
      connection: :per_partition,
      endpoints: [localhost: 9092],
      group_consumer: [
        group: "per_partition",
        topics: ["topic1"],
        config: [
          prefetch_count: 5,
          prefetch_bytes: 0,
          begin_offset: :earliest
        ]
      ]
    ]
  end

  def broadway_config(opts, topic, partition) do
    [
      name: :"broadway_per_partition_#{topic}_#{partition}",
      processors: [
        default: [
          concurrency: 5
        ]
      ],
      context: %{
        pid: Keyword.get(opts, :pid)
      }
    ]
  end

  def handle_message(processor, message, context) do
    send(context.pid, {:message, message})
    message
  end
end