-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
81 lines (67 loc) · 2 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
const fs = require("fs");
const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS;
function readConfig(fileName) {
const data = fs.readFileSync(fileName, "utf8").toString().split("\n");
return data.reduce((config, line) => {
const [key, value] = line.split("=");
if (key && value) {
config[key] = value;
}
return config;
}, {});
}
async function produce(topic, config) {
const key = "nome";
const value = "sandro";
// create a new producer instance
const producer = new Kafka().producer(config);
// connect the producer to the broker
await producer.connect();
// send a single message
const produceRecord = await producer.send({
topic,
messages: [{ key, value }],
});
console.log(
`\n\n Produced message to topic ${topic}: key = ${key}, value = ${value}, ${JSON.stringify(
produceRecord,
null,
2
)} \n\n`
);
// disconnect the producer
await producer.disconnect();
}
async function consume(topic, config) {
// setup graceful shutdown
const disconnect = () => {
consumer.commitOffsets().finally(() => {
consumer.disconnect();
});
};
process.on("SIGTERM", disconnect);
process.on("SIGINT", disconnect);
// set the consumer's group ID, offset and initialize it
config["group.id"] = "nodejs-group-1";
config["auto.offset.reset"] = "earliest";
const consumer = new Kafka().consumer(config);
// connect the consumer to the broker
await consumer.connect();
// subscribe to the topic
await consumer.subscribe({ topics: [topic] });
// consume messages from the topic
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(
`Consumed message from topic ${topic}, partition ${partition}: key = ${message.key.toString()}, value = ${message.value.toString()}`
);
},
});
}
async function main() {
const config = readConfig("client.properties");
const topic = "topic_0";
await produce(topic, config);
//await consume(topic, config);
}
main();