Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add self-equivocation filter #648

Merged
merged 3 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions equivocation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package f3

import (
"bytes"
"slices"
"sync"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/libp2p/go-libp2p/core/peer"
)

// zero value is valid
type equivocationFilter struct {
lk sync.Mutex
localPID peer.ID
currentInstance uint64
// seenMessages map unique message slot to its signature
seenMessages map[equivocationKey]equivMessage
activeSenders map[gpbft.ActorID]equivSenders
}

func newEquivocationFilter(localPID peer.ID) equivocationFilter {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
return equivocationFilter{
localPID: localPID,
seenMessages: make(map[equivocationKey]equivMessage),
activeSenders: make(map[gpbft.ActorID]equivSenders),
}
}

type equivocationKey struct {
Sender gpbft.ActorID
Round uint64
Phase gpbft.Phase
Comment on lines +31 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

Copy link
Contributor Author

@Kubuxu Kubuxu Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It mirrors GMessage, is there a good reason to make it private?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The struct is private. Private structs with public fields usually mean "here be reflection/encoding shenanigans".

}

func (ef *equivocationFilter) formKey(m *gpbft.GMessage) equivocationKey {
return equivocationKey{
Sender: m.Sender,
Round: m.Vote.Round,
Phase: m.Vote.Phase,
}
}

type equivSenders struct {
origins []peer.ID
equivocation bool
}

type equivMessage struct {
signature []byte
origin peer.ID
}

func (es *equivSenders) addSender(id peer.ID, equivocation bool) {
if !slices.Contains(es.origins, id) {
es.origins = append(es.origins, id)
if len(es.origins) > 10 {
es.origins = es.origins[:10]

Check warning on line 58 in equivocation.go

View check run for this annotation

Codecov / codecov/patch

equivocation.go#L58

Added line #L58 was not covered by tests
}
slices.Sort(es.origins)
}
es.equivocation = es.equivocation || equivocation
}

func (ef *equivocationFilter) ProcessBroadcast(m *gpbft.GMessage) bool {
ef.lk.Lock()
defer ef.lk.Unlock()

if m.Vote.Instance < ef.currentInstance {
masih marked this conversation as resolved.
Show resolved Hide resolved
// disallow past instances
log.Warnw("disallowing broadcast for past instance", "sender", m.Sender, "instance",
m.Vote.Instance, "currentInstance", ef.currentInstance)
return false
}
// moved onto new instance
if m.Vote.Instance > ef.currentInstance {
ef.currentInstance = m.Vote.Instance
ef.seenMessages = make(map[equivocationKey]equivMessage)
ef.activeSenders = make(map[gpbft.ActorID]equivSenders)
}

key := ef.formKey(m)
msgInfo, ok := ef.seenMessages[key]
equivocationDetected := false
if ok && !bytes.Equal(msgInfo.signature, m.Signature) {
if msgInfo.origin == ef.localPID {
log.Warnw("local self-equivocation detected", "sender", m.Sender,
"instance", m.Vote.Instance, "round", m.Vote.Round, "phase", m.Vote.Phase)
return false
} else {
log.Warnw("detected equivocation during broadcast", "sender", m.Sender,
"instance", m.Vote.Instance, "round", m.Vote.Round, "phase", m.Vote.Phase)
equivocationDetected = true

Check warning on line 93 in equivocation.go

View check run for this annotation

Codecov / codecov/patch

equivocation.go#L91-L93

Added lines #L91 - L93 were not covered by tests
}
} else if !ok {
// save the signature
ef.seenMessages[key] = equivMessage{signature: m.Signature, origin: ef.localPID}
}
// save ourselves as one of the senders
senders := ef.activeSenders[m.Sender]
Kubuxu marked this conversation as resolved.
Show resolved Hide resolved
senders.addSender(ef.localPID, equivocationDetected)
ef.activeSenders[m.Sender] = senders

if !senders.equivocation {
// we are alone in this dark forest
return true
}
// if we are not alone, broadcast the message if we have the best (lowest PeerID)

log.Warnw("self-equivocation detected during broadcast", "sender", m.Sender, "instance", m.Vote.Instance,
"round", m.Vote.Round, "phase", m.Vote.Phase, "sourcers", senders, "localPID", ef.localPID)

// if there are multiple senders, only broadcast if we are the smallest one
return senders.origins[0] == ef.localPID
}

