Skip to content

JohnApache/egg-kafka-node

Repository files navigation

egg-kafka-node

NPM version build status Test coverage Known Vulnerabilities npm download

This plug-in is a package of [kafka-node] (https://github.com/SOHU-Co/kafka-node). It is an egg-style plug-in for easy use in the environment of egg.js. It also provides a detailed configuration of methods for sending Kafka messages. Refer to [https://github.com/SOHU-Co/kafka-node] (https://github.com/SOHU-Co/kafka-node).

Install

$ npm i egg-kafka-node --save

Usage

// {app_root}/config/plugin.js
exports.kafkaNode = {
  enable: true,
  package: 'egg-kafka-node',
};

Configuration

// {app_root}/config/config.default.js
exports.kafkaNode = {
  kafkaHost: '127.0.0.1:9092', // kafka connect host
  clientOption: {}, // KafkaClient option, more documentation please visit kafka-node
  consumerOption: [{
    groupId: 'group1', // consumerGroup's groupId
    topics: [ 'testTopic1' ], // topics under the same consumer group 
    options: {
      fetchMaxWaitMs: 100,
      fetchMinBytes: 1,
      fetchMaxBytes: 1024 * 1024,
    }, // relevant configuration for each consumer group, more documentation please visit kafka-node
  }, {
    groupId: 'group2',
    topics: [ 'testTopic2' ],
    options: {},
  }, {
    groupId: 'group3',
    topics: [ 'testTopic3' ],
  }],
  
  // HighLevelProducer option, more documentation please visit kafka-node
  producerOption: {
    requireAcks: 1, 
    ackTimeoutMs: 100, 
    partitionerType: 2, 
    autoCreateTopic: true, // Whether to turn on automatic topic creation. default true
    topics: [ 'testTopic1', 'testTopic2', 'testTopic3' ], // Topics that all consumers need to consume
  },
  messageOption: {
    partition: 0,
    attributes: 0, // send message option
  },
  // NewConfig 
  baseConsumersDir: './app/kafka', // support read consumers files base dir
};

see config/config.default.js for more detail.

Structure

egg-project
├── package.json
├── app.js (optional)
├── app
|   ├── router.js
   ├── controller
   |   └── home.js
   ├── service (optional)
   |   └── user.js
   |   └── response_time.js
   └── kafka (optional)  --------> like `controller, service...`
       ├── someTopic (optional)  -------> topic name of kafka
            ├── someKey1Consumer.js(optional)  ------> `someKey1` is a key of someTopic
|            └── someKey2Consumer.js(optional)  ------> `someKey2` is an another key of someTopic
|            └── someKey3Consumer.ts(optional)  ------> support load ts file consumers
├── config
|   ├── plugin.js
|   ├── config.default.js
   ├── config.prod.js
|   ├── config.test.js (optional)
|   ├── config.local.js (optional)
|   └── config.unittest.js (optional)

USE TIPS

Note: The producer option topics of the kafkaNode configuration must create a corresponding topic directory under the {app-root}/kafka directory. Kafka-node automatically reads the file containing the'Consumers'filename under the topic, and the key needs to be passed in when sendMessage to facilitate business differentiation.

Note: You must set app.config.baseDir, kafka need to load consumers base on the baseDir.

Note: SendMessage messages max bytes depending on the configuration of you set.

Example

// {app_root}/controller/index.js
class IndexController extends Controller {
  async index() {
    await this.ctx.kafka.sendMessage({
      topic: 'someTopic', // Specify topics in the Kafka directory
      key: 'someKey', // Specify consumer for the corresponding key under topic
      messages: JSON.stringify({
        username: 'JohnApache',
        userId: 10001,
        gender: 0
      })
    });
  }

  async some() {
    this.ctx.kafka.sendMessageSync({
      topic: 'someTopic', // Specify topics in the Kafka directory
      key: 'someKey', // Specify consumer for the corresponding key under topic
      messages: JSON.stringify({
        username: 'JohnApache',
        userId: 10001,
        gender: 0
      })
    }, () => {
      // success callback 
    }, () => {
      // error callback 
    })
  }
}

// {app_root}/kafka/someTopic/someKeyConsumer.js
class SomeKeySubscription extends Subscription {
  async subscribe(message) {
    const {value, topic, key} = message;
    this.ctx.logger.info(`consume message ${value} by topic ${topic} key ${key} consumer`);
    await asyncTask();
  }
}

Questions & Suggestions

Please open an issue here.

License

MIT