Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mitch-lbw committed Dec 11, 2023
0 parents commit 7ebe07d
Show file tree
Hide file tree
Showing 17 changed files with 2,716 additions and 0 deletions.
27 changes: 27 additions & 0 deletions .env_template
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
LOG_LEVEL=info

# node API endpoint (http(s)://...)
EOSIO_NODE_API=
# node shipreader endpoint (ws(s)://...)
# block to start initially from
EOSIO_START_BLOCK=

# unique client_id for interacting with Kafka
KAFKA_CLIENT_ID=
# comma separated list of <hostname:port> pairs of brokers
KAFKA_BROKERS=
# timeout for connections to Kafka brokers
KAFKA_CONNECTION_TIMEOUT=10000
# Kafka log level (1=error)
KAFKA_LOG_LEVEL=1
# max number of retries to establish a connection to Kafka brokers (with increasing retry delay)
KAFKA_RETRY_RETRIES=5

# sink topic for messages
KAFKA_CONTRACT_TOPIC=
# default topic partition
KAFKA_CONTRACT_TOPIC_PARTITION=0
# wait until all in-sync replicas acknowledge record received
KAFKA_CONTRACT_TOPIC_ACKS=-1
# unique group_id for consumers reading this topic
KAFKA_CONTRACT_CONSUMER_GROUP_ID=
3 changes: 3 additions & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build/
dist/
.lh/
14 changes: 14 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 12,
"sourceType": "module"
},
"plugins": ["@typescript-eslint"],
"extends": ["eslint:recommended", "plugin:@typescript-eslint/recommended"],
"rules": {},
"env": {
"browser": true,
"es2021": true
}
}
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.env_local
node_modules/
dist/
.lh/
46 changes: 46 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.

# github actions & gitops
.github
.argo
charts

# dependencies
/node_modules
/.pnp
.pnp.js

# testing
/coverage

# next.js
.next
/.next/
/out/

# production
/build

# misc
.DS_Store
*.pem

# debug
npm-debug.log*
yarn-debug.log*
yarn-error.log*

# local env files
.env.local
.env.development.local
.env.test.local
.env.production.local

# vercel
.vercel

# changelog
CHANGELOG.md

/dist
/.lh
9 changes: 9 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"arrowParens": "avoid",
"singleQuote": true,
"jsxSingleQuote": true,
"tabWidth": 2,
"semi": true,
"proseWrap": "always",
"printWidth": 100
}
27 changes: 27 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
FROM node:21.1.0-alpine3.18 as builder

WORKDIR /node-event-processor
COPY package.json ./
COPY tsconfig.json ./
COPY src ./src
COPY examples ./examples
RUN yarn install --frozen-lockfile
RUN yarn build

FROM node:21.1.0-alpine3.18 as runner

ENV NODE_ENV production

RUN adduser --disabled-password node_user && \
mkdir -p /node-event-processor && \
chown -R node_user:node_user /node-event-processor

WORKDIR /node-event-processor

USER node_user

COPY package.json ./
RUN yarn install --frozen-lockfile
COPY --from=builder ./node-event-processor/dist .

ENTRYPOINT ["node", "./examples/atomicmarket.js", " | npx pino-pretty --colorize"]
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# event-processor-node library

#### This library provides the following capabilities

