Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update monitor with last version from freimairs repo #4685

8 changes: 5 additions & 3 deletions monitor/src/main/java/bisq/monitor/AvailableTor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package bisq.monitor;

import java.io.File;
import org.berndpruenster.netlayer.tor.Tor;
import bisq.network.p2p.network.TorMode;

import org.berndpruenster.netlayer.tor.Tor;

import java.io.File;

/**
* This class uses an already defined Tor via <code>Tor.getDefault()</code>
*
*
* @author Florian Reimair
*
*/
Expand Down
8 changes: 4 additions & 4 deletions monitor/src/main/java/bisq/monitor/Metric.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ public void run() {

// execute all the things
synchronized (this) {
log.info("{} started", getName());
execute();
log.info("{} done", getName());
log.info("{} started", getName());
execute();
log.info("{} done", getName());
}
} catch(Throwable e) {
} catch (Throwable e) {
log.error("A metric misbehaved!", e);
}
}
Expand Down
4 changes: 2 additions & 2 deletions monitor/src/main/java/bisq/monitor/OnionParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package bisq.monitor;

import bisq.network.p2p.NodeAddress;

import java.net.MalformedURLException;
import java.net.URL;

import bisq.network.p2p.NodeAddress;

/**
* Helper for parsing and pretty printing onion addresses.
*
Expand Down
6 changes: 5 additions & 1 deletion monitor/src/main/java/bisq/monitor/StatisticsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package bisq.monitor;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Calculates average, max, min, p25, p50, p75 off of a list of samples and
Expand All @@ -31,7 +33,9 @@
*/
public class StatisticsHelper {

public static Map<String, String> process(List<Long> samples) {
public static Map<String, String> process(Collection<Long> input) {

List<Long> samples = input.stream().collect(Collectors.toList());

// aftermath
Collections.sort(samples);
Expand Down
2 changes: 1 addition & 1 deletion monitor/src/main/java/bisq/monitor/ThreadGate.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void engage() {

/**
* Make everyone wait until the gate is open again.
*
*
* @param numberOfLocks how often the gate has to be unlocked until the gate
* opens.
*/
Expand Down
199 changes: 120 additions & 79 deletions monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@

package bisq.monitor.metric;

import bisq.monitor.OnionParser;
import bisq.monitor.Reporter;

import bisq.core.account.witness.AccountAgeWitnessStore;
import bisq.core.offer.OfferPayload;
import bisq.core.proto.persistable.CorePersistenceProtoResolver;
import bisq.core.trade.statistics.TradeStatistics3Store;

import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
Expand All @@ -32,23 +28,14 @@
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;

import bisq.common.app.Version;
import bisq.common.config.BaseCurrencyNetwork;
import bisq.common.persistence.PersistenceManager;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistableEnvelope;

import java.io.File;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -63,16 +50,16 @@
*/
@Slf4j
public class P2PMarketStats extends P2PSeedNodeSnapshotBase {
private static final String DATABASE_DIR = "run.dbDir";

private final Set<byte[]> hashes = new TreeSet<>(Arrays::compare);

final Map<NodeAddress, Statistics<Counter>> versionBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Aggregator>> versionBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Aggregator>> offerVolumeBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<List<Long>>> offerVolumeDistributionBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Map<NodeAddress, Aggregator>>> offersPerTraderBucketsPerHost = new ConcurrentHashMap<>();
final Map<NodeAddress, Statistics<Map<NodeAddress, Aggregator>>> volumePerTraderBucketsPerHost = new ConcurrentHashMap<>();

/**
* Efficient way to count occurrences.
* Efficient way to aggregate numbers.
*/
private class Counter {
private class Aggregator {
private long value = 0;

synchronized long value() {
Expand All @@ -82,47 +69,75 @@ synchronized long value() {
synchronized void increment() {
value++;
}
}

private class MyStatistics implements Statistics<Counter> {
private final Map<String, Counter> buckets = new HashMap<>();

@Override
public Statistics create() {
return new MyStatistics();
synchronized void add(long amount) {
value += amount;
}
}

private abstract class OfferStatistics<T> extends Statistics<T> {
@Override
public synchronized void log(Object message) {

if(message instanceof OfferPayload) {
if (message instanceof OfferPayload) {
OfferPayload currentMessage = (OfferPayload) message;
// For logging different data types
String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode();

buckets.putIfAbsent(market, new Counter());
buckets.get(market).increment();
process(market, currentMessage);
}
}

abstract void process(String market, OfferPayload currentMessage);
}

private class OfferCountStatistics extends OfferStatistics<Aggregator> {

@Override
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new Aggregator());
buckets.get(market).increment();
}
}

private class OfferVolumeStatistics extends OfferStatistics<Aggregator> {

@Override
public Map<String, Counter> values() {
return buckets;
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new Aggregator());
buckets.get(market).add(currentMessage.getAmount());
}
}

private class OfferVolumeDistributionStatistics extends OfferStatistics<List<Long>> {

@Override
public void reset() {
buckets.clear();
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new ArrayList<>());
buckets.get(market).add(currentMessage.getAmount());
}
}

private class VersionsStatistics implements Statistics<Counter> {
private final Map<String, Counter> buckets = new HashMap<>();
private class OffersPerTraderStatistics extends OfferStatistics<Map<NodeAddress, Aggregator>> {

@Override
public Statistics create() {
return new VersionsStatistics();
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new HashMap<>());
buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator());
buckets.get(market).get(currentMessage.getOwnerNodeAddress()).increment();
}
}

private class VolumePerTraderStatistics extends OfferStatistics<Map<NodeAddress, Aggregator>> {

@Override
void process(String market, OfferPayload currentMessage) {
buckets.putIfAbsent(market, new HashMap<>());
buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator());
buckets.get(market).get(currentMessage.getOwnerNodeAddress()).add(currentMessage.getAmount());
}
}

private class VersionsStatistics extends Statistics<Aggregator> {

@Override
public void log(Object message) {
Expand All @@ -132,47 +147,14 @@ public void log(Object message) {

String version = "v" + currentMessage.getId().substring(currentMessage.getId().lastIndexOf("-") + 1);

buckets.putIfAbsent(version, new Counter());
buckets.putIfAbsent(version, new Aggregator());
buckets.get(version).increment();
}
}

@Override
public Map<String, Counter> values() {
return buckets;
}

@Override
public void reset() {
buckets.clear();
}
}

public P2PMarketStats(Reporter graphiteReporter) {
super(graphiteReporter);

statistics = new MyStatistics();
}

@Override
public void configure(Properties properties) {
super.configure(properties);

if (hashes.isEmpty() && configuration.getProperty(DATABASE_DIR) != null) {
File dir = new File(configuration.getProperty(DATABASE_DIR));
String networkPostfix = "_" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].toString();
try {
PersistenceManager<PersistableEnvelope> persistenceManager = new PersistenceManager<>(dir, new CorePersistenceProtoResolver(null, null), null);
TradeStatistics3Store tradeStatistics3Store = (TradeStatistics3Store) persistenceManager.getPersisted(TradeStatistics3Store.class.getSimpleName() + networkPostfix);
hashes.addAll(tradeStatistics3Store.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList()));

AccountAgeWitnessStore accountAgeWitnessStore = (AccountAgeWitnessStore) persistenceManager.getPersisted(AccountAgeWitnessStore.class.getSimpleName() + networkPostfix);
hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream().map(byteArray -> byteArray.bytes).collect(Collectors.toList()));
} catch (NullPointerException e) {
// in case there is no store file
log.error("There is no storage file where there should be one: {}", dir.getAbsolutePath());
}
}
}

@Override
Expand All @@ -185,12 +167,59 @@ protected List<NetworkEnvelope> getRequests() {
return result;
}

protected void createHistogram(List<Long> input, String market, Map<String, String> report) {
int numberOfBins = 5;

// - get biggest offer
double max = input.stream().max(Long::compareTo).get() * 1.01;

// - create histogram
input.stream().collect(
Collectors.groupingBy(aLong -> aLong == max ? (int) numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())).
forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2)));

