Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sync by indexer RPCs #863

Merged
merged 18 commits into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ export default class Transaction extends BaseEntity {
})
updatedAt!: string

// only used for check fork in indexer mode
@Column({
type: 'boolean',
})
confirmed: boolean = false

@OneToMany(_type => InputEntity, input => input.transaction)
inputs!: InputEntity[]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
import {MigrationInterface, QueryRunner, TableColumn, getConnection} from "typeorm";
import {MigrationInterface, QueryRunner, TableColumn, getConnection, In} from "typeorm";
import TransactionEntity from '../entities/transaction'
import { OutputStatus } from '../../../services/tx/params'
import { TransactionStatus } from '../../../types/cell-types'
import OutputEntity from 'database/chain/entities/output'

export class AddStatusToTx1562038960990 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'status' varchar NOT NULL DEFAULT '';`)
// TransactionStatus.Success = 'success'
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'status' varchar NOT NULL DEFAULT 'success';`)

const pendingTxHashes: string[] = (await getConnection()
.getRepository(OutputEntity)
.createQueryBuilder('output')
.select(`output.outPointTxHash`, 'txHash')
.where({
status: OutputStatus.Sent
})
.getRawMany())
.filter(output => output.txHash)
await getConnection()
.createQueryBuilder()
.update(TransactionEntity)
.set({ status: TransactionStatus.Pending })
.where({
hash: In(pendingTxHashes)
})
.execute()

const txs = await getConnection()
.getRepository(TransactionEntity)
.find({ relations: ['inputs', 'outputs'] })
const updatedTxs = txs.map(tx => {
tx.status = tx.outputs[0].status === OutputStatus.Sent ? TransactionStatus.Pending : TransactionStatus.Success
return tx
})
await getConnection().manager.save(updatedTxs)
await queryRunner.changeColumn('transaction', 'status', new TableColumn({
name: 'status',
type: 'varchar',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {MigrationInterface, QueryRunner} from "typeorm";

export class AddConfirmed1565693320664 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<any> {
await queryRunner.query(`ALTER TABLE 'transaction' ADD COLUMN 'confirmed' boolean NOT NULL DEFAULT false;`)
}

public async down(queryRunner: QueryRunner): Promise<any> {
await queryRunner.dropColumn('transaction', 'confirmed')
}

}
3 changes: 2 additions & 1 deletion packages/neuron-wallet/src/database/chain/ormconfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import SyncInfo from './entities/sync-info'

import { InitMigration1561695143591 } from './migrations/1561695143591-InitMigration'
import { AddStatusToTx1562038960990 } from './migrations/1562038960990-AddStatusToTx'
import { AddConfirmed1565693320664 } from './migrations/1565693320664-AddConfirmed'

export const CONNECTION_NOT_FOUND_NAME = 'ConnectionNotFoundError'

Expand All @@ -31,7 +32,7 @@ const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectio
type: 'sqlite',
database: dbPath(genesisBlockHash),
entities: [Transaction, Input, Output, SyncInfo],
migrations: [InitMigration1561695143591, AddStatusToTx1562038960990],
migrations: [InitMigration1561695143591, AddStatusToTx1562038960990, AddConfirmed1565693320664],
logging,
}
}
Expand Down
41 changes: 41 additions & 0 deletions packages/neuron-wallet/src/services/indexer/indexer-rpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import Core from '@nervosnetwork/ckb-sdk-core'

export default class IndexerRPC {
private core: Core

constructor(url: string) {
this.core = new Core(url)
}

public deindexLockHash = async (lockHash: string) => {
return this.core.rpc.deindexLockHash(lockHash)
}

public indexLockHash = async (lockHash: string, indexFrom = '0') => {
return this.core.rpc.indexLockHash(lockHash, indexFrom)
}

public getTransactionByLockHash = async (
lockHash: string,
page: string,
per: string,
reverseOrder: boolean = false
) => {
const result = await this.core.rpc.getTransactionsByLockHash(lockHash, page, per, reverseOrder)
return result
}

public getLockHashIndexStates = async () => {
return this.core.rpc.getLockHashIndexStates()
}

public getLiveCellsByLockHash = async (
lockHash: string,
page: string,
per: string,
reverseOrder: boolean = false
) => {
const result = await this.core.rpc.getLiveCellsByLockHash(lockHash, page, per, reverseOrder)
return result
}
}
211 changes: 211 additions & 0 deletions packages/neuron-wallet/src/services/indexer/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import { Subject, Subscription } from 'rxjs'
import Utils from 'services/sync/utils'
import logger from 'utils/logger'
import GetBlocks from 'services/sync/get-blocks'
import { Transaction } from 'types/cell-types'
import TypeConvert from 'types/type-convert'
import BlockNumber from 'services/sync/block-number'
import AddressesUsedSubject from 'models/subjects/addresses-used-subject'
import LockUtils from 'models/lock-utils'
import TransactionPersistor from 'services/tx/transaction-persistor'
import IndexerTransaction from 'services/tx/indexer-transaction'

