Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(1/5) Test and Refactor P2PDataStorage Synchronization Path #3667

Merged
merged 28 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5fcd18c
[TESTS] Add tests of requestData
julianknutsen Nov 20, 2019
1e814d9
[REFACTOR] Introduce buildGetDataRequest variants
julianknutsen Nov 20, 2019
a927ed4
[TESTS] Add tests of new RequestData APIs
julianknutsen Nov 20, 2019
daffe6d
[TESTS] Add tests of GetDataRequestHandler
julianknutsen Nov 21, 2019
944b3ff
[REFACTOR] Introduce buildGetDataResponse
julianknutsen Nov 21, 2019
8208f78
[REFACTOR] Extract connectionInfo String
julianknutsen Nov 21, 2019
5402155
[REFACTOR] Extract getDataResponse logging
julianknutsen Nov 21, 2019
a6e8868
[REFACTOR] Extract truncation logging
julianknutsen Nov 21, 2019
dafc762
[REFACTOR] Pass peerCapabilities into buildGetDataResponse
julianknutsen Nov 21, 2019
5630b35
[TESTS] Unit tests of buildGetDataResponse
julianknutsen Nov 21, 2019
caf946d
Remove redundant HashSet lookups in filter functions
julianknutsen Nov 21, 2019
703a9a0
[REFACTOR] Move required capabilities log
julianknutsen Nov 21, 2019
3aaf8a2
[REFACTOR] Inline capability check for ProtectedStorageEntries
julianknutsen Nov 21, 2019
4c5d818
[REFACTOR] Inline filtering functions
julianknutsen Nov 21, 2019
e767340
[REFACTOR] Remove duplication in filtering functions
julianknutsen Nov 21, 2019
00128d9
[BUGFIX] Fix off-by-one in truncation logic
julianknutsen Nov 21, 2019
c7bce9e
[TESTS] Add test of RequestDataHandler::onMessage
julianknutsen Nov 21, 2019
873271c
[REFACTOR] Introduce processGetDataResponse
julianknutsen Nov 21, 2019
690b980
[TESTS] Make verify() functions more flexible
julianknutsen Nov 21, 2019
a34488b
[TESTS] Add unit tests for processGetDataResponse
julianknutsen Nov 21, 2019
3d6e9fb
Remove static from initialRequestApplied
julianknutsen Nov 21, 2019
f92893b
[TESTS] Write synchronization integration tests
julianknutsen Nov 21, 2019
5db1285
[REFACTOR] Clean up processGetDataResponse
julianknutsen Nov 22, 2019
ecae31e
[RENAME] LazyProcessedPayload to ProcessOncePersistableNetworkPayload
julianknutsen Nov 22, 2019
a0fae12
Remove @Nullable around persistableNetworkPayloadSet
julianknutsen Nov 22, 2019
c503bcb
Remove @Nullable around supportedCapabilities in GetDataResponse
julianknutsen Nov 22, 2019
b1a06fe
Remove @Nullable around supportedCapabilities in PreliminaryGetDataRe…
julianknutsen Nov 22, 2019
4fe19ae
[DEADCODE] Remove old request handler tests
julianknutsen Nov 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/account/sign/SignedWitness.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.DateTolerantPayload;
import bisq.network.p2p.storage.payload.LazyProcessedPayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;

