Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Remove reference counting from MessageData #264

Merged
merged 6 commits into from
Nov 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,13 @@
package tech.pegasys.pantheon.consensus.ibft.ibftmessage;

import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.wire.AbstractMessageData;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;

import io.netty.buffer.ByteBuf;
import tech.pegasys.pantheon.util.bytes.BytesValue;

public abstract class AbstractIbftMessage extends AbstractMessageData {
protected AbstractIbftMessage(final ByteBuf data) {
protected AbstractIbftMessage(final BytesValue data) {
super(data);
}

public abstract IbftSignedMessageData<?> decode();

protected static ByteBuf writeMessageToByteBuf(
final IbftSignedMessageData<?> ibftSignedMessageData) {

BytesValueRLPOutput rlpEncode = new BytesValueRLPOutput();
ibftSignedMessageData.writeTo(rlpEncode);

final ByteBuf data = NetworkMemoryPool.allocate(rlpEncode.encodedSize());
data.writeBytes(rlpEncode.encoded().extractArray());

return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,20 @@

import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedPrePrepareMessageData;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import io.netty.buffer.ByteBuf;

public class IbftPrePrepareMessage extends AbstractIbftMessage {

private static final int MESSAGE_CODE = IbftV2.PRE_PREPARE;

private IbftPrePrepareMessage(final ByteBuf data) {
private IbftPrePrepareMessage(final BytesValue data) {
super(data);
}

public static IbftPrePrepareMessage fromMessage(final MessageData message) {
if (message instanceof IbftPrePrepareMessage) {
message.retain();
return (IbftPrePrepareMessage) message;
}
final int code = message.getCode();
Expand All @@ -40,21 +36,18 @@ public static IbftPrePrepareMessage fromMessage(final MessageData message) {
String.format("Message has code %d and thus is not a PrePrepareMessage", code));
}

final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new IbftPrePrepareMessage(data);
return new IbftPrePrepareMessage(message.getData());
}

@Override
public IbftSignedMessageData<IbftUnsignedPrePrepareMessageData> decode() {
return IbftSignedMessageData.readIbftSignedPrePrepareMessageDataFrom(
RLP.input(BytesValue.wrapBuffer(data)));
return IbftSignedMessageData.readIbftSignedPrePrepareMessageDataFrom(RLP.input(data));
}

public static IbftPrePrepareMessage create(
final IbftSignedMessageData<IbftUnsignedPrePrepareMessageData> ibftPrepareMessageDecoded) {

return new IbftPrePrepareMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded));
return new IbftPrePrepareMessage(ibftPrepareMessageDecoded.encode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,20 @@

import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedPrepareMessageData;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import io.netty.buffer.ByteBuf;

public class IbftPrepareMessage extends AbstractIbftMessage {

private static final int MESSAGE_CODE = IbftV2.PREPARE;

private IbftPrepareMessage(final ByteBuf data) {
private IbftPrepareMessage(final BytesValue data) {
super(data);
}

public static IbftPrepareMessage fromMessage(final MessageData message) {
if (message instanceof IbftPrepareMessage) {
message.retain();
return (IbftPrepareMessage) message;
}
final int code = message.getCode();
Expand All @@ -40,21 +36,18 @@ public static IbftPrepareMessage fromMessage(final MessageData message) {
String.format("Message has code %d and thus is not a PrepareMessage", code));
}

final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new IbftPrepareMessage(data);
return new IbftPrepareMessage(message.getData());
}

@Override
public IbftSignedMessageData<IbftUnsignedPrepareMessageData> decode() {
return IbftSignedMessageData.readIbftSignedPrepareMessageDataFrom(
RLP.input(BytesValue.wrapBuffer(data)));
return IbftSignedMessageData.readIbftSignedPrepareMessageDataFrom(RLP.input(data));
}

public static IbftPrepareMessage create(
final IbftSignedMessageData<IbftUnsignedPrepareMessageData> ibftPrepareMessageDecoded) {

return new IbftPrepareMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded));
return new IbftPrepareMessage(ibftPrepareMessageDecoded.encode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,20 @@

import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftSignedMessageData;
import tech.pegasys.pantheon.consensus.ibft.ibftmessagedata.IbftUnsignedRoundChangeMessageData;
import tech.pegasys.pantheon.ethereum.p2p.NetworkMemoryPool;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import io.netty.buffer.ByteBuf;

public class IbftRoundChangeMessage extends AbstractIbftMessage {

private static final int MESSAGE_CODE = IbftV2.ROUND_CHANGE;

private IbftRoundChangeMessage(final ByteBuf data) {
private IbftRoundChangeMessage(final BytesValue data) {
super(data);
}

public static IbftRoundChangeMessage fromMessage(final MessageData message) {
if (message instanceof IbftRoundChangeMessage) {
message.retain();
return (IbftRoundChangeMessage) message;
}
final int code = message.getCode();
Expand All @@ -40,21 +36,18 @@ public static IbftRoundChangeMessage fromMessage(final MessageData message) {
String.format("Message has code %d and thus is not a RoundChangeMessage", code));
}

final ByteBuf data = NetworkMemoryPool.allocate(message.getSize());
message.writeTo(data);
return new IbftRoundChangeMessage(data);
return new IbftRoundChangeMessage(message.getData());
}

@Override
public IbftSignedMessageData<IbftUnsignedRoundChangeMessageData> decode() {
return IbftSignedMessageData.readIbftSignedRoundChangeMessageDataFrom(
RLP.input(BytesValue.wrapBuffer(data)));
return IbftSignedMessageData.readIbftSignedRoundChangeMessageDataFrom(RLP.input(data));
}

public static IbftRoundChangeMessage create(
final IbftSignedMessageData<IbftUnsignedRoundChangeMessageData> ibftPrepareMessageDecoded) {

return new IbftRoundChangeMessage(writeMessageToByteBuf(ibftPrepareMessageDecoded));
return new IbftRoundChangeMessage(ibftPrepareMessageDecoded.encode());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import tech.pegasys.pantheon.crypto.SECP256K1.Signature;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Util;
import tech.pegasys.pantheon.ethereum.rlp.BytesValueRLPOutput;
import tech.pegasys.pantheon.ethereum.rlp.RLPInput;
import tech.pegasys.pantheon.ethereum.rlp.RLPOutput;
import tech.pegasys.pantheon.util.bytes.BytesValue;

public class IbftSignedMessageData<M extends AbstractIbftUnsignedMessageData> {

Expand Down Expand Up @@ -51,6 +53,12 @@ public void writeTo(final RLPOutput output) {
output.endList();
}

public BytesValue encode() {
final BytesValueRLPOutput rlpEncode = new BytesValueRLPOutput();
writeTo(rlpEncode);
return rlpEncode.encoded();
}

public static IbftSignedMessageData<IbftUnsignedPrePrepareMessageData>
readIbftSignedPrePrepareMessageDataFrom(final RLPInput rlpInput) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.consensus.ibft.network;

import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand All @@ -29,6 +28,7 @@
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.PeerInfo;
import tech.pegasys.pantheon.ethereum.p2p.wire.RawMessage;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.math.BigInteger;
import java.util.List;
Expand Down Expand Up @@ -74,7 +74,7 @@ public void onlyValidatorsAreSentAMessage() throws PeerNotConnected {
peers.peerAdded(peer);
}

final MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER);
final MessageData messageToSend = new RawMessage(1, BytesValue.EMPTY);
peers.multicastToValidators(messageToSend);

verify(peerConnections.get(0), times(1)).sendForProtocol("IBF", messageToSend);
Expand All @@ -93,9 +93,9 @@ public void doesntSendToValidatorsWhichAreNotDirectlyConnected() throws PeerNotC
final IbftNetworkPeers peers = new IbftNetworkPeers(validatorProvider);

// only add peer connections 1, 2 & 3, none of which should be invoked.
Lists.newArrayList(1, 2, 3).stream().forEach(i -> peers.peerAdded(peerConnections.get(i)));
Lists.newArrayList(1, 2, 3).forEach(i -> peers.peerAdded(peerConnections.get(i)));

final MessageData messageToSend = new RawMessage(1, EMPTY_BUFFER);
final MessageData messageToSend = new RawMessage(1, BytesValue.EMPTY);
peers.multicastToValidators(messageToSend);

verify(peerConnections.get(0), never()).sendForProtocol(any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,6 @@ private void handleStatusMessage(final EthPeer peer, final MessageData data) {
// Parsing errors can happen when clients broadcast network ids outside of the int range,
// So just disconnect with "subprotocol" error rather than "breach of protocol".
peer.disconnect(DisconnectReason.SUBPROTOCOL_TRIGGERED);
} finally {
status.release();
}
}

Expand All @@ -266,15 +264,10 @@ public void blockMined(final Block block) {
.forEach(
peer -> {
try {
// Send(msg) will release the NewBlockMessage's internal buffer, thus it must be
// retained
// prior to transmission - then released on exit from function.
newBlockMessage.retain();
peer.send(newBlockMessage);
} catch (final PeerNotConnected ex) {
// Peers may disconnect while traversing the list, this is a normal occurrence.
}
});
newBlockMessage.release();
}
}
Loading