- establish communication to Kafka broker network
- read last message of defined output topic to get last successfully processed blocknum
- if topic is empty (initial), use env variable
- connect and listen to XPR Network based on [blockmatic/antelope-ship-reader](https://github.com/blockmatic/antelope-ship-reader)
- start reading at resolved blocknum above
- listening on user defined contract actions as well as forks
- provide a `handleAction `callback to implement message transformation
- transformed messages are automatically sent to the configured Kafka topic with a dedicated header `<message_prefix_header>.<action_name>`

In case of expected shutdown, a SIGTERM/SIGINT listener generates a dedicated `RestInfo` message on the output topic with the last processed blocknum from the ship reader. This allows a reset at this point in case no relevant block data was written to the output topic and thus omits the necessity of reading irrelevant / empty blocks again on restart (in case of bigger gaps).

Using this mechanism also allows a manual reset to a certain blocknum by manually creating such a `ResetInfo` message within the output topic.

In case of forks, the ship reader stops and creates a `ResetInfo` with the last irreversible block as starting point. All subsequent processes can also react on this message.

#### Configuration
Refer to [.env_template](.env_template).

#### Sample usage
The example implementation [atomicmarket.ts](/examples/atomicmarket.ts) show how the necessary configuration is provided
- `table_rows_whitelist` events on contract and contract tables
- `actions_whitelist` contract actions
The `handleAction` callback is called, whenever one of the whitelisted actions occur
The wrapper is configured and started as follows
```
const wrapper: ShipReaderWrapper = new ShipReaderWrapper({
actions_whitelist: actions_whitelist,
message_header_prefix: 'atomicmarket',
table_rows_whitelist: table_rows_whitelist,
action_handler: handleAction,
});
wrapper.startProcessing();
```
The [Dockerfile](Dockerfile) contains the build library and entrypoint for the example.
71 changes: 71 additions & 0 deletions examples/atomicmarket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {
EosioReaderAction,
EosioReaderActionFilter,
EosioReaderTableRowFilter,
} from '@blockmatic/eosio-ship-reader';
import { getLogger } from '../src/common/config';
import { ActionData, ActionHandlerResult } from '../src/common/types';
import ShipReaderWrapper from '../src/eosio/ship-reader-wrapper';

const logger = getLogger('atomicmarket_example');

/**
* define tables and actions to listen for
*/
const table_rows_whitelist: () => EosioReaderTableRowFilter[] = () => [
{ code: 'atomicmarket', table: 'auctions' },
{ code: 'atomicmarket', table: 'balances' },
{ code: 'atomicmarket', table: 'buyoffers' },
{ code: 'atomicmarket', table: 'config' },
{ code: 'atomicmarket', table: 'counters' },
{ code: 'atomicmarket', table: 'marketplaces' },
{ code: 'atomicmarket', table: 'sales' },
];

const actions_whitelist: () => EosioReaderActionFilter[] = () => [
{ code: 'atomicmarket', action: '*' },
];

/**
* define the handler which reacts on actions contained in a block
* the message returned by this method will be sent to contract topic
* @param actionData
* @returns
*/
const handleAction = (actionData: ActionData): ActionHandlerResult => {
const action: EosioReaderAction = actionData.eosio_reader_action;

switch (action.name) {
default:
logger.debug(`Ignoring action ${action.name}`);
break;
case 'lognewsale':
if ('atomicmarket' === action.receipt.receiver) {
return {
msg: JSON.stringify({
blocknum: actionData.blocknum,
timestamp: actionData.timestamp,
type: action.name,
transaction_id: action.transaction_id,
data: action.data,
}),
action_type: actionData.eosio_reader_action.name,
};
}
}
};

/**
* configure and start event-processor-node
*/
const run = async () => {
const wrapper: ShipReaderWrapper = new ShipReaderWrapper({
actions_whitelist: actions_whitelist,
message_header_prefix: 'atomicmarket',
table_rows_whitelist: table_rows_whitelist,
action_handler: handleAction,
});
wrapper.startProcessing();
};

run();
46 changes: 46 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"name": "@kryptokrauts/event-processor-node-lib",
"version": "1.0.0",
"description": "",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": [
"dist",
"src"
],
"scripts": {
"dev-example": "env-cmd -f ./.env_local tsnd examples/atomicmarket.ts | npx pino-pretty --colorize",
"lint": "eslint --ignore-path .gitignore \"**/*.+(js|ts|tsx)\"",
"format": "prettier --ignore-path .gitignore --write \"**/*.+(js|json|ts|tsx)\"",
"build": "tsc -p .",
"prepare": "yarn run build",
"prepublishOnly": "yarn run format",
"preversion": "yarn run lint",
"version": "yarn run format && git add -A src",
"postversion": "git push && git push --tags"
},
"author": "kryptokrauts",
"license": "MIT",
"devDependencies": {
"@typescript-eslint/eslint-plugin": "6.13.2",
"@typescript-eslint/parser": "6.13.2",
"eslint": "8.55.0",
"pino-pretty": "10.2.3",
"prettier": "3.1.0",
"ts-node-dev": "2.0.0",
"typescript": "5.3.2"
},
"dependencies": {
"@blockmatic/eosio-ship-reader": "1.1.0",
"@types/node": "20.10.3",
"@types/node-fetch": "2.6.9",
"dotenv": "16.3.1",
"env-cmd": "10.1.0",
"kafkajs": "2.2.4",
"pino": "8.16.2"
},
"resolutions": {
"@eosrio/node-abieos": "2.1.1",
"rxjs": "6.6.7"
}
}
34 changes: 34 additions & 0 deletions src/common/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import dotenv from 'dotenv';
import { KafkaConfig, logLevel } from 'kafkajs';
import { Logger, pino } from 'pino';

