Skip to content

wikimedia/KafkaSSE

Repository files navigation

KafkaSSE

Kafka Consumer to HTTP SSE/EventSource

Uses node-rdkafka KafkaConsumer to stream JSON messages to clients over HTTP in SSE/EventSource format

The Last-Event-ID and EventSource id field will be used to handle auto-resume during client disconnects. By using EventSource to connect to a KafkaSSE endpoint, your client will automatically resume from where it left off if it gets disconnected from the server. Every message sent to the client will have an id field that will be a JSON array of objects describing each latest topic, partition and either offset or timestamp seen by this client. On a reconnect, this object will be sent back as the Last-Event-ID header, and will be used by KafkaSSE to assign a KafkaConsumer to start from those offsets.

If the underlying Kafka cluster supports timestamp based consumption, the Last-Event-ID header may initially provide assignments with timestamp entries instead of offset. If timestamps are provided, then Kafka will be quried for the offsets that most closely belong to the provided timestamps. This allows for historical consumption from Kafka without having to know the logical Kafka partition offsets for particular times in the past.

KafkaSSE can be configured to always use timestamps instead of offsets in the EventSource event id field via the useTimestampForId option. If this option is true, each EventSource event id (which automatically is used for the Last-Event-ID header) will be set with the Kafka message timestamp instead of offset. This is less precise than using offsets, but is better if you need to hide the underlying Kafka cluster's message offsets to support multi-DC. If users want to use offsets, they can manually modify the Last-Event-ID header before reconnecting with the latest message offsets instead of timestamps. offset in the Last-Event-ID will take precedence over timestamp if present.

See client.js for a couple of examples of Last-Event-ID offset and timestamp based assignment.

See also Kasocki.

Usage

HTTP server KafkaSSE set up

The kafka-sse module exports a function that wraps up handling an HTTP SSE request for Kafka topics.

'use strict';
const kafkaSse = require('kafka-sse');
const server = require('http').createServer();

const options = {
    kafkaConfig: {'metadata.broker.list': 'mybroker:9092'}
}

server.on('request', (req, res) => {
    const topics = req.url.replace('/', '').split(',');
    console.log(`Handling SSE request for topics ${topics}`);
    kafkaSse(req, res, topics, options)
    // This won't happen unless client disconnects or kafkaSse encounters an error.
    .then(() => {
        console.log('Finished handling SSE request.');
    });
});

server.listen(6917);
console.log('Listening for SSE connections at http:/localhost:6917/:topics');

Custom deserializer

The default deserializer used for messages returned from node-rdkafka assumes that kafkaMessage.value is a utf-8 byte buffer containing a JSON string. It parses kafkaMessage.value into an object, and then sets it as kafkaMessage.message. kafkaMessage.message is what will be sent to the connected SSE client as an event.

You may override this default deserializer. The deserializer is given the kafkaMessage as returned by node-rdkafka consume. You must make sure to set the message field on this object, and not modify the other top level fields such as topic, offset and partition. These are used to set the Last-Event-ID header.

function customDeserializer(kafkaMessage) {
    kafkaMessage.message = JSON.parse(kafkaMessage.value.toString());
    kafkaMessage.message.extraInfo = 'I was deserialized by a custom deserializer';
    return kafkaMesssage;
}

//...

kafkaSse(req, res, topics, {
    deserializer: customDeserializer,
});

// ...

Server Side filtering

By default, all consumed messages are sent to the client. However, you may provide a custom filter function as the filterer option. This function will be given the kafkaMessage as returned by the deserializer function. The message will be kept and sent to the client if your filter function returns true, otherwise it will be skipped.

/**
 * Only send events to SSE clients that have `price` field greater than `10.0`;
 */
function filterFunction(kafkaMessage) {
    return kafkaMessage.message.price >= 10.0;
}

//...

kafkaSse(req, res, topics, {
    filterer: filterFunction,
});

// ...

NodeJS EventSource usage

const EventSource = require('eventsource');
'use strict';
const topics = process.argv[2];
const port   = 6917

const url = `http://localhost:${port}/${topics}`;
console.log(`Connecting to Kafka SSE server at ${url}`);
let eventSource = new EventSource(url);

eventSource.onopen = function(event) {
    console.log('--- Opened SSE connection.');
};

eventSource.onerror = function(event) {
    console.log('--- Got SSE error', event);
};

eventSource.onmessage = function(event) {
    // event.data will be a JSON string containing the message event.
    console.log(JSON.parse(event.data));
};

Errors

If an error is encountered during SSE client connection, a normal HTTP error response will be returnred, along with JSON information about the error in the response body. However, once the SSE connection has started, the HTTP response header will have already been set to 200. From that point on, errors are given to the client as onerror EventSource events. You must register an onerror function for your EventSource object to receive these.

Notes on Kafka consumer state

In normal use cases, Kafka (and previously Zookeeper) handles consumer state. Kafka keeps track of multiple consumer processes in named consumer groups, and handles rebalancing of those processes as they come and go. Kafka also handles offset commits, keeping track of the high water mark each consumer has reached in each topic and partition.

KafkaSSE is intended to be exposed to the public internet by enabling web based consumers to use HTTP to consume from Kafka. Since the internet at large cannot be trusted, we would prefer to avoid allowing the internet to make any state changes to our Kafka clusters. KakfaSSE pushes as much consumer state management to the connected clients as it can.

Offset commits are not supported. Instead, latest subscription state is sent as the EventSource id field with each event. This information can be used during connection initializion in the Last-Event-ID header to specify the positions at which KafkaSSE should start consuming from Kafka. Last-Event-ID should be an assignments array, of the form:

[
    { topic: 'topicA', partition: 0, offset 12345 },
    { topic: 'topicB', partition: 0, offset 46666 },
    { topic: 'topicB', partition: 1, offset 45555 },
]

Consumer group management is also not supported. Each new SSE client corresponds to a new consumer group. There is no way to parallelize consumption from Kafka for a single connected client. Ideally, we would not register a consumer group at all with Kafka, but as of this writing librdkafka and blizzard/node-rdkafka don't support this yet. Consumer groups that are registered with Kafka are named after the x-request-id header, or a uuid if this is not set, e.g. KafkaSSE-2a360ded-1da0-4258-bad5-90ce954b7c52.

node-rdkafka consume modes

The node-rdkafka client that KafkaSSE uses has several consume APIs. KafkaSSE uses the Standard Non flowing API.

Testing

On host

Mocha tests require a running 0.11+ Kafka broker at localhost:9092 with delete.topic.enable=true. test/utils/kafka_fixture.sh will prepare topics in Kafka for tests. npm test will download, install, and run a Kafka broker. If you already have one running locally, then npm run test-local will be easier to run. You may set the KAFKA_TOPICS_CMD and the KAFKA_CONSOLE_PRODUCER_CMD environment variables if you would like to override the commands used in kafka_fixture.sh.

With Docker

Testing with docker requires docker-engine >- 1.12.4 and docker-compose >= 1.9.0. All you have to do is run ./test/docker-tests.sh. You shouldn't even need to have node installed for this to run.

To Do

  • tests for kafkaEventHandlers