report.put(market + ".number_of_bins", String.valueOf((int) numberOfBins));
report.put(market + ".max", String.valueOf((int) max));
}

@Override
protected void report() {
Map<String, String> report = new HashMap<>();
bucketsPerHost.forEach((host, statistics) -> statistics.values().forEach((market, numberOfOffers) -> report.put(OnionParser.prettyPrint(host) + "." + market, String.valueOf(((Counter) numberOfOffers).value()))));
bucketsPerHost.values().stream().findFirst().ifPresent(nodeAddressStatisticsEntry -> nodeAddressStatisticsEntry.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(((Aggregator) numberOfOffers).value()))));
reporter.report(report, getName() + ".offerCount");

// do offerbook volume statistics
report.clear();
offerVolumeBucketsPerHost.values().stream().findFirst().ifPresent(aggregatorStatistics -> aggregatorStatistics.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(numberOfOffers.value()))));
reporter.report(report, getName() + ".volume");

// do the offer vs volume histogram
report.clear();
// - get a data set
offerVolumeDistributionBucketsPerHost.values().stream().findFirst().ifPresent(listStatistics -> listStatistics.values().forEach((market, offers) -> {
createHistogram(offers, market, report);
}));
reporter.report(report, getName() + ".volume-per-offer-distribution");

// do offers per trader
report.clear();
// - get a data set
offersPerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> {
List<Long> offerPerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList());

createHistogram(offerPerTrader, market, report);
}));
reporter.report(report, getName() + ".traders_by_number_of_offers");

// do volume per trader
report.clear();
// - get a data set
volumePerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> {
List<Long> volumePerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList());

reporter.report(report, getName());
createHistogram(volumePerTrader, market, report);
}));
reporter.report(report, getName() + ".traders_by_volume");

// do version statistics
report.clear();
Expand All @@ -204,8 +233,12 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne

if (networkEnvelope instanceof GetDataResponse) {

Statistics result = this.statistics.create();
VersionsStatistics versions = new VersionsStatistics();
Statistics offerCount = new OfferCountStatistics();
Statistics offerVolume = new OfferVolumeStatistics();
Statistics offerVolumeDistribution = new OfferVolumeDistributionStatistics();
Statistics offersPerTrader = new OffersPerTraderStatistics();
Statistics volumePerTrader = new VolumePerTraderStatistics();
Statistics versions = new VersionsStatistics();

GetDataResponse dataResponse = (GetDataResponse) networkEnvelope;
final Set<ProtectedStorageEntry> dataSet = dataResponse.getDataSet();
Expand All @@ -216,7 +249,11 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne
return;
}

result.log(protectedStoragePayload);
offerCount.log(protectedStoragePayload);
offerVolume.log(protectedStoragePayload);
offerVolumeDistribution.log(protectedStoragePayload);
offersPerTrader.log(protectedStoragePayload);
volumePerTrader.log(protectedStoragePayload);
versions.log(protectedStoragePayload);
});

Expand All @@ -230,7 +267,11 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne
hashes.add(persistableNetworkPayload.getHash());
});

bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result);
bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerCount);
offerVolumeBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolume);
offerVolumeDistributionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolumeDistribution);
offersPerTraderBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offersPerTrader);
volumePerTraderBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), volumePerTrader);
versionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), versions);
return true;
}
Expand Down
Loading