Skip to content

Commit

Permalink
Merge pull request #863 from nervosnetwork/impl-indexer
Browse files Browse the repository at this point in the history
feat: sync by indexer RPCs
  • Loading branch information
classicalliu authored Aug 19, 2019
2 parents ce9597b + ad1ffc9 commit 6f3b695
Show file tree
Hide file tree
Showing 13 changed files with 592 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ export default class Transaction extends BaseEntity {
})
updatedAt!: string

// only used for check fork in indexer mode
@Column({
type: 'boolean',
})
confirmed: boolean = false

@OneToMany(_type => InputEntity, input => input.transaction)
inputs!: InputEntity[]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
import {MigrationInterface, QueryRunner, TableColumn, getConnection} from "typeorm";
import {MigrationInterface, QueryRunner, TableColumn, getConnection, In} from "typeorm";
import TransactionEntity from '../entities/transaction'
import { OutputStatus } from '../../../services/tx/params'
import { TransactionStatus } from '../../../types/cell-types'
import OutputEntity from 'database/chain/entities/output'

export class AddStatusToTx1562038960990 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'status' varchar NOT NULL DEFAULT '';`)
// TransactionStatus.Success = 'success'
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'status' varchar NOT NULL DEFAULT 'success';`)

const pendingTxHashes: string[] = (await getConnection()
.getRepository(OutputEntity)
.createQueryBuilder('output')
.select(`output.outPointTxHash`, 'txHash')
.where({
status: OutputStatus.Sent
})
.getRawMany())
.filter(output => output.txHash)
await getConnection()
.createQueryBuilder()
.update(TransactionEntity)
.set({ status: TransactionStatus.Pending })
.where({
hash: In(pendingTxHashes)
})
.execute()

