diff --git a/.gitpod.dockerfile b/.gitpod.dockerfile new file mode 100644 index 0000000000..aca6c59500 --- /dev/null +++ b/.gitpod.dockerfile @@ -0,0 +1 @@ +FROM gitpod/workspace-postgresql \ No newline at end of file diff --git a/.gitpod.yml b/.gitpod.yml new file mode 100644 index 0000000000..e4577dc555 --- /dev/null +++ b/.gitpod.yml @@ -0,0 +1,11 @@ +# This configuration file was automatically generated by Gitpod. +# Please adjust to your needs (see https://www.gitpod.io/docs/config-gitpod-file) +# and commit this file to your remote git repository to share the goodness with others. +image: + file: .gitpod.dockerfile + +tasks: + - init: yarn install && yarn run build + + + diff --git a/package.json b/package.json index fb4ac291a9..e34a974fd2 100644 --- a/package.json +++ b/package.json @@ -8,6 +8,7 @@ "@actions/core": "^1.6.0", "@babel/preset-env": "^7.16.11", "@octokit/request": "^5.6.3", + "@types/cron-converter": "^1", "@types/node": "^14.18.10", "@typescript-eslint/eslint-plugin": "^5.10.2", "@typescript-eslint/parser": "^5.10.2", diff --git a/packages/common-substrate/src/project/models.ts b/packages/common-substrate/src/project/models.ts index 7fddeb3df8..b97db59e02 100644 --- a/packages/common-substrate/src/project/models.ts +++ b/packages/common-substrate/src/project/models.ts @@ -42,6 +42,9 @@ export class BlockFilter implements SubstrateBlockFilter { @IsOptional() @IsInt() modulo?: number; + @IsOptional() + @IsString() + timestamp?: string; } export class EventFilter extends BlockFilter implements SubstrateEventFilter { diff --git a/packages/node/package.json b/packages/node/package.json index 020d88bea4..9ac21247c3 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -33,6 +33,7 @@ "@subql/x-merkle-mountain-range": "2.0.0-0.1.2", "@willsoto/nestjs-prometheus": "^4.4.0", "app-module-path": "^2.2.0", + "cron-converter": "^1.0.2", "dayjs": "^1.10.7", "eventemitter2": "^6.4.5", "fetch-h2": "3.0.2", diff --git a/packages/node/src/configure/SubqueryProject.ts b/packages/node/src/configure/SubqueryProject.ts index 7e1ca98671..ee88eab1aa 100644 --- a/packages/node/src/configure/SubqueryProject.ts +++ b/packages/node/src/configure/SubqueryProject.ts @@ -1,6 +1,7 @@ // Copyright 2020-2022 OnFinality Limited authors & contributors // SPDX-License-Identifier: Apache-2.0 +import { ApiPromise } from '@polkadot/api'; import { RegisteredTypes } from '@polkadot/types/types'; import { ReaderFactory, @@ -18,19 +19,31 @@ import { SubstrateDataSource, FileType, ProjectManifestV1_0_0Impl, + SubstrateBlockFilter, + isRuntimeDs, + SubstrateHandlerKind, } from '@subql/common-substrate'; import { buildSchemaFromString } from '@subql/utils'; +import Cron from 'cron-converter'; import { GraphQLSchema } from 'graphql'; import { getChainTypes, getProjectRoot, updateDataSourcesV0_2_0, } from '../utils/project'; +import { getBlockByHeight, getTimestamp } from '../utils/substrate'; export type SubqlProjectDs = SubstrateDataSource & { mapping: SubstrateDataSource['mapping'] & { entryScript: string }; }; +export type SubqlProjectBlockFilter = SubstrateBlockFilter & { + cronSchedule?: { + schedule: Cron.Seeker; + next: number; + }; +}; + export type SubqlProjectDsTemplate = Omit & { name: string; }; @@ -228,3 +241,53 @@ async function loadProjectTemplates( })); } } + +// eslint-disable-next-line @typescript-eslint/require-await +export async function generateTimestampReferenceForBlockFilters( + dataSources: SubqlProjectDs[], + api: ApiPromise, +): Promise { + const cron = new Cron(); + + dataSources = await Promise.all( + dataSources.map(async (ds) => { + if (isRuntimeDs(ds)) { + const startBlock = ds.startBlock ?? 1; + let block; + let timestampReference; + + ds.mapping.handlers = await Promise.all( + ds.mapping.handlers.map(async (handler) => { + if (handler.kind === SubstrateHandlerKind.Block) { + if (handler.filter?.timestamp) { + if (!block) { + block = await getBlockByHeight(api, startBlock); + timestampReference = getTimestamp(block); + } + try { + cron.fromString(handler.filter.timestamp); + } catch (e) { + throw new Error( + `Invalid Cron string: ${handler.filter.timestamp}`, + ); + } + + const schedule = cron.schedule(timestampReference); + (handler.filter as SubqlProjectBlockFilter).cronSchedule = { + schedule: schedule, + get next() { + return Date.parse(this.schedule.next().format()); + }, + }; + } + } + return handler; + }), + ); + } + return ds; + }), + ); + + return dataSources; +} diff --git a/packages/node/src/indexer/fetch.service.ts b/packages/node/src/indexer/fetch.service.ts index 5a66175e70..e7f3569fad 100644 --- a/packages/node/src/indexer/fetch.service.ts +++ b/packages/node/src/indexer/fetch.service.ts @@ -569,7 +569,9 @@ export class FetchService implements OnApplicationShutdown { const { _metadata: metaData } = dictionary; if (metaData.genesisHash !== this.api.genesisHash.toString()) { - logger.error('The dictionary that you have specified does not match the chain you are indexing, it will be ignored. Please update your project manifest to reference the correct dictionary'); + logger.error( + 'The dictionary that you have specified does not match the chain you are indexing, it will be ignored. Please update your project manifest to reference the correct dictionary', + ); this.useDictionary = false; this.eventEmitter.emit(IndexerEvent.UsingDictionary, { value: Number(this.useDictionary), diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index eff49d4abc..9584dcb593 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -33,7 +33,11 @@ import { SubstrateExtrinsic, } from '@subql/types'; import { Sequelize } from 'sequelize'; -import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject'; +import { + generateTimestampReferenceForBlockFilters, + SubqlProjectDs, + SubqueryProject, +} from '../configure/SubqueryProject'; import * as SubstrateUtil from '../utils/substrate'; import { yargsOptions } from '../yargs'; import { ApiService } from './api.service'; @@ -176,11 +180,13 @@ export class IndexerManager { async start(): Promise { await this.projectService.init(); + logger.info('indexer manager started'); } private filterDataSources(nextProcessingHeight: number): SubqlProjectDs[] { let filteredDs: SubqlProjectDs[]; - filteredDs = this.project.dataSources.filter( + + filteredDs = this.projectService.dataSources.filter( (ds) => ds.startBlock <= nextProcessingHeight, ); diff --git a/packages/node/src/indexer/project.service.ts b/packages/node/src/indexer/project.service.ts index eba805934a..fce7d831bb 100644 --- a/packages/node/src/indexer/project.service.ts +++ b/packages/node/src/indexer/project.service.ts @@ -18,7 +18,11 @@ import { getMetaDataInfo, } from '@subql/node-core'; import { Sequelize } from 'sequelize'; -import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject'; +import { + generateTimestampReferenceForBlockFilters, + SubqlProjectDs, + SubqueryProject, +} from '../configure/SubqueryProject'; import { initDbSchema } from '../utils/project'; import { ApiService } from './api.service'; import { DsProcessorService } from './ds-processor.service'; @@ -55,6 +59,10 @@ export class ProjectService { return this._schema; } + get dataSources(): SubqlProjectDs[] { + return this.project.dataSources; + } + get blockOffset(): number { return this._blockOffset; } @@ -71,6 +79,10 @@ export class ProjectService { // Used to load assets into DS-processor, has to be done in any thread await this.dsProcessorService.validateProjectCustomDatasources(); // Do extra work on main thread to setup stuff + this.project.dataSources = await generateTimestampReferenceForBlockFilters( + this.project.dataSources, + this.apiService.getApi(), + ); if (isMainThread) { this._schema = await this.ensureProject(); await this.initDbSchema(); diff --git a/packages/node/src/utils/substrate.test.ts b/packages/node/src/utils/substrate.test.ts index b0eb05ea66..d4cf849bc2 100644 --- a/packages/node/src/utils/substrate.test.ts +++ b/packages/node/src/utils/substrate.test.ts @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 import { ApiPromise, WsProvider } from '@polkadot/api'; -import { fetchBlocks } from './substrate'; +import Cron from 'cron-converter'; +import { SubqlProjectBlockFilter } from '../configure/SubqueryProject'; +import { fetchBlocks, filterBlock } from './substrate'; const endpoint = 'wss://polkadot.api.onfinality.io/public-ws'; @@ -29,6 +31,45 @@ describe('substrate utils', () => { } }); + it('filters blocks based on timestamp', async () => { + const cronString = '*/5 * * * *'; + const cron = new Cron(); + try { + cron.fromString(cronString); + } catch (e) { + throw new Error(`invalid cron expression: ${cronString}`); + } + const blocks = await fetchBlocks(api, 100000, 100100); + const reference = blocks[0].block.timestamp; + const schedule = cron.schedule(reference); + const filter: SubqlProjectBlockFilter = { + timestamp: cronString, + cronSchedule: { + schedule: schedule, + get next() { + return Date.parse(this.schedule.next().format()); + }, + }, + }; + const filteredBlocks = blocks.filter((block) => { + return filterBlock(block.block, filter) !== undefined; + }); + + expect(filteredBlocks).toHaveLength(2); + }); + + it('invalid timestamp throws error on cron creation', () => { + const cronString = 'invalid cron'; + const cron = new Cron(); + expect(() => { + try { + cron.fromString(cronString); + } catch (e) { + throw new Error(`invalid cron expression: ${cronString}`); + } + }).toThrow(Error); + }); + it.skip('when failed to fetch, log block height and re-throw error', async () => { //some large number of block height await expect(fetchBlocks(api, 100000000, 100000019)).rejects.toThrow( diff --git a/packages/node/src/utils/substrate.ts b/packages/node/src/utils/substrate.ts index d6d13972a5..7bea4b6010 100644 --- a/packages/node/src/utils/substrate.ts +++ b/packages/node/src/utils/substrate.ts @@ -23,6 +23,7 @@ import { SubstrateExtrinsic, } from '@subql/types'; import { last, merge, range } from 'lodash'; +import { SubqlProjectBlockFilter } from '../configure/SubqueryProject'; import { BlockContent } from '../indexer/types'; const logger = getLogger('fetch'); const INTERVAL_THRESHOLD = BN_THOUSAND.div(BN_TWO); @@ -41,7 +42,7 @@ export function wrapBlock( }); } -function getTimestamp({ block: { extrinsics } }: SignedBlock): Date { +export function getTimestamp({ block: { extrinsics } }: SignedBlock): Date { for (const e of extrinsics) { const { method: { method, section }, @@ -123,6 +124,11 @@ export function filterBlock( ): SubstrateBlock | undefined { if (!filter) return block; if (!filterBlockModulo(block, filter)) return; + if (filter.timestamp) { + if (!filterBlockTimestamp(block, filter as SubqlProjectBlockFilter)) { + return; + } + } return filter.specVersion === undefined || block.specVersion === undefined || checkSpecRange(filter.specVersion, block.specVersion) @@ -139,6 +145,30 @@ export function filterBlockModulo( return block.block.header.number.toNumber() % modulo === 0; } +export function filterBlockTimestamp( + block: SubstrateBlock, + filter: SubqlProjectBlockFilter, +): boolean { + const unixTimestamp = block.timestamp.getTime(); + if (unixTimestamp > filter.cronSchedule.next) { + logger.info( + `Block with timestamp ${new Date( + unixTimestamp, + ).toString()} is about to be indexed`, + ); + logger.info( + `Next block will be indexed at ${new Date( + filter.cronSchedule.next, + ).toString()}`, + ); + filter.cronSchedule.schedule.prev(); + return true; + } else { + filter.cronSchedule.schedule.prev(); + return false; + } +} + export function filterExtrinsic( { block, extrinsic, success }: SubstrateExtrinsic, filter?: SubstrateCallFilter, @@ -252,7 +282,7 @@ export async function fetchBlocks( }); } -async function getBlockByHeight( +export async function getBlockByHeight( api: ApiPromise, height: number, ): Promise { diff --git a/packages/types/src/project.ts b/packages/types/src/project.ts index c6c4d5f672..18b7462304 100644 --- a/packages/types/src/project.ts +++ b/packages/types/src/project.ts @@ -51,6 +51,7 @@ interface SubstrateBaseHandlerFilter { export interface SubstrateBlockFilter extends SubstrateBaseHandlerFilter { modulo?: number; + timestamp?: string; } export interface SubstrateEventFilter extends SubstrateBaseHandlerFilter { diff --git a/yarn.lock b/yarn.lock index bf75031986..4c88a66348 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3801,6 +3801,7 @@ __metadata: "@types/yargs": ^16.0.4 "@willsoto/nestjs-prometheus": ^4.4.0 app-module-path: ^2.2.0 + cron-converter: ^1.0.2 dayjs: ^1.10.7 dotenv: ^15.0.1 eventemitter2: ^6.4.5 @@ -4168,6 +4169,15 @@ __metadata: languageName: node linkType: hard +"@types/cron-converter@npm:^1": + version: 1.0.1 + resolution: "@types/cron-converter@npm:1.0.1" + dependencies: + moment: ">=2.14.0" + checksum: 93a0a81d44a12d7cc748e124e459037349863ecf10c47316c7693e96367c735fe463afb2bd22f82c5bdb2679e67f247dfca71403d3e4df0bee2d24b63698004a + languageName: node + linkType: hard + "@types/debug@npm:^4.1.4, @types/debug@npm:^4.1.5, @types/debug@npm:^4.1.7": version: 4.1.7 resolution: "@types/debug@npm:4.1.7" @@ -7176,6 +7186,16 @@ __metadata: languageName: node linkType: hard +"cron-converter@npm:^1.0.2": + version: 1.0.2 + resolution: "cron-converter@npm:1.0.2" + dependencies: + moment-timezone: ~0.5 + sprintf-js: ~1 + checksum: 7bbd01f29a6fc5a8a8c8dabda3a6468b7040acd374beadbf103d6c86f9e85c961a0937cfa2a997e4bfcdcd6bd07cde658a3009b899130e69e6c9ceb5f0a82ac3 + languageName: node + linkType: hard + "cron@npm:1.8.2": version: 1.8.2 resolution: "cron@npm:1.8.2" @@ -12463,6 +12483,15 @@ __metadata: languageName: node linkType: hard +"moment-timezone@npm:~0.5": + version: 0.5.37 + resolution: "moment-timezone@npm:0.5.37" + dependencies: + moment: ">= 2.9.0" + checksum: b3fc24848a396ee3801331a4b5511591df0e22241e59f55200a475603f23a8429f01887294c7a0c005dae25b8e4bef20472fbc875ff20080f26fded066559f1a + languageName: node + linkType: hard + "moment@npm:>= 2.9.0, moment@npm:^2.29.1": version: 2.29.3 resolution: "moment@npm:2.29.3" @@ -12470,6 +12499,13 @@ __metadata: languageName: node linkType: hard +"moment@npm:>=2.14.0": + version: 2.29.4 + resolution: "moment@npm:2.29.4" + checksum: 0ec3f9c2bcba38dc2451b1daed5daded747f17610b92427bebe1d08d48d8b7bdd8d9197500b072d14e326dd0ccf3e326b9e3d07c5895d3d49e39b6803b76e80e + languageName: node + linkType: hard + "mri@npm:^1.1.5": version: 1.2.0 resolution: "mri@npm:1.2.0" @@ -15331,6 +15367,13 @@ __metadata: languageName: node linkType: hard +"sprintf-js@npm:~1": + version: 1.1.2 + resolution: "sprintf-js@npm:1.1.2" + checksum: d4bb46464632b335e5faed381bd331157e0af64915a98ede833452663bc672823db49d7531c32d58798e85236581fb7342fd0270531ffc8f914e186187bf1c90 + languageName: node + linkType: hard + "sprintf-js@npm:~1.0.2": version: 1.0.3 resolution: "sprintf-js@npm:1.0.3" @@ -15615,6 +15658,7 @@ __metadata: "@actions/core": ^1.6.0 "@babel/preset-env": ^7.16.11 "@octokit/request": ^5.6.3 + "@types/cron-converter": ^1 "@types/node": ^14.18.10 "@typescript-eslint/eslint-plugin": ^5.10.2 "@typescript-eslint/parser": ^5.10.2