-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: sync by indexer RPCs #863
Changes from 13 commits
16fa2a4
ece382f
81de9aa
f913275
61e67a1
96c4977
a038e18
a7ad2d5
8ce59ef
c231ad1
14331b5
0888077
a087114
6be360e
acbafd0
2ec2529
537dcbf
ad1ffc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import {MigrationInterface, QueryRunner} from "typeorm"; | ||
|
||
export class AddConfirmed1565693320664 implements MigrationInterface { | ||
|
||
public async up(queryRunner: QueryRunner): Promise<any> { | ||
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'confirmed' boolean NOT NULL DEFAULT false;`) | ||
} | ||
|
||
public async down(queryRunner: QueryRunner): Promise<any> { | ||
await queryRunner.dropColumn('transaction', 'confirmed') | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
import Core from '@nervosnetwork/ckb-sdk-core' | ||
|
||
export default class IndexerRPC { | ||
private core: Core | ||
|
||
constructor(url: string) { | ||
this.core = new Core(url) | ||
} | ||
|
||
public deindexLockHash = async (lockHash: string) => { | ||
return this.core.rpc.deindexLockHash(lockHash) | ||
} | ||
|
||
public indexLockHash = async (lockHash: string, indexFrom = '0') => { | ||
return this.core.rpc.indexLockHash(lockHash, indexFrom) | ||
} | ||
|
||
public getTransactionByLockHash = async ( | ||
lockHash: string, | ||
page: string, | ||
per: string, | ||
reverseOrder: boolean = false | ||
) => { | ||
const result = await this.core.rpc.getTransactionsByLockHash(lockHash, page, per, reverseOrder) | ||
return result | ||
} | ||
|
||
public getLockHashIndexStates = async () => { | ||
return this.core.rpc.getLockHashIndexStates() | ||
} | ||
|
||
public getLiveCellsByLockHash = async ( | ||
lockHash: string, | ||
page: string, | ||
per: string, | ||
reverseOrder: boolean = false | ||
) => { | ||
const result = await this.core.rpc.getLiveCellsByLockHash(lockHash, page, per, reverseOrder) | ||
return result | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
import { Subject, Subscription } from 'rxjs' | ||
import Utils from 'services/sync/utils' | ||
import logger from 'utils/logger' | ||
import GetBlocks from 'services/sync/get-blocks' | ||
import { Transaction } from 'types/cell-types' | ||
import TypeConvert from 'types/type-convert' | ||
import BlockNumber from 'services/sync/block-number' | ||
import AddressesUsedSubject from 'models/subjects/addresses-used-subject' | ||
import LockUtils from 'models/lock-utils' | ||
import TransactionPersistor from 'services/tx/transaction-persistor' | ||
import IndexerTransaction from 'services/tx/indexer-transaction' | ||
|
||
import IndexerRPC from './indexer-rpc' | ||
|
||
enum TxPointType { | ||
CreatedBy = 'createdBy', | ||
ConsumedBy = 'consumedBy', | ||
} | ||
|
||
export default class Queue { | ||
private lockHashes: string[] | ||
private indexerRPC: IndexerRPC | ||
private getBlocksService: GetBlocks | ||
private per = 50 | ||
private interval = 1000 | ||
private blockNumberService: BlockNumber | ||
private tipNumberListener: Subscription | ||
private tipBlockNumber: bigint = BigInt(-1) | ||
|
||
private stopped = false | ||
private indexed = false | ||
|
||
private inProcess = false | ||
|
||
constructor(url: string, lockHashes: string[], tipNumberSubject: Subject<string | undefined>) { | ||
this.lockHashes = lockHashes | ||
this.indexerRPC = new IndexerRPC(url) | ||
this.getBlocksService = new GetBlocks() | ||
this.blockNumberService = new BlockNumber() | ||
this.tipNumberListener = tipNumberSubject.subscribe(async (num: string) => { | ||
if (num) { | ||
this.tipBlockNumber = BigInt(num) | ||
} | ||
}) | ||
} | ||
|
||
public setLockHashes = (lockHashes: string[]): void => { | ||
this.lockHashes = lockHashes | ||
this.indexed = false | ||
} | ||
|
||
/* eslint no-await-in-loop: "off" */ | ||
/* eslint no-restricted-syntax: "off" */ | ||
public start = async () => { | ||
while (!this.stopped) { | ||
try { | ||
this.inProcess = true | ||
const { lockHashes } = this | ||
const currentBlockNumber: bigint = await this.blockNumberService.getCurrent() | ||
if (!this.indexed || currentBlockNumber !== this.tipBlockNumber) { | ||
if (!this.indexed) { | ||
await this.indexLockHashes(lockHashes) | ||
this.indexed = true | ||
} | ||
const minBlockNumber = await this.getCurrentBlockNumber(lockHashes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the difference between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
for (const lockHash of lockHashes) { | ||
await this.pipeline(lockHash, TxPointType.CreatedBy, currentBlockNumber) | ||
} | ||
for (const lockHash of lockHashes) { | ||
await this.pipeline(lockHash, TxPointType.ConsumedBy, currentBlockNumber) | ||
} | ||
if (minBlockNumber) { | ||
await this.blockNumberService.updateCurrent(minBlockNumber) | ||
} | ||
} | ||
await this.yield(this.interval) | ||
} catch (err) { | ||
if (err.message.startsWith('connect ECONNREFUSED')) { | ||
logger.debug('sync indexer error:', err) | ||
} else { | ||
logger.error('sync indexer error:', err) | ||
} | ||
} finally { | ||
await this.yield() | ||
this.inProcess = false | ||
} | ||
} | ||
} | ||
|
||
public processFork = async () => { | ||
while (!this.stopped) { | ||
try { | ||
const tip = this.tipBlockNumber | ||
const txs = await IndexerTransaction.txHashes() | ||
for (const tx of txs) { | ||
const result = await this.getBlocksService.getTransaction(tx.hash) | ||
if (!result) { | ||
shaojunda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
await IndexerTransaction.deleteTxWhenFork(tx.hash) | ||
} else if (tip - BigInt(tx.blockNumber) >= 1000) { | ||
await IndexerTransaction.confirm(tx.hash) | ||
} | ||
} | ||
} catch (err) { | ||
logger.error(`indexer delete forked tx:`, err) | ||
} finally { | ||
await this.yield(10000) | ||
} | ||
} | ||
} | ||
|
||
public getCurrentBlockNumber = async (lockHashes: string[]) => { | ||
// get lock hash indexer status | ||
const lockHashIndexStates = await this.indexerRPC.getLockHashIndexStates() | ||
const blockNumbers = lockHashIndexStates | ||
.filter(state => lockHashes.includes(state.lockHash)) | ||
.map(state => state.blockNumber) | ||
const uniqueBlockNumbers = [...new Set(blockNumbers)] | ||
const blockNumbersBigInt = uniqueBlockNumbers.map(num => BigInt(num)) | ||
const minBlockNumber = blockNumbersBigInt.sort()[0] | ||
return minBlockNumber | ||
} | ||
|
||
public indexLockHashes = async (lockHashes: string[]) => { | ||
const lockHashIndexStates = await this.indexerRPC.getLockHashIndexStates() | ||
const indexedLockHashes: string[] = lockHashIndexStates.map(state => state.lockHash) | ||
const nonIndexedLockHashes = lockHashes.filter(i => !indexedLockHashes.includes(i)) | ||
|
||
await Utils.mapSeries(nonIndexedLockHashes, async (lockHash: string) => { | ||
await this.indexerRPC.indexLockHash(lockHash) | ||
}) | ||
} | ||
|
||
// type: 'createdBy' | 'consumedBy' | ||
public pipeline = async (lockHash: string, type: TxPointType, startBlockNumber: bigint) => { | ||
let page = 0 | ||
let stopped = false | ||
while (!stopped) { | ||
const txs = await this.indexerRPC.getTransactionByLockHash(lockHash, page.toString(), this.per.toString()) | ||
if (txs.length < this.per) { | ||
stopped = true | ||
} | ||
for (const tx of txs) { | ||
let txPoint: CKBComponents.TransactionPoint | null = null | ||
if (type === TxPointType.CreatedBy) { | ||
txPoint = tx.createdBy | ||
} else if (type === TxPointType.ConsumedBy) { | ||
txPoint = tx.consumedBy | ||
} | ||
|
||
if ( | ||
txPoint && | ||
(BigInt(txPoint.blockNumber) >= startBlockNumber || this.tipBlockNumber - BigInt(txPoint.blockNumber) < 1000) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to extract this expression into a method? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need, only use once. |
||
) { | ||
const transactionWithStatus = await this.getBlocksService.getTransaction(txPoint.txHash) | ||
const ckbTransaction: CKBComponents.Transaction = transactionWithStatus.transaction | ||
const transaction: Transaction = TypeConvert.toTransaction(ckbTransaction) | ||
// tx timestamp / blockNumber / blockHash | ||
const { blockHash } = transactionWithStatus.txStatus | ||
if (blockHash) { | ||
const blockHeader = await this.getBlocksService.getHeader(blockHash) | ||
transaction.blockHash = blockHash | ||
transaction.blockNumber = blockHeader.number | ||
transaction.timestamp = blockHeader.timestamp | ||
} | ||
// broadcast address used | ||
const txEntity = await TransactionPersistor.saveFetchTx(transaction) | ||
|
||
let address: string | undefined | ||
if (type === TxPointType.CreatedBy) { | ||
address = LockUtils.lockScriptToAddress(transaction.outputs![+txPoint.index].lock) | ||
} else if (type === TxPointType.ConsumedBy) { | ||
const input = txEntity.inputs[+txPoint.index] | ||
const output = await IndexerTransaction.updateInputLockHash(input.outPointTxHash!, input.outPointIndex!) | ||
if (output) { | ||
address = LockUtils.lockScriptToAddress(output.lock) | ||
} | ||
} | ||
if (address) { | ||
AddressesUsedSubject.getSubject().next([address]) | ||
} | ||
} | ||
} | ||
page += 1 | ||
} | ||
} | ||
|
||
public stop = () => { | ||
this.tipNumberListener.unsubscribe() | ||
this.stopped = true | ||
} | ||
|
||
public waitForDrained = async (timeout: number = 5000) => { | ||
shaojunda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const startAt: number = +new Date() | ||
while (this.inProcess) { | ||
const now: number = +new Date() | ||
if (now - startAt > timeout) { | ||
return | ||
} | ||
await this.yield(50) | ||
} | ||
} | ||
|
||
public stopAndWait = async () => { | ||
this.stop() | ||
await this.waitForDrained() | ||
} | ||
|
||
private yield = async (millisecond: number = 1) => { | ||
await Utils.sleep(millisecond) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
import Core from '@nervosnetwork/ckb-sdk-core' | ||
|
||
import { Block } from 'types/cell-types' | ||
import { Block, BlockHeader } from 'types/cell-types' | ||
import TypeConvert from 'types/type-convert' | ||
import { NetworkWithID } from 'services/networks' | ||
import CheckAndSave from './check-and-save' | ||
|
@@ -55,6 +55,16 @@ export default class GetBlocks { | |
return block | ||
} | ||
|
||
public getTransaction = async (hash: string): Promise<CKBComponents.TransactionWithStatus> => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need, it's block info too. |
||
const tx = await core.rpc.getTransaction(hash) | ||
return tx | ||
} | ||
|
||
public getHeader = async (hash: string): Promise<BlockHeader> => { | ||
const result = await core.rpc.getHeader(hash) | ||
return TypeConvert.toBlockHeader(result) | ||
} | ||
|
||
public static getBlockByNumber = async (num: string): Promise<Block> => { | ||
const block = await core.rpc.getBlockByNumber(num) | ||
return TypeConvert.toBlock(block) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I understand we have different folders - to some extend they act like namespaces, having multiple
Queue
classes worries me that it could confuse reading and understanding our design and implementation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
IndexerQueue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's better. We have two queue implementations for two different modules, then we name them differently for clarification. Please do so.