From 70347ed989b557076a9eb45bbd70f6ea88403a1a Mon Sep 17 00:00:00 2001 From: Florian Reimair Date: Thu, 20 Feb 2020 14:03:20 +0100 Subject: [PATCH 01/10] Fix connection config for monitor The monitor does not use the common bisq app base featuring guice. Hence, the `config` in the `Connection` class is never injected and leads to NullPointerExceptions and ultimately breaks the monitor. This workaround initilizes default values in case guice isn't there. --- p2p/src/main/java/bisq/network/p2p/network/Connection.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index c6d3277ee35..70463aceeed 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -186,6 +186,10 @@ public static int getPermittedMessageSize() { addMessageListener(messageListener); + if (config == null) { + config = new Config(); + } + this.networkProtoResolver = networkProtoResolver; init(peersNodeAddress); } From 7c5043063a9958c6a447299415980e1716825293 Mon Sep 17 00:00:00 2001 From: Florian Reimair Date: Thu, 20 Feb 2020 13:36:55 +0100 Subject: [PATCH 02/10] Code cleanup --- .../bisq/monitor/metric/P2PMarketStats.java | 81 +------------- .../monitor/metric/P2PSeedNodeSnapshot.java | 102 ++++-------------- .../metric/P2PSeedNodeSnapshotBase.java | 54 ++++++++-- 3 files changed, 74 insertions(+), 163 deletions(-) diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java index 0f7e8afb83c..ea99f39055a 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java @@ -20,10 +20,8 @@ import bisq.monitor.OnionParser; import bisq.monitor.Reporter; -import bisq.core.account.witness.AccountAgeWitnessStore; + import bisq.core.offer.OfferPayload; -import bisq.core.proto.persistable.CorePersistenceProtoResolver; -import bisq.core.trade.statistics.TradeStatistics3Store; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.Connection; @@ -32,25 +30,15 @@ import bisq.network.p2p.storage.payload.ProtectedStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStoragePayload; -import bisq.common.app.Version; -import bisq.common.config.BaseCurrencyNetwork; -import bisq.common.persistence.PersistenceManager; import bisq.common.proto.network.NetworkEnvelope; -import bisq.common.proto.persistable.PersistableEnvelope; - -import java.io.File; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -63,10 +51,6 @@ */ @Slf4j public class P2PMarketStats extends P2PSeedNodeSnapshotBase { - private static final String DATABASE_DIR = "run.dbDir"; - - private final Set hashes = new TreeSet<>(Arrays::compare); - final Map> versionBucketsPerHost = new ConcurrentHashMap<>(); /** @@ -84,13 +68,7 @@ synchronized void increment() { } } - private class MyStatistics implements Statistics { - private final Map buckets = new HashMap<>(); - - @Override - public Statistics create() { - return new MyStatistics(); - } + private class MyStatistics extends Statistics { @Override public synchronized void log(Object message) { @@ -104,25 +82,9 @@ public synchronized void log(Object message) { buckets.get(market).increment(); } } - - @Override - public Map values() { - return buckets; - } - - @Override - public void reset() { - buckets.clear(); - } } - private class VersionsStatistics implements Statistics { - private final Map buckets = new HashMap<>(); - - @Override - public Statistics create() { - return new VersionsStatistics(); - } + private class VersionsStatistics extends Statistics { @Override public void log(Object message) { @@ -136,43 +98,10 @@ public void log(Object message) { buckets.get(version).increment(); } } - - @Override - public Map values() { - return buckets; - } - - @Override - public void reset() { - buckets.clear(); - } } public P2PMarketStats(Reporter graphiteReporter) { super(graphiteReporter); - - statistics = new MyStatistics(); - } - - @Override - public void configure(Properties properties) { - super.configure(properties); - - if (hashes.isEmpty() && configuration.getProperty(DATABASE_DIR) != null) { - File dir = new File(configuration.getProperty(DATABASE_DIR)); - String networkPostfix = "_" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].toString(); - try { - PersistenceManager persistenceManager = new PersistenceManager<>(dir, new CorePersistenceProtoResolver(null, null), null); - TradeStatistics3Store tradeStatistics3Store = (TradeStatistics3Store) persistenceManager.getPersisted(TradeStatistics3Store.class.getSimpleName() + networkPostfix); - hashes.addAll(tradeStatistics3Store.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList())); - - AccountAgeWitnessStore accountAgeWitnessStore = (AccountAgeWitnessStore) persistenceManager.getPersisted(AccountAgeWitnessStore.class.getSimpleName() + networkPostfix); - hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList())); - } catch (NullPointerException e) { - // in case there is no store file - log.error("There is no storage file where there should be one: {}", dir.getAbsolutePath()); - } - } } @Override @@ -204,8 +133,8 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne if (networkEnvelope instanceof GetDataResponse) { - Statistics result = this.statistics.create(); - VersionsStatistics versions = new VersionsStatistics(); + Statistics result = new MyStatistics(); + Statistics versions = new VersionsStatistics(); GetDataResponse dataResponse = (GetDataResponse) networkEnvelope; final Set dataSet = dataResponse.getDataSet(); diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java index 2ca12716f04..cc06144c520 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java @@ -21,26 +21,27 @@ import bisq.monitor.Reporter; import bisq.core.account.witness.AccountAgeWitnessStore; +import bisq.common.config.BaseCurrencyNetwork; import bisq.core.dao.monitoring.model.StateHash; import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest; import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest; import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest; import bisq.core.dao.monitoring.network.messages.GetStateHashesResponse; import bisq.core.proto.persistable.CorePersistenceProtoResolver; -import bisq.core.trade.statistics.TradeStatistics3Store; +import bisq.core.trade.statistics.TradeStatistics2Store; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.Connection; import bisq.network.p2p.peers.getdata.messages.GetDataResponse; import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest; +import bisq.network.p2p.storage.payload.PersistableNetworkPayload; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStoragePayload; import bisq.common.app.Version; -import bisq.common.config.BaseCurrencyNetwork; -import bisq.common.persistence.PersistenceManager; import bisq.common.proto.network.NetworkEnvelope; import bisq.common.proto.persistable.PersistableEnvelope; +import bisq.common.storage.Storage; import java.net.MalformedURLException; @@ -80,11 +81,8 @@ */ @Slf4j public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase { - private static final String DATABASE_DIR = "run.dbDir"; - Statistics statistics; final Map>> bucketsPerHost = new ConcurrentHashMap<>(); - protected final Set hashes = new TreeSet<>(Arrays::compare); private int daostateheight = 594000; private int proposalheight = daostateheight; private int blindvoteheight = daostateheight; @@ -92,14 +90,7 @@ public class P2PSeedNodeSnapshot extends P2PSeedNodeSnapshotBase { /** * Use a counter to do statistics. */ - private class MyStatistics implements Statistics> { - - private final Map> buckets = new HashMap<>(); - - @Override - public Statistics create() { - return new MyStatistics(); - } + private class MyStatistics extends Statistics> { @Override public synchronized void log(Object message) { @@ -110,43 +101,10 @@ public synchronized void log(Object message) { buckets.putIfAbsent(className, new HashSet<>()); buckets.get(className).add(message.hashCode()); } - - @Override - public Map> values() { - return buckets; - } - - @Override - public synchronized void reset() { - buckets.clear(); - } } public P2PSeedNodeSnapshot(Reporter reporter) { super(reporter); - - statistics = new MyStatistics(); - } - - @Override - public void configure(Properties properties) { - super.configure(properties); - - if (hashes.isEmpty() && configuration.getProperty(DATABASE_DIR) != null) { - File dir = new File(configuration.getProperty(DATABASE_DIR)); - String networkPostfix = "_" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].toString(); - try { - PersistenceManager persistenceManager = new PersistenceManager<>(dir, new CorePersistenceProtoResolver(null, null), null); - TradeStatistics3Store tradeStatistics3Store = (TradeStatistics3Store) persistenceManager.getPersisted(TradeStatistics3Store.class.getSimpleName() + networkPostfix); - hashes.addAll(tradeStatistics3Store.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList())); - - AccountAgeWitnessStore accountAgeWitnessStore = (AccountAgeWitnessStore) persistenceManager.getPersisted(AccountAgeWitnessStore.class.getSimpleName() + networkPostfix); - hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList())); - } catch (NullPointerException e) { - // in case there is no store file - log.error("There is no storage file where there should be one: {}", dir.getAbsolutePath()); - } - } } protected List getRequests() { @@ -190,21 +148,21 @@ void report() { // - calculate diffs messagesPerHost.forEach( - (host, statistics) -> { - statistics.values().forEach((messageType, set) -> { - try { - report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType, - String.valueOf(set.size() - referenceValues.get(messageType).size())); - } catch (MalformedURLException | NullPointerException ignore) { - log.error("we should never have gotten here", ignore); - } - }); + (host, statistics) -> { + statistics.values().forEach((messageType, set) -> { try { - report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost); - } catch (MalformedURLException ignore) { - log.error("we should never got here"); + report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType, + String.valueOf(set.size() - referenceValues.get(messageType).size())); + } catch (MalformedURLException | NullPointerException ignore) { + log.error("we should never have gotten here", ignore); } }); + try { + report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost); + } catch (MalformedURLException ignore) { + log.error("we should never got here"); + } + }); // cleanup for next run bucketsPerHost.forEach((host, statistics) -> statistics.reset()); @@ -233,9 +191,9 @@ void report() { int oldest = (int) nodeAddressTupleMap.values().stream().min(Comparator.comparingLong(Tuple::getHeight)).get().height; // - update queried height - if (type.contains("DaoState")) + if(type.contains("DaoState")) daostateheight = oldest - 20; - else if (type.contains("Proposal")) + else if(type.contains("Proposal")) proposalheight = oldest - 20; else blindvoteheight = oldest - 20; @@ -255,6 +213,7 @@ else if (type.contains("Proposal")) List states = hitcount.entrySet().stream().sorted((o1, o2) -> o2.getValue().compareTo(o1.getValue())).map(byteBufferIntegerEntry -> byteBufferIntegerEntry.getKey()).collect(Collectors.toList()); hitcount.clear(); + nodeAddressTupleMap.forEach((nodeAddress, tuple) -> daoreport.put(type + "." + OnionParser.prettyPrint(nodeAddress) + ".hash", Integer.toString(Arrays.asList(states.toArray()).indexOf(ByteBuffer.wrap(tuple.hash))))); // - report reference head @@ -278,14 +237,7 @@ private class Tuple { } } - private class DaoStatistics implements Statistics { - - Map buckets = new ConcurrentHashMap<>(); - - @Override - public Statistics create() { - return new DaoStatistics(); - } + private class DaoStatistics extends Statistics { @Override public void log(Object message) { @@ -297,16 +249,6 @@ public void log(Object message) { buckets.putIfAbsent(className, new Tuple(last.getHeight(), last.getHash())); } - - @Override - public Map values() { - return buckets; - } - - @Override - public void reset() { - buckets.clear(); - } } private Map> daoData = new ConcurrentHashMap<>(); @@ -317,7 +259,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne if (networkEnvelope instanceof GetDataResponse) { - Statistics result = this.statistics.create(); + Statistics result = new MyStatistics(); GetDataResponse dataResponse = (GetDataResponse) networkEnvelope; final Set dataSet = dataResponse.getDataSet(); diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java index b458cfe6e79..631e0154e06 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java @@ -24,7 +24,10 @@ import bisq.monitor.Reporter; import bisq.monitor.ThreadGate; +import bisq.core.account.witness.AccountAgeWitnessStore; import bisq.core.proto.network.CoreNetworkProtoResolver; +import bisq.core.proto.persistable.CorePersistenceProtoResolver; +import bisq.core.trade.statistics.TradeStatistics2Store; import bisq.network.p2p.CloseConnectionMessage; import bisq.network.p2p.NodeAddress; @@ -33,7 +36,11 @@ import bisq.network.p2p.network.NetworkNode; import bisq.network.p2p.network.TorNetworkNode; +import bisq.common.app.Version; +import bisq.common.config.BaseCurrencyNetwork; import bisq.common.proto.network.NetworkEnvelope; +import bisq.common.proto.persistable.PersistableEnvelope; +import bisq.common.storage.Storage; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -42,10 +49,18 @@ import java.time.Clock; +import java.io.File; + import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -65,30 +80,55 @@ public abstract class P2PSeedNodeSnapshotBase extends Metric implements MessageL private static final String HOSTS = "run.hosts"; private static final String TOR_PROXY_PORT = "run.torProxyPort"; - Statistics statistics; + private static final String DATABASE_DIR = "run.dbDir"; final Map> bucketsPerHost = new ConcurrentHashMap<>(); private final ThreadGate gate = new ThreadGate(); + protected final Set hashes = new TreeSet<>(Arrays::compare); /** * Statistics Interface for use with derived classes. * * @param the value type of the statistics implementation */ - protected interface Statistics { - - Statistics create(); + protected abstract class Statistics { + protected final Map buckets = new HashMap<>(); - void log(Object message); + abstract void log(Object message); - Map values(); + Map values() { + return buckets; + } - void reset(); + void reset() { + buckets.clear(); + } } public P2PSeedNodeSnapshotBase(Reporter reporter) { super(reporter); } + @Override + public void configure(Properties properties) { + super.configure(properties); + + if (hashes.isEmpty() && configuration.getProperty(DATABASE_DIR) != null) { + File dir = new File(configuration.getProperty(DATABASE_DIR)); + String networkPostfix = "_" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].toString(); + try { + Storage storage = new Storage<>(dir, new CorePersistenceProtoResolver(null, null, null, null), null); + TradeStatistics2Store tradeStatistics2Store = (TradeStatistics2Store) storage.initAndGetPersistedWithFileName(TradeStatistics2Store.class.getSimpleName() + networkPostfix, 0); + hashes.addAll(tradeStatistics2Store.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList())); + + AccountAgeWitnessStore accountAgeWitnessStore = (AccountAgeWitnessStore) storage.initAndGetPersistedWithFileName(AccountAgeWitnessStore.class.getSimpleName() + networkPostfix, 0); + hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList())); + } catch (NullPointerException e) { + // in case there is no store file + log.error("There is no storage file where there should be one: {}", dir.getAbsolutePath()); + } + } + } + @Override protected void execute() { // start the network node From ac27ea69ec57fb82da0191b4f7c949657e1a0e94 Mon Sep 17 00:00:00 2001 From: Florian Reimair Date: Thu, 20 Feb 2020 15:24:47 +0100 Subject: [PATCH 03/10] Added offer volume feed metric --- .../bisq/monitor/metric/P2PMarketStats.java | 51 ++++++++++++++++--- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java index ea99f39055a..c4181fd2f38 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java @@ -17,7 +17,6 @@ package bisq.monitor.metric; -import bisq.monitor.OnionParser; import bisq.monitor.Reporter; @@ -52,6 +51,7 @@ @Slf4j public class P2PMarketStats extends P2PSeedNodeSnapshotBase { final Map> versionBucketsPerHost = new ConcurrentHashMap<>(); + final Map> offerVolumeBucketsPerHost = new ConcurrentHashMap<>(); /** * Efficient way to count occurrences. @@ -68,12 +68,27 @@ synchronized void increment() { } } - private class MyStatistics extends Statistics { + /** + * Efficient way to aggregate numbers. + */ + private class Aggregator { + private long value = 0; + + synchronized long value() { + return value; + } + + synchronized void add(long amount) { + value += amount; + } + } + + private class OfferCountStatistics extends Statistics { @Override public synchronized void log(Object message) { - if(message instanceof OfferPayload) { + if (message instanceof OfferPayload) { OfferPayload currentMessage = (OfferPayload) message; // For logging different data types String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); @@ -84,6 +99,19 @@ public synchronized void log(Object message) { } } + private class OfferVolumeStatistics extends Statistics { + @Override + public synchronized void log(Object message) { + if (message instanceof OfferPayload) { + OfferPayload currentMessage = (OfferPayload) message; + String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); + + buckets.putIfAbsent(market, new Aggregator()); + buckets.get(market).add(currentMessage.getAmount()); + } + } + } + private class VersionsStatistics extends Statistics { @Override @@ -117,9 +145,13 @@ protected List getRequests() { @Override protected void report() { Map report = new HashMap<>(); - bucketsPerHost.forEach((host, statistics) -> statistics.values().forEach((market, numberOfOffers) -> report.put(OnionParser.prettyPrint(host) + "." + market, String.valueOf(((Counter) numberOfOffers).value())))); + bucketsPerHost.values().stream().findFirst().ifPresent(nodeAddressStatisticsEntry -> nodeAddressStatisticsEntry.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(((Counter) numberOfOffers).value())))); + reporter.report(report, getName() + ".offerCount"); - reporter.report(report, getName()); + // do offerbook volume statistics + report.clear(); + offerVolumeBucketsPerHost.values().stream().findFirst().ifPresent(aggregatorStatistics -> aggregatorStatistics.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(numberOfOffers.value())))); + reporter.report(report, getName() + ".volume"); // do version statistics report.clear(); @@ -133,7 +165,8 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne if (networkEnvelope instanceof GetDataResponse) { - Statistics result = new MyStatistics(); + Statistics offerCount = new OfferCountStatistics(); + Statistics offerVolume = new OfferVolumeStatistics(); Statistics versions = new VersionsStatistics(); GetDataResponse dataResponse = (GetDataResponse) networkEnvelope; @@ -145,7 +178,8 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne return; } - result.log(protectedStoragePayload); + offerCount.log(protectedStoragePayload); + offerVolume.log(protectedStoragePayload); versions.log(protectedStoragePayload); }); @@ -159,7 +193,8 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne hashes.add(persistableNetworkPayload.getHash()); }); - bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result); + bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerCount); + offerVolumeBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolume); versionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), versions); return true; } From 3674d36a91746f9fcbfd99c8b955d80a14fd8ee3 Mon Sep 17 00:00:00 2001 From: Florian Reimair Date: Fri, 21 Feb 2020 09:18:34 +0100 Subject: [PATCH 04/10] Added offer-volume-distribution --- .../bisq/monitor/metric/P2PMarketStats.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java index c4181fd2f38..4472ababe4c 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java @@ -38,6 +38,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -52,6 +53,7 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase { final Map> versionBucketsPerHost = new ConcurrentHashMap<>(); final Map> offerVolumeBucketsPerHost = new ConcurrentHashMap<>(); + final Map>> offerVolumeDistributionBucketsPerHost = new ConcurrentHashMap<>(); /** * Efficient way to count occurrences. @@ -112,6 +114,20 @@ public synchronized void log(Object message) { } } + private class OfferVolumeDistributionStatistics extends Statistics> { + + @Override + public synchronized void log(Object message) { + if (message instanceof OfferPayload) { + OfferPayload currentMessage = (OfferPayload) message; + String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); + + buckets.putIfAbsent(market, new ArrayList<>()); + buckets.get(market).add(currentMessage.getAmount()); + } + } + } + private class VersionsStatistics extends Statistics { @Override @@ -153,6 +169,23 @@ protected void report() { offerVolumeBucketsPerHost.values().stream().findFirst().ifPresent(aggregatorStatistics -> aggregatorStatistics.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(numberOfOffers.value())))); reporter.report(report, getName() + ".volume"); + // do the offer vs volume histogram + report.clear(); + int numberOfBins = 5; + // - get a data set + offerVolumeDistributionBucketsPerHost.values().stream().findFirst().ifPresent(listStatistics -> listStatistics.values().forEach((market, offers) -> { + // - get biggest offer + Long max = offers.stream().max(Long::compareTo).get(); + + // - create histogram + offers.stream().collect( + Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())). + forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2))); + + report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins)); + })); + reporter.report(report, getName() + ".volume-per-offer-distribution"); + // do version statistics report.clear(); versionBucketsPerHost.values().stream().findAny().get().values().forEach((version, numberOfOccurrences) -> report.put(version, String.valueOf(numberOfOccurrences.value()))); @@ -167,6 +200,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne Statistics offerCount = new OfferCountStatistics(); Statistics offerVolume = new OfferVolumeStatistics(); + Statistics offerVolumeDistribution = new OfferVolumeDistributionStatistics(); Statistics versions = new VersionsStatistics(); GetDataResponse dataResponse = (GetDataResponse) networkEnvelope; @@ -180,6 +214,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne offerCount.log(protectedStoragePayload); offerVolume.log(protectedStoragePayload); + offerVolumeDistribution.log(protectedStoragePayload); versions.log(protectedStoragePayload); }); @@ -195,6 +230,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerCount); offerVolumeBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolume); + offerVolumeDistributionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolumeDistribution); versionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), versions); return true; } From 1f1b7390f0c57fc84a8735b8806f093bd4bc201e Mon Sep 17 00:00:00 2001 From: Florian Reimair Date: Fri, 21 Feb 2020 10:06:55 +0100 Subject: [PATCH 05/10] Added offers-per-trader metric --- .../java/bisq/monitor/StatisticsHelper.java | 6 ++- .../bisq/monitor/metric/P2PMarketStats.java | 40 ++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/monitor/src/main/java/bisq/monitor/StatisticsHelper.java b/monitor/src/main/java/bisq/monitor/StatisticsHelper.java index 90064f22781..cebd9af1304 100644 --- a/monitor/src/main/java/bisq/monitor/StatisticsHelper.java +++ b/monitor/src/main/java/bisq/monitor/StatisticsHelper.java @@ -17,11 +17,13 @@ package bisq.monitor; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.LongSummaryStatistics; import java.util.Map; +import java.util.stream.Collectors; /** * Calculates average, max, min, p25, p50, p75 off of a list of samples and @@ -31,7 +33,9 @@ */ public class StatisticsHelper { - public static Map process(List samples) { + public static Map process(Collection input) { + + List samples = input.stream().collect(Collectors.toList()); // aftermath Collections.sort(samples); diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java index 4472ababe4c..95fac81affe 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java @@ -19,7 +19,6 @@ import bisq.monitor.Reporter; - import bisq.core.offer.OfferPayload; import bisq.network.p2p.NodeAddress; @@ -54,6 +53,7 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase { final Map> versionBucketsPerHost = new ConcurrentHashMap<>(); final Map> offerVolumeBucketsPerHost = new ConcurrentHashMap<>(); final Map>> offerVolumeDistributionBucketsPerHost = new ConcurrentHashMap<>(); + final Map>> offersPerTraderBucketsPerHost = new ConcurrentHashMap<>(); /** * Efficient way to count occurrences. @@ -128,6 +128,21 @@ public synchronized void log(Object message) { } } + private class OffersPerTraderStatistics extends Statistics> { + + @Override + public synchronized void log(Object message) { + if (message instanceof OfferPayload) { + OfferPayload currentMessage = (OfferPayload) message; + String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); + + buckets.putIfAbsent(market, new HashMap<>()); + buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Counter()); + buckets.get(market).get(currentMessage.getOwnerNodeAddress()).increment(); + } + } + } + private class VersionsStatistics extends Statistics { @Override @@ -186,6 +201,26 @@ protected void report() { })); reporter.report(report, getName() + ".volume-per-offer-distribution"); + // do offers per trader + report.clear(); + + // - get a data set + offersPerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> { + List offerPerTrader = stuff.values().stream().map(Counter::value).collect(Collectors.toList()); + + // - get most active trader + double max = offerPerTrader.stream().max(Long::compareTo).get() + 0.01; + + // - create histogram + offerPerTrader.stream().collect( + Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())). + forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2))); + + report.put(market + ".number_of_traders", String.valueOf(stuff.size())); + report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins)); + })); + reporter.report(report, getName() + ".offersPerTrader"); + // do version statistics report.clear(); versionBucketsPerHost.values().stream().findAny().get().values().forEach((version, numberOfOccurrences) -> report.put(version, String.valueOf(numberOfOccurrences.value()))); @@ -201,6 +236,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne Statistics offerCount = new OfferCountStatistics(); Statistics offerVolume = new OfferVolumeStatistics(); Statistics offerVolumeDistribution = new OfferVolumeDistributionStatistics(); + Statistics offersPerTrader = new OffersPerTraderStatistics(); Statistics versions = new VersionsStatistics(); GetDataResponse dataResponse = (GetDataResponse) networkEnvelope; @@ -215,6 +251,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne offerCount.log(protectedStoragePayload); offerVolume.log(protectedStoragePayload); offerVolumeDistribution.log(protectedStoragePayload); + offersPerTrader.log(protectedStoragePayload); versions.log(protectedStoragePayload); }); @@ -231,6 +268,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerCount); offerVolumeBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolume); offerVolumeDistributionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolumeDistribution); + offersPerTraderBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offersPerTrader); versionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), versions); return true; } From 292f057f226cb427000e99139de62e1a31f10176 Mon Sep 17 00:00:00 2001 From: Florian Reimair Date: Fri, 21 Feb 2020 11:25:46 +0100 Subject: [PATCH 06/10] Added volume-per-trader metric --- .../bisq/monitor/metric/P2PMarketStats.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java index 95fac81affe..9391378cd6e 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java @@ -54,6 +54,7 @@ public class P2PMarketStats extends P2PSeedNodeSnapshotBase { final Map> offerVolumeBucketsPerHost = new ConcurrentHashMap<>(); final Map>> offerVolumeDistributionBucketsPerHost = new ConcurrentHashMap<>(); final Map>> offersPerTraderBucketsPerHost = new ConcurrentHashMap<>(); + final Map>> volumePerTraderBucketsPerHost = new ConcurrentHashMap<>(); /** * Efficient way to count occurrences. @@ -143,6 +144,21 @@ public synchronized void log(Object message) { } } + private class VolumePerTraderStatistics extends Statistics> { + + @Override + public synchronized void log(Object message) { + if (message instanceof OfferPayload) { + OfferPayload currentMessage = (OfferPayload) message; + String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); + + buckets.putIfAbsent(market, new HashMap<>()); + buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator()); + buckets.get(market).get(currentMessage.getOwnerNodeAddress()).add(currentMessage.getAmount()); + } + } + } + private class VersionsStatistics extends Statistics { @Override @@ -221,6 +237,25 @@ protected void report() { })); reporter.report(report, getName() + ".offersPerTrader"); + // do volume per trader + report.clear(); + // - get a data set + volumePerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> { + List volumePerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList()); + + // - get most active trader + double max = volumePerTrader.stream().max(Long::compareTo).get() + 0.01; + + // - create histogram + volumePerTrader.stream().collect( + Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())). + forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2))); + + report.put(market + ".number_of_traders", String.valueOf(stuff.size())); + report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins)); + })); + reporter.report(report, getName() + ".volumePerTrader"); + // do version statistics report.clear(); versionBucketsPerHost.values().stream().findAny().get().values().forEach((version, numberOfOccurrences) -> report.put(version, String.valueOf(numberOfOccurrences.value()))); @@ -237,6 +272,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne Statistics offerVolume = new OfferVolumeStatistics(); Statistics offerVolumeDistribution = new OfferVolumeDistributionStatistics(); Statistics offersPerTrader = new OffersPerTraderStatistics(); + Statistics volumePerTrader = new VolumePerTraderStatistics(); Statistics versions = new VersionsStatistics(); GetDataResponse dataResponse = (GetDataResponse) networkEnvelope; @@ -252,6 +288,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne offerVolume.log(protectedStoragePayload); offerVolumeDistribution.log(protectedStoragePayload); offersPerTrader.log(protectedStoragePayload); + volumePerTrader.log(protectedStoragePayload); versions.log(protectedStoragePayload); }); @@ -269,6 +306,7 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne offerVolumeBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolume); offerVolumeDistributionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolumeDistribution); offersPerTraderBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offersPerTrader); + volumePerTraderBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), volumePerTrader); versionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), versions); return true; } From 6c1d7509a106ab9edee6f163a544cc0f61668004 Mon Sep 17 00:00:00 2001 From: Florian Reimair Date: Fri, 21 Feb 2020 13:08:38 +0100 Subject: [PATCH 07/10] Cleanup and refactoring --- .../bisq/monitor/metric/P2PMarketStats.java | 153 +++++++----------- 1 file changed, 59 insertions(+), 94 deletions(-) diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java index 9391378cd6e..846764355f7 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java @@ -50,16 +50,16 @@ */ @Slf4j public class P2PMarketStats extends P2PSeedNodeSnapshotBase { - final Map> versionBucketsPerHost = new ConcurrentHashMap<>(); + final Map> versionBucketsPerHost = new ConcurrentHashMap<>(); final Map> offerVolumeBucketsPerHost = new ConcurrentHashMap<>(); final Map>> offerVolumeDistributionBucketsPerHost = new ConcurrentHashMap<>(); - final Map>> offersPerTraderBucketsPerHost = new ConcurrentHashMap<>(); + final Map>> offersPerTraderBucketsPerHost = new ConcurrentHashMap<>(); final Map>> volumePerTraderBucketsPerHost = new ConcurrentHashMap<>(); /** - * Efficient way to count occurrences. + * Efficient way to aggregate numbers. */ - private class Counter { + private class Aggregator { private long value = 0; synchronized long value() { @@ -69,97 +69,75 @@ synchronized long value() { synchronized void increment() { value++; } - } - - /** - * Efficient way to aggregate numbers. - */ - private class Aggregator { - private long value = 0; - - synchronized long value() { - return value; - } synchronized void add(long amount) { value += amount; } } - private class OfferCountStatistics extends Statistics { - + private abstract class OfferStatistics extends Statistics { @Override public synchronized void log(Object message) { - if (message instanceof OfferPayload) { OfferPayload currentMessage = (OfferPayload) message; // For logging different data types String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); - buckets.putIfAbsent(market, new Counter()); - buckets.get(market).increment(); + process(market, currentMessage); } } + + abstract void process(String market, OfferPayload currentMessage); } - private class OfferVolumeStatistics extends Statistics { - @Override - public synchronized void log(Object message) { - if (message instanceof OfferPayload) { - OfferPayload currentMessage = (OfferPayload) message; - String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); + private class OfferCountStatistics extends OfferStatistics { - buckets.putIfAbsent(market, new Aggregator()); - buckets.get(market).add(currentMessage.getAmount()); - } + @Override + void process(String market, OfferPayload currentMessage) { + buckets.putIfAbsent(market, new Aggregator()); + buckets.get(market).increment(); } } - private class OfferVolumeDistributionStatistics extends Statistics> { + private class OfferVolumeStatistics extends OfferStatistics { @Override - public synchronized void log(Object message) { - if (message instanceof OfferPayload) { - OfferPayload currentMessage = (OfferPayload) message; - String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); - - buckets.putIfAbsent(market, new ArrayList<>()); - buckets.get(market).add(currentMessage.getAmount()); - } + void process(String market, OfferPayload currentMessage) { + buckets.putIfAbsent(market, new Aggregator()); + buckets.get(market).add(currentMessage.getAmount()); } } - private class OffersPerTraderStatistics extends Statistics> { + private class OfferVolumeDistributionStatistics extends OfferStatistics> { @Override - public synchronized void log(Object message) { - if (message instanceof OfferPayload) { - OfferPayload currentMessage = (OfferPayload) message; - String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); - - buckets.putIfAbsent(market, new HashMap<>()); - buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Counter()); - buckets.get(market).get(currentMessage.getOwnerNodeAddress()).increment(); - } + void process(String market, OfferPayload currentMessage) { + buckets.putIfAbsent(market, new ArrayList<>()); + buckets.get(market).add(currentMessage.getAmount()); } } - private class VolumePerTraderStatistics extends Statistics> { + private class OffersPerTraderStatistics extends OfferStatistics> { @Override - public synchronized void log(Object message) { - if (message instanceof OfferPayload) { - OfferPayload currentMessage = (OfferPayload) message; - String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); + void process(String market, OfferPayload currentMessage) { + buckets.putIfAbsent(market, new HashMap<>()); + buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator()); + buckets.get(market).get(currentMessage.getOwnerNodeAddress()).increment(); + } + } - buckets.putIfAbsent(market, new HashMap<>()); - buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator()); - buckets.get(market).get(currentMessage.getOwnerNodeAddress()).add(currentMessage.getAmount()); - } + private class VolumePerTraderStatistics extends OfferStatistics> { + + @Override + void process(String market, OfferPayload currentMessage) { + buckets.putIfAbsent(market, new HashMap<>()); + buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator()); + buckets.get(market).get(currentMessage.getOwnerNodeAddress()).add(currentMessage.getAmount()); } } - private class VersionsStatistics extends Statistics { + private class VersionsStatistics extends Statistics { @Override public void log(Object message) { @@ -169,7 +147,7 @@ public void log(Object message) { String version = "v" + currentMessage.getId().substring(currentMessage.getId().lastIndexOf("-") + 1); - buckets.putIfAbsent(version, new Counter()); + buckets.putIfAbsent(version, new Aggregator()); buckets.get(version).increment(); } } @@ -189,10 +167,25 @@ protected List getRequests() { return result; } + protected void createHistogram(List input, String market, Map report) { + int numberOfBins = 5; + + // - get biggest offer + double max = input.stream().max(Long::compareTo).get() * 1.01; + + // - create histogram + input.stream().collect( + Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())). + forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2))); + + report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins)); + report.put(market + ".max", String.valueOf((int) max)); + } + @Override protected void report() { Map report = new HashMap<>(); - bucketsPerHost.values().stream().findFirst().ifPresent(nodeAddressStatisticsEntry -> nodeAddressStatisticsEntry.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(((Counter) numberOfOffers).value())))); + bucketsPerHost.values().stream().findFirst().ifPresent(nodeAddressStatisticsEntry -> nodeAddressStatisticsEntry.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(((Aggregator) numberOfOffers).value())))); reporter.report(report, getName() + ".offerCount"); // do offerbook volume statistics @@ -202,40 +195,21 @@ protected void report() { // do the offer vs volume histogram report.clear(); - int numberOfBins = 5; // - get a data set offerVolumeDistributionBucketsPerHost.values().stream().findFirst().ifPresent(listStatistics -> listStatistics.values().forEach((market, offers) -> { - // - get biggest offer - Long max = offers.stream().max(Long::compareTo).get(); - - // - create histogram - offers.stream().collect( - Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())). - forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2))); - - report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins)); + createHistogram(offers, market, report); })); reporter.report(report, getName() + ".volume-per-offer-distribution"); // do offers per trader report.clear(); - // - get a data set offersPerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> { - List offerPerTrader = stuff.values().stream().map(Counter::value).collect(Collectors.toList()); - - // - get most active trader - double max = offerPerTrader.stream().max(Long::compareTo).get() + 0.01; + List offerPerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList()); - // - create histogram - offerPerTrader.stream().collect( - Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())). - forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2))); - - report.put(market + ".number_of_traders", String.valueOf(stuff.size())); - report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins)); + createHistogram(offerPerTrader, market, report); })); - reporter.report(report, getName() + ".offersPerTrader"); + reporter.report(report, getName() + ".traders_by_number_of_offers"); // do volume per trader report.clear(); @@ -243,18 +217,9 @@ protected void report() { volumePerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> { List volumePerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList()); - // - get most active trader - double max = volumePerTrader.stream().max(Long::compareTo).get() + 0.01; - - // - create histogram - volumePerTrader.stream().collect( - Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())). - forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2))); - - report.put(market + ".number_of_traders", String.valueOf(stuff.size())); - report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins)); + createHistogram(volumePerTrader, market, report); })); - reporter.report(report, getName() + ".volumePerTrader"); + reporter.report(report, getName() + ".traders_by_volume"); // do version statistics report.clear(); From f3e0696078612b04bfb5eaf97a759df272d0d5e3 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 22 Oct 2020 11:38:34 -0500 Subject: [PATCH 08/10] Reorg imports, reformat code according to your style guide --- .../main/java/bisq/monitor/AvailableTor.java | 8 ++-- .../src/main/java/bisq/monitor/Metric.java | 8 ++-- .../main/java/bisq/monitor/OnionParser.java | 4 +- .../main/java/bisq/monitor/ThreadGate.java | 2 +- .../bisq/monitor/metric/P2PNetworkLoad.java | 4 +- .../bisq/monitor/metric/P2PRoundTripTime.java | 2 +- .../monitor/metric/P2PSeedNodeSnapshot.java | 40 +++++++------------ .../monitor/reporter/ConsoleReporter.java | 3 +- .../monitor/reporter/GraphiteReporter.java | 5 +-- monitor/src/main/resources/logback.xml | 22 +++++----- .../monitor/MonitorInfrastructureTests.java | 2 +- .../bisq/monitor/P2PNetworkLoadTests.java | 9 +++-- .../bisq/monitor/P2PRoundTripTimeTests.java | 9 +++-- .../bisq/monitor/PriceNodeStatsTests.java | 1 - 14 files changed, 54 insertions(+), 65 deletions(-) diff --git a/monitor/src/main/java/bisq/monitor/AvailableTor.java b/monitor/src/main/java/bisq/monitor/AvailableTor.java index 650425fca52..0c79ec48e39 100644 --- a/monitor/src/main/java/bisq/monitor/AvailableTor.java +++ b/monitor/src/main/java/bisq/monitor/AvailableTor.java @@ -17,13 +17,15 @@ package bisq.monitor; -import java.io.File; -import org.berndpruenster.netlayer.tor.Tor; import bisq.network.p2p.network.TorMode; +import org.berndpruenster.netlayer.tor.Tor; + +import java.io.File; + /** * This class uses an already defined Tor via Tor.getDefault() - * + * * @author Florian Reimair * */ diff --git a/monitor/src/main/java/bisq/monitor/Metric.java b/monitor/src/main/java/bisq/monitor/Metric.java index df7186f2a0f..a54a99e9620 100644 --- a/monitor/src/main/java/bisq/monitor/Metric.java +++ b/monitor/src/main/java/bisq/monitor/Metric.java @@ -122,11 +122,11 @@ public void run() { // execute all the things synchronized (this) { - log.info("{} started", getName()); - execute(); - log.info("{} done", getName()); + log.info("{} started", getName()); + execute(); + log.info("{} done", getName()); } - } catch(Throwable e) { + } catch (Throwable e) { log.error("A metric misbehaved!", e); } } diff --git a/monitor/src/main/java/bisq/monitor/OnionParser.java b/monitor/src/main/java/bisq/monitor/OnionParser.java index e5a984446fd..58f186f068a 100644 --- a/monitor/src/main/java/bisq/monitor/OnionParser.java +++ b/monitor/src/main/java/bisq/monitor/OnionParser.java @@ -17,11 +17,11 @@ package bisq.monitor; +import bisq.network.p2p.NodeAddress; + import java.net.MalformedURLException; import java.net.URL; -import bisq.network.p2p.NodeAddress; - /** * Helper for parsing and pretty printing onion addresses. * diff --git a/monitor/src/main/java/bisq/monitor/ThreadGate.java b/monitor/src/main/java/bisq/monitor/ThreadGate.java index 9055ddf8760..085ee395ae1 100644 --- a/monitor/src/main/java/bisq/monitor/ThreadGate.java +++ b/monitor/src/main/java/bisq/monitor/ThreadGate.java @@ -41,7 +41,7 @@ public void engage() { /** * Make everyone wait until the gate is open again. - * + * * @param numberOfLocks how often the gate has to be unlocked until the gate * opens. */ diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PNetworkLoad.java b/monitor/src/main/java/bisq/monitor/metric/P2PNetworkLoad.java index 4a231d8da61..ae41cab8dea 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PNetworkLoad.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PNetworkLoad.java @@ -156,7 +156,7 @@ protected void execute() { // report Map report = new HashMap<>(); - if(lastRun != 0 && System.currentTimeMillis() - lastRun != 0) { + if (lastRun != 0 && System.currentTimeMillis() - lastRun != 0) { // - normalize to data/minute double perMinuteFactor = 60000.0 / (System.currentTimeMillis() - lastRun); @@ -219,7 +219,7 @@ synchronized void increment() { public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { if (networkEnvelope instanceof BroadcastMessage) { try { - if(history.get(networkEnvelope.hashCode()) == null) { + if (history.get(networkEnvelope.hashCode()) == null) { history.put(networkEnvelope.hashCode(), null); buckets.get(networkEnvelope.getClass().getSimpleName()).increment(); } diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java b/monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java index a0c1fc0a96a..841c8243005 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PRoundTripTime.java @@ -57,7 +57,7 @@ private class Statistics { public synchronized void log(Object message) { Pong pong = (Pong) message; Long start = sentAt.get(pong.getRequestNonce()); - if(start != null) + if (start != null) samples.add(System.currentTimeMillis() - start); } diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java index cc06144c520..13007593d24 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java @@ -20,35 +20,25 @@ import bisq.monitor.OnionParser; import bisq.monitor.Reporter; -import bisq.core.account.witness.AccountAgeWitnessStore; -import bisq.common.config.BaseCurrencyNetwork; import bisq.core.dao.monitoring.model.StateHash; import bisq.core.dao.monitoring.network.messages.GetBlindVoteStateHashesRequest; import bisq.core.dao.monitoring.network.messages.GetDaoStateHashesRequest; import bisq.core.dao.monitoring.network.messages.GetProposalStateHashesRequest; import bisq.core.dao.monitoring.network.messages.GetStateHashesResponse; -import bisq.core.proto.persistable.CorePersistenceProtoResolver; -import bisq.core.trade.statistics.TradeStatistics2Store; import bisq.network.p2p.NodeAddress; import bisq.network.p2p.network.Connection; import bisq.network.p2p.peers.getdata.messages.GetDataResponse; import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest; -import bisq.network.p2p.storage.payload.PersistableNetworkPayload; import bisq.network.p2p.storage.payload.ProtectedStorageEntry; import bisq.network.p2p.storage.payload.ProtectedStoragePayload; -import bisq.common.app.Version; import bisq.common.proto.network.NetworkEnvelope; -import bisq.common.proto.persistable.PersistableEnvelope; -import bisq.common.storage.Storage; import java.net.MalformedURLException; import java.nio.ByteBuffer; -import java.io.File; - import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -56,10 +46,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Random; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -148,21 +136,21 @@ void report() { // - calculate diffs messagesPerHost.forEach( - (host, statistics) -> { - statistics.values().forEach((messageType, set) -> { + (host, statistics) -> { + statistics.values().forEach((messageType, set) -> { + try { + report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType, + String.valueOf(set.size() - referenceValues.get(messageType).size())); + } catch (MalformedURLException | NullPointerException ignore) { + log.error("we should never have gotten here", ignore); + } + }); try { - report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType, - String.valueOf(set.size() - referenceValues.get(messageType).size())); - } catch (MalformedURLException | NullPointerException ignore) { - log.error("we should never have gotten here", ignore); + report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost); + } catch (MalformedURLException ignore) { + log.error("we should never got here"); } }); - try { - report.put(OnionParser.prettyPrint(host) + ".referenceHost", referenceHost); - } catch (MalformedURLException ignore) { - log.error("we should never got here"); - } - }); // cleanup for next run bucketsPerHost.forEach((host, statistics) -> statistics.reset()); @@ -191,9 +179,9 @@ void report() { int oldest = (int) nodeAddressTupleMap.values().stream().min(Comparator.comparingLong(Tuple::getHeight)).get().height; // - update queried height - if(type.contains("DaoState")) + if (type.contains("DaoState")) daostateheight = oldest - 20; - else if(type.contains("Proposal")) + else if (type.contains("Proposal")) proposalheight = oldest - 20; else blindvoteheight = oldest - 20; diff --git a/monitor/src/main/java/bisq/monitor/reporter/ConsoleReporter.java b/monitor/src/main/java/bisq/monitor/reporter/ConsoleReporter.java index e6df6fa46e7..f34dc2d5679 100644 --- a/monitor/src/main/java/bisq/monitor/reporter/ConsoleReporter.java +++ b/monitor/src/main/java/bisq/monitor/reporter/ConsoleReporter.java @@ -19,9 +19,8 @@ import bisq.monitor.Reporter; -import bisq.common.config.BaseCurrencyNetwork; - import bisq.common.app.Version; +import bisq.common.config.BaseCurrencyNetwork; import java.util.HashMap; import java.util.Map; diff --git a/monitor/src/main/java/bisq/monitor/reporter/GraphiteReporter.java b/monitor/src/main/java/bisq/monitor/reporter/GraphiteReporter.java index 406704bc63e..caf20103ddd 100644 --- a/monitor/src/main/java/bisq/monitor/reporter/GraphiteReporter.java +++ b/monitor/src/main/java/bisq/monitor/reporter/GraphiteReporter.java @@ -20,11 +20,10 @@ import bisq.monitor.OnionParser; import bisq.monitor.Reporter; -import bisq.common.config.BaseCurrencyNetwork; - import bisq.network.p2p.NodeAddress; import bisq.common.app.Version; +import bisq.common.config.BaseCurrencyNetwork; import org.berndpruenster.netlayer.tor.TorSocket; @@ -81,7 +80,7 @@ public void report(String key, String value, String timeInMilliseconds, String p String report = "bisq" + (Version.getBaseCurrencyNetwork() != 0 ? "-" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].getNetwork() : "") + (prefix.isEmpty() ? "" : "." + prefix) + (key.isEmpty() ? "" : "." + key) - + " " + value + " " + Long.valueOf(timeInMilliseconds)/1000 + "\n"; + + " " + value + " " + Long.valueOf(timeInMilliseconds) / 1000 + "\n"; try { NodeAddress nodeAddress = OnionParser.getNodeAddress(configuration.getProperty("serviceUrl")); diff --git a/monitor/src/main/resources/logback.xml b/monitor/src/main/resources/logback.xml index 65f307ab1fb..28831d6fc3b 100644 --- a/monitor/src/main/resources/logback.xml +++ b/monitor/src/main/resources/logback.xml @@ -1,12 +1,12 @@ - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - \ No newline at end of file + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + diff --git a/monitor/src/test/java/bisq/monitor/MonitorInfrastructureTests.java b/monitor/src/test/java/bisq/monitor/MonitorInfrastructureTests.java index 17d0837a5b4..5e1fc248308 100644 --- a/monitor/src/test/java/bisq/monitor/MonitorInfrastructureTests.java +++ b/monitor/src/test/java/bisq/monitor/MonitorInfrastructureTests.java @@ -58,7 +58,7 @@ protected void execute() { } @ParameterizedTest - @ValueSource(strings = { "empty", "no interval", "typo" }) + @ValueSource(strings = {"empty", "no interval", "typo"}) public void basicConfigurationError(String configuration) { HashMap lut = new HashMap<>(); lut.put("empty", new Properties()); diff --git a/monitor/src/test/java/bisq/monitor/P2PNetworkLoadTests.java b/monitor/src/test/java/bisq/monitor/P2PNetworkLoadTests.java index 2011e69b0e8..25a11f976bc 100644 --- a/monitor/src/test/java/bisq/monitor/P2PNetworkLoadTests.java +++ b/monitor/src/test/java/bisq/monitor/P2PNetworkLoadTests.java @@ -20,20 +20,21 @@ import bisq.monitor.metric.P2PNetworkLoad; import bisq.monitor.reporter.ConsoleReporter; -import static com.google.common.base.Preconditions.checkNotNull; +import org.berndpruenster.netlayer.tor.NativeTor; +import org.berndpruenster.netlayer.tor.Tor; +import org.berndpruenster.netlayer.tor.TorCtlException; import java.util.Map; import java.util.Properties; -import org.berndpruenster.netlayer.tor.NativeTor; -import org.berndpruenster.netlayer.tor.Tor; -import org.berndpruenster.netlayer.tor.TorCtlException; import org.junit.Assert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import static com.google.common.base.Preconditions.checkNotNull; + /** * Test the round trip time metric against the hidden service of tor project.org. * diff --git a/monitor/src/test/java/bisq/monitor/P2PRoundTripTimeTests.java b/monitor/src/test/java/bisq/monitor/P2PRoundTripTimeTests.java index bb976593276..a10dde58095 100644 --- a/monitor/src/test/java/bisq/monitor/P2PRoundTripTimeTests.java +++ b/monitor/src/test/java/bisq/monitor/P2PRoundTripTimeTests.java @@ -20,14 +20,13 @@ import bisq.monitor.metric.P2PRoundTripTime; import bisq.monitor.reporter.ConsoleReporter; -import static com.google.common.base.Preconditions.checkNotNull; +import org.berndpruenster.netlayer.tor.NativeTor; +import org.berndpruenster.netlayer.tor.Tor; +import org.berndpruenster.netlayer.tor.TorCtlException; import java.util.Map; import java.util.Properties; -import org.berndpruenster.netlayer.tor.NativeTor; -import org.berndpruenster.netlayer.tor.Tor; -import org.berndpruenster.netlayer.tor.TorCtlException; import org.junit.Assert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -35,6 +34,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static com.google.common.base.Preconditions.checkNotNull; + /** * Test the round trip time metric against the hidden service of tor project.org. * diff --git a/monitor/src/test/java/bisq/monitor/PriceNodeStatsTests.java b/monitor/src/test/java/bisq/monitor/PriceNodeStatsTests.java index 1024f5c9a0f..b424b9878df 100644 --- a/monitor/src/test/java/bisq/monitor/PriceNodeStatsTests.java +++ b/monitor/src/test/java/bisq/monitor/PriceNodeStatsTests.java @@ -25,7 +25,6 @@ import java.io.File; -import java.util.HashMap; import java.util.Map; import java.util.Properties; From 57064a091bf889d2d8d56ad53fb74bf087707450 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 22 Oct 2020 11:40:08 -0500 Subject: [PATCH 09/10] Remove change in Connection (issue with config when not using guice is solved in another PR which we will add to that PR in next commits) --- p2p/src/main/java/bisq/network/p2p/network/Connection.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index 70463aceeed..c6d3277ee35 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -186,10 +186,6 @@ public static int getPermittedMessageSize() { addMessageListener(messageListener); - if (config == null) { - config = new Config(); - } - this.networkProtoResolver = networkProtoResolver; init(peersNodeAddress); } From 9a018ccd477ba9c5a420e7ec5ae244115cf144f8 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Thu, 22 Oct 2020 14:49:16 -0500 Subject: [PATCH 10/10] Adopt to new PersistenceManager --- .../witness/AccountAgeWitnessStore.java | 2 +- .../statistics/TradeStatistics3Store.java | 2 +- .../metric/P2PSeedNodeSnapshotBase.java | 91 +++++++++++-------- 3 files changed, 56 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/bisq/core/account/witness/AccountAgeWitnessStore.java b/core/src/main/java/bisq/core/account/witness/AccountAgeWitnessStore.java index 28661102ded..349d18e80dd 100644 --- a/core/src/main/java/bisq/core/account/witness/AccountAgeWitnessStore.java +++ b/core/src/main/java/bisq/core/account/witness/AccountAgeWitnessStore.java @@ -35,7 +35,7 @@ @Slf4j public class AccountAgeWitnessStore extends PersistableNetworkPayloadStore { - AccountAgeWitnessStore() { + public AccountAgeWitnessStore() { } diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics3Store.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics3Store.java index 7c79778d4e1..53b7e3281c4 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics3Store.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics3Store.java @@ -35,7 +35,7 @@ @Slf4j public class TradeStatistics3Store extends PersistableNetworkPayloadStore { - TradeStatistics3Store() { + public TradeStatistics3Store() { } diff --git a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java index 631e0154e06..0e7384b8059 100644 --- a/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java +++ b/monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshotBase.java @@ -27,7 +27,7 @@ import bisq.core.account.witness.AccountAgeWitnessStore; import bisq.core.proto.network.CoreNetworkProtoResolver; import bisq.core.proto.persistable.CorePersistenceProtoResolver; -import bisq.core.trade.statistics.TradeStatistics2Store; +import bisq.core.trade.statistics.TradeStatistics3Store; import bisq.network.p2p.CloseConnectionMessage; import bisq.network.p2p.NodeAddress; @@ -38,9 +38,8 @@ import bisq.common.app.Version; import bisq.common.config.BaseCurrencyNetwork; +import bisq.common.persistence.PersistenceManager; import bisq.common.proto.network.NetworkEnvelope; -import bisq.common.proto.persistable.PersistableEnvelope; -import bisq.common.storage.Storage; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -90,7 +89,7 @@ public abstract class P2PSeedNodeSnapshotBase extends Metric implements MessageL * * @param the value type of the statistics implementation */ - protected abstract class Statistics { + protected abstract static class Statistics { protected final Map buckets = new HashMap<>(); abstract void log(Object message); @@ -116,12 +115,29 @@ public void configure(Properties properties) { File dir = new File(configuration.getProperty(DATABASE_DIR)); String networkPostfix = "_" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].toString(); try { - Storage storage = new Storage<>(dir, new CorePersistenceProtoResolver(null, null, null, null), null); - TradeStatistics2Store tradeStatistics2Store = (TradeStatistics2Store) storage.initAndGetPersistedWithFileName(TradeStatistics2Store.class.getSimpleName() + networkPostfix, 0); - hashes.addAll(tradeStatistics2Store.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList())); - - AccountAgeWitnessStore accountAgeWitnessStore = (AccountAgeWitnessStore) storage.initAndGetPersistedWithFileName(AccountAgeWitnessStore.class.getSimpleName() + networkPostfix, 0); - hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList())); + CorePersistenceProtoResolver persistenceProtoResolver = new CorePersistenceProtoResolver(null, null); + + TradeStatistics3Store tradeStatistics3Store = new TradeStatistics3Store(); + PersistenceManager tradeStatistics3PersistenceManager = new PersistenceManager<>(dir, + persistenceProtoResolver, null); + tradeStatistics3PersistenceManager.initialize(tradeStatistics3Store, PersistenceManager.Source.NETWORK); + TradeStatistics3Store persistedTradeStatistics3Store = tradeStatistics3PersistenceManager.getPersisted(); + if (persistedTradeStatistics3Store != null) { + tradeStatistics3Store.getMap().putAll(persistedTradeStatistics3Store.getMap()); + } + hashes.addAll(tradeStatistics3Store.getMap().keySet().stream() + .map(byteArray -> byteArray.bytes).collect(Collectors.toSet())); + + AccountAgeWitnessStore accountAgeWitnessStore = new AccountAgeWitnessStore(); + PersistenceManager accountAgeWitnessPersistenceManager = new PersistenceManager<>(dir, + persistenceProtoResolver, null); + accountAgeWitnessPersistenceManager.initialize(accountAgeWitnessStore, PersistenceManager.Source.NETWORK); + AccountAgeWitnessStore persistedAccountAgeWitnessStore = accountAgeWitnessPersistenceManager.getPersisted(); + if (persistedAccountAgeWitnessStore != null) { + accountAgeWitnessStore.getMap().putAll(persistedAccountAgeWitnessStore.getMap()); + } + hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream() + .map(byteArray -> byteArray.bytes).collect(Collectors.toSet())); } catch (NullPointerException e) { // in case there is no store file log.error("There is no storage file where there should be one: {}", dir.getAbsolutePath()); @@ -156,32 +172,32 @@ protected void send(NetworkNode networkNode, NetworkEnvelope message) { for (String current : configuration.getProperty(HOSTS, "").split(",")) { threadList.add(new Thread(() -> { - try { - // parse Url - NodeAddress target = OnionParser.getNodeAddress(current); - - // do the data request - aboutToSend(message); - SettableFuture future = networkNode.sendMessage(target, message); - - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(Connection connection) { - connection.addMessageListener(P2PSeedNodeSnapshotBase.this); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - gate.proceed(); - log.error( - "Sending {} failed. That is expected if the peer is offline.\n\tException={}", message.getClass().getSimpleName(), throwable.getMessage()); - } - }, MoreExecutors.directExecutor()); - - } catch (Exception e) { - gate.proceed(); // release the gate on error - e.printStackTrace(); - } + try { + // parse Url + NodeAddress target = OnionParser.getNodeAddress(current); + + // do the data request + aboutToSend(message); + SettableFuture future = networkNode.sendMessage(target, message); + + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(Connection connection) { + connection.addMessageListener(P2PSeedNodeSnapshotBase.this); + } + + @Override + public void onFailure(@NotNull Throwable throwable) { + gate.proceed(); + log.error( + "Sending {} failed. That is expected if the peer is offline.\n\tException={}", message.getClass().getSimpleName(), throwable.getMessage()); + } + }, MoreExecutors.directExecutor()); + + } catch (Exception e) { + gate.proceed(); // release the gate on error + e.printStackTrace(); + } }, current)); } @@ -195,7 +211,8 @@ public void onFailure(@NotNull Throwable throwable) { gate.await(); } - protected void aboutToSend(NetworkEnvelope message) { }; + protected void aboutToSend(NetworkEnvelope message) { + } /** * Report all the stuff. Uses the configured reporter directly.