Skip to content

Commit

Permalink
feat: only_irreversible_blocks flag, node sync state event publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
mitch-lbw committed Mar 4, 2024
1 parent 3c098ee commit 01b679b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const wrapper: ShipReaderWrapper = new ShipReaderWrapper({
message_header_prefix: 'atomicmarket',
table_rows_whitelist: table_rows_whitelist,
action_handler: handleAction,
only_irreversible_blocks: false
});
wrapper.startProcessing();
```
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@kryptokrauts/event-processor-node-lib",
"version": "1.0.3",
"version": "1.0.4",
"description": "Wrapper for ease listening on antelope blockchain based on @blockmatic/antelope-ship-reader",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
10 changes: 10 additions & 0 deletions src/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,21 @@ export interface ResetEvent {
clean_database: boolean;
}

export interface NodeSyncStatusEvent {
timestamp: number;
head_block: number;
current_block: number;
diff: number;
in_sync: boolean;
current_sync_date: string;
}

export interface ShipReaderWrapperConfig {
action_handler: (data: ActionData) => ActionHandlerResult;
message_header_prefix: string;
table_rows_whitelist: () => EosioReaderTableRowFilter[];
actions_whitelist: () => EosioReaderActionFilter[];
only_irreversible_blocks: boolean;
}

export interface ActionData {
Expand Down
50 changes: 46 additions & 4 deletions src/eosio/ship-reader-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { EOSIO_CONFIG, getLogger, KAFKA_CONFIG, KAFKA_TOPIC_CONFIG } from '../co
import {
ActionHandlerResult,
delta_whitelist,
NodeSyncStatusEvent,
ResetEvent,
ShipReaderWrapperConfig,
} from '../common/types';
Expand Down Expand Up @@ -83,7 +84,7 @@ export class ShipReaderWrapper {

if (this.start_block !== -1) {
this.current_block = this.start_block;
this.checkReaderSyncState(this.start_block);
this.checkReaderSyncState(this.start_block, undefined);

// start listening to XPR Network node
this.startShipReader();
Expand Down Expand Up @@ -157,7 +158,7 @@ export class ShipReaderWrapper {

// since replaying blocks is much faster, check within greater block-span
if (!this.reader_in_sync && block.block_num % syncStateCheckInterval === 0) {
this.checkReaderSyncState(block.block_num);
this.checkReaderSyncState(block.block_num, block.timestamp);
}

if (block.actions?.length > 0) {
Expand Down Expand Up @@ -209,7 +210,10 @@ export class ShipReaderWrapper {
);
}

private async checkReaderSyncState(current_block: number): Promise<void> {
private async checkReaderSyncState(
current_block: number,
current_block_timestamp: string,
): Promise<void> {
const head_block = Number(await getHeadBlockNum());
const head_diff = head_block - current_block;
this.reader_in_sync = head_diff - num_blocks_to_finality <= 0;
Expand All @@ -219,6 +223,16 @@ export class ShipReaderWrapper {
} else {
logger.info(`Reader is at block height ${current_block}, diff to head is ${head_diff}`);
}

await this.kafka_wrapper.sendEvent(
this.createNodeSyncStatusEvent(
head_block,
current_block,
current_block_timestamp,
this.reader_in_sync,
),
'node_sync_status_event',
);
}

/**
Expand All @@ -227,6 +241,10 @@ export class ShipReaderWrapper {
* @returns
*/
private async getShipReader() {
if (this.config.only_irreversible_blocks) {
logger.info(`Configuration is set to fetch irreversible blocks only`);
}

const uniqueContractNames = [
...new Set(this.config.table_rows_whitelist().map(row => row.code)),
];
Expand Down Expand Up @@ -254,7 +272,7 @@ export class ShipReaderWrapper {
end_block_num: 0xffffffff,
max_messages_in_flight: 50,
have_positions: [],
irreversible_only: false,
irreversible_only: this.config.only_irreversible_blocks,
fetch_block: true,
fetch_traces: true,
fetch_deltas: true,
Expand Down Expand Up @@ -300,6 +318,30 @@ export class ShipReaderWrapper {
return undefined;
}

/**
* Create a node sync status event
* @param reset_type
* @param restart_at_block
* @param clean_database
* @returns
*/
private createNodeSyncStatusEvent(
head_block: number,
current_block: number,
current_block_timestamp: string,
in_sync: boolean,
): string {
const nodeSyncStatusEvent: NodeSyncStatusEvent = {
timestamp: Date.now(),
head_block,
current_block,
diff: head_block - current_block,
in_sync,
current_sync_date: current_block_timestamp,
};
return JSON.stringify(nodeSyncStatusEvent);
}

/**
* Dedicated shutdown procedure
*
Expand Down

0 comments on commit 01b679b

Please sign in to comment.