dotenv.config();

export const EOSIO_CONFIG = {
start_block: Number(process.env.EOSIO_START_BLOCK),
log_head_diff: Boolean(process.env.EOSIO_LOG_HEAD_DIFF),
};

export const KAFKA_CONFIG: KafkaConfig = {
clientId: process.env.KAFKA_CLIENT_ID,
brokers: process.env.KAFKA_BROKERS.split(','),
connectionTimeout: Number(process.env.KAFKA_CONNECTION_TIMEOUT),
logLevel: logLevel[process.env.KAFKA_LOG_LEVEL],
retry: {
retries: Number(process.env.KAFKA_RETRY_RETRIES || 5),
},
};

export const KAFKA_TOPIC_CONFIG = {
contract_topic: process.env.KAFKA_CONTRACT_TOPIC,
contract_topic_partition: Number(process.env.KAFKA_CONTRACT_TOPIC_PARTITION || 0),
contract_consumer_group_id: process.env.KAFKA_CONTRACT_CONSUMER_GROUP_ID,
contract_topic_acks: Number(process.env.KAFKA_CONTRACT_TOPIC_ACKS),
};

export function getLogger(name: string): Logger {
return pino({
name: name,
level: process.env.LOG_LEVEL,
});
}
55 changes: 55 additions & 0 deletions src/common/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import {
EosioReaderAction,
EosioReaderActionFilter,
EosioReaderTableRowFilter,
ShipTableDeltaName,
} from '@blockmatic/eosio-ship-reader';

export interface KafkaWrapperConfig {
header_prefix: string;
}

export interface ResetInfo {
last_blocknum: string;
type: string;
}

export interface ResetEvent {
// type of event, can be fork or manual reset
reset_type: string;
// timestamp, reset event occurred
timestamp: string;
// the block to restart from on the next run
restart_at_block: number;
// blocknum the reset event occured
reset_blocknum: number;
// flag for cleaning the internal database, all data after this block will be cleansed
clean_database: boolean;
}

export interface ShipReaderWrapperConfig {
action_handler: (data: ActionData) => ActionHandlerResult;
message_header_prefix: string;
table_rows_whitelist: () => EosioReaderTableRowFilter[];
actions_whitelist: () => EosioReaderActionFilter[];
}

export interface ActionData {
eosio_reader_action: EosioReaderAction;
blocknum: number;
timestamp: string;
}

export interface ActionHandlerResult {
msg: string;
action_type: string;
}

export const delta_whitelist: () => ShipTableDeltaName[] = () => [
'account_metadata',
'contract_table',
'contract_row',
'contract_index64',
'resource_usage',
'resource_limits_state',
];
Loading

0 comments on commit 7ebe07d

Please sign in to comment.