Skip to content

Commit

Permalink
feat: init commit, add orap, rek, utils
Browse files Browse the repository at this point in the history
  • Loading branch information
nom4dv3 committed Aug 8, 2024
1 parent c15ade8 commit 2593601
Show file tree
Hide file tree
Showing 26 changed files with 962 additions and 0 deletions.
59 changes: 59 additions & 0 deletions src/orap/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# ORAP: Oracle Application Framework

ORAP is an out of box, express style framework.

## Usage
```ts
import { ListenOptions, Orap, StoreManager } from "../../orap";
import { memoryStore, redisStore } from "../../utils";

const orap = new Orap()
// const sm = new StoreManager(redisStore(...)) // use redis
const sm = new StoreManager() // use memory

var eventSignalParam = {
address: "0xdAC17F958D2ee523a2206206994597C13D831ec7",
abi: {"anonymous": false,"inputs": [{ "indexed": true,"name": "from","type": "address"},{"indexed": true,"name": "to","type": "address"},{"indexed": false,"name": "value","type": "uint256"}],"name": "Transfer","type": "event"},
eventName: "Transfer",
}

const handle = (...args:any)=>{console.log("handle", args)}

orap.event(eventSignalParam, handle)
.crosscheck({intervalMsMin: 1000, batchBlocksCount: 1, blockIntervalMs: 12000})

orap.listen(
{wsProvider:"wss://127.0.0.1", httpProvider:"http://127.0.0.1"},
()=>{console.log("listening on provider.network")}
)
```

### listen options
- required: wsProvider, for subscription
- optional: httpProvider, for crosscheck only, since crosscheck is based on getLogs

## Task

### TaskBase
- provide universal `toString`, `fromString`, `stringify`

### TaskStorable
- provide store (redis) compatible features, i.e. load, save, remove, done
- overwrite when extends:
- `toKey()` (required): define the primary key that identifies each task, **doesn't** include `taskPrefix`
- `taskPrefix` (recommend): set the prefix of all tasks, also is used when `load` task
- `taskPrefixDone` (recommend): set the prefix of finished tasks, only used in `done`; no need to set if you don't use "task.done(sm)"

## Signal

all actions that arrive the oracle server and trigger actions are defined as signal, including:
- [x] event
- [ ] block
- [ ] http request
etc.

### EventSignal
- define event listener as simple as: `orap.event({address:"0x", abi:"", eventName: "Transfer"}, handleSignal)`
- provide crosschecker by `rek`, available config please checkout `AutoCrossCheckParam` in `rek`
- currently one and only one crosschecker is set to each event signal
- store: provide 2 options: use memory or redis, checkout `orap/store`
4 changes: 4 additions & 0 deletions src/orap/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from "./signal"
export * from "./store"
export * from "./task"
export * from "./orap"
36 changes: 36 additions & 0 deletions src/orap/orap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { ethers } from "ethers";
import { EventSignal } from "./signal/event";

export interface ListenOptions {
wsProvider: ethers.WebSocketProvider
httpProvider?: ethers.JsonRpcProvider
}

export class Orap {
routes: {
event: EventSignal[]
}

constructor() {
this.routes = {
event: []
}
}

event(options: any, fn: any) {
const es = new EventSignal(options, fn)
this.routes.event.push(es)
return es
}

_listenChain(wsProvider: ethers.WebSocketProvider, httpProvider?: ethers.JsonRpcProvider) {
this.routes.event.forEach(es => es.listen(wsProvider, httpProvider));
}

listen(options: ListenOptions, onListen: any = ()=>{}) {
this._listenChain(options.wsProvider, options.httpProvider);
onListen();
return this;
}
}
107 changes: 107 additions & 0 deletions src/orap/signal/event.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/* eslint-disable no-unused-vars */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { ethers, EventFragment, Log } from "ethers";
import { Signal } from "./type";
import { AutoCrossChecker } from "../../rek/event/crosschecker/autochecker";
import { AutoCrossCheckParam } from "../../rek/event/crosschecker/interface";
import { ONE_MINUTE_MS } from "../../rek/constants";

export interface EventSignalRegisterParams {
address: string,
abi: any,
eventName: string,
// esig?: string,
}

export type EventSignalCallback = ethers.Listener

