diff --git a/packages/core/src/common/typedResourceManager.ts b/packages/core/src/common/typedResourceManager.ts new file mode 100644 index 000000000000..afb75a13b7a7 --- /dev/null +++ b/packages/core/src/common/typedResourceManager.ts @@ -0,0 +1,133 @@ +import { ClassType } from '../interface'; + +export class TypedResourceManager< + Resource = any, + ResourceInitializeConfig = any, + ResourceProviderType = any +> { + private resourceMap: Map = new Map(); + private resourceBindingMap: Map = new Map(); + constructor( + protected typedResourceInitializerOptions: { + initializeValue: { + [resourceName: string]: ResourceInitializeConfig; + }; + initializeClzProvider: { + [resourceName: string]: ClassType; + }; + resourceInitialize: ( + resourceInitializeConfig: ResourceInitializeConfig, + resourceName: string + ) => Promise; + resourceBinding: ( + ClzProvider: ClassType, + resourceInitializeConfig: ResourceInitializeConfig, + resource: Resource, + resourceName: string + ) => Promise; + resourceStart: ( + resource: Resource, + resourceInitializeConfig: ResourceInitializeConfig, + resourceBindingResult?: any + ) => Promise; + resourceDestroy: ( + resource: Resource, + resourceInitializeConfig: ResourceInitializeConfig + ) => Promise; + } + ) {} + + public async createResource( + resourceName: string, + resourceInitializeConfig: ResourceInitializeConfig + ) { + const resource = + await this.typedResourceInitializerOptions.resourceInitialize( + resourceInitializeConfig, + resourceName + ); + this.resourceMap.set(resourceName, resource); + return resource; + } + + public async init() { + for (const resourceName of Object.keys( + this.typedResourceInitializerOptions.initializeValue + )) { + const resourceInitializeConfig = + this.typedResourceInitializerOptions.initializeValue[resourceName]; + const ClzProvider = + this.typedResourceInitializerOptions.initializeClzProvider[ + resourceName + ]; + + const resource = await this.createResource( + resourceName, + resourceInitializeConfig + ); + + const bindingResult = + await this.typedResourceInitializerOptions.resourceBinding( + ClzProvider, + resourceInitializeConfig, + resource, + resourceName + ); + if (bindingResult) { + this.resourceBindingMap.set(resourceName, bindingResult); + } + } + } + + public async startParallel() { + const startPromises = []; + for (const [resourceName, resource] of this.resourceMap.entries()) { + startPromises.push( + this.typedResourceInitializerOptions.resourceStart( + resource, + this.typedResourceInitializerOptions.initializeValue[resourceName], + this.resourceBindingMap.get(resourceName) + ) + ); + } + await Promise.all(startPromises); + } + + public async start() { + for (const [resourceName, resource] of this.resourceMap.entries()) { + await this.typedResourceInitializerOptions.resourceStart( + resource, + this.typedResourceInitializerOptions.initializeValue[resourceName], + this.resourceBindingMap.get(resourceName) + ); + } + } + + public async destroyParallel() { + const destroyPromises = []; + for (const [resourceName, resource] of this.resourceMap.entries()) { + destroyPromises.push( + this.typedResourceInitializerOptions.resourceDestroy( + resource, + this.typedResourceInitializerOptions.initializeValue[resourceName] + ) + ); + } + await Promise.all(destroyPromises); + } + + public async destroy() { + for (const [resourceName, resource] of this.resourceMap.entries()) { + await this.typedResourceInitializerOptions.resourceDestroy( + resource, + this.typedResourceInitializerOptions.initializeValue[resourceName] + ); + } + this.resourceMap.clear(); + this.resourceBindingMap.clear(); + } + + public getResource(resourceName: string): any { + return this.resourceMap.get(resourceName); + } +} diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index fde42c743392..3ac441a6fe05 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -87,4 +87,5 @@ export { PathFileUtil } from './util/pathFileUtil'; export { FileUtils } from './util/fs'; export { FORMAT } from './util/format'; export { ServerResponse, HttpServerResponse } from './response/index'; +export { TypedResourceManager } from './common/typedResourceManager'; export { MidwayPerformanceManager } from './common/performanceManager'; diff --git a/packages/core/src/interface.ts b/packages/core/src/interface.ts index bc3565dd0aaf..5c03fe50f4c3 100644 --- a/packages/core/src/interface.ts +++ b/packages/core/src/interface.ts @@ -1246,3 +1246,5 @@ export interface ServerSendEventStreamOptions { closeEvent?: string; tpl?: (data: ServerSendEventMessage, ctx: CTX) => ServerSendEventMessage; } + +export type ClassType = new (...args: any[]) => T; diff --git a/packages/core/test/common/typedResourceManager.test.ts b/packages/core/test/common/typedResourceManager.test.ts new file mode 100644 index 000000000000..a1b9646310f2 --- /dev/null +++ b/packages/core/test/common/typedResourceManager.test.ts @@ -0,0 +1,65 @@ +import { TypedResourceManager } from '../../src'; + +describe('test/common/typedResourceManager.test.ts', () => { + let typedResourceManager: TypedResourceManager; + let mockResourceInitialize: jest.Mock; + let mockResourceBinding: jest.Mock; + let mockResourceStart: jest.Mock; + let mockResourceDestroy: jest.Mock; + + beforeEach(() => { + mockResourceInitialize = jest.fn().mockResolvedValue('mockResource'); + mockResourceBinding = jest.fn().mockResolvedValue('mockBindingResult'); + mockResourceStart = jest.fn().mockResolvedValue(undefined); + mockResourceDestroy = jest.fn().mockResolvedValue(undefined); + + typedResourceManager = new TypedResourceManager({ + initializeValue: { + testResource: { configKey: 'configValue' } + }, + initializeClzProvider: { + testResource: class {} + }, + resourceInitialize: mockResourceInitialize, + resourceBinding: mockResourceBinding, + resourceStart: mockResourceStart, + resourceDestroy: mockResourceDestroy + }); + }); + + it('should create and initialize resources', async () => { + await typedResourceManager.init(); + expect(mockResourceInitialize).toHaveBeenCalledWith({ configKey: 'configValue' }, 'testResource'); + expect(mockResourceBinding).toHaveBeenCalledWith(expect.any(Function), { configKey: 'configValue' }, 'mockResource', 'testResource'); + }); + + it('should start resources', async () => { + await typedResourceManager.init(); + await typedResourceManager.start(); + expect(mockResourceStart).toHaveBeenCalledWith('mockResource', { configKey: 'configValue' }, 'mockBindingResult'); + }); + + it('should destroy resources', async () => { + await typedResourceManager.init(); + await typedResourceManager.destroy(); + expect(mockResourceDestroy).toHaveBeenCalledWith('mockResource', { configKey: 'configValue' }); + }); + + it('should start resources in parallel', async () => { + await typedResourceManager.init(); + await typedResourceManager.startParallel(); + expect(mockResourceStart).toHaveBeenCalledWith('mockResource', { configKey: 'configValue' }, 'mockBindingResult'); + }); + + it('should destroy resources in parallel', async () => { + await typedResourceManager.init(); + await typedResourceManager.destroyParallel(); + expect(mockResourceDestroy).toHaveBeenCalledWith('mockResource', { configKey: 'configValue' }); + }); + + it('should get a resource by name', async () => { + await typedResourceManager.init(); + const resource = typedResourceManager.getResource('testResource'); + expect(resource).toBe('mockResource'); + }); +}); diff --git a/packages/kafka/index.d.ts b/packages/kafka/index.d.ts index 7ce6814cc3da..3ea9eae4f61e 100644 --- a/packages/kafka/index.d.ts +++ b/packages/kafka/index.d.ts @@ -1,5 +1,13 @@ import { IMidwayKafkaConfigurationOptions } from './dist'; export * from './dist/index'; +export { + EachMessagePayload, + EachBatchPayload, + Consumer, + Kafka, + Producer, + Admin, +} from 'kafkajs'; declare module '@midwayjs/core/dist/interface' { interface MidwayConfig { diff --git a/packages/kafka/src/configuration.ts b/packages/kafka/src/configuration.ts index b8bd89a05ee1..d148ac3bd0b2 100644 --- a/packages/kafka/src/configuration.ts +++ b/packages/kafka/src/configuration.ts @@ -5,11 +5,18 @@ import { Configuration } from '@midwayjs/core'; importConfigs: [ { default: { - kafka: {}, + kafka: { + contextLoggerApplyLogger: 'kafkaLogger', + }, + midwayLogger: { + clients: { + kafkaLogger: { + fileLogName: 'midway-kafka.log', + }, + }, + }, }, }, ], }) -export class KafkaConfiguration { - async onReady() {} -} +export class KafkaConfiguration {} diff --git a/packages/kafka/src/decorator.ts b/packages/kafka/src/decorator.ts new file mode 100644 index 000000000000..db7ddd0ffca9 --- /dev/null +++ b/packages/kafka/src/decorator.ts @@ -0,0 +1,17 @@ +import { + Provide, + saveModule, + Scope, + ScopeEnum, + saveClassMetadata, +} from '@midwayjs/core'; +export const KAFKA_DECORATOR_KEY = 'rpc:kafka'; + +export function KafkaConsumer(consumerName: string): ClassDecorator { + return target => { + saveModule(KAFKA_DECORATOR_KEY, target); + saveClassMetadata(KAFKA_DECORATOR_KEY, consumerName, target); + Scope(ScopeEnum.Request)(target); + Provide()(target); + }; +} diff --git a/packages/kafka/src/framework.ts b/packages/kafka/src/framework.ts index 22eb805d0fca..a372fe94f41c 100644 --- a/packages/kafka/src/framework.ts +++ b/packages/kafka/src/framework.ts @@ -9,13 +9,37 @@ import { MSListenerType, MS_CONSUMER_KEY, MidwayInvokeForbiddenError, + ILogger, + Logger, + TypedResourceManager, + MidwayCommonError, } from '@midwayjs/core'; import { + IKafkaConsumerInitOptions, + IKafkaConsumer, IMidwayConsumerConfig, IMidwayKafkaApplication, IMidwayKafkaContext, } from './interface'; import { KafkaConsumerServer } from './kafka'; +import { KAFKA_DECORATOR_KEY } from './decorator'; +import { ConsumerRunConfig, Kafka, Consumer, logLevel } from 'kafkajs'; +import { KafkaManager } from './manager'; + +const toMidwayLogLevel = level => { + switch (level) { + case logLevel.NOTHING: + return 'none'; + case logLevel.ERROR: + return 'error'; + case logLevel.WARN: + return 'warn'; + case logLevel.INFO: + return 'info'; + case logLevel.DEBUG: + return 'debug'; + } +}; @Framework() export class MidwayKafkaFramework extends BaseFramework< @@ -23,33 +47,141 @@ export class MidwayKafkaFramework extends BaseFramework< IMidwayKafkaContext, any > { + protected LogCreator: any; + protected typedResourceManager: TypedResourceManager< + Consumer, + IKafkaConsumerInitOptions, + IKafkaConsumer + >; configure() { return this.configService.getConfiguration('kafka'); } + @Logger('kafkaLogger') + kafKaLogger: ILogger; + async applicationInitialize() { + this.LogCreator = logLevel => { + const logger = this.kafKaLogger; + + return ({ level, log }) => { + const lvl = toMidwayLogLevel(level); + const { message, ...extra } = log; + logger?.[lvl](message, extra); + }; + }; + // Create a connection manager - this.app = new KafkaConsumerServer({ - logger: this.logger, - ...this.configurationOptions, - }) as unknown as IMidwayKafkaApplication; + if (this.configurationOptions['kafkaConfig']) { + this.app = new KafkaConsumerServer({ + logger: this.kafKaLogger, + ...this.configurationOptions, + }) as unknown as IMidwayKafkaApplication; + } else { + this.app = {} as any; + } } public async run(): Promise { - try { - await this.app.connect( - this.configurationOptions.kafkaConfig, - this.configurationOptions.consumerConfig - ); - await this.loadSubscriber(); - this.logger.info('Kafka consumer server start success'); - } catch (error) { - this.logger.error('Kafka consumer connect fail', error); - await this.app.closeConnection(); + if (this.configurationOptions['kafkaConfig']) { + try { + await this.app.connect( + this.configurationOptions.kafkaConfig, + this.configurationOptions.consumerConfig + ); + await this.loadLegacySubscriber(); + this.kafKaLogger.info('Kafka consumer server start success'); + } catch (error) { + this.kafKaLogger.error('Kafka consumer connect fail', error); + await this.app.closeConnection(); + } + } else { + const { consumer } = this.configurationOptions; + if (!consumer) return; + const subscriberMap = {}; + // find subscriber + const subscriberModules = listModule(KAFKA_DECORATOR_KEY); + for (const subscriberModule of subscriberModules) { + const subscriberName = getClassMetadata( + KAFKA_DECORATOR_KEY, + subscriberModule + ) as string; + subscriberMap[subscriberName] = subscriberModule; + } + + this.typedResourceManager = new TypedResourceManager< + Consumer, + IKafkaConsumerInitOptions, + IKafkaConsumer + >({ + initializeValue: consumer, + initializeClzProvider: subscriberMap, + resourceInitialize: async (resourceInitializeConfig, resourceName) => { + let client; + if (resourceInitializeConfig.kafkaInstanceRef) { + client = KafkaManager.getInstance().getKafkaInstance( + resourceInitializeConfig.kafkaInstanceRef + ); + if (!client) { + throw new MidwayCommonError( + `kafka instance ${resourceInitializeConfig.kafkaInstanceRef} not found` + ); + } + } else { + client = new Kafka({ + logCreator: this.LogCreator, + ...resourceInitializeConfig.connectionOptions, + }); + KafkaManager.getInstance().addKafkaInstance(resourceName, client); + } + + const consumer = client.consumer( + resourceInitializeConfig.consumerOptions + ); + await consumer.connect(); + await consumer.subscribe(resourceInitializeConfig.subscribeOptions); + return consumer; + }, + resourceBinding: async ( + ClzProvider, + resourceInitializeConfig, + consumer + ): Promise => { + const runMethod = ClzProvider.prototype['eachBatch'] + ? 'eachBatch' + : 'eachMessage'; + const runConfig = { + ...resourceInitializeConfig.consumerRunConfig, + }; + runConfig[runMethod] = async payload => { + const ctx = this.app.createAnonymousContext(); + const fn = await this.applyMiddleware(async ctx => { + ctx.payload = payload; + ctx.consumer = consumer; + const instance = await ctx.requestContext.getAsync(ClzProvider); + return await instance[runMethod].call(instance, payload, ctx); + }); + return await fn(ctx); + }; + return runConfig; + }, + resourceStart: async ( + resource: Consumer, + resourceInitializeConfig, + resourceBindingResult: ConsumerRunConfig + ) => { + await resource.run(resourceBindingResult); + }, + resourceDestroy: async (resource: Consumer) => { + await resource.disconnect(); + }, + }); + await this.typedResourceManager.init(); + await this.typedResourceManager.start(); } } - private async loadSubscriber() { + private async loadLegacySubscriber() { const subscriberModules = listModule(MS_CONSUMER_KEY, module => { const metadata: ConsumerMetadata.ConsumerMetadata = getClassMetadata( MS_CONSUMER_KEY, @@ -94,7 +226,8 @@ export class MidwayKafkaFramework extends BaseFramework< ); await this.app.connection.run( Object.assign(e.runConfig, { - eachMessage: async ({ topic, partition, message }) => { + eachMessage: async payload => { + const { topic, partition, message } = payload; let propertyKey: string | number; for (const methodBindListeners of data) { for (const listenerOptions of methodBindListeners) { @@ -113,6 +246,8 @@ export class MidwayKafkaFramework extends BaseFramework< }, ]); }, + payload, + consumer: this.app.connection, } as unknown as IMidwayKafkaContext; this.app.createAnonymousContext(ctx); @@ -159,12 +294,26 @@ export class MidwayKafkaFramework extends BaseFramework< } } + public getConsumer(subscriberNameOrInstanceName: string) { + if (this.typedResourceManager) { + return this.typedResourceManager.getResource( + subscriberNameOrInstanceName + ); + } + } + + public getKafka(instanceName: string) { + return KafkaManager.getInstance().getKafkaInstance(instanceName); + } + public getFrameworkName() { return 'midway:kafka'; } protected async beforeStop(): Promise { - await this.app.close(); + if (this.typedResourceManager) { + await this.typedResourceManager.destroy(); + } } public getFrameworkType(): MidwayFrameworkType { diff --git a/packages/kafka/src/index.ts b/packages/kafka/src/index.ts index e83b8d2b08b9..faccb546cfa4 100644 --- a/packages/kafka/src/index.ts +++ b/packages/kafka/src/index.ts @@ -1,6 +1,8 @@ -import * as Kafkajs from 'kafkajs'; +import * as KafkaJS from 'kafkajs'; export { MidwayKafkaFramework as Framework } from './framework'; -export * from './interface'; export { KafkaConfiguration as Configuration } from './configuration'; -export { Kafkajs }; +export * from './interface'; +export * from './decorator'; +export * from './service'; +export { KafkaJS }; diff --git a/packages/kafka/src/interface.ts b/packages/kafka/src/interface.ts index 831689d16df6..544ba3bbc5d8 100644 --- a/packages/kafka/src/interface.ts +++ b/packages/kafka/src/interface.ts @@ -3,8 +3,23 @@ import { IMidwayApplication, IMidwayContext, NextFunction as BaseNextFunction, -} from '@midwayjs/core'; -import { Kafka, KafkaConfig } from 'kafkajs'; + ServiceFactoryConfigOption +} from "@midwayjs/core"; +import { + AdminConfig, + Consumer, + ConsumerConfig, + ConsumerRunConfig, + ConsumerSubscribeTopic, + ConsumerSubscribeTopics, + EachBatchHandler, + EachBatchPayload, + EachMessageHandler, + EachMessagePayload, + Kafka, + KafkaConfig, + ProducerConfig +} from 'kafkajs'; export interface IKafkaApplication { } @@ -12,15 +27,25 @@ export interface IKafkaApplication { export type IMidwayKafkaApplication = IMidwayApplication & IKafkaApplication; -export interface IMidwayKafkaConfigurationOptions - extends IConfigurationOptions, - KafkaConfig {} - export type IMidwayKafkaContext = IMidwayContext<{ - topic: any; - partition: any; - message: any; - commitOffsets(data: any): void; + /** + * @deprecated please use `ctx.payload` instead + */ + topic?: any; + /** + * @deprecated please use `ctx.payload` instead + */ + partition?: any; + /** + * @deprecated please use `ctx.payload` instead + */ + message?: any; + /** + * @deprecated please use `ctx.consumer.commitOffsets` instead + */ + commitOffsets?(data: any): void; + payload: EachMessagePayload | EachBatchPayload; + consumer: Consumer; }>; export type Application = IMidwayKafkaApplication; @@ -30,9 +55,61 @@ export type DefaultConfig = string | Kafka; /** * 客户端的相关配置,在midwayjs的自定义配置项 + * @deprecated */ export interface IMidwayConsumerConfig { topic: string; subscription: any; runConfig: any; } + +/** + * The options for the kafka consumer initialization in midway + */ +export interface IKafkaConsumerInitOptions { + /** + * The connection options for the kafka instance + */ + connectionOptions: KafkaConfig; + /** + * The consumer options for the kafka consumer + */ + consumerOptions: ConsumerConfig; + subscribeOptions: ConsumerSubscribeTopics | ConsumerSubscribeTopic; + consumerRunConfig: ConsumerRunConfig; + kafkaInstanceRef?: string; +} + +/** + * The options for the kafka producer initialization in midway + */ +export interface IMidwayKafkaProducerInitOptions { + connectionOptions: KafkaConfig; + producerOptions: ProducerConfig; + kafkaInstanceRef?: string; +} + +/** + * The options for the kafka admin initialization in midway + */ +export interface IMidwayKafkaAdminInitOptions { + kafkaInstanceRef?: string; + connectionOptions: KafkaConfig; + /** + * The options for the kafka admin initialization + */ + adminOptions: AdminConfig; +} + +export interface IMidwayKafkaConfigurationOptions extends IConfigurationOptions { + consumer: { + [name: string]: Partial; + }, + producer: ServiceFactoryConfigOption>, + admin: ServiceFactoryConfigOption> +} + +export interface IKafkaConsumer { + eachBatch?: EachBatchHandler; + eachMessage?: EachMessageHandler; +} diff --git a/packages/kafka/src/manager.ts b/packages/kafka/src/manager.ts new file mode 100644 index 000000000000..a812f23ce20b --- /dev/null +++ b/packages/kafka/src/manager.ts @@ -0,0 +1,23 @@ +import { Kafka } from 'kafkajs'; + +export class KafkaManager { + private kafkaInstanceMap: Map = new Map(); + private static instance: KafkaManager; + + private constructor() {} + + getKafkaInstance(name: string) { + return this.kafkaInstanceMap.get(name); + } + + addKafkaInstance(name: string, kafka: Kafka) { + this.kafkaInstanceMap.set(name, kafka); + } + + public static getInstance() { + if (!this.instance) { + this.instance = new KafkaManager(); + } + return this.instance; + } +} diff --git a/packages/kafka/src/service.ts b/packages/kafka/src/service.ts new file mode 100644 index 000000000000..16c4abc2d92f --- /dev/null +++ b/packages/kafka/src/service.ts @@ -0,0 +1,121 @@ +import { + Config, + Destroy, + ILogger, + Init, + Logger, + MidwayCommonError, + ServiceFactory, + ServiceFactoryConfigOption, + Singleton, +} from '@midwayjs/core'; +import { Producer, Kafka, Admin } from 'kafkajs'; +import { + IMidwayKafkaAdminInitOptions, + IMidwayKafkaProducerInitOptions, +} from './interface'; +import { KafkaManager } from './manager'; + +@Singleton() +export class KafkaProducerFactory extends ServiceFactory { + @Logger('kafkaLogger') + logger: ILogger; + + @Config('kafka.producer') + pubConfig: ServiceFactoryConfigOption; + + getName(): string { + return 'kafka:producer'; + } + + @Init() + async init() { + await this.initClients(this.pubConfig); + } + + protected async createClient( + config: IMidwayKafkaProducerInitOptions, + clientName: any + ): Promise { + const { connectionOptions, producerOptions, kafkaInstanceRef } = config; + let client: Kafka; + if (kafkaInstanceRef) { + client = KafkaManager.getInstance().getKafkaInstance(kafkaInstanceRef); + if (!client) { + throw new MidwayCommonError( + `[midway:kafka] kafka instance ${kafkaInstanceRef} not found` + ); + } + } else { + client = new Kafka(connectionOptions); + KafkaManager.getInstance().addKafkaInstance(kafkaInstanceRef, client); + } + const producer = client.producer(producerOptions); + + producer.on('producer.connect', () => { + this.logger.info('[midway:kafka] producer: %s is connect', clientName); + }); + await producer.connect(); + return producer; + } + + async destroyClient(producer: Producer, name: string) { + await producer.disconnect(); + this.logger.info('[midway:kafka] producer: %s is close', name); + } + + @Destroy() + async destroy() { + await super.stop(); + } +} + +@Singleton() +export class KafkaAdminFactory extends ServiceFactory { + @Logger('kafkaLogger') + logger: ILogger; + + @Config('kafka.admin') + adminConfig: ServiceFactoryConfigOption; + + getName(): string { + return 'kafka:admin'; + } + + @Init() + async init() { + await this.initClients(this.adminConfig); + } + + protected async createClient( + config: IMidwayKafkaAdminInitOptions, + clientName: any + ): Promise { + const { connectionOptions, adminOptions, kafkaInstanceRef } = config; + let client: Kafka; + if (kafkaInstanceRef) { + client = KafkaManager.getInstance().getKafkaInstance(kafkaInstanceRef); + if (!client) { + throw new MidwayCommonError( + `[midway:kafka] kafka instance ${kafkaInstanceRef} not found` + ); + } + } else { + client = new Kafka(connectionOptions); + KafkaManager.getInstance().addKafkaInstance(kafkaInstanceRef, client); + } + const admin = client.admin(adminOptions); + await admin.connect(); + return admin; + } + + async destroyClient(admin: Admin, name: string) { + await admin.disconnect(); + this.logger.info('[midway:kafka] admin: %s is close', name); + } + + @Destroy() + async destroy() { + await super.stop(); + } +} diff --git a/packages/kafka/test/fixtures/base-app-auto-commit/src/configuration.ts b/packages/kafka/test/fixtures/base-app-auto-commit/src/configuration.ts index 41c0b26dc4c3..7af0057fe82f 100644 --- a/packages/kafka/test/fixtures/base-app-auto-commit/src/configuration.ts +++ b/packages/kafka/test/fixtures/base-app-auto-commit/src/configuration.ts @@ -12,7 +12,7 @@ import { IMidwayKafkaApplication } from '../../../../src'; brokers: [process.env.KAFKA_URL || 'localhost:9092'], }, consumerConfig: { - groupId: 'groupId-test', + groupId: 'groupId-test-' + Math.random(), }, }, }, diff --git a/packages/kafka/test/fixtures/base-app-manual-committing/src/configuration.ts b/packages/kafka/test/fixtures/base-app-manual-committing/src/configuration.ts index 41c0b26dc4c3..7af0057fe82f 100644 --- a/packages/kafka/test/fixtures/base-app-manual-committing/src/configuration.ts +++ b/packages/kafka/test/fixtures/base-app-manual-committing/src/configuration.ts @@ -12,7 +12,7 @@ import { IMidwayKafkaApplication } from '../../../../src'; brokers: [process.env.KAFKA_URL || 'localhost:9092'], }, consumerConfig: { - groupId: 'groupId-test', + groupId: 'groupId-test-' + Math.random(), }, }, }, diff --git a/packages/kafka/test/fixtures/base-app-multi-different-topic/src/configuration.ts b/packages/kafka/test/fixtures/base-app-multi-different-topic/src/configuration.ts index 9349df6ad1ce..36e7a16a9819 100644 --- a/packages/kafka/test/fixtures/base-app-multi-different-topic/src/configuration.ts +++ b/packages/kafka/test/fixtures/base-app-multi-different-topic/src/configuration.ts @@ -12,7 +12,7 @@ import { IMidwayKafkaApplication } from '../../../../src'; brokers: [process.env.KAFKA_URL || 'localhost:9092'], }, consumerConfig: { - groupId: 'groupId-test', + groupId: 'groupId-test-' + Math.random(), } }, }, diff --git a/packages/kafka/test/fixtures/base-app-multi-some-topic/src/configuration.ts b/packages/kafka/test/fixtures/base-app-multi-some-topic/src/configuration.ts index 41c0b26dc4c3..7af0057fe82f 100644 --- a/packages/kafka/test/fixtures/base-app-multi-some-topic/src/configuration.ts +++ b/packages/kafka/test/fixtures/base-app-multi-some-topic/src/configuration.ts @@ -12,7 +12,7 @@ import { IMidwayKafkaApplication } from '../../../../src'; brokers: [process.env.KAFKA_URL || 'localhost:9092'], }, consumerConfig: { - groupId: 'groupId-test', + groupId: 'groupId-test-' + Math.random(), }, }, }, diff --git a/packages/kafka/test/fixtures/base-app/src/configuration.ts b/packages/kafka/test/fixtures/base-app/src/configuration.ts index 6dcf45cef1b0..26d6d61d90d1 100644 --- a/packages/kafka/test/fixtures/base-app/src/configuration.ts +++ b/packages/kafka/test/fixtures/base-app/src/configuration.ts @@ -12,7 +12,7 @@ import { IMidwayKafkaApplication } from '../../../../src'; brokers: [process.env.KAFKA_URL || 'localhost:9092'], }, consumerConfig: { - groupId: 'groupId-test' + groupId: 'groupId-test-' + Math.random(), } }, }, diff --git a/packages/kafka/test/index.test.ts b/packages/kafka/test/index.test.ts index 8ae521d50a37..6d6e591af587 100644 --- a/packages/kafka/test/index.test.ts +++ b/packages/kafka/test/index.test.ts @@ -1,6 +1,9 @@ -import { createKafkaProducer } from '@midwayjs/mock'; +import { createKafkaProducer, createLightApp } from '@midwayjs/mock'; import { closeApp, creatApp } from './utils'; -import { sleep } from '@midwayjs/core'; +import { sleep, Inject } from '@midwayjs/core'; +import { IKafkaConsumer, KafkaConsumer, Context } from '../src'; +import * as Kafka from '../src'; +import { EachMessagePayload, Kafka as KafkaJs, Partitioners } from 'kafkajs'; describe('/test/index.test.ts', () => { it('should test create producer with method', async () => { @@ -127,4 +130,311 @@ describe('/test/index.test.ts', () => { await closeApp(app); expect([1, 2]).toContain(app.getAttr('total')) }); + + + describe('new features', () => { + it('should test create producer and consumer with the multi different topic', async () => { + let total = 0; + @KafkaConsumer('sub1') + class UserConsumer implements IKafkaConsumer { + async eachMessage(payload: EachMessagePayload) { + total++; + } + } + + @KafkaConsumer('sub2') + class UserConsumer2 implements IKafkaConsumer { + async eachMessage(payload: EachMessagePayload) { + total++; + } + } + + const app = await createLightApp({ + imports: [ + Kafka, + ], + preloadModules: [UserConsumer, UserConsumer2], + globalConfig: { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: [process.env.KAFKA_URL || 'localhost:9092'], + }, + consumerOptions: { + groupId: 'groupId-test-' + Math.random(), + }, + subscribeOptions: { + topics: ['topic-test-1'], + fromBeginning: false, + } + }, + sub2: { + kafkaInstanceRef: 'sub1', + consumerOptions: { + groupId: 'groupId-test-' + Math.random(), + }, + subscribeOptions: { + topics: ['topic-test-2'], + fromBeginning: false, + } + } + } + }, + } + }); + + await sleep(3000); + expect(total).toEqual(0); + + // create a producer + const producer = await createKafkaProducer({ + kafkaConfig: { + clientId: 'my-app', + brokers: [process.env.KAFKA_BROKERS as string || 'localhost:9092'], + }, + mock: false, + }); + await producer.connect(); + + // send data to topic + await producer.send({ + // compression: CompressionTypes.GZIP, + topic: 'topic-test-1', + messages: [{ key: 'message-key1', value: 'hello consumer 11 !' }], + }); + await producer.send({ + // compression: CompressionTypes.GZIP, + topic: 'topic-test-2', + messages: [{ key: 'message-key2', value: 'hello consumer 22 !' }], + }); + await sleep(3000); + await producer.disconnect(); + + await closeApp(app); + expect(total).toEqual(2); + }); + + it('should test throw error in trigger', async () => { + @KafkaConsumer('sub1') + class UserConsumer implements IKafkaConsumer { + async eachMessage(payload: EachMessagePayload) { + throw new Error('test error'); + } + } + + const app = await createLightApp({ + imports: [ + Kafka, + ], + preloadModules: [UserConsumer], + globalConfig: { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: [process.env.KAFKA_URL || 'localhost:9092'], + }, + consumerOptions: { + groupId: 'groupId-test-' + Math.random(), + }, + subscribeOptions: { + topics: ['topic-test-1'], + fromBeginning: false, + } + } + } + }, + } + }); + + await sleep(3000); + + // create a producer + const producer = await createKafkaProducer({ + kafkaConfig: { + clientId: 'my-app', + brokers: [process.env.KAFKA_BROKERS as string || 'localhost:9092'], + }, + mock: false, + }); + await producer.connect(); + + // send data to topic + await producer.send({ + // compression: CompressionTypes.GZIP, + topic: 'topic-test-1', + messages: [{ key: 'message-key1', value: 'hello consumer 11 !' }], + }); + await sleep(3000); + await producer.disconnect(); + + await closeApp(app); + }); + + it('should test create producer and send', async () => { + const app = await createLightApp({ + imports: [ + Kafka, + ], + globalConfig: { + kafka: { + producer: { + clients: { + producer1: { + connectionOptions: { + clientId: 'my-app', + brokers: [process.env.KAFKA_BROKERS as string || 'localhost:9092'], + }, + } + } + } + } + } + }); + + const kafka = new KafkaJs({ + clientId: 'my-app', + brokers: [process.env.KAFKA_BROKERS as string || 'localhost:9092'], + }); + + const consumer = kafka.consumer({ groupId: 'my-group' }); + await consumer.connect(); + await consumer.subscribe({ topics: ['topic-test-1'] }); + + await new Promise(async (resolve, reject) => { + + await consumer.run({ + eachMessage: async ({ topic, partition, message, heartbeat, pause }) => { + if (topic === 'topic-test-1' && message?.value?.toString() === 'hello consumer 11 !') { + resolve(); + } else { + reject(new Error('test error')); + } + }, + }); + + const producerFactory = await app.getApplicationContext().getAsync(Kafka.KafkaProducerFactory); + const producer = producerFactory.get('producer1'); + await producer.send({ + topic: 'topic-test-1', + messages: [{ key: 'message-key1', value: 'hello consumer 11 !' }], + }); + }); + + await consumer.disconnect(); + await closeApp(app); + }); + + it('should test create admin', async () => { + const app = await createLightApp({ + imports: [ + Kafka, + ], + globalConfig: { + kafka: { + admin: { + clients: { + admin1: { + connectionOptions: { + clientId: 'my-app', + brokers: [process.env.KAFKA_BROKERS as string || 'localhost:9092'], + }, + }, + } + } + } + } + }); + + const adminFactory = await app.getApplicationContext().getAsync(Kafka.KafkaAdminFactory); + const admin = adminFactory.get('admin1'); + await admin.createTopics({ + topics: [{ + topic: 'my-topic', + numPartitions: 3, + replicationFactor: 1 + }] + }); + + // 查看所有主题 + const topics = await admin.listTopics(); + expect(topics).toContain('my-topic'); + // 删除主题 + await admin.deleteTopics({ + topics: ['my-topic'] + }); + + // 查看消费者组 + const groups = await admin.listGroups(); + expect(groups).not.toContain('my-group'); + await closeApp(app); + }); + + it('should test share same kafka instance with consumer and producer', async () => { + let total = 0; + @KafkaConsumer('sub1') + class UserConsumer implements IKafkaConsumer { + @Inject() + ctx: Context; + async eachMessage(payload: EachMessagePayload) { + this.ctx.logger.info(payload.message.value?.toString()); + total++; + } + } + + const app = await createLightApp({ + imports: [ + Kafka, + ], + preloadModules: [UserConsumer], + globalConfig: { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: [process.env.KAFKA_BROKERS as string || 'localhost:9092'], + }, + consumerOptions: { + groupId: 'groupId-test-' + Math.random(), + }, + subscribeOptions: { + topics: ['topic-test-1'], + fromBeginning: false, + } + } + }, + producer: { + clients: { + producer1: { + kafkaInstanceRef: 'sub1', + producerOptions: { + createPartitioner: Partitioners.DefaultPartitioner, + } + } + } + } + } + } + }); + + await sleep(1000); + + // get producer + const producerFactory = await app.getApplicationContext().getAsync(Kafka.KafkaProducerFactory); + const producer = producerFactory.get('producer1'); + await producer.send({ + topic: 'topic-test-1', + messages: [{ key: 'message-key1', value: 'hello consumer 11 !' }], + }); + + await sleep(3000); + expect(total).toEqual(1); + + await closeApp(app); + }); + }) }); diff --git a/site/docs/extensions/kafka.md b/site/docs/extensions/kafka.md index 4e154038f543..a16f959239f1 100644 --- a/site/docs/extensions/kafka.md +++ b/site/docs/extensions/kafka.md @@ -17,7 +17,7 @@ | 可用于 Serverless | ❌ | | 可用于一体化 | ✅ | | 包含独立主框架 | ✅ | -| 包含独立日志 | ❌ | +| 包含独立日志 | ✅ | @@ -29,7 +29,7 @@ * 容错(故障转移)存储信息(流),存储事件流 * 在消息流发生的时候进行处理,处理事件流 -理解Producer(生产者) +理解 Producer(生产者) * 发布消息到一个主题或多个 topic (主题)。 @@ -45,18 +45,18 @@ ![image.png](https://kafka.apache.org/images/streams-and-tables-p1_p4.png) +:::tip +从 v3.19 开始,Kafka 组件做了一次重构,Kafka 组件的配置、使用方法和之前都有较大差异,原有使用方式兼容,但是文档不再保留。 +::: -## 消费者(Consumer)使用方法 +## 安装依赖 -### 安装依赖 +安装 `@midwayjs/kafka` 模块。 - -Midway 提供了订阅 Kafka 的能力,并能够独立部署和使用。安装 `@midwayjs/kafka` 模块及其定义。 ```bash $ npm i @midwayjs/kafka --save -$ npm i kafkajs --save ``` 或者在 `package.json` 中增加如下依赖后,重新安装。 @@ -65,7 +65,6 @@ $ npm i kafkajs --save { "dependencies": { "@midwayjs/kafka": "^3.0.0", - "kafka": "^2.0.0", // ... } } @@ -115,13 +114,14 @@ export class MainConfiguration { } ``` -### 目录结构 +由于 Kafka 分为 **消费者(Consumer)** 和 **生产者(Producer)** 两部分,两个可以独立使用,我们将分别介绍。 +## 消费者(Consumer) -我们一般把能力分为生产者和消费者,而订阅正是消费者的能力。 +### 目录结构 -我们一般把消费者放在 consumer 目录。比如 `src/consumer/userConsumer.ts` 。 +我们一般把消费者放在 consumer 目录。比如 `src/consumer/user.consumer.ts` 。 ``` ➜ my_midway_app tree . @@ -135,338 +135,455 @@ export class MainConfiguration { ├── package.json └── tsconfig.json ``` -代码示例如下。 -```typescript -@Provide() -@Consumer(MSListenerType.KAFKA) -export class UserConsumer { - @Inject() - ctx: IMidwayKafkaContext; +### 基础配置 - @Inject() - logger; +通过 `consumer` 字段和 `@KafkaConsumer` 装饰器,我们可以配置多个消费者。 + +比如,下面的 `sub1` 和 `sub2` 就是两个不同的消费者。 - @KafkaListener('topic-test') - async gotData(message: KafkaMessage) { - this.logger.info('test output =>', message.offset + ' ' + message.key + ' ' + message.value.toString('utf8')); +```typescript +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + // ... + }, + sub2: { + // ... + }, + } } } ``` -`@Consumer` 装饰器,提供消费者标识,并且它的参数,指定了某种消费框架的类型,比如,我们这里指定了 `MSListenerType.KFAKA` 这个类型,指的就是 kafka 类型。 +最简单的消费者配置需要几个字段,Kafka 的连接配置、消费者配置以及订阅配置。 -标识了 `@Consumer` 的类,对方法使用 `@KafkaListener` 装饰器后,可以绑定一个topic。 +```typescript +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + // ... + }, + consumerOptions: { + // ... + }, + subscribeOptions: { + // ... + }, + }, + } + } +} +``` +比如: -方法的参数为接收到的消息,类型为 `ConsumeMessage` 。默认设置了自动确认,什么时候设置手动确认?当出现异常的时候,需要设置commitOffsets偏移到异常的位置,用于重新执行消费。 +```typescript +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + consumerOptions: { + groupId: 'groupId-test-1', + }, + subscribeOptions: { + topics: ['topic-test-1'], + } + }, + } + } +} +``` -如果需要订阅多个topic,可以使用多个方法,也可以使用多个文件。 +完整可配置参数包括: +- `connectionOptions`:Kafka 的连接配置,即 `new Kafka(consumerOptions)` 的参数 +- `consumerOptions`:Kafka 的消费者配置,即 `kafka.consumer(consumerOptions)` 的参数 +- `subscribeOptions`:Kafka 的订阅配置,即 `consumer.subscribe(subscribeOptions)` 的参数 +- `consumerRunConfig`:消费者运行配置,即 `consumer.run(consumerRunConfig)` 的参数 -### Kafka 消息上下文 +这些参数的详细说明,可以参考 [KafkaJS Consumer](https://kafka.js.org/docs/consuming) 文档。 +### 复用 Kafka 实例 -订阅 `Kafka` 数据的上下文,和 Web 同样的,其中包含一个 `requestContext` ,和每次接收消息的数据绑定。 +如果如果需要复用 Kafka 实例,可以通过 `kafkaInstanceRef` 字段来指定。 -整个 ctx 的定义为: ```typescript -export type Context = { - requestContext: IMidwayContainer; -}; +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + consumerOptions: { + groupId: 'groupId-test-1', + }, + subscribeOptions: { + topics: ['topic-test-1'], + } + }, + sub2: { + kafkaInstanceRef: 'sub1', + consumerOptions: { + groupId: 'groupId-test-2', + }, + subscribeOptions: { + topics: ['topic-test-2'], + } + } + } + } +} ``` +注意,上述的 `sub1` 和 `sub2` 是两个不同的消费者,但是它们共享同一个 Kafka 实例,且 `sub2` 的 `groupId` 需要和 `sub1` 不同。 + +用 Kafka SDK 写法类似如下: -可以从框架获取定义 ```typescript -import { Context } from '@midwayjs/kafka'; -``` +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['localhost:9092'], +}); +const consumer1 = kafka.consumer({ groupId: 'groupId-test-1' }); +const consumer2 = kafka.consumer({ groupId: 'groupId-test-2' }); +``` -### 配置消费者 +### 消费者实现 -我们需要在配置中指定 kafka 的地址。 +我们可以在目录中提供一个标准的消费者实现,比如 `src/consumer/sub1.consumer.ts`。 ```typescript -// src/config/config.default -import { MidwayConfig } from '@midwayjs/core'; +// src/consumer/sub1.consumer.ts +import { KafkaConsumer, IKafkaConsumer, EachMessagePayload } from '@midwayjs/kafka'; -export default { - // ... - kafka: { - kafkaConfig: { - clientId: 'my-app', - brokers: [process.env.KAFKA_URL || 'localhost:9092'], - }, - consumerConfig: { - groupId: 'groupId-test' - } - }, -} as MidwayConfig; +@KafkaConsumer('sub1') +class Sub1Consumer implements IKafkaConsumer { + async eachMessage(payload: EachMessagePayload) { + // ... + } +} ``` -更多配置(更详细的配置,参考 https://kafka.js.org/docs/consuming): +`sub1` 是消费者名称,使用的是配置中的 `sub1` 消费者。 -| 属性 | 描述 | -| --- | --- | -| kafkaConfig | kafka 的连接信息 | -| - clientId | 指定客户端ID | -| - brokers | Kafka集群brokers | -| consumerConfig | 消费者配置 | -| - groupId | 分组ID | +也可以实现 `eachBatch` 方法,处理批量消息。 +```typescript +// src/consumer/sub1.consumer.ts +import { KafkaConsumer, IKafkaConsumer, EachBatchPayload } from '@midwayjs/kafka'; +@KafkaConsumer('sub1') +class Sub1Consumer implements IKafkaConsumer { + async eachBatch(payload: EachBatchPayload) { + // ... + } +} +``` -### Manual-committing -手动提交设置,组件默认是自动提交。 +### 消息上下文 -```typescript -import { Provide, Consumer, MSListenerType, Inject, App, KafkaListener } from '@midwayjs/core'; -import { KafkaMessage } from 'kafkajs'; -import { Context, Application } from '../../../../../src'; -@Provide() -@Consumer(MSListenerType.KAFKA) -export class UserConsumer { +和其他消息订阅机制一样,消息本身通过 `Context` 字段来传递。 + +```typescript +// src/consumer/sub1.consumer.ts +import { KafkaConsumer, IKafkaConsumer, EachMessagePayload, Context } from '@midwayjs/kafka'; +import { Inject } from '@midwayjs/core'; - @App() - app: Application; +@KafkaConsumer('sub1') +class Sub1Consumer implements IKafkaConsumer { @Inject() ctx: Context; - @Inject() - logger; - - @KafkaListener('topic-test0', { - subscription: { - fromBeginning: false, - }, - runConfig: { - autoCommit: false, - } - }) - async gotData(message: KafkaMessage) { - console.info('gotData info'); - this.logger.info('test output =>', message.offset + ' ' + message.key + ' ' + message.value.toString('utf8')); - try { - // 抛出异常,当出现异常的时候,需要设置commitOffsets偏移到异常的位置,用于重新执行消费,所以这里应该出现的消费是2次,total为2 - throw new Error("error"); - } catch (error) { - this.ctx.commitOffsets(message.offset); - } - this.app.setAttr('total', this.app.getAttr('total') + 1); + async eachMessage(payload: EachMessagePayload) { + // ... } } ``` -### Multi different Topic -订阅的 topic1 和 topic2, 两个主题的消费都会被调用。 +`Context` 字段包括几个属性: -```typescript -import { Provide, Consumer, MSListenerType, Inject, App, KafkaListener } from '@midwayjs/core'; -import { KafkaMessage } from 'kafkajs'; -import { Context, Application } from '../../../../../src'; +| 属性 | 类型 | 描述 | +| ----------- | ------------------------------ | ---------------- | +| ctx.payload | EachMessagePayload, EachBatchPayload | 消息内容 | +| ctx.consumer | Consumer | 消费者实例 | -@Provide() -@Consumer(MSListenerType.KAFKA) -export class UserConsumer { - @App() - app: Application; +你可以通过 `ctx.consumer` 来调用 Kafka 的 API,比如 `ctx.consumer.commitOffsets` 来手动提交偏移量或者 `ctx.consumer.pause` 来暂停消费。 - @Inject() - ctx: Context; - @Inject() - logger; +## 生产者(Producer) - @KafkaListener('topic-test') - async gotData(message: KafkaMessage) { - console.info('gotData info'); - this.logger.info('test output =>', message.offset + ' ' + message.key + ' ' + message.value.toString('utf8')); - this.app.setAttr('total', this.app.getAttr('total') + 1); - } +### 基础配置 - @KafkaListener('topic-test2') - async gotData2(message: KafkaMessage) { - console.info('gotData2 info'); - this.logger.info('test output =>', message.offset + ' ' + message.key + ' ' + message.value.toString('utf8')); - this.app.setAttr('total', this.app.getAttr('total') + 1); - } +服务生产者也需要创建实例,配置本身使用了 [服务工厂](/docs/service_factory) 的设计模式。 -} +配置如下: +```typescript +// src/config/config.default +export default { + kafka: { + producer: { + clients: { + pub1: { + // ... + }, + pub2: { + // ... + } + } + } + } +} ``` -### 装饰器参数 +每个 Producer 实例的配置,同样包括 `connectionOptions` 和 `producerOptions`。 +```typescript +// src/config/config.default +export default { + kafka: { + producer: { + clients: { + pub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + producerOptions: { + // ... + } + } + } + } + } +} +``` -`@kafkaListener` 装饰器的第一个参数为 topic ,代表需要消费的主题。 +具体参数可以参考 [KafkaJS Producer](https://kafka.js.org/docs/producing) 文档。 +此外,由于 Kafka Consumer 和 Producer 都可以从同一个 Kafka 实例创建,所以它们可以复用同一个 Kafka 实例。 -第二个参数是一个对象,包含注册的配置`subscription`、运行的配置`runConfig`等参数,详细定义如下: +Producer 后于 Consumer 创建,也同样可以使用 `kafkaInstanceRef` 字段来复用 Kafka 实例。 ```typescript -export interface KafkaListenerOptions { - propertyKey?: string; - topic?: string; - - subscription?: ConsumerSubscribeTopics | ConsumerSubscribeTopic; - runConfig?: ConsumerRunConfig; +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + } + }, + producer: { + clients: { + pub1: { + kafkaInstanceRef: 'sub1', + } + } + } + } } ``` +### 使用 Producer +Producer 不存在默认实例,由于使用了服务工厂的设计模式,所以可以通过 `@InjectClient()` 来注入。 -**示例一** - -创建一个手动提交,设置消费者在开始获取消息时将使用最新提交的偏移量`fromBeginning: false`,设置运行时的提交方式为手动提交`autoCommit: false` -```typeScript -import { Provide, Consumer, MSListenerType, Inject, App, KafkaListener } from '@midwayjs/core'; -import { KafkaMessage } from 'kafkajs'; -import { Context, Application } from '../../../../../src'; +```typescript +// src/service/user.service.ts +import { Provide, InjectClient } from '@midwayjs/core'; +import { KafkaProducerFactory, Producer } from '@midwayjs/kafka'; @Provide() -@Consumer(MSListenerType.KAFKA) -export class UserConsumer { +export class UserService { + + @InjectClient(KafkaProducerFactory, 'pub1') + producer: Producer; + + async invoke() { + await this.producer.send({ + topic: 'topic-test-1', + messages: [{ key: 'message-key1', value: 'hello consumer 11 !' }], + }); + } +} +``` - @App() - app: Application; +## Admin - @Inject() - ctx: Context; +Kafka 的 Admin 功能,可以用来创建、删除、查看主题,查看配置和 ACL 等。 - @Inject() - logger; +### 基础配置 - @KafkaListener('topic-test0', { - subscription: { - fromBeginning: false, - }, - runConfig: { - autoCommit: false, - } - }) - async gotData(message: KafkaMessage) { - console.info('gotData info'); - this.logger.info('test output =>', message.offset + ' ' + message.key + ' ' + message.value.toString('utf8')); - try { - // 抛出异常,当出现异常的时候,需要设置commitOffsets偏移到异常的位置,用于重新执行消费 - throw new Error("error"); - } catch (error) { - this.ctx.commitOffsets(message.offset); +和 Producer 类似,Admin 也使用了服务工厂的设计模式。 + +```typescript +// src/config/config.default +export default { + kafka: { + admin: { + clients: { + admin1: { + // ... + } + } } } } - ``` +同样的,Admin 也可以复用 Kafka 实例。 +```typescript +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + } + }, + admin: { + clients: { + admin1: { + kafkaInstanceRef: 'sub1', + } + } + } + } +} +``` +### 使用 Admin -## 生产者(Producer)使用方法 +Admin 不存在默认实例,由于使用了服务工厂的设计模式,所以可以通过 `@InjectClient()` 来注入。 +```typescript +// src/service/admin.service.ts +import { Provide, InjectClient } from '@midwayjs/core'; +import { KafkaAdminFactory, Admin } from '@midwayjs/kafka'; -生产者(Producer)也就是第一节中的消息生产者,简单的来说就是会创建一个客户端,将消息发送到 Kafka 服务。 +@Provide() +export class AdminService { + + @InjectClient(KafkaAdminFactory, 'admin1') + admin: Admin; +} +``` +更多的 Admin 使用方法,可以参考 [KafkaJS Admin](https://kafka.js.org/docs/admin) 文档。 -注意:当前 Midway 并没有使用组件来支持消息发送,这里展示的示例只是使用纯 SDK 在 Midway 中的写法。 +## 组件日志 -### 安装依赖 +Kafka 组件默认使用 `kafkaLogger` 日志,默认会将 `ctx.logger` 记录在 `midway-kafka.log`。 +你可以通过配置修改。 -```bash -$ npm i kafkajs --save +```typescript +// src/config/config.default +export default { + midwayLogger: { + clients: { + kafkaLogger: { + fileLogName: 'midway-kafka.log', + }, + }, + }, +} ``` +这个日志的输出格式,我们也可以单独配置。 -### 调用服务发送消息 - - -比如,我们在 service 文件下,新增一个 `kafka.ts` 文件。 ```typescript -import { - Provide, - Scope, - ScopeEnum, - Init, - Autoload, - Destroy, - Config, -} from '@midwayjs/core'; -import { ProducerRecord } from 'kafkajs'; -const { Kafka } = require('kafkajs'); - -@Autoload() -@Provide() -@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别) -export class KafkaService { - @Config('kafka') - kafkaConfig: any; - - private producer; - - @Init() - async connect() { - // 创建连接,你可以把配置放在 Config 中,然后注入进来 - const { brokers, clientId, producerConfig = {} } = this.kafkaConfig; - const client = new Kafka({ - clientId: clientId, - brokers: brokers, - }); - this.producer = client.producer(producerConfig); - await this.producer.connect(); +export default { + kafka: { + // ... + contextLoggerFormat: info => { + const { jobId, from } = info.ctx; + return `${info.timestamp} ${info.LEVEL} ${info.pid} ${info.message}`; + }, } +} +``` - // 发送消息 - public async send(producerRecord: ProducerRecord) { - return this.producer.send(producerRecord); - } - @Destroy() - async close() { - await this.producer.disconnect(); - } -} +## 获取 KafkaJS 模块 + +KafkaJS 模块,可以通过 `@midwayjs/kafka` 的 `KafkaJS` 字段来获取。 +```typescript +import { KafkaJS } from '@midwayjs/kafka'; + +const { ConfigResourceTypes } = KafkaJS; +// ... ``` -大概就是创建了一个用来封装消息通信的 service,同时他是全局唯一的 Singleton 单例。由于增加了 `@AutoLoad` 装饰器,可以自执行初始化。 +## 关于分区的警告 -这样基础的调用服务就抽象好了,我们只需要在用到的地方,调用 `send` 方法即可。 +如果你使用的是 KafkaJS 的 v2.0.0 版本,你可能会看到如下的警告: +``` +2024-11-04 23:47:28.228 WARN 31729 KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1" { timestamp: '2024-11-04T15:47:28.228Z', logger: 'kafkajs' } +``` -比如: +这个警告是由于 KafkaJS 的 v2.0.0 版本默认使用了新的分区器,如果接受新的分区器行为,但想要关闭这个警告消息,可以通过设置环境变量 `KAFKAJS_NO_PARTITIONER_WARNING=1` 来消除这个警告。 +或者显示声明分区器。 ```typescript -@Provide() -export class UserService { - - @Inject() - kafkaService: KafkaService; +// src/config/config.default +import { KafkaJS } from '@midwayjs/kafka'; +const { Partitioners } = KafkaJS; - async invoke() { - // TODO - - // 发送消息 - const result = this.kafkaService.send({ - topic: 'test', - messages: [ - { - value: JSON.stringify(messageValue), +export default { + kafka: { + producer: { + clients: { + pub1: { + // ... + producerOptions: { + createPartitioner: Partitioners.DefaultPartitioner, + // ... + createPartitioner: Partitioners.LegacyPartitioner, + }, }, - ], - }); + }, + }, } } ``` +建议你查看 KafkaJS v2.0.0 的 [迁移指南](https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner) 了解更多细节。 + + ## 参考文档 diff --git a/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/kafka.md b/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/kafka.md index 31130e98429e..ad567e039c0d 100644 --- a/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/kafka.md +++ b/site/i18n/en/docusaurus-plugin-content-docs/current/extensions/kafka.md @@ -1,79 +1,71 @@ # Kafka -In the architecture of complex systems, event flow is a very important part, including real-time capture of data from event sources (databases, sensors, mobile devices, etc.) in the form of event flow, persistence of event flow for easy retrieval, and real-time and review of operations to process response event flow. +In the architecture of complex systems, event streams are a crucial part, including capturing data in real-time from event sources (databases, sensors, mobile devices, etc.) as event streams, persisting event streams for easy retrieval, and processing and responding to event streams in real-time and retrospectively. -It is used for payment and financial transactions, tracking and monitoring the flow of information in industries such as automobiles, capturing and analyzing Internet of Things data, and so on. +Applicable to industries such as payment and financial transactions, implementing tracking and monitoring of automotive information flow, capturing and analyzing IoT data, etc. +In Midway, we provide the ability to subscribe to Kafka to meet such user needs. -In Midway, we provide the ability to subscribe to Kafka to meet such needs of users. +Related Information: -Related information: +**Subscription Service** -**Subscribe to service** - -| Description | | +| Description | | | ----------------- | ---- | -| Can be used for standard projects | ✅ | -| Can be used for Serverless | ❌ | -| Can be used for integration | ✅ | -| Contains independent main framework | ✅ | -| Contains independent logs | ❌ | - - - -## Basic concept +| Available for standard projects | ✅ | +| Available for Serverless | ❌ | +| Available for integrated projects | ✅ | +| Includes standalone main framework | ✅ | +| Includes standalone logging | ✅ | +## Basic Concepts Distributed stream processing platform -* Publish and subscribe (stream) information -* Fault tolerance (failover) Store information (flow), store event flow -* When the message flow occurs, handle the event flow. +* Publish-subscribe (stream) information +* Fault-tolerant (failover) storage of information (streams), storing event streams +* Process event streams as they occur -Understanding Producer (Producer) +Understanding Producer -* Publish messages to one topic or topics. +* Publish messages to one or more topics. -Understanding Consumer (Subject Consumers) +Understanding Consumer * Subscribe to one or more topics and process the generated information. -Understand Stream API -* Act as a stream processor, consume input streams from one or more topics, and produce an output stream to one or more output topics, effectively converting the input stream to the output stream. +Understanding Stream API +* Acts as a stream processor, consuming input streams from one or more topics and producing an output stream to one or more output topics, effectively transforming input streams into output streams. Understanding Broker -* Published messages are kept in a group of servers, called a Kafka cluster. Each server in the cluster is a broker. Consumers can consume these published messages by subscribing to one or more topics and pulling data from the Broker. - +* Published messages are stored in a set of servers called a Kafka cluster. Each server in the cluster is a broker. Consumers can subscribe to one or more topics and pull data from brokers to consume these published messages. ![image.png](https://kafka.apache.org/images/streams-and-tables-p1_p4.png) +:::tip +From v3.19, the Kafka component has been refactored, and the configuration and usage methods of the Kafka component have changed significantly from before. The original usage method is compatible, but the documentation is no longer retained. +::: +## Install Dependencies -## Consumer Usage - +Install the `@midwayjs/kafka` module. -### Installation dependency - - -Midway provides the ability to subscribe to Kafka and can be deployed and used independently. Install the `@midwayjs/kafka` module and its definition. ```bash -$ npm i @midwayjs/kafka@3 --save -$ npm i kafkajs --save +$ npm i @midwayjs/kafka --save ``` -Or reinstall the following dependencies in `package.json`. +Or add the following dependency to `package.json` and reinstall. ```json { "dependencies": { "@midwayjs/kafka": "^3.0.0", - "kafka": "^2.0.0 ", // ... } } ``` -## Open the component +## Enable Component -`@midwayjs/kafka` can be used as an independent main framework. +`@midwayjs/kafka` can be used as a standalone main framework. ```typescript // src/configuration.ts @@ -82,7 +74,7 @@ import * as kafka from '@midwayjs/kafka'; @Configuration({ imports: [ - Kafka + kafka ], // ... }) @@ -115,15 +107,15 @@ export class MainConfiguration { } ``` -### Directory structure +Since Kafka is divided into **Consumer** and **Producer** parts, both can be used independently, and we will introduce them separately. +## Consumer -We generally divide capabilities into producers and consumers, and subscriptions are the capabilities of consumers. +### Directory Structure - -We usually put consumers in consumer catalogues. For example, `src/consumer/userConsumer.ts`. +We usually place consumers in the consumer directory, such as `src/consumer/user.consumer.ts`. ``` -➜ my_midway_app tree +➜ my_midway_app tree . ├── src │ ├── consumer @@ -135,340 +127,447 @@ We usually put consumers in consumer catalogues. For example, `src/consumer/user ├── package.json └── tsconfig.json ``` -The code example is as follows. -```typescript -@Provide() -@Consumer(MSListenerType.KAFKA) -export class UserConsumer { +### Basic Configuration - @Inject() - ctx: IMidwayKafkaContext; +We can configure multiple consumers through the `consumer` field and the `@KafkaConsumer` decorator. - @Inject() - logger; +For example, `sub1` and `sub2` below are two different consumers. - @KafkaListener('topic-test') - async gotData(message: KafkaMessage) { - this.logger.info('test output =>', message.offset + '' + message.key + '' + message.value.toString('utf8')); +```typescript +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + // ... + }, + sub2: { + // ... + }, + } } } ``` -The `@Consumer` decorator provides the consumer identifier, and its parameters specify the type of a certain consumption framework. For example, here we specify the `MSListenerType.KFAKA` type, which refers to the kafka type. +The simplest consumer configuration requires several fields: Kafka connection configuration, consumer configuration, and subscription configuration. -The class that identifies the `@Consumer` can bind a topic after using the `@KafkaListener` decorator for the method. +```typescript +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + // ... + }, + consumerOptions: { + // ... + }, + subscribeOptions: { + // ... + }, + }, + } + } +} +``` +For example: -The parameter of the method is the received message of type `ConsumeMessage`. Automatic confirmation is set by default. When is manual confirmation set? When an exception occurs, it is necessary to set the commitOffsets offset to the abnormal position for re-consumption. +```typescript +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + consumerOptions: { + groupId: 'groupId-test-1', + }, + subscribeOptions: { + topics: ['topic-test-1'], + } + }, + } + } +} +``` -If you need to subscribe to multiple topics, you can use multiple methods or multiple files. +Complete configurable parameters include: +- `connectionOptions`: Kafka connection configuration, i.e., parameters for `new Kafka(consumerOptions)` +- `consumerOptions`: Kafka consumer configuration, i.e., parameters for `kafka.consumer(consumerOptions)` +- `subscribeOptions`: Kafka subscription configuration, i.e., parameters for `consumer.subscribe(subscribeOptions)` +- `consumerRunConfig`: Consumer run configuration, i.e., parameters for `consumer.run(consumerRunConfig)` -### Kafka message context +For detailed explanations of these parameters, refer to the [KafkaJS Consumer](https://kafka.js.org/docs/consuming) documentation. +### Reuse Kafka Instance -The context for subscribing to `Kafka` data is the same as the Web, which contains a `requestContext` and a data binding for each message received. +If you need to reuse a Kafka instance, you can specify it through the `kafkaInstanceRef` field. -The entire ctx is defined: ```typescript -export type Context = { - requestContext: IMidwayContainer; -}; +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + consumerOptions: { + groupId: 'groupId-test-1', + }, + subscribeOptions: { + topics: ['topic-test-1'], + } + }, + sub2: { + kafkaInstanceRef: 'sub1', + consumerOptions: { + groupId: 'groupId-test-2', + }, + subscribeOptions: { + topics: ['topic-test-2'], + } + } + } + } +} ``` +Note that `sub1` and `sub2` above are two different consumers, but they share the same Kafka instance, and `sub2`'s `groupId` needs to be different from `sub1`. + +The Kafka SDK writing is similar to the following: -You can get the definition from the framework ```typescript -import { Context } from '@midwayjs/kafka'; -``` +const kafka = new Kafka({ + clientId: 'my-app', + brokers: ['localhost:9092'], +}); +const consumer1 = kafka.consumer({ groupId: 'groupId-test-1' }); +const consumer2 = kafka.consumer({ groupId: 'groupId-test-2' }); +``` -### Configure consumers +### Consumer Implementation -We need to specify the address of Kafka in the configuration. +We can provide a standard consumer implementation in the directory, such as `src/consumer/sub1.consumer.ts`. ```typescript -// src/config/config.default -import { MidwayConfig } from '@midwayjs/core'; +// src/consumer/sub1.consumer.ts +import { KafkaConsumer, IKafkaConsumer, EachMessagePayload } from '@midwayjs/kafka'; -export default { - // ... - kafka: { - kafkaConfig: { - clientId: 'my-app', - brokers: [process.env.KAFKA_URL || 'localhost:9092'] - }, - consumerConfig: { - groupId: 'groupId-test' - } - }, -} as MidwayConfig; +@KafkaConsumer('sub1') +class Sub1Consumer implements IKafkaConsumer { + async eachMessage(payload: EachMessagePayload) { + // ... + } +} ``` -More configurations (see https://kafka.js.org/docs/consuming for more detailed configurations): +`sub1` is the consumer name, using the `sub1` consumer in the configuration. -| Property | Description | -| --- | --- | -| kafkaConfig | Kafka connection information | -| - clientId | Specify client ID | -| - brokers | Kafka cluster brokers | -| consumerConfig | Consumer Configuration | -| - groupId | Packet ID | +You can also implement the `eachBatch` method to process batch messages. +```typescript +// src/consumer/sub1.consumer.ts +import { KafkaConsumer, IKafkaConsumer, EachBatchPayload } from '@midwayjs/kafka'; +@KafkaConsumer('sub1') +class Sub1Consumer implements IKafkaConsumer { + async eachBatch(payload: EachBatchPayload) { + // ... + } +} +``` -### Manual-committing +### Message Context -Manually submit settings. By default, components submit automatically. +Like other message subscription mechanisms, the message itself is passed through the `Context` field. ```typescript -import { Provide, Consumer, MSListenerType, Inject, App, KafkaListener } from '@midwayjs/core'; -import { KafkaMessage } from 'kafkajs'; -import { Context, Application } from '../../../../../src'; +// src/consumer/sub1.consumer.ts +import { KafkaConsumer, IKafkaConsumer, EachMessagePayload, Context } from '@midwayjs/kafka'; +import { Inject } from '@midwayjs/core'; -@Provide() -@Consumer(MSListenerType.KAFKA) -export class UserConsumer { - - @App() - app: Application; +@KafkaConsumer('sub1') +class Sub1Consumer implements IKafkaConsumer { @Inject() ctx: Context; - @Inject() - logger; - - @KafkaListener('topic-test0', { - subscription: { - fromBeginning: false - }, - runConfig: { - autoCommit: false - } - }) - async gotData(message: KafkaMessage) { - console.info('gotData info'); - this.logger.info('test output =>', message.offset + '' + message.key + '' + message.value.toString('utf8')); - try { - // Throws an exception. When an exception occurs, you need to set the commitOffsets offset to the location of the exception to re-execute the consumption, so the consumption that should occur here is 2 times, and the total is 2 - throw new Error("error"); - } catch (error) { - this.ctx.commitOffsets(message.offset); - } - this.app.setAttr('total', this.app.getAttr('total') + 1); + async eachMessage(payload: EachMessagePayload) { + // ... } } ``` -### Multi different Topic -the subscription of topic1 and topic2, and the consumption of both topics are called. +The `Context` field includes several properties: -```typescript -import { Provide, Consumer, MSListenerType, Inject, App, KafkaListener } from '@midwayjs/core'; -import { KafkaMessage } from 'kafkajs'; -import { Context, Application } from '../../../../../src'; +| Property | Type | Description | +| ------------ | ------------------------------ | ---------------- | +| ctx.payload | EachMessagePayload, EachBatchPayload | Message content | +| ctx.consumer | Consumer | Consumer instance | -@Provide() -@Consumer(MSListenerType.KAFKA) -export class UserConsumer { +You can call Kafka's API through `ctx.consumer`, such as `ctx.consumer.commitOffsets` to manually commit offsets or `ctx.consumer.pause` to pause consumption. - @App() - app: Application; +## Producer - @Inject() - ctx: Context; +### Basic Configuration - @Inject() - logger; +Service producers also need to create instances, and the configuration itself uses the [Service Factory](/docs/service_factory) design pattern. - @KafkaListener('topic-test') - async gotData(message: KafkaMessage) { - console.info('gotData info'); - this.logger.info('test output =>', message.offset + '' + message.key + '' + message.value.toString('utf8')); - this.app.setAttr('total', this.app.getAttr('total') + 1); - } +The configuration is as follows: - @KafkaListener('topic-test2') - async gotData2(message: KafkaMessage) { - console.info('gotData2 info'); - this.logger.info('test output =>', message.offset + '' + message.key + '' + message.value.toString('utf8')); - this.app.setAttr('total', this.app.getAttr('total') + 1); +```typescript +// src/config/config.default +export default { + kafka: { + producer: { + clients: { + pub1: { + // ... + }, + pub2: { + // ... + } + } + } } - } - ``` -### Decorator parameters +Each Producer instance's configuration also includes `connectionOptions` and `producerOptions`. +```typescript +// src/config/config.default +export default { + kafka: { + producer: { + clients: { + pub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + producerOptions: { + // ... + } + } + } + } + } +} +``` -`@kafkaListener` the first parameter of the decorator is topic, which represents the topic to be consumed. +For specific parameters, refer to the [KafkaJS Producer](https://kafka.js.org/docs/producing) documentation. +Additionally, since Kafka Consumer and Producer can both be created from the same Kafka instance, they can reuse the same Kafka instance. -The second parameter is an object, including the registered configuration `subscription`, the running configuration `runConfig` and other parameters. The detailed definition is as follows: +If the Producer is created after the Consumer, it can also reuse the Kafka instance using the `kafkaInstanceRef` field. ```typescript -export interface KafkaListenerOptions { - propertyKey?: string; - topic?: string; - - subscription?: ConsumerSubscribeTopics | ConsumerSubscribeTopic; - runConfig?: ConsumerRunConfig; +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + } + }, + producer: { + clients: { + pub1: { + kafkaInstanceRef: 'sub1', + } + } + } + } } ``` +### Using Producer +There is no default instance for Producer. Since the service factory design pattern is used, it can be injected through `@InjectClient()`. -**Example 1** - - -Create a manual submission, set the offset of the latest submission to be used by the consumer when starting to get the message `fromBeginning: false`, and set the submission method at runtime to manual submission `autoCommit: false` -```typeScript -import { Provide, Consumer, MSListenerType, Inject, App, KafkaListener } from '@midwayjs/core'; -import { KafkaMessage } from 'kafkajs'; -import { Context, Application } from '../../../../../src'; +```typescript +// src/service/user.service.ts +import { Provide, InjectClient } from '@midwayjs/core'; +import { KafkaProducerFactory, Producer } from '@midwayjs/kafka'; @Provide() -@Consumer(MSListenerType.KAFKA) -export class UserConsumer { +export class UserService { + + @InjectClient(KafkaProducerFactory, 'pub1') + producer: Producer; + + async invoke() { + await this.producer.send({ + topic: 'topic-test-1', + messages: [{ key: 'message-key1', value: 'hello consumer 11 !' }], + }); + } +} +``` - @App() - app: Application; +## Admin - @Inject() - ctx: Context; +Kafka's Admin functionality can be used to create, delete, view topics, view configurations, and ACLs, etc. - @Inject() - logger; +### Basic Configuration - @KafkaListener('topic-test0', { - subscription: { - fromBeginning: false - }, - runConfig: { - autoCommit: false - } - }) - async gotData(message: KafkaMessage) { - console.info('gotData info'); - this.logger.info('test output =>', message.offset + '' + message.key + '' + message.value.toString('utf8')); - try { - // Throws an exception. When an exception occurs, you need to set the commitOffsets offset to the location of the exception to re-execute the consumption. - throw new Error("error"); - } catch (error) { - this.ctx.commitOffsets(message.offset); +Like Producer, Admin also uses the service factory design pattern. + +```typescript +// src/config/config.default +export default { + kafka: { + admin: { + clients: { + admin1: { + // ... + } + } } } } - ``` +Similarly, Admin can also reuse the Kafka instance. +```typescript +// src/config/config.default +export default { + kafka: { + consumer: { + sub1: { + connectionOptions: { + clientId: 'my-app', + brokers: ['localhost:9092'], + }, + } + }, + admin: { + clients: { + admin1: { + kafkaInstanceRef: 'sub1', + } + } + } + } +} +``` +### Using Admin -## Producer Usage Method - +There is no default instance for Admin. Since the service factory design pattern is used, it can be injected through `@InjectClient()`. -The producer (Producer) is also the message producer in the first section. In short, it will create a client and send messages to the Kafka service. +```typescript +// src/service/admin.service.ts +import { Provide, InjectClient } from '@midwayjs/core'; +import { KafkaAdminFactory, Admin } from '@midwayjs/kafka'; +@Provide() +export class AdminService { + + @InjectClient(KafkaAdminFactory, 'admin1') + admin: Admin; +} +``` -Note: Midway currently does not use components to support message sending. The example shown here is only the writing method of pure SDK in Midway. +For more Admin usage methods, refer to the [KafkaJS Admin](https://kafka.js.org/docs/admin) documentation. +## Component Logging -### Install dependencies +The Kafka component uses the `kafkaLogger` log by default, which will record `ctx.logger` in `midway-kafka.log`. +You can modify it through configuration. -```bash -$ npm i kafkajs --save +```typescript +// src/config/config.default +export default { + midwayLogger: { + clients: { + kafkaLogger: { + fileLogName: 'midway-kafka.log', + }, + }, + }, +} ``` +The output format of this log can also be configured separately. -### Call the service to send a message - - -For example, we add a `Kafka. ts` file under the service file. ```typescript -import { - Provide, - Scope, - ScopeEnum, - Init, - Autoload, - Destroy, - Config, -} from '@midwayjs/core'; -import { ProducerRecord } from 'kafkajs'; -const { Kafka } = require('kafkajs'); - -@Autoload() -@Provide() -@Scope(ScopeEnum.Singleton) // Singleton singleton, globally unique (process level) -export class KafkaService { - @Config('kafka') - kafkaConfig: any; - - private producer; - - @Init() - async connect() { - // To create a connection, you can put the configuration in Config and inject it into it. - const { brokers, clientId, producerConfig = {} } = this.kafkaConfig; - const client = new Kafka({ - clientId: clientId - brokers: brokers - }); - this.producer = client.producer(producerConfig); - await this.producer.connect(); +export default { + kafka: { + // ... + contextLoggerFormat: info => { + const { jobId, from } = info.ctx; + return `${info.timestamp} ${info.LEVEL} ${info.pid} ${info.message}`; + }, } +} +``` - // Send a message - public async send(producerRecord: ProducerRecord) { - return this.producer.send(producerRecord); - } +## Access KafkaJS Module - @Destroy() - async close() { - await this.producer.disconnect(); - } -} +The KafkaJS module can be accessed through the `KafkaJS` field of `@midwayjs/kafka`. + +```typescript +import { KafkaJS } from '@midwayjs/kafka'; +const { ConfigResourceTypes } = KafkaJS; +// ... ``` -Probably created a service to encapsulate message communication, and it is the only Singleton singleton in the world. Due to the addition of `@AutoLoad` decorator, initialization can be self-executed. +## Warning About Partitions -In this way, the basic calling service is abstract. You only need to call the `send` method where it is used. +If you are using KafkaJS version v2.0.0, you may see the following warning: +``` +2024-11-04 23:47:28.228 WARN 31729 KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1" { timestamp: '2024-11-04T15:47:28.228Z', logger: 'kafkajs' } +``` -For example: +This warning is due to KafkaJS version v2.0.0 using a new partitioner by default. If you accept the new partitioner behavior but want to turn off this warning message, you can eliminate this warning by setting the environment variable `KAFKAJS_NO_PARTITIONER_WARNING=1`. +Or explicitly declare the partitioner. ```typescript -@Provide() -export class UserService { - - @Inject() - kafkaService: KafkaService; +// src/config/config.default +import { KafkaJS } from '@midwayjs/kafka'; +const { Partitioners } = KafkaJS; - async invoke() { - // TODO - - // Send a message - const result = this.kafkaService.send({ - topic: 'test', - messages: [ - { - value: JSON.stringify(messageValue) +export default { + kafka: { + producer: { + clients: { + pub1: { + // ... + producerOptions: { + createPartitioner: Partitioners.DefaultPartitioner, + // ... + createPartitioner: Partitioners.LegacyPartitioner, + }, }, - ], - }); + }, + }, } } ``` +It is recommended to check the KafkaJS v2.0.0 [migration guide](https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner) for more details. -## Reference document +## Reference Documentation - [KafkaJS](https://kafka.js.org/docs/introduction) -- [apache kafka official website](https://kafka.apache.org/intro) +- [Apache Kafka Official Website](https://kafka.apache.org/intro)