import IndexerRPC from './indexer-rpc'

enum TxPointType {
CreatedBy = 'createdBy',
ConsumedBy = 'consumedBy',
}

export default class Queue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I understand we have different folders - to some extend they act like namespaces, having multiple Queue classes worries me that it could confuse reading and understanding our design and implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about IndexerQueue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's better. We have two queue implementations for two different modules, then we name them differently for clarification. Please do so.

private lockHashes: string[]
private indexerRPC: IndexerRPC
private getBlocksService: GetBlocks
private per = 50
private interval = 1000
private blockNumberService: BlockNumber
private tipNumberListener: Subscription
private tipBlockNumber: bigint = BigInt(-1)

private stopped = false
private indexed = false

private inProcess = false

constructor(url: string, lockHashes: string[], tipNumberSubject: Subject<string | undefined>) {
this.lockHashes = lockHashes
this.indexerRPC = new IndexerRPC(url)
this.getBlocksService = new GetBlocks()
this.blockNumberService = new BlockNumber()
this.tipNumberListener = tipNumberSubject.subscribe(async (num: string) => {
if (num) {
this.tipBlockNumber = BigInt(num)
}
})
}

public setLockHashes = (lockHashes: string[]): void => {
this.lockHashes = lockHashes
this.indexed = false
}

/* eslint no-await-in-loop: "off" */
/* eslint no-restricted-syntax: "off" */
public start = async () => {
while (!this.stopped) {
try {
this.inProcess = true
const { lockHashes } = this
const currentBlockNumber: bigint = await this.blockNumberService.getCurrent()
if (!this.indexed || currentBlockNumber !== this.tipBlockNumber) {
if (!this.indexed) {
await this.indexLockHashes(lockHashes)
this.indexed = true
}
const minBlockNumber = await this.getCurrentBlockNumber(lockHashes)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between currentBlockNumber and minBlockNumber?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minBlockNumber means the min block number in get_lock_hash_index_states of all lock hashes.

for (const lockHash of lockHashes) {
await this.pipeline(lockHash, TxPointType.CreatedBy, currentBlockNumber)
}
for (const lockHash of lockHashes) {
await this.pipeline(lockHash, TxPointType.ConsumedBy, currentBlockNumber)
}
if (minBlockNumber) {
await this.blockNumberService.updateCurrent(minBlockNumber)
}
}
await this.yield(this.interval)
} catch (err) {
if (err.message.startsWith('connect ECONNREFUSED')) {
logger.debug('sync indexer error:', err)
} else {
logger.error('sync indexer error:', err)
}
} finally {
await this.yield()
this.inProcess = false
}
}
}

public processFork = async () => {
while (!this.stopped) {
try {
const tip = this.tipBlockNumber
const txs = await IndexerTransaction.txHashes()
for (const tx of txs) {
const result = await this.getBlocksService.getTransaction(tx.hash)
if (!result) {
shaojunda marked this conversation as resolved.
Show resolved Hide resolved
await IndexerTransaction.deleteTxWhenFork(tx.hash)
} else if (tip - BigInt(tx.blockNumber) >= 1000) {
await IndexerTransaction.confirm(tx.hash)
}
}
} catch (err) {
logger.error(`indexer delete forked tx:`, err)
} finally {
await this.yield(10000)
}
}
}

public getCurrentBlockNumber = async (lockHashes: string[]) => {
// get lock hash indexer status
const lockHashIndexStates = await this.indexerRPC.getLockHashIndexStates()
const blockNumbers = lockHashIndexStates
.filter(state => lockHashes.includes(state.lockHash))
.map(state => state.blockNumber)
const uniqueBlockNumbers = [...new Set(blockNumbers)]
const blockNumbersBigInt = uniqueBlockNumbers.map(num => BigInt(num))
const minBlockNumber = blockNumbersBigInt.sort()[0]
return minBlockNumber
}

