Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync stats expansion & refactor #844

Merged
merged 15 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
342 changes: 107 additions & 235 deletions modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion modAionImpl/src/org/aion/zero/impl/sync/TaskGetBodies.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.aion.p2p.IP2pMgr;
import org.aion.zero.impl.sync.PeerState.State;
import org.aion.zero.impl.sync.msg.ReqBlocksBodies;
import org.aion.zero.impl.sync.statistics.RequestType;
import org.aion.zero.types.A0BlockHeader;
import org.slf4j.Logger;

Expand Down Expand Up @@ -87,7 +88,7 @@ public void run() {
new ReqBlocksBodies(
headers.stream().map(k -> k.getHash()).collect(Collectors.toList())));
stats.updateTotalRequestsToPeer(displayId, RequestType.BODIES);
stats.updateBodiesRequest(displayId, System.nanoTime());
stats.updateRequestTime(displayId, System.nanoTime(), RequestType.BODIES);

headersWithBodiesRequested.put(idHash, hw);

Expand Down
3 changes: 2 additions & 1 deletion modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.aion.p2p.INode;
import org.aion.p2p.IP2pMgr;
import org.aion.zero.impl.sync.msg.ReqBlocksHeaders;
import org.aion.zero.impl.sync.statistics.RequestType;
import org.slf4j.Logger;

