Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): refactor framework and add factory service for producer and admin #4154

Merged
merged 9 commits into from
Nov 8, 2024

Conversation

czy88840616
Copy link
Member

@czy88840616 czy88840616 commented Oct 30, 2024

基本重构了原有的代码,原有装饰器和配置会保留,文档会干掉(废弃)。

1、针对多个topic的问题

其实单个文件在 kafka 上比较正常,topics 是可以配数组以及正则的,所以其实没啥问题。
从逻辑上来说, 一个 kafka 实例可能可以对应多个 consumer 以及多个 topic,这里是1:n:n,文件代表的是 kafka 实例还是 consumer,或者是 topic,就成了一个问题。

const kafka = new Kafka();
const consumer = kafka.consumer();
await consumer.connect();
await consumer.subscribe({ topics: ['topic-B', 'topic-C'] })
await consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
        // ...
    },
})

而 mqtt 这种就一层,全局一个实例,只要监听 message 就行,本质上服务实例、 consumer和 topic 是 1:1:1 的关系,这里的多个文件天然就代表了多个服务实例(consumer 了)

const mqtt = require("mqtt");
const client = mqtt.connect("mqtt://test.mosquitto.org");

client.on("connect", () => {
  client.subscribe("presence", (err) => {
    if (!err) {
      client.publish("presence", "Hello mqtt");
    }
  });
});

client.on("message", (topic, message) => {
  // message is Buffer
  console.log(message.toString());
  client.end();
});

目前的方案是折中, 一个文件实体代表一个 consumer。

@KafkaSubscriber('sub1')
export class UserConsumer implements IKafkaSubscriber {
  async eachMessage(payload: EachMessagePayload) {
    // ...
  }
}

@KafkaSubscriber('sub2')
export class UserConsumer2 implements IKafkaSubscriber {
  async eachMessage(payload: EachMessagePayload) {
    // ...
  }
}

之前的回调封装的比较厉害,这次直接暴露 eachMessageeachBatch 两个方法,和原始 SDK 保持一致。

2、共享实例

多个文件,多个 consumer,就等价于多个 kafka 实例,其实是有点浪费的。

所以这里会考虑复用性设计。

sub1: {
  connectionOptions: {
    clientId: 'my-app',
    brokers: [process.env.KAFKA_URL || 'localhost:9092'],
  },
  consumerOptions: {
    groupId: 'groupId-1',
  },
  // ...
},
sub2: {
  kafkaInstanceRef: 'sub1',
  consumerOptions: {
    groupId: 'groupId-2',
  },
}

kafkaInstanceRef 会在创建时处理,指代同一个实例。
必须创建不同的消费分组,不然 kafka 会重复做消费组平衡,持续报错。

3、日志

kafka 的日志之前是自带的,且输出是对象形式,这次一并修改,会使用 midway/logger 打到 midway-kafka.log

4、获取 consumer 对象

原来没有提供这样的方法,这次打算加上,目前 framework.getComsumer('sub1') 能拿到,至于装饰器,暂时不加了。在 ctx 中,也可以使用 ctx.consumer 来获取,方便 pause 或者 commit。

5、移除 autoCommit 功能

原来返回 result 会一次 commitOffsets,事实上 kafka 客户端自己就会做 auto commit,也提供了手动 commit 的功能,交给用户判断会更合理。另外返回值可能有别的用途不应该占用。

@czy88840616
Copy link
Member Author

related: #4136 #4048 #4042

@czy88840616 czy88840616 added the pr: polish This PR adds a very minor behavior improvement that users will enjoy. label Oct 30, 2024
@czy88840616 czy88840616 added this to the v3.19 milestone Nov 1, 2024
@czy88840616
Copy link
Member Author

支持了 kafka producer 和 admin 的创建。

@czy88840616
Copy link
Member Author

czy88840616 commented Nov 3, 2024

剩下的部分:

  • kafka 的日志格式
  • 新文档更新

@czy88840616 czy88840616 changed the title refactor: kafka framework feat: refactor kafka framework and add factory service for producer and admin Nov 3, 2024
@czy88840616 czy88840616 changed the title feat: refactor kafka framework and add factory service for producer and admin feat: refactor kafka and add factory service for producer and admin Nov 3, 2024
@czy88840616 czy88840616 changed the title feat: refactor kafka and add factory service for producer and admin feat(kafka): refactor framework and add factory service for producer and admin Nov 3, 2024
@czy88840616 czy88840616 merged commit 2c71afc into main Nov 8, 2024
7 checks passed
@czy88840616 czy88840616 deleted the refactor_kafka branch November 8, 2024 15:19
czy88840616 added a commit that referenced this pull request Dec 15, 2024
…and admin (#4154)

* refactor: kafka framework

* refactor: kafka framework

* refactor: kafka framework

* refactor: kafka framework

* refactor: kafka framework

* feat: support producer and admin for kafka

* fix: test

* docs: add document

(cherry picked from commit 2c71afc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr: polish This PR adds a very minor behavior improvement that users will enjoy.
Development

Successfully merging this pull request may close these issues.

1 participant