-
Notifications
You must be signed in to change notification settings - Fork 0
Home
A Node.js native binding for Apache Kafka using Rust, providing high performance and type safety.
npm install kafka-crab-js
import { KafkaConsumer, KafkaClientConfig } from 'kafka-crab-js';
const config: KafkaClientConfig = {
brokers: ['localhost:9092'],
groupId: 'my-group',
};
async function consumeMessages() {
const consumer = new KafkaConsumer(config);
// Subscribe to events before consuming
consumer.on_events((error, event) => {
if (error) {
console.error('Event error:', error);
return;
}
switch (event.name) {
case 'PreRebalance':
console.log('Starting rebalance');
break;
case 'PostRebalance':
console.log('Finished rebalance');
break;
case 'CommitCallback':
console.log('Offset committed');
break;
}
});
try {
await consumer.subscribe('my-topic');
while (true) {
const message = await consumer.recv();
if (message === null) {
console.log('Consumer disconnected');
break;
}
console.log('Received:', message);
}
} finally {
await consumer.disconnect();
}
}
import { KafkaConsumer, KafkaClientConfig } from 'kafka-crab-js';
const config: KafkaClientConfig = {
brokers: ['localhost:9092'],
groupId: 'my-group',
// Disable auto-commit when you want to commit manually
configuration: {
'enable.auto.commit': 'false'
}
};
async function consumeMessages() {
const consumer = new KafkaConsumer(config);
try {
await consumer.subscribe('my-topic');
while (true) {
const message = await consumer.recv();
if (message === null) {
console.log('Consumer disconnected');
break;
}
console.log('Received:', message);
// Manual commit example
await consumer.commit(
message.topic,
message.partition,
message.offset,
'Sync' // or 'Async'
);
}
} finally {
await consumer.disconnect();
}
}
import { KafkaProducer, KafkaClientConfig } from 'kafka-crab-js';
const config: KafkaClientConfig = {
brokers: ['localhost:9092']
};
async function produceMessages() {
const producer = new KafkaProducer(config);
await producer.send({
topic: 'my-topic',
message: 'Hello World'
});
await producer.flush();
}
new KafkaConsumer(config: KafkaClientConfig)
-
subscribe(topic: string | TopicPartitionConfig[]): Promise Subscribe to topics
-
recv(): Promise<Message | null> Receive next message
-
on_events(callback: (error: Error | undefined, event: KafkaEvent) => void): void Subscribe to consumer events
-
disconnect(): Promise Disconnect and cleanup consumer
-
pause(): Promise Pause consumption
-
resume(): Promise Resume consumption
-
seek(topic: string, partition: number, offset: OffsetModel): Promise Seek to specific offset
-
commit(topic: string, partition: number, offset: number, mode: CommitMode): Promise Commit offsets
enum KafkaEventName {
PreRebalance,
PostRebalance,
CommitCallback
}
interface TopicPartition {
topic: string;
partition: number;
offset: number;
}
interface KafkaEventPayload {
action?: string;
tpl: Array<TopicPartition>;
error?: string;
}
interface KafkaEvent {
name: KafkaEventName;
payload: KafkaEventPayload;
}
-
Resource Management
- Always call
disconnect()
when done - Use try/finally blocks
- Handle cleanup properly
- Always call
-
Event Handling
- Subscribe to events before consuming
- Handle all event types
- Check for errors in callbacks
-
Error Handling
- Use try/catch blocks
- Implement proper error recovery
- Check null returns from recv()
-
Type Safety
- Use TypeScript for better type checking
- Leverage provided type definitions
- Use enum values for event names
-
Commit Strategy
- By default, auto-commit is enabled (every 5 seconds)
- Disable auto-commit when you need manual control
- Use manual commits for:
- At-least-once delivery guarantee
- Custom commit intervals
- Batch processing
- Choose commit mode based on your needs:
- 'Sync': Ensures commit is complete before continuing
- 'Async': Better performance but no immediate confirmation
interface KafkaClientConfig {
brokers: string[];
groupId?: string;
clientId?: string;
configuration?: {
'enable.auto.commit'?: 'true' | 'false'; // Defaults to 'true'
'auto.commit.interval.ms'?: string; // Defaults to '5000'
// For all available configuration options, see:
// https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
}
}
For detailed API documentation, see the Complete API Reference
Note: All configuration attributes in the
configuration
object are passed directly to librdkafka. For a complete list of available options, refer to the librdkafka Configuration Properties.
-
Connection Issues
- Verify broker addresses
- Check network connectivity
- Ensure proper security settings
-
Performance Tuning
- Adjust batch sizes
- Configure appropriate timeouts
- Monitor memory usage
-
Commit Issues
- Remember to disable auto-commit when using manual commits
- Use Sync commit mode when you need confirmation
- Consider commit frequency impact on performance
- Watch for commit errors in event callbacks
Contributions are welcome! Please see our Contributing Guide for more details.
This project is licensed under the MIT License - see the LICENSE file for details.