Skip to content

Commit

Permalink
feat(kafka): refactor framework and add factory service for producer …
Browse files Browse the repository at this point in the history
…and admin (#4154)

* refactor: kafka framework

* refactor: kafka framework

* refactor: kafka framework

* refactor: kafka framework

* refactor: kafka framework

* feat: support producer and admin for kafka

* fix: test

* docs: add document

(cherry picked from commit 2c71afc)
  • Loading branch information
czy88840616 committed Dec 15, 2024
1 parent d8c7802 commit 31ee217
Show file tree
Hide file tree
Showing 20 changed files with 1,676 additions and 547 deletions.
133 changes: 133 additions & 0 deletions packages/core/src/common/typedResourceManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { ClassType } from '../interface';

export class TypedResourceManager<
Resource = any,
ResourceInitializeConfig = any,
ResourceProviderType = any
> {
private resourceMap: Map<string, Resource> = new Map();
private resourceBindingMap: Map<string, any> = new Map();
constructor(
protected typedResourceInitializerOptions: {
initializeValue: {
[resourceName: string]: ResourceInitializeConfig;
};
initializeClzProvider: {
[resourceName: string]: ClassType<ResourceProviderType>;
};
resourceInitialize: (
resourceInitializeConfig: ResourceInitializeConfig,
resourceName: string
) => Promise<Resource>;
resourceBinding: (
ClzProvider: ClassType<ResourceProviderType>,
resourceInitializeConfig: ResourceInitializeConfig,
resource: Resource,
resourceName: string
) => Promise<any>;
resourceStart: (
resource: Resource,
resourceInitializeConfig: ResourceInitializeConfig,
resourceBindingResult?: any
) => Promise<void>;
resourceDestroy: (
resource: Resource,
resourceInitializeConfig: ResourceInitializeConfig
) => Promise<void>;
}
) {}

public async createResource(
resourceName: string,
resourceInitializeConfig: ResourceInitializeConfig
) {
const resource =
await this.typedResourceInitializerOptions.resourceInitialize(
resourceInitializeConfig,
resourceName
);
this.resourceMap.set(resourceName, resource);
return resource;
}

public async init() {
for (const resourceName of Object.keys(
this.typedResourceInitializerOptions.initializeValue
)) {
const resourceInitializeConfig =
this.typedResourceInitializerOptions.initializeValue[resourceName];
const ClzProvider =
this.typedResourceInitializerOptions.initializeClzProvider[
resourceName
];

const resource = await this.createResource(
resourceName,
resourceInitializeConfig
);

const bindingResult =
await this.typedResourceInitializerOptions.resourceBinding(
ClzProvider,
resourceInitializeConfig,
resource,
resourceName
);
if (bindingResult) {
this.resourceBindingMap.set(resourceName, bindingResult);
}
}
}

public async startParallel() {
const startPromises = [];
for (const [resourceName, resource] of this.resourceMap.entries()) {
startPromises.push(
this.typedResourceInitializerOptions.resourceStart(
resource,
this.typedResourceInitializerOptions.initializeValue[resourceName],
this.resourceBindingMap.get(resourceName)
)
);
}
await Promise.all(startPromises);
}

public async start() {
for (const [resourceName, resource] of this.resourceMap.entries()) {
await this.typedResourceInitializerOptions.resourceStart(
resource,
this.typedResourceInitializerOptions.initializeValue[resourceName],
this.resourceBindingMap.get(resourceName)
);
}
}

public async destroyParallel() {
const destroyPromises = [];
for (const [resourceName, resource] of this.resourceMap.entries()) {
destroyPromises.push(
this.typedResourceInitializerOptions.resourceDestroy(
resource,
this.typedResourceInitializerOptions.initializeValue[resourceName]
)
);
}
await Promise.all(destroyPromises);
}

public async destroy() {
for (const [resourceName, resource] of this.resourceMap.entries()) {
await this.typedResourceInitializerOptions.resourceDestroy(
resource,
this.typedResourceInitializerOptions.initializeValue[resourceName]
);
}
this.resourceMap.clear();
this.resourceBindingMap.clear();
}

public getResource(resourceName: string): any {
return this.resourceMap.get(resourceName);
}
}
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export { PathFileUtil } from './util/pathFileUtil';
export { FileUtils } from './util/fs';
export { FORMAT } from './util/format';
export { ServerResponse, HttpServerResponse } from './response/index';
export { TypedResourceManager } from './common/typedResourceManager';
export { MidwayPerformanceManager } from './common/performanceManager';

export * from './decorator/metadataManager';
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1201,3 +1201,4 @@ export interface InjectionConfigurationOptions {
}

export type FunctionalConfigurationOptions = InjectionConfigurationOptions & ILifeCycle;

65 changes: 65 additions & 0 deletions packages/core/test/common/typedResourceManager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { TypedResourceManager } from '../../src';

describe('test/common/typedResourceManager.test.ts', () => {
let typedResourceManager: TypedResourceManager<any, any, any>;
let mockResourceInitialize: jest.Mock;
let mockResourceBinding: jest.Mock;
let mockResourceStart: jest.Mock;
let mockResourceDestroy: jest.Mock;

beforeEach(() => {
mockResourceInitialize = jest.fn().mockResolvedValue('mockResource');
mockResourceBinding = jest.fn().mockResolvedValue('mockBindingResult');
mockResourceStart = jest.fn().mockResolvedValue(undefined);
mockResourceDestroy = jest.fn().mockResolvedValue(undefined);

typedResourceManager = new TypedResourceManager({
initializeValue: {
testResource: { configKey: 'configValue' }
},
initializeClzProvider: {
testResource: class {}
},
resourceInitialize: mockResourceInitialize,
resourceBinding: mockResourceBinding,
resourceStart: mockResourceStart,
resourceDestroy: mockResourceDestroy
});
});

it('should create and initialize resources', async () => {
await typedResourceManager.init();
expect(mockResourceInitialize).toHaveBeenCalledWith({ configKey: 'configValue' }, 'testResource');
expect(mockResourceBinding).toHaveBeenCalledWith(expect.any(Function), { configKey: 'configValue' }, 'mockResource', 'testResource');
});

it('should start resources', async () => {
await typedResourceManager.init();
await typedResourceManager.start();
expect(mockResourceStart).toHaveBeenCalledWith('mockResource', { configKey: 'configValue' }, 'mockBindingResult');
});

it('should destroy resources', async () => {
await typedResourceManager.init();
await typedResourceManager.destroy();
expect(mockResourceDestroy).toHaveBeenCalledWith('mockResource', { configKey: 'configValue' });
});

it('should start resources in parallel', async () => {
await typedResourceManager.init();
await typedResourceManager.startParallel();
expect(mockResourceStart).toHaveBeenCalledWith('mockResource', { configKey: 'configValue' }, 'mockBindingResult');
});

it('should destroy resources in parallel', async () => {
await typedResourceManager.init();
await typedResourceManager.destroyParallel();
expect(mockResourceDestroy).toHaveBeenCalledWith('mockResource', { configKey: 'configValue' });
});

it('should get a resource by name', async () => {
await typedResourceManager.init();
const resource = typedResourceManager.getResource('testResource');
expect(resource).toBe('mockResource');
});
});
8 changes: 8 additions & 0 deletions packages/kafka/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import { IMidwayKafkaConfigurationOptions } from './dist';
export * from './dist/index';
export {
EachMessagePayload,
EachBatchPayload,
Consumer,
Kafka,
Producer,
Admin,
} from 'kafkajs';

declare module '@midwayjs/core/dist/interface' {
interface MidwayConfig {
Expand Down
15 changes: 11 additions & 4 deletions packages/kafka/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@ import { Configuration } from '@midwayjs/core';
importConfigs: [
{
default: {
kafka: {},
kafka: {
contextLoggerApplyLogger: 'kafkaLogger',
},
midwayLogger: {
clients: {
kafkaLogger: {
fileLogName: 'midway-kafka.log',
},
},
},
},
},
],
})
export class KafkaConfiguration {
async onReady() {}
}
export class KafkaConfiguration {}
17 changes: 17 additions & 0 deletions packages/kafka/src/decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import {
Provide,
saveModule,
Scope,
ScopeEnum,
saveClassMetadata,
} from '@midwayjs/core';
export const KAFKA_DECORATOR_KEY = 'rpc:kafka';

export function KafkaConsumer(consumerName: string): ClassDecorator {
return target => {
saveModule(KAFKA_DECORATOR_KEY, target);
saveClassMetadata(KAFKA_DECORATOR_KEY, consumerName, target);
Scope(ScopeEnum.Request)(target);
Provide()(target);
};
}
Loading

0 comments on commit 31ee217

Please sign in to comment.