-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathindex.js
More file actions
39 lines (31 loc) · 839 Bytes
/
index.js
File metadata and controls
39 lines (31 loc) · 839 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
})
const producerkafka = async () => {
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
})
await producer.disconnect()
}
//
const consumerkaf = async () => {
const consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}
producerkafka()
consumerkaf()