Skip to content

Commit

Permalink
Merge pull request #844 from aionnetwork/sync-stats-expansion
Browse files Browse the repository at this point in the history
Sync stats expansion & refactor
  • Loading branch information
AionJayT authored Mar 20, 2019
2 parents 0e7522b + eb86526 commit 453156b
Show file tree
Hide file tree
Showing 19 changed files with 1,038 additions and 744 deletions.
342 changes: 107 additions & 235 deletions modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion modAionImpl/src/org/aion/zero/impl/sync/TaskGetBodies.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.aion.p2p.IP2pMgr;
import org.aion.zero.impl.sync.PeerState.State;
import org.aion.zero.impl.sync.msg.ReqBlocksBodies;
import org.aion.zero.impl.sync.statistics.RequestType;
import org.aion.zero.types.A0BlockHeader;
import org.slf4j.Logger;

Expand Down Expand Up @@ -87,7 +88,7 @@ public void run() {
new ReqBlocksBodies(
headers.stream().map(k -> k.getHash()).collect(Collectors.toList())));
stats.updateTotalRequestsToPeer(displayId, RequestType.BODIES);
stats.updateBodiesRequest(displayId, System.nanoTime());
stats.updateRequestTime(displayId, System.nanoTime(), RequestType.BODIES);

headersWithBodiesRequested.put(idHash, hw);

Expand Down
3 changes: 2 additions & 1 deletion modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.aion.p2p.INode;
import org.aion.p2p.IP2pMgr;
import org.aion.zero.impl.sync.msg.ReqBlocksHeaders;
import org.aion.zero.impl.sync.statistics.RequestType;
import org.slf4j.Logger;

