Skip to content

Commit

Permalink
raft: require app to consume result from Ready()
Browse files Browse the repository at this point in the history
I changed `(*RawNode).Ready`'s behavior in #10892 in a problematic way.
Previously, `Ready()` would create and immediately "accept" a Ready
(i.e. commit the app to actually handling it). In #10892, Ready() became
a pure read-only operation and the "accepting" was moved to
`Advance(rd)`.  As a result it was illegal to use the RawNode in certain
ways while the Ready was being handled. Failure to do so would result in
dropped messages (and perhaps worse). For example, with the following
operations

1. `rd := rawNode.Ready()`
2. `rawNode.Step(someMsg)`
3. `rawNode.Advance(rd)`

`someMsg` would be dropped, because `Advance()` would clear out the
outgoing messages thinking that they had all been handled by the client.
I mistakenly assumed that this restriction had existed prior, but this
is incorrect.

I noticed this while trying to pick up the above PR in CockroachDB,
where it caused unit test failures, precisely due to the above example.

This PR reestablishes the previous behavior (result of `Ready()` must
be handled by the app) and adds a regression test.

While I was there, I carried out a few small clarifying refactors.
  • Loading branch information
tbg committed Jul 23, 2019
1 parent a91f4e4 commit 721127d
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 25 deletions.
4 changes: 2 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (n *node) run(rn *RawNode) {
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = rn.Ready()
rd = rn.readyWithoutAccept()
readyc = n.readyc
}

Expand Down Expand Up @@ -387,7 +387,7 @@ func (n *node) run(rn *RawNode) {
rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
rn.commitReady(rd)
rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
Expand Down
34 changes: 11 additions & 23 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,17 @@ func (rn *RawNode) Step(m pb.Message) error {

// Ready returns the outstanding work that the application needs to handle. This
// includes appending and applying entries or a snapshot, updating the HardState,
// and sending messages. Ready() is a read-only operation, that is, it does not
// require the caller to actually handle the result. Typically, a caller will
// want to handle the Ready and must pass the Ready to Advance *after* having
// done so. While a Ready is being handled, the RawNode must not be used for
// operations that may alter its state. For example, it is illegal to call
// Ready, followed by Step, followed by Advance.
// and sending messages. The returned Ready() *must* be handled and subsequently
// passed back via Advance().
func (rn *RawNode) Ready() Ready {
rd := rn.newReady()
rd := rn.readyWithoutAccept()
rn.acceptReady(rd)
return rd
}

func (rn *RawNode) newReady() Ready {
// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
// is no obligation that the Ready must be handled.
func (rn *RawNode) readyWithoutAccept() Ready {
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}

Expand All @@ -149,15 +148,6 @@ func (rn *RawNode) acceptReady(rd Ready) {
rn.raft.msgs = nil
}

// commitReady is called when the consumer of the RawNode has successfully
// handled a Ready (having previously called acceptReady).
func (rn *RawNode) commitReady(rd Ready) {
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
rn.raft.advance(rd)
}

// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
Expand All @@ -183,12 +173,10 @@ func (rn *RawNode) HasReady() bool {
// Advance notifies the RawNode that the application has applied and saved progress in the
// last Ready results.
func (rn *RawNode) Advance(rd Ready) {
// Advance combines accept and commit. Callers can't mutate the RawNode
// between the call to Ready and the matching call to Advance, or the work
// done in acceptReady will clobber potentially newer data that has not been
// emitted in a Ready yet.
rn.acceptReady(rd)
rn.commitReady(rd)
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
rn.raft.advance(rd)
}

// Status returns the current status of the given group. This allocates, see
Expand Down
34 changes: 34 additions & 0 deletions raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,37 @@ func BenchmarkStatus(b *testing.B) {
})
}
}

func TestRawNodeConsumeReady(t *testing.T) {
// Check that readyWithoutAccept() does not call acceptReady (which resets
// the messages) but Ready() does.
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 3, 1, s)
m1 := pb.Message{Context: []byte("foo")}
m2 := pb.Message{Context: []byte("bar")}

// Inject first message, make sure it's visible via readyWithoutAccept.
rn.raft.msgs = append(rn.raft.msgs, m1)
rd := rn.readyWithoutAccept()
if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
}
if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) {
t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs)
}
// Now call Ready() which should move the message into the Ready (as opposed
// to leaving it in both places).
rd = rn.Ready()
if len(rn.raft.msgs) > 0 {
t.Fatalf("messages not reset: %+v", rn.raft.msgs)
}
if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
}
// Add a message to raft to make sure that Advance() doesn't drop it.
rn.raft.msgs = append(rn.raft.msgs, m2)
rn.Advance(rd)
if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) {
t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
}
}

0 comments on commit 721127d

Please sign in to comment.