Node Kafka Producer And Consumer Example
To set up a Node.js project where producer.js
sends messages to Kafka using a cron job, and consumer.js
consumes those messages, follow these steps:
1. Initialize the Node.js Project
First, create a new directory for your project and initialize it with npm
:
mkdir node-kafka-example
cd node-kafka-example
npm init -y
2. Install Required Dependencies
You will need kafkajs
for interacting with Kafka and node-cron
for scheduling the cron job:
npm install kafkajs node-cron
3. Create the Producer Script (producer.js
)
This script will use node-cron
to schedule a job that sends a message to Kafka at regular intervals:
// producer.js
const { Kafka } = require('kafkajs');
const cron = require('node-cron');
// Initialize Kafka
const kafka = new Kafka({
clientId: 'my-producer',
brokers: ['localhost:9092'] // Replace with your Kafka broker addresses
});
// Create a producer instance
const producer = kafka.producer();
// Function to send a message to Kafka
const sendMessage = async () => {
try {
await producer.connect();
await producer.send({
topic: 'my-topic', // Replace with your Kafka topic
messages: [{ value: `Message sent at ${new Date().toISOString()}` }],
});
console.log('Message sent successfully');
await producer.disconnect();
} catch (err) {
console.error('Error sending message', err);
}
};
// Schedule a cron job to send a message every 3s
cron.schedule('*/3 * * * * *', () => {
console.log('Cron job running...');
sendMessage();
});
console.log('Producer running...');
4. Create the Consumer Script (consumer.js
)
This script will consume messages from the Kafka topic:
// consumer.js
const { Kafka } = require('kafkajs');
// Initialize Kafka
const kafka = new Kafka({
clientId: 'my-consumer',
brokers: ['localhost:9092'] // Replace with your Kafka broker addresses
});
// Create a consumer instance
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
// Connect the consumer
await consumer.connect();
// Subscribe to the topic
await consumer.subscribe({ topic: 'my-topic', fromBeginning: true });
// Consume messages
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
};
run().catch(console.error);
console.log('Consumer running...');
5. Running the Producer and Consumer
You can run these scripts independently in separate terminal sessions:
node producer.js
node consumer.js
6. Kafka Setup
Ensure that Kafka is running and accessible. If you’re running Kafka locally, you should have it set up with at least one broker, and the topic (my-topic
) should be created.
7. Explanation
-
producer.js
: This script usesnode-cron
to schedule a job that sends a message to the Kafka topic every minute. ThesendMessage
function is responsible for connecting to Kafka, sending the message, and then disconnecting. -
consumer.js
: This script connects to Kafka as a consumer, subscribes to the specified topic, and logs any messages it receives.
This setup allows you to independently run the producer and consumer, with the producer automatically sending messages at intervals defined by the cron job.
Code
Please check the code in GitHub.