/** @author chris */
Expand Down Expand Up @@ -172,7 +173,7 @@ public void run() {
ReqBlocksHeaders rbh = new ReqBlocksHeaders(from, size);
this.p2p.send(node.getIdHash(), node.getIdShort(), rbh);
stats.updateTotalRequestsToPeer(node.getIdShort(), RequestType.STATUS);
stats.updateHeadersRequest(node.getIdShort(), System.nanoTime());
stats.updateRequestTime(node.getIdShort(), System.nanoTime(), RequestType.HEADERS);

// update timestamp
state.setLastHeaderRequest(now);
Expand Down
4 changes: 3 additions & 1 deletion modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.aion.p2p.INode;
import org.aion.p2p.IP2pMgr;
import org.aion.zero.impl.sync.msg.ReqStatus;
import org.aion.zero.impl.sync.statistics.RequestType;
import org.slf4j.Logger;

/** @author chris long run */
Expand Down Expand Up @@ -46,7 +47,8 @@ public void run() {
// System.out.println("requesting-status from-node=" + n.getIdShort());
p2p.send(node.getIdHash(), node.getIdShort(), reqStatus);
stats.updateTotalRequestsToPeer(node.getIdShort(), RequestType.STATUS);
stats.updateStatusRequest(node.getIdShort(), System.nanoTime());
stats.updateRequestTime(
node.getIdShort(), System.nanoTime(), RequestType.STATUS);
}
Thread.sleep(interval);
} catch (Exception e) {
Expand Down
10 changes: 6 additions & 4 deletions modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.aion.types.ByteArrayWrapper;
import org.aion.mcf.core.ImportResult;
import org.aion.p2p.P2pConstant;
import org.aion.types.ByteArrayWrapper;
import org.aion.zero.impl.AionBlockchainImpl;
import org.aion.zero.impl.db.AionBlockStore;
import org.aion.zero.impl.sync.PeerState.Mode;
import org.aion.zero.impl.sync.statistics.BlockType;
import org.aion.zero.impl.types.AionBlock;
import org.slf4j.Logger;

Expand Down Expand Up @@ -257,7 +258,7 @@ private PeerState processBatch(PeerState givenState, List<AionBlock> batch, Stri

if (importResult.isStored()) {
importedBlockHashes.put(ByteArrayWrapper.wrap(b.getHash()), true);
this.syncStats.updatePeerImportedBlocks(displayId, 1);
this.syncStats.updatePeerBlocks(displayId, 1, BlockType.IMPORTED);

if (last <= b.getNumber()) {
last = b.getNumber() + 1;
Expand All @@ -280,7 +281,8 @@ private PeerState processBatch(PeerState givenState, List<AionBlock> batch, Stri

// if any block results in NO_PARENT, all subsequent blocks will too
if (importResult == ImportResult.NO_PARENT) {
executors.submit(new TaskStorePendingBlocks(chain, batch, displayId, syncStats, log));
executors.submit(
new TaskStorePendingBlocks(chain, batch, displayId, syncStats, log));

if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -581,7 +583,7 @@ private ImportResult importBlock(AionBlock b, String displayId, PeerState state)
log.info("Compacting state database due to slow IO time.");
}
t1 = System.currentTimeMillis();
//this.chain.compactState();
this.chain.compactState();
t2 = System.currentTimeMillis();
if (log.isInfoEnabled()) {
log.info("Compacting state completed in {} ms.", t2 - t1);
Expand Down
150 changes: 32 additions & 118 deletions modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ final class TaskShowStatus implements Runnable {
private final IP2pMgr p2p;

private final Map<Integer, PeerState> peerStates;

private final Set<StatsType> showStatistics;

TaskShowStatus(
Expand All @@ -64,45 +65,45 @@ final class TaskShowStatus implements Runnable {
@Override
public void run() {
Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
String requestedInfo;
String requestedStats;

while (this.start.get()) {

String status = getStatus();
p2pLOG.info(status);

if (showStatistics.contains(StatsType.PEER_STATES)) {
requestedInfo = dumpPeerStateInfo(p2p.getActiveNodes().values());
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = dumpPeerStateInfo(p2p.getActiveNodes().values());
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

if (showStatistics.contains(StatsType.REQUESTS)) {
requestedInfo = dumpRequestsInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = stats.dumpRequestStats();
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

if (showStatistics.contains(StatsType.SEEDS)) {
requestedInfo = dumpTopSeedsInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = stats.dumpTopSeedsStats();
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

if (showStatistics.contains(StatsType.LEECHES)) {
requestedInfo = dumpTopLeechesInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = stats.dumpTopLeechesStats();
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

if (showStatistics.contains(StatsType.RESPONSES)) {
requestedInfo = stats.dumpResponseStats();
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = stats.dumpResponseStats();
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

Expand All @@ -121,25 +122,25 @@ public void run() {
String status = getStatus();
p2pLOG.debug(status);

requestedInfo = dumpPeerStateInfo(p2p.getActiveNodes().values());
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = dumpPeerStateInfo(p2p.getActiveNodes().values());
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}
requestedInfo = dumpRequestsInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = stats.dumpRequestStats();
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}
requestedInfo = dumpTopSeedsInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = stats.dumpTopSeedsStats();
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}
requestedInfo = dumpTopLeechesInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = stats.dumpTopLeechesStats();
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}
requestedInfo = stats.dumpResponseStats();
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = stats.dumpResponseStats();
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}

p2pLOG.debug("sync-ss shutdown");
Expand Down Expand Up @@ -169,94 +170,7 @@ private String getStatus() {
+ "";
}

/**
* Returns a log stream containing statistics about the percentage of requests made to each peer
* with respect to the total number of requests made.
*
* @return log stream with requests statistical data
*/
private String dumpRequestsInfo() {
Map<String, Float> reqToPeers = this.stats.getPercentageOfRequestsToPeers();

StringBuilder sb = new StringBuilder();

if (!reqToPeers.isEmpty()) {

sb.append("\n====== sync-requests-to-peers ======\n");
sb.append(String.format(" %9s %20s\n", "peer", "% requests"));
sb.append("------------------------------------\n");

reqToPeers.forEach(
(nodeId, percReq) ->
sb.append(
String.format(
" id:%6s %20s\n",
nodeId, String.format("%.2f", percReq * 100) + " %")));
}

return sb.toString();
}

/**
* Returns a log stream containing a list of peers ordered by the total number of blocks
* received from each peer used to determine who is providing the majority of blocks, i.e. top
* seeds.
*
* @return log stream with peers statistical data on seeds
*/
private String dumpTopSeedsInfo() {
Map<String, Integer> totalBlocksByPeer = this.stats.getTotalBlocksByPeer();

StringBuilder sb = new StringBuilder();

if (!totalBlocksByPeer.isEmpty()) {

sb.append(
"\n============================= sync-top-seeds ==============================\n");
sb.append(
String.format(
" %9s %20s %19s %19s\n",
"peer", "total blocks", "imported blocks", "stored blocks"));
sb.append(
"---------------------------------------------------------------------------\n");
totalBlocksByPeer.forEach(
(nodeId, totalBlocks) ->
sb.append(
String.format(
" id:%6s %20s %19s %19s\n",
nodeId,
totalBlocks,
this.stats.getImportedBlocksByPeer(nodeId),
this.stats.getStoredBlocksByPeer(nodeId))));
}

return sb.toString();
}

/**
* Obtain log stream containing a list of peers ordered by the total number of blocks requested
* by each peer used to determine who is requesting the majority of blocks, i.e. top leeches.
*
* @return log stream with peers statistical data on leeches
*/
private String dumpTopLeechesInfo() {
Map<String, Integer> totalBlockReqByPeer = this.stats.getTotalBlockRequestsByPeer();

StringBuilder sb = new StringBuilder();

if (!totalBlockReqByPeer.isEmpty()) {

sb.append("\n========= sync-top-leeches =========\n");
sb.append(String.format(" %9s %20s\n", "peer", "total blocks"));
sb.append("------------------------------------\n");

totalBlockReqByPeer.forEach(
(nodeId, totalBlocks) ->
sb.append(String.format(" id:%6s %20s\n", nodeId, totalBlocks)));
}

return sb.toString();
}

private String dumpPeerStateInfo(Collection<INode> filtered) {
List<NodeState> sorted = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import org.aion.zero.impl.AionBlockchainImpl;
import org.aion.zero.impl.sync.statistics.BlockType;
import org.aion.zero.impl.types.AionBlock;
import org.slf4j.Logger;

Expand Down Expand Up @@ -38,7 +39,7 @@ public void run() {
Thread.currentThread().setName("sync-save:" + first.getNumber());

int stored = chain.storePendingBlockRange(batch);
this.syncStats.updatePeerStoredBlocks(displayId, stored);
this.syncStats.updatePeerBlocks(displayId, stored, BlockType.STORED);

// log operation
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.aion.zero.impl.sync.SyncStats;
import org.aion.zero.impl.sync.msg.BroadcastNewBlock;
import org.aion.zero.impl.sync.msg.ResStatus;
import org.aion.zero.impl.sync.statistics.BlockType;
import org.aion.zero.impl.types.AionBlock;
import org.aion.zero.types.A0BlockHeader;
import org.apache.commons.collections4.map.LRUMap;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void propagateNewBlock(final AionBlock block) {
}

public PropStatus processIncomingBlock(
final int nodeId, final String _displayId, final AionBlock block) {
final int nodeId, final String displayId, final AionBlock block) {
if (block == null) return PropStatus.DROPPED;

ByteArrayWrapper hashWrapped = new ByteArrayWrapper(block.getHash());
Expand Down Expand Up @@ -146,15 +147,15 @@ public PropStatus processIncomingBlock(
if (log.isInfoEnabled()) {
log.info(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, result = NOT_IN_RANGE>",
_displayId,
displayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
result);
} else if (log.isDebugEnabled()) {
log.debug(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, block time = {}, result = NOT_IN_RANGE>",
_displayId,
displayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
Expand All @@ -163,8 +164,7 @@ public PropStatus processIncomingBlock(
}
boolean stored = blockchain.storePendingStatusBlock(block);
if (stored) {
this.syncStats.updatePeerStoredBlocks(_displayId, 1);
this.syncStats.updatePeerTotalBlocks(_displayId, 1);
this.syncStats.updatePeerBlocks(displayId, 1, BlockType.STORED);
}

if (log.isDebugEnabled()) {
Expand All @@ -180,14 +180,13 @@ public PropStatus processIncomingBlock(

long t2 = System.currentTimeMillis();
if (result.isStored()) {
this.syncStats.updatePeerImportedBlocks(_displayId, 1);
this.syncStats.updatePeerTotalBlocks(_displayId, 1);
this.syncStats.updatePeerBlocks(displayId, 1, BlockType.IMPORTED);
}

if (log.isInfoEnabled()) {
log.info(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, result = {}, time elapsed = {} ms>",
_displayId,
displayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
Expand All @@ -196,7 +195,7 @@ public PropStatus processIncomingBlock(
} else if (log.isDebugEnabled()) {
log.debug(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, block time = {}, result = {}, time elapsed = {} ms>",
_displayId,
displayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
Expand Down
Loading

0 comments on commit 453156b

Please sign in to comment.