Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Limit ibft msg queues #704

Merged
merged 24 commits into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4fa3284
move evicting map into it's own class
jframe Jan 29, 2019
fa03a32
use evicting map for future msgs in ibftController and roundChangeMan…
jframe Jan 29, 2019
593b7ca
add generics on evict map eviction method
jframe Jan 29, 2019
bb6532e
config for message buffer size
jframe Jan 29, 2019
968e758
test that MaxSizeEvictingMap evicts records when at capacity
jframe Jan 29, 2019
cf639a7
rename map type
jframe Jan 30, 2019
11be71c
Merge branch 'master' into limit_ibft_msg_queues
jframe Jan 30, 2019
aa3e2d6
use SizeLimitedMap in the UniqueMessageMulticaster
jframe Jan 30, 2019
16467b8
set event queue limit from the ibft message buffer size config
jframe Jan 30, 2019
d385a15
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 1, 2019
3695c51
revert map and queue changes to future messages and round change and …
jframe Feb 1, 2019
901a331
spotless
jframe Feb 1, 2019
13e3388
fix test context after config changes
jframe Feb 1, 2019
363f507
update config names
jframe Feb 1, 2019
b64829e
update config names
jframe Feb 3, 2019
f5b1e2e
Update gossiped history limit to 1000
jframe Feb 3, 2019
4afd709
Update gossiped history limit to 1000 - test
jframe Feb 3, 2019
08b79fe
comment on why default gossipped history limit was chosen
jframe Feb 4, 2019
97af876
update field names to match new config names
jframe Feb 4, 2019
be0bd99
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 4, 2019
d32c128
Merge branch 'master' into limit_ibft_msg_queues
CjHare Feb 6, 2019
96a3ab7
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 6, 2019
14ab492
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 6, 2019
66a5279
Merge branch 'master' into limit_ibft_msg_queues
jframe Feb 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class IbftConfigOptions {
private static final long DEFAULT_EPOCH_LENGTH = 30_000;
private static final int DEFAULT_BLOCK_PERIOD_SECONDS = 1;
private static final int DEFAULT_ROUND_EXPIRY_SECONDS = 1;
// In a healthy network this can be very small. This default limit will allow for suitable
// protection for on a typical 20 node validator network with multiple rounds
private static final int DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;

private final JsonObject ibftConfigRoot;

Expand All @@ -39,4 +43,12 @@ public int getBlockPeriodSeconds() {
public int getRequestTimeoutSeconds() {
return ibftConfigRoot.getInteger("requesttimeoutseconds", DEFAULT_ROUND_EXPIRY_SECONDS);
}

public int getGossipedHistoryLimit() {
return ibftConfigRoot.getInteger("gossipedhistorylimit", DEFAULT_GOSSIPED_HISTORY_LIMIT);
}

public int getMessageQueueLimit() {
return ibftConfigRoot.getInteger("messagequeuelimit", DEFAULT_MESSAGE_QUEUE_LIMIT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class IbftConfigOptionsTest {
private static final int EXPECTED_DEFAULT_EPOCH_LENGTH = 30_000;
private static final int EXPECTED_DEFAULT_BLOCK_PERIOD = 1;
private static final int EXPECTED_DEFAULT_REQUEST_TIMEOUT = 1;
private static final int EXPECTED_DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;

@Test
public void shouldGetEpochLengthFromConfig() {
Expand Down Expand Up @@ -80,6 +82,42 @@ public void shouldGetDefaultRequestTimeoutFromDefaultConfig() {
.isEqualTo(EXPECTED_DEFAULT_REQUEST_TIMEOUT);
}

@Test
public void shouldGetGossipedHistoryLimitFromConfig() {
final IbftConfigOptions config = fromConfigOptions(singletonMap("GossipedHistoryLimit", 100));
assertThat(config.getGossipedHistoryLimit()).isEqualTo(100);
}

@Test
public void shouldFallbackToDefaultGossipedHistoryLimit() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getGossipedHistoryLimit()).isEqualTo(EXPECTED_DEFAULT_GOSSIPED_HISTORY_LIMIT);
}

@Test
public void shouldGetDefaultGossipedHistoryLimitFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getGossipedHistoryLimit())
.isEqualTo(EXPECTED_DEFAULT_GOSSIPED_HISTORY_LIMIT);
}

@Test
public void shouldGetMessageQueueLimitFromConfig() {
final IbftConfigOptions config = fromConfigOptions(singletonMap("MessageQueueLimit", 100));
assertThat(config.getMessageQueueLimit()).isEqualTo(100);
}

@Test
public void shouldFallbackToDefaultMessageQueueLimit() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getMessageQueueLimit()).isEqualTo(EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT);
}

