Skip to content

Commit

Permalink
feat: dal retry when init failed
Browse files Browse the repository at this point in the history
  • Loading branch information
gxkl committed Dec 19, 2024
1 parent b310ec3 commit d6ee030
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 132 deletions.
1 change: 1 addition & 0 deletions core/dal-runtime/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@types/node": "^20.2.4",
"@types/nunjucks": "^3.2.6",
"cross-env": "^7.0.3",
"mm": "^3.2.1",
"mocha": "^10.2.0",
"ts-node": "^10.9.1",
"typescript": "^5.0.4"
Expand Down
24 changes: 23 additions & 1 deletion core/dal-runtime/src/MySqlDataSource.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { RDSClient } from '@eggjs/rds';
import type { RDSClientOptions } from '@eggjs/rds';
import Base from 'sdk-base';
import { Logger } from '@eggjs/tegg-types';

export interface DataSourceOptions extends RDSClientOptions {
name: string;
// default is select 1 + 1;
initSql?: string;
forkDb?: boolean;
initRetryTimes?: number;
logger?: Logger;
}

const DEFAULT_OPTIONS: RDSClientOptions = {
Expand All @@ -22,12 +25,16 @@ export class MysqlDataSource extends Base {
readonly timezone?: string;
readonly rdsOptions: RDSClientOptions;
readonly forkDb?: boolean;
readonly #initRetryTimes?: number;
readonly #logger?: Logger;

constructor(options: DataSourceOptions) {
super({ initMethod: '_init' });
const { name, initSql, forkDb, ...mysqlOptions } = options;
const { name, initSql, forkDb, initRetryTimes, logger, ...mysqlOptions } = options;
this.#logger = logger;
this.forkDb = forkDb;
this.initSql = initSql ?? 'SELECT 1 + 1';
this.#initRetryTimes = initRetryTimes;
this.name = name;
this.timezone = options.timezone;
this.rdsOptions = Object.assign({}, DEFAULT_OPTIONS, mysqlOptions);
Expand All @@ -36,7 +43,22 @@ export class MysqlDataSource extends Base {

protected async _init() {
if (this.initSql) {
await this.#doInit(1);
}
}

async #doInit(tryTimes: number): Promise<void> {
try {
this.#logger?.log(`${tryTimes} try to initialize dataSource ${this.name}`);
const st = Date.now();
await this.client.query(this.initSql);
this.#logger?.info(`dataSource initialization cost: ${Date.now() - st}, tryTimes: ${tryTimes}`);
} catch (e) {
this.#logger?.warn(`failed to initialize dataSource ${this.name}, tryTimes ${tryTimes}`, e);
if (!this.#initRetryTimes || tryTimes >= this.#initRetryTimes) {
throw e;
}
await this.#doInit(tryTimes + 1);
}
}

Expand Down
300 changes: 174 additions & 126 deletions core/dal-runtime/test/DataSource.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import assert from 'node:assert';
import path from 'node:path';
import mm from 'mm';
import { RDSClient } from '@eggjs/rds';
import { DeleteResult, InsertResult, UpdateResult } from '@eggjs/rds/lib/types';
import { TableModel } from '@eggjs/dal-decorator';
import { MysqlDataSource } from '../src/MySqlDataSource';
Expand All @@ -11,143 +13,189 @@ import { DatabaseForker } from '../src/DatabaseForker';
import { BaseFooDAO } from './fixtures/modules/dal/dal/dao/base/BaseFooDAO';

describe('test/Datasource.test.ts', () => {
let dataSource: DataSource<Foo>;
let tableModel: TableModel<Foo>;
let forker: DatabaseForker;

before(async () => {
const mysqlOptions = {
name: 'foo',
host: '127.0.0.1',
user: 'root',
database: 'test_runtime',
timezone: '+08:00',
initSql: 'SET GLOBAL time_zone = \'+08:00\';',
forkDb: true,
};
forker = new DatabaseForker('unittest', mysqlOptions);
await forker.forkDb(path.join(__dirname, './fixtures/modules/dal'));
const mysql = new MysqlDataSource(mysqlOptions);
await mysql.ready();

tableModel = TableModel.build(Foo);
const sqlMapLoader = new SqlMapLoader(tableModel, BaseFooDAO, console as any);
const sqlMap = sqlMapLoader.load();
dataSource = new DataSource(tableModel, mysql, sqlMap);
});
const mysqlOptions = {
name: 'foo',
host: '127.0.0.1',
user: 'root',
database: 'test_runtime',
timezone: '+08:00',
initSql: 'SET GLOBAL time_zone = \'+08:00\';',
forkDb: true,
};

describe('init', () => {
afterEach(() => {
mm.restore();
});

it('init failed should throw error', async () => {
mm.errorOnce(RDSClient.prototype, 'query', new Error('fake error'));
const query: any = RDSClient.prototype.query;

const mysql = new MysqlDataSource(mysqlOptions);
await assert.rejects(mysql.ready(), /fake error/);
assert.strictEqual(query.called, 1);
assert.deepStrictEqual(query.lastCalledArguments, [ mysqlOptions.initSql ]);
});

it('init should retry', async () => {
let i = 0;
mm(RDSClient.prototype, 'query', () => {
throw new Error(`fake error ${++i}`);
});
const query: any = RDSClient.prototype.query;

const mysql = new MysqlDataSource({ ...mysqlOptions, initRetryTimes: 3 });
await assert.rejects(mysql.ready(), /fake error 3/);
assert.strictEqual(query.called, 3);
});

after(async () => {
await forker.destroy();
it('should success after retry', async () => {
let i = 0;
mm(RDSClient.prototype, 'query', async () => {
if (i === 0) {
i++;
throw new Error('fake error');
}
});
const query: any = RDSClient.prototype.query;

const mysql = new MysqlDataSource({ ...mysqlOptions, initRetryTimes: 2 });
await assert.doesNotReject(mysql.ready());
assert.strictEqual(query.called, 2);
});
});

it('execute should work', async () => {
const foo = new Foo();
foo.name = 'name';
foo.col1 = 'col1';
foo.bitColumn = Buffer.from([ 0, 0 ]);
foo.boolColumn = 0;
foo.tinyIntColumn = 0;
foo.smallIntColumn = 1;
foo.mediumIntColumn = 3;
foo.intColumn = 3;
foo.bigIntColumn = '00099';
foo.decimalColumn = '00002.33333';
foo.floatColumn = 2.3;
foo.doubleColumn = 2.3;
foo.dateColumn = new Date('2024-03-16T16:00:00.000Z');
foo.dateTimeColumn = new Date('2024-03-16T01:26:58.677Z');
foo.timestampColumn = new Date('2024-03-16T01:26:58.677Z');
foo.timeColumn = '838:59:50.123';
foo.yearColumn = 2024;
foo.varCharColumn = 'var_char';
foo.binaryColumn = Buffer.from('b');
foo.varBinaryColumn = Buffer.from('var_binary');
foo.tinyBlobColumn = Buffer.from('tiny_blob');
foo.tinyTextColumn = 'text';
foo.blobColumn = Buffer.from('blob');
foo.textColumn = 'text';
foo.mediumBlobColumn = Buffer.from('medium_blob');
foo.longBlobColumn = Buffer.from('long_blob');
foo.mediumTextColumn = 'medium_text';
foo.longTextColumn = 'long_text';
foo.enumColumn = 'A';
foo.setColumn = 'B';
foo.geometryColumn = { x: 10, y: 10 };
foo.pointColumn = { x: 10, y: 10 };
foo.lineStringColumn = [
{ x: 15, y: 15 },
{ x: 20, y: 20 },
];
foo.polygonColumn = [
[
{ x: 0, y: 0 }, { x: 10, y: 0 }, { x: 10, y: 10 }, { x: 0, y: 10 }, { x: 0, y: 0 },
], [
{ x: 5, y: 5 }, { x: 7, y: 5 }, { x: 7, y: 7 }, { x: 5, y: 7 }, { x: 5, y: 5 },
],
];
foo.multipointColumn = [
{ x: 0, y: 0 }, { x: 20, y: 20 }, { x: 60, y: 60 },
];
foo.multiLineStringColumn = [
[
{ x: 10, y: 10 }, { x: 20, y: 20 },
], [
{ x: 15, y: 15 }, { x: 30, y: 15 },
],
];
foo.multiPolygonColumn = [
[
describe('execute', () => {
let dataSource: DataSource<Foo>;
let tableModel: TableModel<Foo>;
let forker: DatabaseForker;

before(async () => {
forker = new DatabaseForker('unittest', mysqlOptions);
await forker.forkDb(path.join(__dirname, './fixtures/modules/dal'));
const mysql = new MysqlDataSource(mysqlOptions);
await mysql.ready();

tableModel = TableModel.build(Foo);
const sqlMapLoader = new SqlMapLoader(tableModel, BaseFooDAO, console as any);
const sqlMap = sqlMapLoader.load();
dataSource = new DataSource(tableModel, mysql, sqlMap);
});

after(async () => {
await forker.destroy();
});

it('execute should work', async () => {
const foo = new Foo();
foo.name = 'name';
foo.col1 = 'col1';
foo.bitColumn = Buffer.from([ 0, 0 ]);
foo.boolColumn = 0;
foo.tinyIntColumn = 0;
foo.smallIntColumn = 1;
foo.mediumIntColumn = 3;
foo.intColumn = 3;
foo.bigIntColumn = '00099';
foo.decimalColumn = '00002.33333';
foo.floatColumn = 2.3;
foo.doubleColumn = 2.3;
foo.dateColumn = new Date('2024-03-16T16:00:00.000Z');
foo.dateTimeColumn = new Date('2024-03-16T01:26:58.677Z');
foo.timestampColumn = new Date('2024-03-16T01:26:58.677Z');
foo.timeColumn = '838:59:50.123';
foo.yearColumn = 2024;
foo.varCharColumn = 'var_char';
foo.binaryColumn = Buffer.from('b');
foo.varBinaryColumn = Buffer.from('var_binary');
foo.tinyBlobColumn = Buffer.from('tiny_blob');
foo.tinyTextColumn = 'text';
foo.blobColumn = Buffer.from('blob');
foo.textColumn = 'text';
foo.mediumBlobColumn = Buffer.from('medium_blob');
foo.longBlobColumn = Buffer.from('long_blob');
foo.mediumTextColumn = 'medium_text';
foo.longTextColumn = 'long_text';
foo.enumColumn = 'A';
foo.setColumn = 'B';
foo.geometryColumn = { x: 10, y: 10 };
foo.pointColumn = { x: 10, y: 10 };
foo.lineStringColumn = [
{ x: 15, y: 15 },
{ x: 20, y: 20 },
];
foo.polygonColumn = [
[
{ x: 0, y: 0 }, { x: 10, y: 0 }, { x: 10, y: 10 }, { x: 0, y: 10 }, { x: 0, y: 0 },
], [
{ x: 5, y: 5 }, { x: 7, y: 5 }, { x: 7, y: 7 }, { x: 5, y: 7 }, { x: 5, y: 5 },
],
],
[
];
foo.multipointColumn = [
{ x: 0, y: 0 }, { x: 20, y: 20 }, { x: 60, y: 60 },
];
foo.multiLineStringColumn = [
[
{ x: 5, y: 5 }, { x: 7, y: 5 }, { x: 7, y: 7 }, { x: 5, y: 7 }, { x: 5, y: 5 },
{ x: 10, y: 10 }, { x: 20, y: 20 },
], [
{ x: 15, y: 15 }, { x: 30, y: 15 },
],
],
];
foo.geometryCollectionColumn = [
{ x: 10, y: 10 },
{ x: 30, y: 30 },
[
{ x: 15, y: 15 }, { x: 20, y: 20 },
],
];
foo.jsonColumn = {
hello: 'json',
};
const rowValue = TableModelInstanceBuilder.buildRow(foo, tableModel);
const insertResult: InsertResult = await dataSource.executeRawScalar('insert', rowValue);
assert(insertResult.insertId);
foo.id = insertResult.insertId;

const updateResult: UpdateResult = await dataSource.executeRawScalar('update', {
primary: {
id: insertResult.insertId,
},
$name: 'update_name',
});
assert.equal(updateResult.affectedRows, 1);
foo.name = 'update_name';
];
foo.multiPolygonColumn = [
[
[
{ x: 0, y: 0 }, { x: 10, y: 0 }, { x: 10, y: 10 }, { x: 0, y: 10 }, { x: 0, y: 0 },
],
],
[
[
{ x: 5, y: 5 }, { x: 7, y: 5 }, { x: 7, y: 7 }, { x: 5, y: 7 }, { x: 5, y: 5 },
],
],
];
foo.geometryCollectionColumn = [
{ x: 10, y: 10 },
{ x: 30, y: 30 },
[
{ x: 15, y: 15 }, { x: 20, y: 20 },
],
];
foo.jsonColumn = {
hello: 'json',
};
const rowValue = TableModelInstanceBuilder.buildRow(foo, tableModel);
const insertResult: InsertResult = await dataSource.executeRawScalar('insert', rowValue);
assert(insertResult.insertId);
foo.id = insertResult.insertId;

const findRow = await dataSource.executeScalar('findByPrimary', {
$id: insertResult.insertId,
});
assert(findRow);
assert.deepStrictEqual(findRow, foo);
const updateResult: UpdateResult = await dataSource.executeRawScalar('update', {
primary: {
id: insertResult.insertId,
},
$name: 'update_name',
});
assert.equal(updateResult.affectedRows, 1);
foo.name = 'update_name';

const deleteRow: DeleteResult = await dataSource.executeRawScalar('delete', {
id: insertResult.insertId,
});
assert.equal(deleteRow.affectedRows, 1);
const findRow = await dataSource.executeScalar('findByPrimary', {
$id: insertResult.insertId,
});
assert(findRow);
assert.deepStrictEqual(findRow, foo);

const findRow2 = await dataSource.executeScalar('findByPrimary', {
$id: insertResult.insertId,
});
assert.equal(findRow2, null);
const deleteRow: DeleteResult = await dataSource.executeRawScalar('delete', {
id: insertResult.insertId,
});
assert.equal(deleteRow.affectedRows, 1);

const res = await dataSource.paginate('findByPrimary', {}, 1, 10);
assert(res.total === 0);
const findRow2 = await dataSource.executeScalar('findByPrimary', {
$id: insertResult.insertId,
});
assert.equal(findRow2, null);

const res = await dataSource.paginate('findByPrimary', {}, 1, 10);
assert(res.total === 0);
});
});
});
Loading

0 comments on commit d6ee030

Please sign in to comment.