func (ef *equivocationFilter) ProcessReceive(peerID peer.ID, m *gpbft.GMessage) {
masih marked this conversation as resolved.
Show resolved Hide resolved
ef.lk.Lock()
defer ef.lk.Unlock()

if m.Vote.Instance != ef.currentInstance {
masih marked this conversation as resolved.
Show resolved Hide resolved
// the instance does not match
return

Check warning on line 123 in equivocation.go

View check run for this annotation

Codecov / codecov/patch

equivocation.go#L123

Added line #L123 was not covered by tests
}
senders, ok := ef.activeSenders[m.Sender]
if !ok {
// we do not track the sender because we didn't send any messages from that ID
// otherwise we would have to track all messages
return

Check warning on line 129 in equivocation.go

View check run for this annotation

Codecov / codecov/patch

equivocation.go#L129

Added line #L129 was not covered by tests
}
key := ef.formKey(m)
msgInfo, ok := ef.seenMessages[key]
if ok && !bytes.Equal(msgInfo.signature, m.Signature) {
Comment on lines +132 to +133
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

helper?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or even a markSeen that:

  1. Records that the message has been seen.
  2. Returns true if an equivocation.

Copy link
Contributor Author

@Kubuxu Kubuxu Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is different in the case, depending on whether it is external or not. IMO more trouble than it is worth to write a helper. (only two uses, not same exact path).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (ef *equivocationFilter) markSeen(m *gpbft.GMessage, from peer.ID) (peer.ID, bool) {
	key := ef.formKey(m)
	if msgInfo, ok := ef.seenMessages[key]; ok {
		return msgInfo.origin, !bytes.Equal(msgInfo.signature, m.Signature)
	}
	ef.seenMessages[key] = equivMessage{signature: m.Signature, origin: from}
	return from, false
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Actually, I think there's a better way. Let me play with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it really doesn't help much and my way doesn't work. LGTM.

// equivocation detected
senders.addSender(peerID, true)
ef.activeSenders[m.Sender] = senders
}
if !ok {
// add the message
ef.seenMessages[key] = equivMessage{signature: m.Signature, origin: peerID}

Check warning on line 140 in equivocation.go

View check run for this annotation

Codecov / codecov/patch

equivocation.go#L140

Added line #L140 was not covered by tests
}

}
161 changes: 161 additions & 0 deletions equivocation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package f3

import (
"testing"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
)

var localGoodPID = peer.ID("1local")
var remotePID0 = peer.ID("0remote")
var remotePID5 = peer.ID("5remote")

func TestEquivactionFilter_ProcessBroadcast(t *testing.T) {
localPID := localGoodPID
ef := newEquivocationFilter(localPID)

// Test case 1: First message should be processed
msg1 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature1"),
}
assert.True(t, ef.ProcessBroadcast(msg1), "First message should be processed")

// Test case 2: Duplicate message with same signature should be processed
msg2 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature1"),
}
assert.True(t, ef.ProcessBroadcast(msg2), "Duplicate message with same signature should be processed")

// Test case 3 Message with same key but different signature should not be processed
msg3 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature2"),
}
assert.False(t, ef.ProcessBroadcast(msg3), "Message with same key but different signature should not be processed")

// Test case 4: Message with new instance should be processed
msg4 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 2, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature3"),
}
assert.True(t, ef.ProcessBroadcast(msg4), "Message with new instance should be processed")

// Test case 5: Message with past instance should not be processed
msg5 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature4"),
}
assert.False(t, ef.ProcessBroadcast(msg5), "Message with past instance should not be processed")
}

func TestEquivactionFilter_formKey(t *testing.T) {
ef := newEquivocationFilter(localGoodPID)

msg := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Round: 1, Phase: gpbft.Phase(1)},
}

expectedKey := equivocationKey{
Sender: gpbft.ActorID(1),
Round: 1,
Phase: gpbft.Phase(1),
}

assert.Equal(t, expectedKey, ef.formKey(msg), "Keys should match")
}

func TestEquivactionFilter_remoteEquivocationResolution(t *testing.T) {
localPID := localGoodPID
ef := newEquivocationFilter(localPID)

msg1 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature1"),
}
assert.True(t, ef.ProcessBroadcast(msg1), "First message should be processed")
ef.ProcessReceive(localPID, msg1)

msg2 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature2"),
}
ef.ProcessReceive(remotePID5, msg2)
assert.Contains(t, ef.activeSenders[msg2.Sender].origins, remotePID5)

msg3 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 2, Phase: gpbft.Phase(1)},
Signature: []byte("signature3"),
}

assert.True(t, ef.ProcessBroadcast(msg3), "local sender should still be able to broadcast")
ef.ProcessReceive(localPID, msg1)

// lower PeerID sender comes along
msg4 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 2, Phase: gpbft.Phase(1)},
Signature: []byte("signature4"),
}
ef.ProcessReceive(remotePID0, msg4)
assert.Contains(t, ef.activeSenders[msg2.Sender].origins, remotePID0)

msg5 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 3, Phase: gpbft.Phase(1)},
Signature: []byte("signature5"),
}

assert.False(t, ef.ProcessBroadcast(msg5), "we should have backed off")