export class EventSignal implements Signal {

provider?: ethers.JsonRpcProvider | ethers.WebSocketProvider;
contract: ethers.Contract;
esig: string
eventFragment: EventFragment

subscribeCallback: EventSignalCallback
crosscheckCallback: EventSignalCallback

crosschecker?: AutoCrossChecker;
crosscheckerOptions?: AutoCrossCheckParam;

constructor(
public params: EventSignalRegisterParams,
public callback: EventSignalCallback
) {
this.contract = new ethers.Contract(
params.address,
params.abi,
)

// Get the event fragment by name
const iface = this.contract.interface;
const _ef = iface.getEvent(params.eventName);
if (!_ef) throw new Error("")
this.eventFragment = _ef;

this.esig = this.eventFragment.topicHash

// to align with crosschecker onMissing, parse the last arg from ContractEventPayload to EventLog
this.subscribeCallback = async (...args: Array<any>) => {
const _contractEventPayload = args.pop()
await this.callback(...args, _contractEventPayload.log);
}
// to align with subscribe listener, parse event params and add EventLog to the last
this.crosscheckCallback = async (log: Log) => {
const parsedLog = this.contract.interface.decodeEventLog(this.eventFragment, log.data, log.topics);
await this.callback(...parsedLog, log)
}
}

// TODO: how to integrate crosschecker
// TODO: should be wsProvider only?
listen(provider: ethers.WebSocketProvider, crosscheckProvider?: ethers.JsonRpcProvider) {
this.provider = provider

// start event listener
const listener = this.contract.connect(provider)
listener?.on(
this.params.eventName,
// TODO: calling this seems to be async, should we make it to sequential?
this.subscribeCallback,
)

// start cross-checker if ever set
if (this.crosscheckerOptions) {
if (!crosscheckProvider)
throw new Error("crosschecker set, please provide crosscheckProvider to listen function")
this.startCrossChecker(crosscheckProvider)
}

return this;
}

async startCrossChecker(provider: ethers.JsonRpcProvider) {
if (!this.crosscheckerOptions)
throw new Error("no crosscheck set, can't start crosschecker");
this.crosschecker = new AutoCrossChecker(provider, this.crosscheckerOptions)
await this.crosschecker.start(this.crosscheckerOptions);
}

// TODO: hide address & topics & onMissingLog from interface AutoCrossCheckParam
crosscheck(options?: Omit<AutoCrossCheckParam, 'address' | 'topics' | 'onMissingLog'>) {
const {
intervalMsMin = ONE_MINUTE_MS * 60,
ignoreLogs = [],
} = options ?? {}
// save crosschecker param
this.crosscheckerOptions = {
...options,
address: this.params.address,
topics: [this.esig],
onMissingLog: this.crosscheckCallback,
intervalMsMin,
ignoreLogs,
}
return this;
}
}
2 changes: 2 additions & 0 deletions src/orap/signal/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./event"
export * from "./type"
6 changes: 6 additions & 0 deletions src/orap/signal/type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable no-unused-vars */

export interface Signal {
listen(options: any): void;
}
1 change: 1 addition & 0 deletions src/orap/store/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./storemanager"
28 changes: 28 additions & 0 deletions src/orap/store/storemanager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/* eslint-disable no-unused-vars */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { retryOnRet, Store, SimpleStoreManager } from "../../utils";

export class StoreManager extends SimpleStoreManager {
queryDelay: number

constructor(
store?: Store, options?: {queryDelay?: number}
) {
super(store)
this.queryDelay = options?.queryDelay ?? 3000
}

async keys(pattern?: string, isWait = false): Promise<string[]> {
const _keys = () => {
// console.log('[cache] _keys wait non-null')
return super.keys(pattern)
}
if (isWait) {
return retryOnRet(_keys, async (rst) => {
return rst && (await rst).length > 0
}, { delay: this.queryDelay });
} else {
return _keys()
}
}
}
35 changes: 35 additions & 0 deletions src/orap/task/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { deepMerge, Constructor } from '@murongg/utils';

export abstract class TaskBase {
abstract toKey(): string

toString() {
const obj: Record<string, any> = {};
deepMerge(obj, this);
return this.stringify(obj);
}

private stringify(obj: Record<string, any>) {
const replace = (key: string, value: any) => {
if (typeof value === 'bigint') {
return value.toString();
} else {
return value
}
}
return JSON.stringify(obj, replace);
}

fromString(jsonString: string) {
const obj = JSON.parse(jsonString);
Object.assign(this, obj);
return this
}

static fromString<T extends TaskBase>(this: Constructor<T>, jsonString: string): T {
const instance = new this();
instance.fromString(jsonString)
return instance;
}
}
2 changes: 2 additions & 0 deletions src/orap/task/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./base"
export * from "./storable"
45 changes: 45 additions & 0 deletions src/orap/task/storable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Constructor } from '@murongg/utils';
import { StoreManager } from '../store/storemanager';
import { TaskBase } from './base';