const txs = await getConnection()
.getRepository(TransactionEntity)
.find({ relations: ['inputs', 'outputs'] })
const updatedTxs = txs.map(tx => {
tx.status = tx.outputs[0].status === OutputStatus.Sent ? TransactionStatus.Pending : TransactionStatus.Success
return tx
})
await getConnection().manager.save(updatedTxs)
await queryRunner.changeColumn('transaction', 'status', new TableColumn({
name: 'status',
type: 'varchar',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {MigrationInterface, QueryRunner} from "typeorm";

export class AddConfirmed1565693320664 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'confirmed' boolean NOT NULL DEFAULT false;`)
}

public async down(queryRunner: QueryRunner): Promise<any> {
await queryRunner.dropColumn('transaction', 'confirmed')
}

}
3 changes: 2 additions & 1 deletion packages/neuron-wallet/src/database/chain/ormconfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import SyncInfo from './entities/sync-info'

import { InitMigration1561695143591 } from './migrations/1561695143591-InitMigration'
import { AddStatusToTx1562038960990 } from './migrations/1562038960990-AddStatusToTx'
import { AddConfirmed1565693320664 } from './migrations/1565693320664-AddConfirmed'

export const CONNECTION_NOT_FOUND_NAME = 'ConnectionNotFoundError'

Expand All @@ -31,7 +32,7 @@ const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectio
type: 'sqlite',
database: dbPath(genesisBlockHash),
entities: [Transaction, Input, Output, SyncInfo],
migrations: [InitMigration1561695143591, AddStatusToTx1562038960990],
migrations: [InitMigration1561695143591, AddStatusToTx1562038960990, AddConfirmed1565693320664],
logging,
}
}
Expand Down
41 changes: 41 additions & 0 deletions packages/neuron-wallet/src/services/indexer/indexer-rpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Core from '@nervosnetwork/ckb-sdk-core'

export default class IndexerRPC {
private core: Core

constructor(url: string) {
this.core = new Core(url)
}

public deindexLockHash = async (lockHash: string) => {
return this.core.rpc.deindexLockHash(lockHash)
}

public indexLockHash = async (lockHash: string, indexFrom = '0') => {
return this.core.rpc.indexLockHash(lockHash, indexFrom)
}

public getTransactionByLockHash = async (
lockHash: string,
page: string,
per: string,
reverseOrder: boolean = false
) => {
const result = await this.core.rpc.getTransactionsByLockHash(lockHash, page, per, reverseOrder)
return result
}

public getLockHashIndexStates = async () => {
return this.core.rpc.getLockHashIndexStates()
}

public getLiveCellsByLockHash = async (
lockHash: string,
page: string,
per: string,
reverseOrder: boolean = false
) => {
const result = await this.core.rpc.getLiveCellsByLockHash(lockHash, page, per, reverseOrder)
return result
}
}
221 changes: 221 additions & 0 deletions packages/neuron-wallet/src/services/indexer/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import { Subject, Subscription } from 'rxjs'
import Utils from 'services/sync/utils'
import logger from 'utils/logger'
import GetBlocks from 'services/sync/get-blocks'
import { Transaction } from 'types/cell-types'
import TypeConvert from 'types/type-convert'
import BlockNumber from 'services/sync/block-number'
import AddressesUsedSubject from 'models/subjects/addresses-used-subject'
import LockUtils from 'models/lock-utils'
import TransactionPersistor from 'services/tx/transaction-persistor'
import IndexerTransaction from 'services/tx/indexer-transaction'

import IndexerRPC from './indexer-rpc'

enum TxPointType {
CreatedBy = 'createdBy',
ConsumedBy = 'consumedBy',
}

export default class IndexerQueue {
private lockHashes: string[]
private indexerRPC: IndexerRPC
private getBlocksService: GetBlocks
private per = 50
private interval = 1000
private blockNumberService: BlockNumber
private tipNumberListener: Subscription
private tipBlockNumber: bigint = BigInt(-1)

private stopped = false
private indexed = false

private inProcess = false

private resetFlag = false

constructor(url: string, lockHashes: string[], tipNumberSubject: Subject<string | undefined>) {
this.lockHashes = lockHashes
this.indexerRPC = new IndexerRPC(url)
this.getBlocksService = new GetBlocks()
this.blockNumberService = new BlockNumber()
this.tipNumberListener = tipNumberSubject.subscribe(async (num: string) => {
if (num) {
this.tipBlockNumber = BigInt(num)
}
})
}

public setLockHashes = (lockHashes: string[]): void => {
this.lockHashes = lockHashes
this.indexed = false
}

public reset = () => {
this.resetFlag = true
}

/* eslint no-await-in-loop: "off" */
/* eslint no-restricted-syntax: "off" */
public start = async () => {
while (!this.stopped) {
try {
this.inProcess = true
if (this.resetFlag) {
await this.blockNumberService.updateCurrent(BigInt(0))
this.resetFlag = false
}
const { lockHashes } = this
const currentBlockNumber: bigint = await this.blockNumberService.getCurrent()
if (!this.indexed || currentBlockNumber !== this.tipBlockNumber) {
if (!this.indexed) {
await this.indexLockHashes(lockHashes)
this.indexed = true
}
const minBlockNumber = await this.getCurrentBlockNumber(lockHashes)
for (const lockHash of lockHashes) {
await this.pipeline(lockHash, TxPointType.CreatedBy, currentBlockNumber)
}
for (const lockHash of lockHashes) {
await this.pipeline(lockHash, TxPointType.ConsumedBy, currentBlockNumber)
}
if (minBlockNumber) {
await this.blockNumberService.updateCurrent(minBlockNumber)
}
}
await this.yield(this.interval)
} catch (err) {
if (err.message.startsWith('connect ECONNREFUSED')) {
logger.debug('sync indexer error:', err)
} else {
logger.error('sync indexer error:', err)
}
} finally {
await this.yield()
this.inProcess = false
}
}
}

public processFork = async () => {
while (!this.stopped) {
try {
const tip = this.tipBlockNumber
const txs = await IndexerTransaction.txHashes()
for (const tx of txs) {
const result = await this.getBlocksService.getTransaction(tx.hash)
if (!result) {
await IndexerTransaction.deleteTxWhenFork(tx.hash)
} else if (tip - BigInt(tx.blockNumber) >= 1000) {
await IndexerTransaction.confirm(tx.hash)
}
}
} catch (err) {
logger.error(`indexer delete forked tx:`, err)
} finally {
await this.yield(10000)
}
}
}

public getCurrentBlockNumber = async (lockHashes: string[]) => {
// get lock hash indexer status
const lockHashIndexStates = await this.indexerRPC.getLockHashIndexStates()
const blockNumbers = lockHashIndexStates
.filter(state => lockHashes.includes(state.lockHash))
.map(state => state.blockNumber)
const uniqueBlockNumbers = [...new Set(blockNumbers)]
const blockNumbersBigInt = uniqueBlockNumbers.map(num => BigInt(num))
const minBlockNumber = Utils.min(blockNumbersBigInt)
return minBlockNumber
}

public indexLockHashes = async (lockHashes: string[]) => {
const lockHashIndexStates = await this.indexerRPC.getLockHashIndexStates()
const indexedLockHashes: string[] = lockHashIndexStates.map(state => state.lockHash)
const nonIndexedLockHashes = lockHashes.filter(i => !indexedLockHashes.includes(i))

await Utils.mapSeries(nonIndexedLockHashes, async (lockHash: string) => {
await this.indexerRPC.indexLockHash(lockHash)
})
}

// type: 'createdBy' | 'consumedBy'
public pipeline = async (lockHash: string, type: TxPointType, startBlockNumber: bigint) => {
let page = 0
let stopped = false
while (!stopped) {
const txs = await this.indexerRPC.getTransactionByLockHash(lockHash, page.toString(), this.per.toString())
if (txs.length < this.per) {
stopped = true
}
for (const tx of txs) {
let txPoint: CKBComponents.TransactionPoint | null = null
if (type === TxPointType.CreatedBy) {
txPoint = tx.createdBy
} else if (type === TxPointType.ConsumedBy) {
txPoint = tx.consumedBy
}

if (
txPoint &&
(BigInt(txPoint.blockNumber) >= startBlockNumber || this.tipBlockNumber - BigInt(txPoint.blockNumber) < 1000)
) {
const transactionWithStatus = await this.getBlocksService.getTransaction(txPoint.txHash)
const ckbTransaction: CKBComponents.Transaction = transactionWithStatus.transaction
const transaction: Transaction = TypeConvert.toTransaction(ckbTransaction)
// tx timestamp / blockNumber / blockHash
const { blockHash } = transactionWithStatus.txStatus
if (blockHash) {
const blockHeader = await this.getBlocksService.getHeader(blockHash)
transaction.blockHash = blockHash
transaction.blockNumber = blockHeader.number
transaction.timestamp = blockHeader.timestamp
}
// broadcast address used
const txEntity = await TransactionPersistor.saveFetchTx(transaction)

let address: string | undefined
if (type === TxPointType.CreatedBy) {
address = LockUtils.lockScriptToAddress(transaction.outputs![+txPoint.index].lock)
} else if (type === TxPointType.ConsumedBy) {
const input = txEntity.inputs[+txPoint.index]
const output = await IndexerTransaction.updateInputLockHash(input.outPointTxHash!, input.outPointIndex!)
if (output) {
address = LockUtils.lockScriptToAddress(output.lock)
}
}
if (address) {
AddressesUsedSubject.getSubject().next([address])
}
}
}
page += 1
}
}

public stop = () => {
this.tipNumberListener.unsubscribe()
this.stopped = true
}

public waitForDrained = async (timeout: number = 5000) => {
const startAt: number = +new Date()
while (this.inProcess) {
const now: number = +new Date()
if (now - startAt > timeout) {
return
}
await this.yield(50)
}
}

public stopAndWait = async () => {
this.stop()
await this.waitForDrained()
}

private yield = async (millisecond: number = 1) => {
await Utils.sleep(millisecond)
}
}
12 changes: 11 additions & 1 deletion packages/neuron-wallet/src/services/sync/get-blocks.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Core from '@nervosnetwork/ckb-sdk-core'

import { Block } from 'types/cell-types'
import { Block, BlockHeader } from 'types/cell-types'
import TypeConvert from 'types/type-convert'
import { NetworkWithID } from 'services/networks'
import CheckAndSave from './check-and-save'
Expand Down Expand Up @@ -55,6 +55,16 @@ export default class GetBlocks {
return block
}

public getTransaction = async (hash: string): Promise<CKBComponents.TransactionWithStatus> => {
const tx = await core.rpc.getTransaction(hash)
return tx
}

public getHeader = async (hash: string): Promise<BlockHeader> => {
const result = await core.rpc.getHeader(hash)
return TypeConvert.toBlockHeader(result)
}

public static getBlockByNumber = async (num: string): Promise<Block> => {
const block = await core.rpc.getBlockByNumber(num)
return TypeConvert.toBlock(block)
Expand Down
16 changes: 16 additions & 0 deletions packages/neuron-wallet/src/services/sync/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,20 @@ export default class Utils {
}
return result
}

public static min = (array: bigint[]): bigint | undefined => {
let minValue = array[0]
if (!minValue) {
return undefined
}

for (let i = 1; i < array.length; ++i) {
const value = array[i]
if (value < minValue) {
minValue = value
}
}

return minValue
}
}
Loading

0 comments on commit 6f3b695

Please sign in to comment.