assert.False(t, ef.ProcessBroadcast(msg3), "trying to re-broadcast is now not allowed")
}

func TestEquivocationFilter_PeerIDBasedEquivocationHandling(t *testing.T) {
localPID := localGoodPID
ef := newEquivocationFilter(localPID)

// Local broadcast
msg1 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature1"),
}
assert.True(t, ef.ProcessBroadcast(msg1), "Local message should be processed")

// Remote equivocation with higher PeerID
msg2 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature2"),
}
ef.ProcessReceive(remotePID5, msg2)
assert.Contains(t, ef.activeSenders[msg2.Sender].origins, remotePID5, "Higher PeerID should be recorded")

// Local broadcast after higher PeerID equivocation
assert.True(t, ef.ProcessBroadcast(msg1), "Local message should still be processed after higher PeerID equivocation")

// Remote equivocation with lower PeerID
msg3 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature3"),
}
ef.ProcessReceive(remotePID0, msg3)
assert.Contains(t, ef.activeSenders[msg3.Sender].origins, remotePID0, "Lower PeerID should be recorded")

// Local broadcast after lower PeerID equivocation
assert.False(t, ef.ProcessBroadcast(msg1), "Local message should not be processed after lower PeerID equivocation")
}
42 changes: 22 additions & 20 deletions f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

mu sync.Mutex
cs *certstore.Store
wal *writeaheadlog.WriteAheadLog[walEntry, *walEntry]
manifest *manifest.Manifest
runner *gpbftRunner
ps *powerstore.Store
Expand Down Expand Up @@ -92,23 +91,28 @@
}

func (m *F3) Broadcast(ctx context.Context, signatureBuilder *gpbft.SignatureBuilder, msgSig []byte, vrf []byte) {
msg := signatureBuilder.Build(msgSig, vrf)

m.mu.Lock()
runner := m.runner
wal := m.wal
manifest := m.manifest
m.mu.Unlock()

if runner == nil {
log.Error("attempted to broadcast message while F3 wasn't running")
return
}
err := wal.Append(walEntry{*msg})
if err != nil {
log.Error("appending to WAL: %+v", err)
if manifest == nil {
log.Error("attempted to broadcast message while manifest is nil")
return

Check warning on line 106 in f3.go

View check run for this annotation

Codecov / codecov/patch

f3.go#L105-L106

Added lines #L105 - L106 were not covered by tests
}
if manifest.NetworkName != signatureBuilder.NetworkName {
log.Errorw("attempted to broadcast message for a wrong network",
"manifestNetwork", manifest.NetworkName, "messageNetwork", signatureBuilder.NetworkName)
return

Check warning on line 111 in f3.go

View check run for this annotation

Codecov / codecov/patch

f3.go#L109-L111

Added lines #L109 - L111 were not covered by tests
}

err = runner.BroadcastMessage(msg)
msg := signatureBuilder.Build(msgSig, vrf)
err := runner.BroadcastMessage(msg)
if err != nil {
log.Warnf("failed to broadcast message: %+v", err)
}
Expand Down Expand Up @@ -336,12 +340,6 @@
}
m.certserv = nil
}
if m.wal != nil {
if serr := m.wal.Flush(); serr != nil {
err = multierr.Append(err, fmt.Errorf("failed to flush WAL: %w", serr))
}
m.wal = nil
}
return err
}

Expand All @@ -364,12 +362,6 @@

m.cs = cs
}
walPath := filepath.Join(m.diskPath, "wal", strings.ReplaceAll(string(m.manifest.NetworkName), "/", "-"))
var err error
m.wal, err = writeaheadlog.Open[walEntry](walPath)
if err != nil {
return fmt.Errorf("opening WAL: %w", err)
}

if m.ps == nil {
pds := measurements.NewMeteredDatastore(meter, "f3_ohshitstore_datastore_", m.ds)
Expand Down Expand Up @@ -410,9 +402,19 @@
return err
}

cleanName := strings.ReplaceAll(string(m.manifest.NetworkName), "/", "-")
cleanName = strings.ReplaceAll(cleanName, ".", "")
cleanName = strings.ReplaceAll(cleanName, "\u0000", "")

walPath := filepath.Join(m.diskPath, "wal", cleanName)
wal, err := writeaheadlog.Open[walEntry](walPath)
if err != nil {
return fmt.Errorf("opening WAL: %w", err)

Check warning on line 412 in f3.go

View check run for this annotation

Codecov / codecov/patch

f3.go#L412

Added line #L412 was not covered by tests
}

if runner, err := newRunner(
ctx, m.cs, m.ps, m.pubsub, m.verifier,
m.outboundMessages, m.manifest, m.wal,
m.outboundMessages, m.manifest, wal, m.host.ID(),
); err != nil {
return err
} else {
Expand Down
Loading
Loading