export abstract class TaskStorable extends TaskBase {
// overwrite these for store key customize
static readonly taskPrefix: string = "Task:"
static readonly taskPrefixDone: string = "Done-Task:"

getTaskPrefix(): string {
return (this.constructor as typeof TaskStorable).taskPrefix;
}

getTaskPrefixDone(): string {
return (this.constructor as typeof TaskStorable).taskPrefixDone;
}

static async _load<T extends TaskStorable>(this: Constructor<T>, sm: StoreManager): Promise<T> {
const instance = new this();
// get all task keys
const keys = await sm.keys(instance.getTaskPrefix()+'*', true)
// get the first task (del when finish)
const serializedTask: string = (await sm.get(keys[0]))! // never undefined ensured by keys isWait=true

return instance.fromString(serializedTask)
}

static async load<T extends TaskStorable>(this: Constructor<T>, sm: StoreManager): Promise<T> {
return await (this as any)._load(sm)
}

async save(sm: StoreManager) {
await sm.set(this.getTaskPrefix() + this.toKey(), this.toString())
}

async remove(sm: StoreManager) {
await sm.del(this.getTaskPrefix() + this.toKey())
}

async done(sm: StoreManager) {
await sm.set(this.getTaskPrefixDone() + this.toKey(), this.toString())
await this.remove(sm)
}
}
47 changes: 47 additions & 0 deletions src/rek/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Rek: Reliable ETH Kit

## Event Cross-Check
- used for cross-checking missing events using eth getLogs
- note: cross-check interval shouldn't < 5min, otherwise rpc call would be expensive. Recommend set to 1 hour + 50 blocks (for redundancy check).

### BaseCrossChecker
- `onMissingLog: FnOnMissingLog`: the callback function that will be called when there's a missing log found;
- `ignoreLogs?: SimpleLog[]` : it allows users to pass in the txhash list (,log index list) that already processed, let the crosschecker to ignore then.

**crossCheckRange**

**crossCheckRetro**

**crossCheckFrom**

### AutoCrossCheck
Conceptually it supports 'catchup', 'realtime', 'mix' modes, controled by `fromBlock` and `toBlock`;
- realtime mode: run over the latest block data; enter when `fromBlock` and `toBlock` are NOT present; starts from `latestblocknum` and never ends, always waits until `latestblocknum >= lastcheckpoint + batchBlocksCount`;
- catchup mode: run over historical block data; enter when `fromBlock` and `toBlock` is present; starts from `fromBlock` and ends at `toBlock`;
- mix mode: start in catchup mode and transit to realtime mode; enter when `fromBlock` is present and `toBlock` is NOT; auto-transit when `lastcheckpoint > latestblocknum - batchBlocksCount`;

i.e. It starts with 'realtime' mode by default.

Options:
- `store`?: the Store used to cache the <txhash, logindex> that already processed.
- `batchBlocksCount`?: how many blocks to get per `getLogs` check, in readtime mode it waits until the new block num >= `batchBlocksCount`.
- `intervalMsMin`?: mostly for limiting getLogs calling rate in catchup mode; how long does it take at least between 2 checks
- `blockIntervalMs`?: the block interval (in ms) of the given chain, default: 12000 for eth
- `delayBlockFromLatest`?: mostly for realtime mode; each time cc wait until `latest height > toBlock + delayBlockFromLatest`
- `fromBlock`?: once specified, it means start catching up from historical blocks
- `toBlock`?: once specified, it means the crosscheck isn't infinite and will end at this height; need `fromBlock` present if this set

**Usage**
```ts
const acc = new AutoCrossChecker(provider);
await acc.start({
onMissingLog,
ignoreLogs,
fromBlock: 20003371, // optional, empty to start from latest
toBlock: 20003371, // optional, empty to enter continueous cc
address: CONTRACT_ADDRESS,
topics,
batchBlocksCount: 1,
intervalMsMin: 3000,
});
```
3 changes: 3 additions & 0 deletions src/rek/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const ETH_BLOCK_COUNT_ONE_HOUR = (60 / 12) * 60
export const ETH_BLOCK_INTERVAL_MS = 12 * 1000
export const ONE_MINUTE_MS = 60 * 1000
Loading

0 comments on commit 2593601

Please sign in to comment.