-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafkajsConsumer.ts
34 lines (31 loc) · 1.02 KB
/
kafkajsConsumer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import { NodeRuntime } from "@effect/platform-node";
import { Console, Effect, Layer } from "effect";
import { ConfluentKafkaJSInstance, Consumer, ConsumerRecord, MessageRouter } from "../src";
const ConsumerLive = MessageRouter.empty.pipe(
MessageRouter.subscribe(
"test-topic",
Effect.flatMap(ConsumerRecord.ConsumerRecord, ({ topic, partition, ...message }) =>
Console.log({
topic,
partition,
offset: message.offset,
value: message.value?.toString(),
}),
),
),
MessageRouter.subscribe(
"customers",
Effect.flatMap(ConsumerRecord.ConsumerRecord, ({ topic, partition, ...message }) =>
Console.log({
topic,
partition,
offset: message.offset,
value: message.value?.toString(),
}),
),
),
Consumer.serve({ groupId: "group" }),
);
const KafkaLive = ConfluentKafkaJSInstance.layer({ brokers: ["localhost:19092"] });
const MainLive = ConsumerLive.pipe(Layer.provide(KafkaLive));
NodeRuntime.runMain(Layer.launch(MainLive));