Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Update loader module in chain to use network module and fix the tests - Closes #2875 #3243

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions framework/src/controller/bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@ class Bus extends EventEmitter2 {
* @throws {Error} If event name does not exist to bus.
*/
publish(eventName, eventValue) {
if (!this.getEvents().includes(eventName)) {
shuse2 marked this conversation as resolved.
Show resolved Hide resolved
throw new Error(`Event ${eventName} is not registered to bus.`);
}

// Communicate through event emitter
this.emit(eventName, eventValue);

Expand Down
58 changes: 58 additions & 0 deletions framework/src/modules/chain/chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,29 @@ module.exports = class Chain {
this.scope.modules.peers.shared.getPeersCountByFilter(
action.params.parameters
),
getTransactions: async () =>
this.scope.modules.transactions.getMergedTransactionList(
true,
global.constants.MAX_SHARED_TRANSACTIONS
),
getSignatures: async () => {
const transactions = this.scope.modules.transactions.getMultisignatureTransactionList(
true,
global.constants.MAX_SHARED_TRANSACTIONS
);
const multisignatures = transactions
.map(transaction => {
if (transaction.signatures && transaction.signatures.length) {
return {
transaction: transaction.id,
signatures: transaction.signatures,
};
}
return null;
})
.filter(transaction => transaction !== null);
return multisignatures;
},
postSignature: async action =>
promisify(this.scope.modules.signatures.shared.postSignature)(
action.params.signature
Expand Down Expand Up @@ -270,6 +293,41 @@ module.exports = class Chain {
}),
lastBlock: this.scope.modules.blocks.lastBlock.get(),
}),
blocks: async action =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to pass the requested number of blocks as a variable, or returning a dynamic number depending on the sizes of the requested blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think as a new issue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jondubois @diego-G same as above, we don't want to change the behavior now, so please create a new issue, and we will tackle later

new Promise(resolve => {
// Get 34 blocks with all data (joins) from provided block id
// According to maxium payload of 58150 bytes per block with every transaction being a vote
// Discounting maxium compression setting used in middleware
// Maximum transport payload = 2000000 bytes
const query = action.params[0] || {};
this.scope.modules.blocks.utils.loadBlocksData(
{
limit: 34, // 1977100 bytes
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should research and update this number. If I remember well it's not accurate. Block size varies depending on the transaction associated to it. We might be able to increase the number significantly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should raise a new issue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this current protocol, let's keep it this way because it's not really a protocol change, and we don't want to risk here.
However, please create an issue to address this, and we can tackle this when we change the p2p protocol

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jondubois would you mind to open the issue?

lastId: query.lastBlockId,
},
(error, data) => {
(data || []).forEach(block => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer early return if data` is undefined

if (!block.tf_data) {
return;
}
try {
block.tf_data = block.tf_data.toString('utf8');
} catch (e) {
this.logger.error(
'Transport->blocks: Failed to convert data field to UTF-8',
{ block, error: e }
);
}
});
if (error) {
// TODO: Reject here.
return resolve({ blocks: [] });
}

return resolve({ blocks: data });
}
);
}),
};
}

Expand Down
3 changes: 3 additions & 0 deletions framework/src/modules/chain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ module.exports = class ChainModule extends BaseModule {
this.chain.actions.getForgersPublicKeys(),
getTransactionsFromPool: async action =>
this.chain.actions.getTransactionsFromPool(action),
getTransactions: async () => this.chain.actions.getTransactions(),
getSignatures: async () => this.chain.actions.getSignatures(),
getLastCommit: async () => this.chain.actions.getLastCommit(),
getBuild: async () => this.chain.actions.getBuild(),
postTransaction: async action =>
Expand All @@ -75,6 +77,7 @@ module.exports = class ChainModule extends BaseModule {
getSlotNumber: async action => this.chain.actions.getSlotNumber(action),
calcSlotRound: async action => this.chain.actions.calcSlotRound(action),
getNodeStatus: async () => this.chain.actions.getNodeStatus(),
blocks: async action => this.chain.actions.blocks(action),
};
}

Expand Down
3 changes: 2 additions & 1 deletion framework/src/modules/chain/submodules/blocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class Blocks {
scope.schema,
scope.components.storage,
scope.sequence,
scope.genesisBlock
scope.genesisBlock,
scope.channel
diego-G marked this conversation as resolved.
Show resolved Hide resolved
),
utils: new BlocksUtils(
scope.components.logger,
Expand Down
174 changes: 36 additions & 138 deletions framework/src/modules/chain/submodules/blocks/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ class Process {
schema,
storage,
sequence,
genesisBlock
genesisBlock,
channel
) {
library = {
channel,
logger,
schema,
storage,
Expand Down Expand Up @@ -240,108 +242,6 @@ __private.receiveForkFive = function(block, lastBlock, cb) {
);
};

/**
* Performs chain comparison with remote peer.
* WARNING: Can trigger chain recovery.
*
* @param {Peer} peer - Peer to perform chain comparison with
* @param {number} height - Block height
* @param {function} cb - Callback function
* @returns {function} cb - Callback function from params (through setImmediate)
* @returns {Object} cb.err - Error if occurred
* @returns {Object} cb.res - Result object
*/
Process.prototype.getCommonBlock = function(peer, height, cb) {
let comparisonFailed = false;

return async.waterfall(
[
function(waterCb) {
// Get IDs sequence (comma separated list)
modules.blocks.utils.getIdSequence(height, (err, res) =>
setImmediate(waterCb, err, res)
);
},
function(res, waterCb) {
const ids = res.ids;
// Perform request to supplied remote peer
peer = library.logic.peers.create(peer);
peer.rpc.blocksCommon({ ids }, (err, blocksCommonRes) => {
if (err) {
modules.peers.remove(peer);
return setImmediate(waterCb, err);
}

if (!blocksCommonRes.common) {
// FIXME: Need better checking here, is base on 'common' property enough?
comparisonFailed = true;
return setImmediate(
waterCb,
`Chain comparison failed with peer: ${
peer.string
} using ids: ${ids}`
);
}

return setImmediate(waterCb, null, blocksCommonRes.common);
});
},
function(common, waterCb) {
// Check if we received genesis block - before response validation, as genesis block have previousBlock = null
if (common && common.height === 1) {
comparisonFailed = true;
return setImmediate(
waterCb,
'Comparison failed - received genesis as common block'
);
}
// Validate remote peer response via schema
return library.schema.validate(common, definitions.CommonBlock, err => {
if (err) {
return setImmediate(waterCb, err[0].message);
}
return setImmediate(waterCb, null, common);
});
},
function(common, waterCb) {
// Check that block with ID, previousBlock and height exists in database
library.storage.entities.Block.isPersisted({
id: common.id,
previousBlockId: common.previousBlock,
height: common.height,
})
.then(isPersisted => {
if (isPersisted) {
// Block exists - it's common between our node and remote peer
return setImmediate(waterCb, null, common);
}

// Block doesn't exists - comparison failed
comparisonFailed = true;
return setImmediate(
waterCb,
`Chain comparison failed with peer: ${
peer.string
} using block: ${JSON.stringify(common)}`
);
})
.catch(err => {
// SQL error occurred
library.logger.error(err.stack);
return setImmediate(waterCb, 'Blocks#getCommonBlock error');
});
},
],
(err, res) => {
// If comparison failed and current consensus is low - perform chain recovery
if (comparisonFailed && modules.transport.poorConsensus()) {
return modules.blocks.chain.recoverChain(cb);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic needs to stay

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@jondubois jondubois Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yep, we need the recoverChain(...) feature but it should be possible to use it as part of the loadBlocksFromNetwork logic instead (we may be able to use that logic to detect a fork) - It would be nice to remove the commonBlocks logic.

}
return setImmediate(cb, err, res);
}
);
};

/**
* Loads full blocks from database, used when rebuilding blockchain, snapshotting,
* see: loader.loadBlockChain (private).
Expand Down Expand Up @@ -428,32 +328,33 @@ Process.prototype.loadBlocksOffset = function(
};

/**
* Ask remote peer for blocks and process them.
* Ask the network for blocks and process them.
*
* @param {Peer} peer - Peer to perform chain comparison with
* @param {function} cb - Callback function
* @returns {function} cb - Callback function from params (through setImmediate)
* @returns {Object} cb.err - Error if occurred
* @returns {Object} cb.lastValidBlock - Normalized new last block
*/
Process.prototype.loadBlocksFromPeer = function(peer, cb) {
Process.prototype.loadBlocksFromNetwork = function(cb) {
let lastValidBlock = modules.blocks.lastBlock.get();

peer = library.logic.peers.create(peer);
library.logger.info(`Loading blocks from: ${peer.string}`);
library.logger.debug('Loading blocks from the network');

function getFromPeer(seriesCb) {
peer.rpc.blocks(
{ lastBlockId: lastValidBlock.id, peer: library.logic.peers.me() },
(err, res) => {
err = err || res.error;
if (err) {
modules.peers.remove(peer);
return setImmediate(seriesCb, err);
}
return setImmediate(seriesCb, null, res.blocks);
}
);
async function getBlocksFromNetwork() {
shuse2 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: If there is an error, invoke the applyPenalty action on the Network module once it is implemented.
// TODO: Rename procedure to include target module name. E.g. chain:blocks
const { data } = await library.channel.invoke('network:request', {
procedure: 'blocks',
data: {
lastBlockId: lastValidBlock.id,
},
});

if (!data) {
throw new Error('Received an invalid blocks response from the network');
}

return data.blocks;
}

function validateBlocks(blocks, seriesCb) {
Expand All @@ -479,14 +380,8 @@ Process.prototype.loadBlocksFromPeer = function(peer, cb) {
return setImmediate(eachSeriesCb);
}
// ...then process block
return processBlock(block, err => {
// Ban a peer if block validation fails
// Invalid peers won't get chosen in the next sync attempt
if (err) {
library.logic.peers.ban(peer);
}
return eachSeriesCb(err);
});
// TODO: If there is an error, invoke the applyPenalty action on the Network module once it is implemented.
return processBlock(block, err => eachSeriesCb(err));
},
err => setImmediate(seriesCb, err)
);
Expand All @@ -499,7 +394,7 @@ Process.prototype.loadBlocksFromPeer = function(peer, cb) {
// Update last valid block
lastValidBlock = block;
library.logger.info(
`Block ${block.id} loaded from: ${peer.string}`,
`Block ${block.id} loaded from the network`,
`height: ${block.height}`
);
} else {
Expand All @@ -516,16 +411,19 @@ Process.prototype.loadBlocksFromPeer = function(peer, cb) {
});
}

async.waterfall([getFromPeer, validateBlocks, processBlocks], err => {
if (err) {
return setImmediate(
cb,
`Error loading blocks: ${err.message || err}`,
lastValidBlock
);
async.waterfall(
[getBlocksFromNetwork, validateBlocks, processBlocks],
err => {
if (err) {
return setImmediate(
cb,
`Error loading blocks: ${err.message || err}`,
lastValidBlock
);
}
return setImmediate(cb, null, lastValidBlock);
}
return setImmediate(cb, null, lastValidBlock);
});
);
};

/**
Expand Down
Loading