Skip to content

Commit

Permalink
feat: control api
Browse files Browse the repository at this point in the history
  • Loading branch information
mitch-lbw committed Mar 12, 2024
1 parent a8defdd commit 90d7545
Show file tree
Hide file tree
Showing 5 changed files with 481 additions and 7 deletions.
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@kryptokrauts/event-processor-node-lib",
"version": "1.0.5",
"version": "1.0.6",
"description": "Wrapper for ease listening on antelope blockchain based on @blockmatic/antelope-ship-reader",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand All @@ -9,7 +9,8 @@
"src"
],
"scripts": {
"dev-example": "env-cmd -f ./.env_local tsnd examples/atomicmarket.ts | npx pino-pretty --colorize",
"dev-aa": "env-cmd -f ./.env_local tsnd examples/atomicassets.ts | npx pino-pretty --colorize",
"dev-am": "env-cmd -f ./.env_local_am tsnd examples/atomicmarket.ts | npx pino-pretty --colorize",
"lint": "eslint --ignore-path .eslintignore \"**/*.+(js|ts|tsx)\"",
"format": "prettier --ignore-path .gitignore --write \"**/*.+(js|json|ts|tsx)\"",
"format:check": "prettier -c .",
Expand Down Expand Up @@ -38,6 +39,7 @@
"@types/node-fetch": "2.6.9",
"dotenv": "16.3.1",
"env-cmd": "10.1.0",
"express": "4.18.3",
"kafkajs": "2.2.4",
"pino": "8.16.2"
},
Expand Down
2 changes: 2 additions & 0 deletions src/common/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { Logger, pino } from 'pino';

dotenv.config();

export const control_api_port = process.env.PORT || 8000;

export const EOSIO_CONFIG = {
start_block: Number(process.env.EOSIO_START_BLOCK),
eosio_node_api: process.env.EOSIO_NODE_API,
Expand Down
30 changes: 30 additions & 0 deletions src/control-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import express from 'express';
import { control_api_port, getLogger } from './common/config';

const api = express();
const logger = getLogger('reset-api');

export async function startControlApi(reset_handler) {
api.get('/reset', (request, response) => {
const restart_at_block = request.query.blocknum;
const reset_database = request.query.reset_db || false;

if (restart_at_block === undefined) {
response.status(404).send('no blocknum defined, aborting');
} else {
logger.info(
`Parameters provided: blocknum: ${restart_at_block}, reset_db: ${reset_database}`,
);
reset_handler(restart_at_block, reset_database);
response
.status(200)
.send(
`event to restart at block ${restart_at_block} and reset database = ${reset_database} emitted`,
);
}
});

api.listen(control_api_port, (): void => {
logger.info(`processor-node control api started, running on port ${control_api_port}`);
});
}
21 changes: 20 additions & 1 deletion src/eosio/ship-reader-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
ResetEvent,
ShipReaderWrapperConfig,
} from '../common/types';
import { startControlApi } from '../control-api';
import KafkaWrapper from '../kafka/kafka-wrapper';
import { fetchAbi, getHeadBlockNum } from './chain-api';

Expand Down Expand Up @@ -63,8 +64,10 @@ export class ShipReaderWrapper {
logger.info(KAFKA_CONFIG);
logger.info(KAFKA_TOPIC_CONFIG);

// connect to kafka and retrieve last processed message data
// start control api for sending reset message
startControlApi(this.handleExternalReset);

// connect to kafka and retrieve last processed message data
this.kafka_wrapper = new KafkaWrapper({ header_prefix: this.config.message_header_prefix });

const { last_blocknum: last_blocknum, type } = await this.kafka_wrapper.connect();
Expand Down Expand Up @@ -376,4 +379,20 @@ export class ShipReaderWrapper {
process.kill(process.pid, type);
}
}

private async handleExternalReset(restart_at_block: number, reset_db: boolean) {
try {
logger.warn(`Got external reset event to restart at blockum ${restart_at_block}`);
const resetEvent: string = this.createResetEvent(
'external_restart',
`caused by control api call`,
restart_at_block,
reset_db,
);
resetEvent && (await this.kafka_wrapper.sendEvent(resetEvent, 'reset_event'));
await this.gracefulShutdown();
} finally {
process.kill(process.pid, -1);
}
}
}
Loading

0 comments on commit 90d7545

Please sign in to comment.