/** @author chris */
Expand Down Expand Up @@ -172,7 +173,7 @@ public void run() {
ReqBlocksHeaders rbh = new ReqBlocksHeaders(from, size);
this.p2p.send(node.getIdHash(), node.getIdShort(), rbh);
stats.updateTotalRequestsToPeer(node.getIdShort(), RequestType.STATUS);
stats.updateHeadersRequest(node.getIdShort(), System.nanoTime());
stats.updateRequestTime(node.getIdShort(), System.nanoTime(), RequestType.HEADERS);

// update timestamp
state.setLastHeaderRequest(now);
Expand Down
4 changes: 3 additions & 1 deletion modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.aion.p2p.INode;
import org.aion.p2p.IP2pMgr;
import org.aion.zero.impl.sync.msg.ReqStatus;
import org.aion.zero.impl.sync.statistics.RequestType;
import org.slf4j.Logger;

/** @author chris long run */
Expand Down Expand Up @@ -46,7 +47,8 @@ public void run() {
// System.out.println("requesting-status from-node=" + n.getIdShort());
p2p.send(node.getIdHash(), node.getIdShort(), reqStatus);
stats.updateTotalRequestsToPeer(node.getIdShort(), RequestType.STATUS);
stats.updateStatusRequest(node.getIdShort(), System.nanoTime());
stats.updateRequestTime(
node.getIdShort(), System.nanoTime(), RequestType.STATUS);
}
Thread.sleep(interval);
} catch (Exception e) {
Expand Down
10 changes: 6 additions & 4 deletions modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.aion.types.ByteArrayWrapper;
import org.aion.mcf.core.ImportResult;
import org.aion.p2p.P2pConstant;
import org.aion.types.ByteArrayWrapper;
import org.aion.zero.impl.AionBlockchainImpl;
import org.aion.zero.impl.db.AionBlockStore;
import org.aion.zero.impl.sync.PeerState.Mode;
import org.aion.zero.impl.sync.statistics.BlockType;
import org.aion.zero.impl.types.AionBlock;
import org.slf4j.Logger;

Expand Down Expand Up @@ -257,7 +258,7 @@ private PeerState processBatch(PeerState givenState, List<AionBlock> batch, Stri

if (importResult.isStored()) {
importedBlockHashes.put(ByteArrayWrapper.wrap(b.getHash()), true);
this.syncStats.updatePeerImportedBlocks(displayId, 1);
this.syncStats.updatePeerBlocks(displayId, 1, BlockType.IMPORTED);

if (last <= b.getNumber()) {
last = b.getNumber() + 1;
Expand All @@ -280,7 +281,8 @@ private PeerState processBatch(PeerState givenState, List<AionBlock> batch, Stri

// if any block results in NO_PARENT, all subsequent blocks will too
if (importResult == ImportResult.NO_PARENT) {
executors.submit(new TaskStorePendingBlocks(chain, batch, displayId, syncStats, log));
executors.submit(
new TaskStorePendingBlocks(chain, batch, displayId, syncStats, log));

if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -581,7 +583,7 @@ private ImportResult importBlock(AionBlock b, String displayId, PeerState state)
log.info("Compacting state database due to slow IO time.");
}
t1 = System.currentTimeMillis();
//this.chain.compactState();
this.chain.compactState();
t2 = System.currentTimeMillis();
if (log.isInfoEnabled()) {
log.info("Compacting state completed in {} ms.", t2 - t1);
Expand Down
150 changes: 32 additions & 118 deletions modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ final class TaskShowStatus implements Runnable {
private final IP2pMgr p2p;

private final Map<Integer, PeerState> peerStates;

private final Set<StatsType> showStatistics;

TaskShowStatus(
Expand All @@ -64,45 +65,45 @@ final class TaskShowStatus implements Runnable {
@Override
public void run() {
Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
String requestedInfo;
String requestedStats;

while (this.start.get()) {

String status = getStatus();
p2pLOG.info(status);

if (showStatistics.contains(StatsType.PEER_STATES)) {
requestedInfo = dumpPeerStateInfo(p2p.getActiveNodes().values());
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = dumpPeerStateInfo(p2p.getActiveNodes().values());
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

if (showStatistics.contains(StatsType.REQUESTS)) {
requestedInfo = dumpRequestsInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = stats.dumpRequestStats();
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

if (showStatistics.contains(StatsType.SEEDS)) {
requestedInfo = dumpTopSeedsInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = stats.dumpTopSeedsStats();
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

if (showStatistics.contains(StatsType.LEECHES)) {
requestedInfo = dumpTopLeechesInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = stats.dumpTopLeechesStats();
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

if (showStatistics.contains(StatsType.RESPONSES)) {
requestedInfo = stats.dumpResponseStats();
if (!requestedInfo.isEmpty()) {
p2pLOG.info(requestedInfo);
requestedStats = stats.dumpResponseStats();
if (!requestedStats.isEmpty()) {
p2pLOG.info(requestedStats);
}
}

Expand All @@ -121,25 +122,25 @@ public void run() {
String status = getStatus();
p2pLOG.debug(status);

requestedInfo = dumpPeerStateInfo(p2p.getActiveNodes().values());
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = dumpPeerStateInfo(p2p.getActiveNodes().values());
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}
requestedInfo = dumpRequestsInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = stats.dumpRequestStats();
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}
requestedInfo = dumpTopSeedsInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = stats.dumpTopSeedsStats();
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}
requestedInfo = dumpTopLeechesInfo();
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = stats.dumpTopLeechesStats();
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}
requestedInfo = stats.dumpResponseStats();
if (!requestedInfo.isEmpty()) {
p2pLOG.debug(requestedInfo);
requestedStats = stats.dumpResponseStats();
if (!requestedStats.isEmpty()) {
p2pLOG.debug(requestedStats);
}

p2pLOG.debug("sync-ss shutdown");
Expand Down Expand Up @@ -169,94 +170,7 @@ private String getStatus() {
+ "";
}

/**
* Returns a log stream containing statistics about the percentage of requests made to each peer
* with respect to the total number of requests made.
*
* @return log stream with requests statistical data
*/
private String dumpRequestsInfo() {
Map<String, Float> reqToPeers = this.stats.getPercentageOfRequestsToPeers();

StringBuilder sb = new StringBuilder();

if (!reqToPeers.isEmpty()) {

sb.append("\n====== sync-requests-to-peers ======\n");
sb.append(String.format(" %9s %20s\n", "peer", "% requests"));
sb.append("------------------------------------\n");

reqToPeers.forEach(
(nodeId, percReq) ->
sb.append(
String.format(
" id:%6s %20s\n",
nodeId, String.format("%.2f", percReq * 100) + " %")));
}

return sb.toString();
}

/**
* Returns a log stream containing a list of peers ordered by the total number of blocks
* received from each peer used to determine who is providing the majority of blocks, i.e. top
* seeds.
*
* @return log stream with peers statistical data on seeds
*/
private String dumpTopSeedsInfo() {
Map<String, Integer> totalBlocksByPeer = this.stats.getTotalBlocksByPeer();

StringBuilder sb = new StringBuilder();

if (!totalBlocksByPeer.isEmpty()) {

sb.append(
"\n============================= sync-top-seeds ==============================\n");
sb.append(
String.format(
" %9s %20s %19s %19s\n",
"peer", "total blocks", "imported blocks", "stored blocks"));
sb.append(
"---------------------------------------------------------------------------\n");
totalBlocksByPeer.forEach(
(nodeId, totalBlocks) ->
sb.append(
String.format(
" id:%6s %20s %19s %19s\n",
nodeId,
totalBlocks,
this.stats.getImportedBlocksByPeer(nodeId),
this.stats.getStoredBlocksByPeer(nodeId))));
}

return sb.toString();
}

/**
* Obtain log stream containing a list of peers ordered by the total number of blocks requested
* by each peer used to determine who is requesting the majority of blocks, i.e. top leeches.
*
* @return log stream with peers statistical data on leeches
*/
private String dumpTopLeechesInfo() {
Map<String, Integer> totalBlockReqByPeer = this.stats.getTotalBlockRequestsByPeer();

StringBuilder sb = new StringBuilder();

if (!totalBlockReqByPeer.isEmpty()) {

sb.append("\n========= sync-top-leeches =========\n");
sb.append(String.format(" %9s %20s\n", "peer", "total blocks"));
sb.append("------------------------------------\n");

totalBlockReqByPeer.forEach(
(nodeId, totalBlocks) ->
sb.append(String.format(" id:%6s %20s\n", nodeId, totalBlocks)));
}

return sb.toString();
}

private String dumpPeerStateInfo(Collection<INode> filtered) {
List<NodeState> sorted = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import org.aion.zero.impl.AionBlockchainImpl;
import org.aion.zero.impl.sync.statistics.BlockType;
import org.aion.zero.impl.types.AionBlock;
import org.slf4j.Logger;

Expand Down Expand Up @@ -38,7 +39,7 @@ public void run() {
Thread.currentThread().setName("sync-save:" + first.getNumber());

int stored = chain.storePendingBlockRange(batch);
this.syncStats.updatePeerStoredBlocks(displayId, stored);
this.syncStats.updatePeerBlocks(displayId, stored, BlockType.STORED);

// log operation
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.aion.zero.impl.sync.SyncStats;
import org.aion.zero.impl.sync.msg.BroadcastNewBlock;
import org.aion.zero.impl.sync.msg.ResStatus;
import org.aion.zero.impl.sync.statistics.BlockType;
import org.aion.zero.impl.types.AionBlock;
import org.aion.zero.types.A0BlockHeader;
import org.apache.commons.collections4.map.LRUMap;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void propagateNewBlock(final AionBlock block) {
}

public PropStatus processIncomingBlock(
final int nodeId, final String _displayId, final AionBlock block) {
final int nodeId, final String displayId, final AionBlock block) {
if (block == null) return PropStatus.DROPPED;

ByteArrayWrapper hashWrapped = new ByteArrayWrapper(block.getHash());
Expand Down Expand Up @@ -146,15 +147,15 @@ public PropStatus processIncomingBlock(
if (log.isInfoEnabled()) {
log.info(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, result = NOT_IN_RANGE>",
_displayId,
displayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
result);
} else if (log.isDebugEnabled()) {
log.debug(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, block time = {}, result = NOT_IN_RANGE>",
_displayId,
displayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
Expand All @@ -163,8 +164,7 @@ public PropStatus processIncomingBlock(
}
boolean stored = blockchain.storePendingStatusBlock(block);
if (stored) {
this.syncStats.updatePeerStoredBlocks(_displayId, 1);
this.syncStats.updatePeerTotalBlocks(_displayId, 1);
this.syncStats.updatePeerBlocks(displayId, 1, BlockType.STORED);
}

if (log.isDebugEnabled()) {
Expand All @@ -180,14 +180,13 @@ public PropStatus processIncomingBlock(

long t2 = System.currentTimeMillis();
if (result.isStored()) {
this.syncStats.updatePeerImportedBlocks(_displayId, 1);
this.syncStats.updatePeerTotalBlocks(_displayId, 1);
this.syncStats.updatePeerBlocks(displayId, 1, BlockType.IMPORTED);
}

if (log.isInfoEnabled()) {
log.info(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, result = {}, time elapsed = {} ms>",
_displayId,
displayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
Expand All @@ -196,7 +195,7 @@ public PropStatus processIncomingBlock(
} else if (log.isDebugEnabled()) {
log.debug(
"<import-status: node = {}, hash = {}, number = {}, txs = {}, block time = {}, result = {}, time elapsed = {} ms>",
_displayId,
displayId,
block.getShortHash(),
block.getNumber(),
block.getTransactionsList().size(),
Expand Down
Loading