public indexLockHashes = async (lockHashes: string[]) => {
const lockHashIndexStates = await this.indexerRPC.getLockHashIndexStates()
const indexedLockHashes: string[] = lockHashIndexStates.map(state => state.lockHash)
const nonIndexedLockHashes = lockHashes.filter(i => !indexedLockHashes.includes(i))

await Utils.mapSeries(nonIndexedLockHashes, async (lockHash: string) => {
await this.indexerRPC.indexLockHash(lockHash)
})
}

// type: 'createdBy' | 'consumedBy'
public pipeline = async (lockHash: string, type: TxPointType, startBlockNumber: bigint) => {
let page = 0
let stopped = false
while (!stopped) {
const txs = await this.indexerRPC.getTransactionByLockHash(lockHash, page.toString(), this.per.toString())
if (txs.length < this.per) {
stopped = true
}
for (const tx of txs) {
let txPoint: CKBComponents.TransactionPoint | null = null
if (type === TxPointType.CreatedBy) {
txPoint = tx.createdBy
} else if (type === TxPointType.ConsumedBy) {
txPoint = tx.consumedBy
}

if (
txPoint &&
(BigInt(txPoint.blockNumber) >= startBlockNumber || this.tipBlockNumber - BigInt(txPoint.blockNumber) < 1000)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to extract this expression into a method?

Copy link
Contributor Author

@classicalliu classicalliu Aug 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, only use once.

) {
const transactionWithStatus = await this.getBlocksService.getTransaction(txPoint.txHash)
const ckbTransaction: CKBComponents.Transaction = transactionWithStatus.transaction
const transaction: Transaction = TypeConvert.toTransaction(ckbTransaction)
// tx timestamp / blockNumber / blockHash
const { blockHash } = transactionWithStatus.txStatus
if (blockHash) {
const blockHeader = await this.getBlocksService.getHeader(blockHash)
transaction.blockHash = blockHash
transaction.blockNumber = blockHeader.number
transaction.timestamp = blockHeader.timestamp
}
// broadcast address used
const txEntity = await TransactionPersistor.saveFetchTx(transaction)

let address: string | undefined
if (type === TxPointType.CreatedBy) {
address = LockUtils.lockScriptToAddress(transaction.outputs![+txPoint.index].lock)
} else if (type === TxPointType.ConsumedBy) {
const input = txEntity.inputs[+txPoint.index]
const output = await IndexerTransaction.updateInputLockHash(input.outPointTxHash!, input.outPointIndex!)
if (output) {
address = LockUtils.lockScriptToAddress(output.lock)
}
}
if (address) {
AddressesUsedSubject.getSubject().next([address])
}
}
}
page += 1
}
}

public stop = () => {
this.tipNumberListener.unsubscribe()
this.stopped = true
}

public waitForDrained = async (timeout: number = 5000) => {
shaojunda marked this conversation as resolved.
Show resolved Hide resolved
const startAt: number = +new Date()
while (this.inProcess) {
const now: number = +new Date()
if (now - startAt > timeout) {
return
}
await this.yield(50)
}
}

public stopAndWait = async () => {
this.stop()
await this.waitForDrained()
}

private yield = async (millisecond: number = 1) => {
await Utils.sleep(millisecond)
}
}
12 changes: 11 additions & 1 deletion packages/neuron-wallet/src/services/sync/get-blocks.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Core from '@nervosnetwork/ckb-sdk-core'

import { Block } from 'types/cell-types'
import { Block, BlockHeader } from 'types/cell-types'
import TypeConvert from 'types/type-convert'
import { NetworkWithID } from 'services/networks'
import CheckAndSave from './check-and-save'
Expand Down Expand Up @@ -55,6 +55,16 @@ export default class GetBlocks {
return block
}

public getTransaction = async (hash: string): Promise<CKBComponents.TransactionWithStatus> => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should getTransaction be placed in a separate file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, it's block info too.

const tx = await core.rpc.getTransaction(hash)
return tx
}

public getHeader = async (hash: string): Promise<BlockHeader> => {
const result = await core.rpc.getHeader(hash)
return TypeConvert.toBlockHeader(result)
}

public static getBlockByNumber = async (num: string): Promise<Block> => {
const block = await core.rpc.getBlockByNumber(num)
return TypeConvert.toBlock(block)
Expand Down
Loading