import bisq.common.app.Capabilities;
Expand All @@ -45,7 +45,7 @@
// Supports signatures made from EC key (arbitrators) and signature created with DSA key.
@Slf4j
@Value
public class SignedWitness implements LazyProcessedPayload, PersistableNetworkPayload, PersistableEnvelope,
public class SignedWitness implements ProcessOncePersistableNetworkPayload, PersistableNetworkPayload, PersistableEnvelope,
DateTolerantPayload, CapabilityRequiringPayload {

public enum VerificationMethod {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.DateTolerantPayload;
import bisq.network.p2p.storage.payload.LazyProcessedPayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;

import bisq.common.proto.persistable.PersistableEnvelope;
Expand All @@ -40,7 +40,7 @@
// so only the newly added objects since the last release will be retrieved over the P2P network.
@Slf4j
@Value
public class AccountAgeWitness implements LazyProcessedPayload, PersistableNetworkPayload, PersistableEnvelope, DateTolerantPayload {
public class AccountAgeWitness implements ProcessOncePersistableNetworkPayload, PersistableNetworkPayload, PersistableEnvelope, DateTolerantPayload {
private static final long TOLERANCE = TimeUnit.DAYS.toMillis(1);

private final byte[] hash; // Ripemd160(Sha256(concatenated accountHash, signature and sigPubKey)); 20 bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import bisq.core.dao.state.model.governance.Proposal;

import bisq.network.p2p.storage.payload.ExpirablePayload;
import bisq.network.p2p.storage.payload.LazyProcessedPayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;

import bisq.common.crypto.Sig;
Expand Down Expand Up @@ -55,7 +55,7 @@
@Getter
@EqualsAndHashCode
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class TempProposalPayload implements LazyProcessedPayload, ProtectedStoragePayload,
public class TempProposalPayload implements ProcessOncePersistableNetworkPayload, ProtectedStoragePayload,
ExpirablePayload, PersistablePayload {

protected final Proposal proposal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import bisq.core.offer.OfferPayload;

import bisq.network.p2p.storage.payload.ExpirablePayload;
import bisq.network.p2p.storage.payload.LazyProcessedPayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;

import bisq.common.crypto.Sig;
Expand Down Expand Up @@ -60,7 +60,7 @@
@Slf4j
@EqualsAndHashCode(exclude = {"signaturePubKeyBytes"})
@Value
public final class TradeStatistics implements LazyProcessedPayload, ProtectedStoragePayload, ExpirablePayload, PersistablePayload {
public final class TradeStatistics implements ProcessOncePersistableNetworkPayload, ProtectedStoragePayload, ExpirablePayload, PersistablePayload {
private final OfferPayload.Direction direction;
private final String baseCurrency;
private final String counterCurrency;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import bisq.core.offer.OfferUtil;

import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.LazyProcessedPayload;
import bisq.network.p2p.storage.payload.ProcessOncePersistableNetworkPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;

import bisq.common.app.Capabilities;
Expand Down Expand Up @@ -63,7 +63,7 @@

@Slf4j
@Value
public final class TradeStatistics2 implements LazyProcessedPayload, PersistableNetworkPayload, PersistableEnvelope, CapabilityRequiringPayload {
public final class TradeStatistics2 implements ProcessOncePersistableNetworkPayload, PersistableNetworkPayload, PersistableEnvelope, CapabilityRequiringPayload {

//We don't support arbitrators anymore so this entry will be only for pre v1.2. trades
@Deprecated
Expand Down
19 changes: 7 additions & 12 deletions monitor/src/main/java/bisq/monitor/metric/P2PMarketStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,15 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne
versions.log(protectedStoragePayload);
});

Set<PersistableNetworkPayload> persistableNetworkPayloadSet = dataResponse
.getPersistableNetworkPayloadSet();
if (persistableNetworkPayloadSet != null) {
persistableNetworkPayloadSet.forEach(persistableNetworkPayload -> {
dataResponse.getPersistableNetworkPayloadSet().forEach(persistableNetworkPayload -> {
// memorize message hashes
//Byte[] bytes = new Byte[persistableNetworkPayload.getHash().length];
//Arrays.setAll(bytes, n -> persistableNetworkPayload.getHash()[n]);

// memorize message hashes
//Byte[] bytes = new Byte[persistableNetworkPayload.getHash().length];
//Arrays.setAll(bytes, n -> persistableNetworkPayload.getHash()[n]);
//hashes.add(bytes);

//hashes.add(bytes);

hashes.add(persistableNetworkPayload.getHash());
});
}
hashes.add(persistableNetworkPayload.getHash());
});

bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result);
versionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), versions);
Expand Down
19 changes: 7 additions & 12 deletions monitor/src/main/java/bisq/monitor/metric/P2PSeedNodeSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,15 @@ protected boolean treatMessage(NetworkEnvelope networkEnvelope, Connection conne
result.log(protectedStoragePayload);
});

Set<PersistableNetworkPayload> persistableNetworkPayloadSet = dataResponse
.getPersistableNetworkPayloadSet();
if (persistableNetworkPayloadSet != null) {
persistableNetworkPayloadSet.forEach(persistableNetworkPayload -> {
dataResponse.getPersistableNetworkPayloadSet().forEach(persistableNetworkPayload -> {
// memorize message hashes
//Byte[] bytes = new Byte[persistableNetworkPayload.getHash().length];
//Arrays.setAll(bytes, n -> persistableNetworkPayload.getHash()[n]);

// memorize message hashes
//Byte[] bytes = new Byte[persistableNetworkPayload.getHash().length];
//Arrays.setAll(bytes, n -> persistableNetworkPayload.getHash()[n]);
//hashes.add(bytes);

//hashes.add(bytes);

hashes.add(persistableNetworkPayload.getHash());
});
}
hashes.add(persistableNetworkPayload.getHash());
});

bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), result);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,17 @@
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.peers.getdata.messages.GetDataRequest;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.GetUpdatedDataRequest;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.payload.CapabilityRequiringPayload;
import bisq.network.p2p.storage.payload.PersistableNetworkPayload;
import bisq.network.p2p.storage.payload.ProtectedStorageEntry;
import bisq.network.p2p.storage.payload.ProtectedStoragePayload;

import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.util.Utilities;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -51,8 +41,8 @@
@Slf4j
public class GetDataRequestHandler {
private static final long TIMEOUT = 90;
private static final int MAX_ENTRIES = 10000;

private static final int MAX_ENTRIES = 10000;

///////////////////////////////////////////////////////////////////////////////////////////
// Listener
Expand Down Expand Up @@ -93,10 +83,35 @@ public GetDataRequestHandler(NetworkNode networkNode, P2PDataStorage dataStorage

public void handle(GetDataRequest getDataRequest, final Connection connection) {
long ts = System.currentTimeMillis();
GetDataResponse getDataResponse = new GetDataResponse(getFilteredProtectedStorageEntries(getDataRequest, connection),
getFilteredPersistableNetworkPayload(getDataRequest, connection),
getDataRequest.getNonce(),
getDataRequest instanceof GetUpdatedDataRequest);
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());

AtomicBoolean outPersistableNetworkPayloadOutputTruncated = new AtomicBoolean(false);
AtomicBoolean outProtectedStoragePayloadOutputTruncated = new AtomicBoolean(false);
GetDataResponse getDataResponse = dataStorage.buildGetDataResponse(
getDataRequest,
MAX_ENTRIES,
outPersistableNetworkPayloadOutputTruncated,
outProtectedStoragePayloadOutputTruncated,
connection.getCapabilities());

if (outPersistableNetworkPayloadOutputTruncated.get()) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}

if (outProtectedStoragePayloadOutputTruncated.get()) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}

log.info("The getDataResponse to peer with {} contains {} ProtectedStorageEntries and {} PersistableNetworkPayloads",
connectionInfo,
getDataResponse.getDataSet().size(),
getDataResponse.getPersistableNetworkPayloadSet().size());

if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
Expand Down Expand Up @@ -136,81 +151,6 @@ public void onFailure(@NotNull Throwable throwable) {
log.info("handle GetDataRequest took {} ms", System.currentTimeMillis() - ts);
}

private Set<PersistableNetworkPayload> getFilteredPersistableNetworkPayload(GetDataRequest getDataRequest,
Connection connection) {
Set<P2PDataStorage.ByteArray> tempLookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());

Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<PersistableNetworkPayload> result = dataStorage.getAppendOnlyDataStoreMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.filter(connection::noCapabilityRequiredOrCapabilityIsSupported)
.filter(payload -> {
boolean notContained = tempLookupSet.add(new P2PDataStorage.ByteArray(payload.getHash()));
return notContained;
})
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("The getData request from peer with {} contains {} PersistableNetworkPayload entries ",
connectionInfo, result.size());
return result;
}

private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataRequest getDataRequest,
Connection connection) {
Set<ProtectedStorageEntry> filteredDataSet = new HashSet<>();
Set<Integer> lookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());

AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
Set<ProtectedStorageEntry> filteredSet = dataStorage.getMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("getFilteredProtectedStorageEntries " + filteredSet.size());

for (ProtectedStorageEntry protectedStorageEntry : filteredSet) {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
boolean doAdd = false;
if (protectedStoragePayload instanceof CapabilityRequiringPayload) {
if (connection.getCapabilities().containsAll(((CapabilityRequiringPayload) protectedStoragePayload).getRequiredCapabilities()))
doAdd = true;
else
log.debug("We do not send the message to the peer because they do not support the required capability for that message type.\n" +
"storagePayload is: " + Utilities.toTruncatedString(protectedStoragePayload));
} else {
doAdd = true;
}
if (doAdd) {
boolean notContained = lookupSet.add(protectedStoragePayload.hashCode());
if (notContained)
filteredDataSet.add(protectedStorageEntry);
}
}

log.info("The getData request from peer with {} contains {} ProtectedStorageEntry entries ",
connectionInfo, filteredDataSet.size());
return filteredDataSet;
}

public void stop() {
cleanup();
}
Expand Down
Loading