Skip to content

Commit

Permalink
bugfix: provide instance, improved error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mitch-lbw committed Mar 12, 2024
1 parent 90d7545 commit 7f3f551
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 35 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@kryptokrauts/event-processor-node-lib",
"version": "1.0.6",
"version": "1.0.7",
"description": "Wrapper for ease listening on antelope blockchain based on @blockmatic/antelope-ship-reader",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
60 changes: 30 additions & 30 deletions src/control-api.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import express from 'express';
import { control_api_port, getLogger } from './common/config';

const api = express();
const logger = getLogger('reset-api');

export async function startControlApi(reset_handler) {
api.get('/reset', (request, response) => {
const restart_at_block = request.query.blocknum;
const reset_database = request.query.reset_db || false;

if (restart_at_block === undefined) {
response.status(404).send('no blocknum defined, aborting');
} else {
logger.info(
`Parameters provided: blocknum: ${restart_at_block}, reset_db: ${reset_database}`,
);
reset_handler(restart_at_block, reset_database);
response
.status(200)
.send(
`event to restart at block ${restart_at_block} and reset database = ${reset_database} emitted`,
);
}
});

api.listen(control_api_port, (): void => {
logger.info(`processor-node control api started, running on port ${control_api_port}`);
});
}
import express from 'express';
import { control_api_port, getLogger } from './common/config';

const api = express();
const logger = getLogger('reset-api');

export async function startControlApi(reset_handler, instance) {
api.get('/reset', (request, response) => {
const restart_at_block = request.query.blocknum;
const reset_database = request.query.reset_db || false;

if (restart_at_block === undefined) {
response.status(404).send('no blocknum defined, aborting');
} else {
logger.info(
`Parameters provided: blocknum: ${restart_at_block}, reset_db: ${reset_database}`,
);
reset_handler.call(instance, restart_at_block, reset_database);
response
.status(200)
.send(
`event to restart at block ${restart_at_block} and reset database = ${reset_database} emitted`,
);
}
});

api.listen(control_api_port, (): void => {
logger.info(`processor-node control api started, running on port ${control_api_port}`);
});
}
14 changes: 10 additions & 4 deletions src/eosio/ship-reader-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class ShipReaderWrapper {
logger.info(KAFKA_TOPIC_CONFIG);

// start control api for sending reset message
startControlApi(this.handleExternalReset);
startControlApi(this.handleExternalReset, this);

// connect to kafka and retrieve last processed message data
this.kafka_wrapper = new KafkaWrapper({ header_prefix: this.config.message_header_prefix });
Expand Down Expand Up @@ -380,19 +380,25 @@ export class ShipReaderWrapper {
}
}

/**
* send external reset message
* @param restart_at_block
* @param reset_db
*/
private async handleExternalReset(restart_at_block: number, reset_db: boolean) {
try {
logger.warn(`Got external reset event to restart at blockum ${restart_at_block}`);
const resetEvent: string = this.createResetEvent(
'external_restart',
'external_blocknum_reset',
`caused by control api call`,
restart_at_block,
reset_db,
);
resetEvent && (await this.kafka_wrapper.sendEvent(resetEvent, 'reset_event'));
await this.gracefulShutdown();
} finally {
process.kill(process.pid, -1);
} catch (e) {
logger.error(e);
this.sendEventAndEndProcess('handle external reset', e);
}
}
}

0 comments on commit 7f3f551

Please sign in to comment.