@Test
public void shouldGetDefaultMessageQueueLimitFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getMessageQueueLimit())
.isEqualTo(EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT);
}

private IbftConfigOptions fromConfigOptions(final Map<String, Object> ibftConfigOptions) {
return GenesisConfigFile.fromConfig(
new JsonObject(singletonMap("config", singletonMap("ibft", ibftConfigOptions))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ public EventMultiplexer getEventMultiplexer() {
public static final int EPOCH_LENGTH = 10_000;
public static final int BLOCK_TIMER_SEC = 3;
public static final int ROUND_TIMER_SEC = 12;
public static final int EVENT_QUEUE_SIZE = 1000;
public static final int SEEN_MESSAGE_SIZE = 100;

private Clock clock = Clock.fixed(Instant.MIN, ZoneId.of("UTC"));
private IbftEventQueue ibftEventQueue = new IbftEventQueue();
private IbftEventQueue ibftEventQueue = new IbftEventQueue(EVENT_QUEUE_SIZE);
private int validatorCount = 4;
private int indexOfFirstLocallyProposedBlock = 0; // Meaning first block is from remote peer.
private boolean useGossip = false;
Expand Down Expand Up @@ -158,7 +160,8 @@ public TestContext build() {
// Use a stubbed version of the multicaster, to prevent creating PeerConnections etc.
final StubValidatorMulticaster multicaster = new StubValidatorMulticaster();

final UniqueMessageMulticaster uniqueMulticaster = new UniqueMessageMulticaster(multicaster);
final UniqueMessageMulticaster uniqueMulticaster =
new UniqueMessageMulticaster(multicaster, SEEN_MESSAGE_SIZE);

final Gossiper gossiper = useGossip ? new IbftGossip(uniqueMulticaster) : mock(Gossiper.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
public class IbftEventQueue {
private final BlockingQueue<IbftEvent> queue = new LinkedBlockingQueue<>();

private static final int MAX_QUEUE_SIZE = 1000;
private static final Logger LOG = LogManager.getLogger();
private final int messageQueueLimit;

public IbftEventQueue(final int messageQueueLimit) {
this.messageQueueLimit = messageQueueLimit;
}

/**
* Put an Ibft event onto the queue
*
* @param event Provided ibft event
*/
public void add(final IbftEvent event) {
if (queue.size() > MAX_QUEUE_SIZE) {
if (queue.size() > messageQueueLimit) {
LOG.warn("Queue size exceeded trying to add new ibft event {}", event.toString());
} else {
queue.add(event);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import java.util.LinkedHashMap;
import java.util.Map.Entry;

/**
* Map that is limited to a specified size and will evict oldest entries when the size limit is
* reached.
*/
public class SizeLimitedMap<K, V> extends LinkedHashMap<K, V> {
private final int maxEntries;

public SizeLimitedMap(final int maxEntries) {
this.maxEntries = maxEntries;
}

@Override
protected boolean removeEldestEntry(final Entry<K, V> ignored) {
return size() > maxEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,34 @@
*/
package tech.pegasys.pantheon.consensus.ibft;

import static java.util.Collections.newSetFromMap;

import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

public class UniqueMessageMulticaster implements ValidatorMulticaster {

private final int maxSeenMessages;
private final ValidatorMulticaster multicaster;

UniqueMessageMulticaster(final ValidatorMulticaster multicaster, final int maxSeenMessages) {
this.maxSeenMessages = maxSeenMessages;
this.multicaster = multicaster;
}
private final Set<Hash> seenMessages;

/**
* Constructor that attaches gossip logic to a set of multicaster
*
* @param multicaster Network connections to the remote validators
* @param gossipHistoryLimit Maximum messages to track as seen
*/
public UniqueMessageMulticaster(final ValidatorMulticaster multicaster) {
this(multicaster, 10_000);
public UniqueMessageMulticaster(
final ValidatorMulticaster multicaster, final int gossipHistoryLimit) {
this.multicaster = multicaster;
// Set that starts evicting members when it hits capacity
this.seenMessages = newSetFromMap(new SizeLimitedMap<>(gossipHistoryLimit));
}

// Set that starts evicting members when it hits capacity
private final Set<Hash> seenMessages =
Collections.newSetFromMap(
new LinkedHashMap<Hash, Boolean>() {
@Override
protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
return size() > maxSeenMessages;
}
});

@Override
public void send(final MessageData message) {
send(message, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void aBlockTimerExpiryEventIsAddedToTheQueueOnExpiry() throws Interrupted
mockExecutorService.schedule(any(Runnable.class), anyLong(), any()))
.thenReturn(mockedFuture);

final IbftEventQueue eventQueue = new IbftEventQueue();
final IbftEventQueue eventQueue = new IbftEventQueue(1000);
final BlockTimer timer =
new BlockTimer(
eventQueue, MINIMAL_TIME_BETWEEN_BLOCKS_SECONDS, mockExecutorService, mockClock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.Test;

public class IbftEventQueueTest {
private static final int MAX_QUEUE_SIZE = 1000;

private static class DummyIbftEvent implements IbftEvent {
@Override
Expand All @@ -35,7 +36,7 @@ public Type getType() {

@Test
public void addPoll() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue();
final IbftEventQueue queue = new IbftEventQueue(MAX_QUEUE_SIZE);

assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
final DummyIbftEvent dummyEvent = new DummyIbftEvent();
Expand All @@ -45,7 +46,7 @@ public void addPoll() throws InterruptedException {

@Test
public void queueOrdering() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue();
final IbftEventQueue queue = new IbftEventQueue(MAX_QUEUE_SIZE);

final DummyIbftEvent dummyEvent1 = new DummyIbftEvent();
final DummyIbftEvent dummyEvent2 = new DummyIbftEvent();
Expand All @@ -65,7 +66,7 @@ public void queueOrdering() throws InterruptedException {

@Test
public void addSizeLimit() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue();
final IbftEventQueue queue = new IbftEventQueue(MAX_QUEUE_SIZE);

for (int i = 0; i <= 1000; i++) {
final DummyIbftEvent dummyEvent = new DummyIbftEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void handlesStopGracefully() throws InterruptedException {
@Test
public void cleanupExecutorsAfterShutdownNow() throws InterruptedException {
final IbftProcessor processor =
new IbftProcessor(new IbftEventQueue(), mockeEventMultiplexer, mockExecutorService);
new IbftProcessor(new IbftEventQueue(1000), mockeEventMultiplexer, mockExecutorService);

// Start the IbftProcessor
final ExecutorService processorExecutor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -142,7 +142,7 @@ public void handlesQueueInterruptGracefully() throws InterruptedException {

@Test
public void drainEventsIntoStateMachine() throws InterruptedException {
final IbftEventQueue queue = new IbftEventQueue();
final IbftEventQueue queue = new IbftEventQueue(1000);
final IbftProcessor processor =
new IbftProcessor(queue, mockeEventMultiplexer, mockExecutorService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class RoundTimerTest {
@Before
public void initialise() {
mockExecutorService = mock(ScheduledExecutorService.class);
queue = new IbftEventQueue();
queue = new IbftEventQueue(1000);
timer = new RoundTimer(queue, 1, mockExecutorService);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.Test;

public class SizeLimitedMapTest {

@Test
public void evictMessageRecordAtCapacity() {
SizeLimitedMap<String, Boolean> map = new SizeLimitedMap<>(5);

map.put("message1", true);
assertThat(map).hasSize(1);

// add messages so map is at capacity
for (int i = 2; i <= 5; i++) {
map.put("message" + i, true);
}
assertThat(map).hasSize(5);

map.put("message6", false);
assertThat(map).hasSize(5);
assertThat(map.keySet()).doesNotContain("message1");
assertThat(map.keySet()).contains("message2", "message3", "message4", "message5", "message6");

map.put("message7", true);
assertThat(map).hasSize(5);
assertThat(map.keySet()).doesNotContain("message1", "message2");
assertThat(map.keySet()).contains("message3", "message4", "message5", "message6", "message7");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class IbftMiningCoordinatorTest {
@Mock private Block block;
@Mock private BlockBody blockBody;
@Mock private BlockHeader blockHeader;
private final IbftEventQueue eventQueue = new IbftEventQueue();
private final IbftEventQueue eventQueue = new IbftEventQueue(1000);
private IbftMiningCoordinator ibftMiningCoordinator;

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public static PantheonController<IbftContext> init(
TransactionPoolFactory.createTransactionPool(
protocolSchedule, protocolContext, ethProtocolManager.ethContext());

final IbftEventQueue ibftEventQueue = new IbftEventQueue();
final IbftEventQueue ibftEventQueue = new IbftEventQueue(ibftConfig.getMessageQueueLimit());

final IbftBlockCreatorFactory blockCreatorFactory =
new IbftBlockCreatorFactory(
Expand All @@ -205,7 +205,8 @@ public static PantheonController<IbftContext> init(
final ValidatorPeers peers =
new ValidatorPeers(protocolContext.getConsensusState().getVoteTally());

final UniqueMessageMulticaster uniqueMessageMulticaster = new UniqueMessageMulticaster(peers);
final UniqueMessageMulticaster uniqueMessageMulticaster =
new UniqueMessageMulticaster(peers, ibftConfig.getGossipedHistoryLimit());

final IbftGossip gossiper = new IbftGossip(uniqueMessageMulticaster);

Expand Down