Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor events handlers #342

Merged

Conversation

hannahhoward
Copy link
Collaborator

Goals

Complete the transport refactor, moving several items below the transport boundary, cleaning up event flow, fixing issues in the state machine, making transport interface more high level, and limit data transfer manager's responbilities to voucher processing and state handling.

Implementation

  1. Cleaning up the initiating states flow
  • We've added a "TransferInitaited" step for when the request begins processing. We've removed the "TransferRequestQueued". TransferInitaited is called when a graphsync (or other transport) actually begins executing a request.
  • Recently, we merged Accept/Reject requests up front ipfs/go-graphsync#384. We now evaluate whether to accept requests immediately, then queue them for later processing. Previously we had queued for processing without evaluating whether to ultimately accept a request.
  • Combined with the new event, this means a high bandwidth data sender using graphsync will see a Requested -> Queued -> Ongoing sequence, where Queued indicates a request has been accepted but we are waiting for it to reach the top of the graphsync processing queue. This is a long requested feature to be able to know when requests have been received and accepted, even when they make take a long time to start processing. Additionally rejected requests will fail much faster
  • What if we receive TransferInitaited before Accept? In a graphsync pull request, the graphsync requestor will begin "processing" their request -- producing a TransferIntiaited -- and then only later receive a reply back from the server accepting the request. For this sequence (first TransferInitated then Accept), the intermediate state is called AwaitingAcceptance.
  1. Cleaning up pause / resumes and the state machine.
  • Previous the state machine's process for coming out of Pause/Resume was simply to go to Ongoing, regardless of the state it had been in before pause resume. The proper behavior would be to return to the state it was in before the pause / resume. While we've gotten away with it largely, there are some rough behaviors that emerge from it.
  • Essentially, pausing and resuming is not great for an FSM where state is represented by a single value. In FSM terminology Pause/Resume is what I believe is referred to as a "orthogonal region" (though possibly a hierarchical state machine... who knows).
  • I briefly explored adding more features to go-statemachine then took the simple path of simply not making pause resume part of the state name, and rather seperate attributes on the channel state.
  1. Moving data limit tracking into the transport

There was a ton of awful logic in the channel state machine plus some memory caches trying to track which new "I sent/received data" events from the transport were actually new, as well as when exactly when we hit our data limits so we can tell the transport to pause. The really simple solution here is the transport should just handle this. It now does. In fact, we could make this even simpler and remove the "DataSentProgress" / "DataQueuedProgress" / "DataReceivedProgress" events because we now know for sure that they will always happen one-to-one with the corresponding "DataSent/DataQueued/DataReceived". I've left them there for now cause existing code may be expecting these. But, also, maybe just fuck-it -- let's make it simple!

Similarly, the transport now tracks data limits and pauses itself as needed.

  1. Making index tracking abstract

Previously, every piece of data sent (i.e. each block) had an integer index. I did a lot of thinking about what an "index" actually is, and I finally came to the conclusion it's simply the transports way of knowing how to pick up again after a restart. In the case of graphsync and selectors, I think the ultimate ideal way to restart is "restart form this path in a selector traversal" (hey this even exists actually already -- ipld/go-ipld-prime#358). I thought about what I would do to eventually evolve the graphsync transport to eventually do resume at path, and I realized I'd need to switch the index type. So with that in mind, I decided to just make indexes IPLD nodes. The value of the IPLD here is simply that it can be serialized into the transfer state. Otherwise it's effectively type "any", with the transport being the one that knows how to interpret it.

  1. The new transport events interface

I went through several iterations on the transport events handler interface before finally settling on what I ended up with.

I wanted to distinguish between "blocking" events -- where the transport expects a response from go-data-transfer -- and non-blocking events that could potentially be dispatched asynchronously, and to limit the number of blocking events. I attempted to make almost everything asychronous at one point, then pulled back a bit at the end. I ultimately decided new requests and responses needed to be processed and return errors as needed, because the transport uses those errors to accept or reject requests. Everything else is now dispatched through an OnTransportEvent() callback, which has no return value. While it's actually called synchronously for the moment, theoretically it could be asynchronous, though attention would need to be paid to ordering with the OnRequestReceived/OnResponseReceived synchronous events.

I also went through several iterations as well trying to think about how data transfer would pass on things like adjusted data limits and updated pause / resume states. I went through various interations of a "channel update" data structure building on the one I had in earlier transport refactor PRs. I ultimately decided the absolute simpest thing was to let go-data-transfer tell the transport when state had been updated and let the transport simply pull ChannelState as needed. The resulting code so far seems relatively reasonable.

  1. I fixed the flaky integraiton test with pause/resume

This was driving me crazy

  1. I removed errors on a bunch of the message creation functions, cause they never returned erros

It's a simple way to clean up code

  1. I redid the testing infrastructure for the transport tests and wrote new ones

Transport unit testing is really weird without a real graphsync below or a responsive data transfer above. But the current versions seem slightly more comprehesible?

For Discussion

  • TODO before merge:

    • the transport tests need error cases added. I left the old tests commented out in one of the files for reference
    • there are some legacy "Transport Events" from experiments with making more stuff asynchronous commented out
    • I'm not sure the state store migrations I wrote cover everything -- I need to review them cause I wrote them early but I think I made more changes since then
  • TODOS before data transfer v2.0 shipping

    • This PR deprecates some events, and there are more already deprecated. Seems like a good time to actually delete them, given v2 is intended to be a breaking code change (but not protocol breaking). Unfortunately, currently they are a numbered sequence, which suggests a more radical and much needed change -- making our events and states string based. This would save so many people so much trouble. I have no idea why they're number based other than when they were being developed before launch people actually thought these numbers would have great significance.
  • Open Questions

    • Was it worth it?

refactors the transport events interface
- moves data limit tracking into the transport
- makes index tracking opaque work inside the transport
- refactors event flow -- we now have an initiated event + queued &
  awaiting acceptance states
- leans into async communication between transport and data transfer
- adds a bunch of transport tests
- refactor pausing to support more correct resumption in state machine
@hannahhoward hannahhoward requested review from rvagg and dirkmc June 30, 2022 09:51
@hannahhoward hannahhoward force-pushed the feat/refactor-transport-events-handlers branch from 49bb7d7 to 087a0dc Compare June 30, 2022 09:58
@hannahhoward
Copy link
Collaborator Author

Oh one more comment -- the code in the graphsync transport is still messy, and there are various things I'd like to consider changing there. However, the good news is, if the interfaces are right, we clean this up later as a non-breaking change.

@hannahhoward
Copy link
Collaborator Author

One more proposal, which could be a challenge, but which I think is ideal in the long term, is have everything be asychronous -- send commands down, events up. It's a bit tricky to coordinate with graphsync, but boy this would make everything clearer in the logic.

@rvagg
Copy link
Member

rvagg commented Jul 1, 2022

I fixed the flaky integraiton test with pause/resume

yay

chst.AddLog("")
return nil
}),
FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance).ToJustRecord().
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So these 5 pause/resume related events are now not mutating state, just triggering the action functions that set state on the associated ChannelState, correct? That seems pretty reasonable I think. I could imagine some kind of hierarchical fsm setup working for this, but would it buy us much? Are there more complex mutations that we'd make if we had the ability to do so? Maybe this is almost at the level of pausing the fsm itself and that's the cross-cutting concern here—fsm is in a paused state so certain actions should be invalid while that's the case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea exactly. I'm not convinced FSM is the right model in a larger sense anyway, so I'd rather not further invest in even more complicated options.

Comment on lines +96 to +98
ReceivedIndex: internal.CborGenCompatibleNode{Node: receivedIndex},
SentIndex: internal.CborGenCompatibleNode{Node: sentIndex},
QueuedIndex: internal.CborGenCompatibleNode{Node: queuedIndex},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine the thinking about adding migration testing is that this transition is pretty straightforward and the absolute hassle of having to come up with fixtures for this is just not worth it? It seems good from eyeballing it but we'll probably want to see it actually working in practice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no I should probably do that!

@@ -0,0 +1,876 @@
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the diff of this with the previous internalchannel_cbor_gen.go show no meaningful change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably? it's all auto generated scripts but the structs are copy paste.

transport.go Outdated
Comment on lines 56 to 71
type TransportReceivedVoucherRequest struct {
Request Request
}

type TransportReceivedUpdateRequest struct {
Request Request
}

type TransportReceivedCancelRequest struct {
Request Request
}

// TransportReceivedResponse occurs when we receive a response to a request
type TransportReceivedResponse struct {
Response Response
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the story with these? are they are TODO in order to finish this work? they seem like important pieces of the puzzle

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I think these come under:

there are some legacy "Transport Events" from experiments with making more stuff asynchronous commented out

But I'd like to hear more about these specific events - were they never actually used? Are we losing anything by not having them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so my original plan, which I'd love your feedback on, was to make the whole thing an event stream, which would remove the requirement of any synchronous hooks. As it turns out, that's kinda hard. This was an in between point when I thought may be I could make this all work for just incoming new or restart requests, which I may still consider. But, honestly, this was the first final interface I could get stood up and running after trying several times!

err = m.recordAcceptedValidationEvents(chst, result)
}
// dispatch channel events and generate a response message
err = m.processValidationUpdate(ctx, chst, result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this refactor buys you anything tbh, it's fine, but how much I miss ternaries in Go! In a sane programming language this would simply be:

err = !result.Accepted ? m.recordRejectedValidationEvents(chid, result) : m.recordAcceptedValidationEvents(chst, result)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate the lack of terneries in go too. on this we agree!

var chid datatransfer.ChannelID

gsData.Gs1.RegisterOutgoingBlockHook(func(p peer.ID, r graphsync.RequestData, block graphsync.BlockData, ha graphsync.OutgoingBlockHookActions) {
if block.Index() == 5 && block.BlockSizeOnWire() > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the flake in here was a race on where the pause was triggered? and shifting further down the pipe to the hook, with added delay, fixed it?

@@ -76,7 +76,7 @@ func UpdateRequest(id datatransfer.TransferID, isPaused bool) datatransfer.Reque
}

// VoucherRequest generates a new request for the data transfer protocol
func VoucherRequest(id datatransfer.TransferID, voucher *datatransfer.TypedVoucher) (datatransfer.Request, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

statuses.go Outdated Show resolved Hide resolved
func (c *Channel) updateFromChannelState(chst datatransfer.ChannelState) error {
// read the sent value
sentNode := chst.SentIndex()
if !sentNode.IsNull() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a comment than a change request: these IsNulls dictate the contract on ChannelState that is currently fulfilled by CborGenCompatibleNode. If we were to move away from cbor-gen then that contract would need to follow. By default the Nodes present here would need to satisfy IsNull()==true in their default state—which isn't necessarily natural (e.g. maybe Absent or simply nil would be more natural choices).

🤷 I think I'm just feeling the mild awkwardness of the implicit datatransfer.ChannelState contract being dictated by use this far down in the stack. Hopefully tests would pick up any problems if we changed later on without realising this.

// ActionFromChannelState comparse internal graphsync channel state with the data transfer
// state and determines what if any action should be taken on graphsync
func (c *Channel) ActionFromChannelState(chst datatransfer.ChannelState) Action {
c.lk.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an RLock() would be OK here wouldn't it?


// extension not found; probably not our request.
if message == nil {
chid, ok := t.requestIDToChannelID.load(request.ID())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add (back) a comment here to say why we might not get one for this request.ID()? is it simply going to be a spoof (or mistake) from the peer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

})
}

/* "gs outgoing request with recognized dt push channel will record incoming blocks": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there still pending tests in this comment block to be ported?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no they should go. finishing those out right now.

if receiveErr == datatransfer.ErrResume && ch.Paused() {

var extensions []graphsync.ExtensionData
if receiveErr != nil {
if response != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happening in here now if response == nil, is it right that we continue on (i.e. that this is really if receiveErr != nil && response != nil {. It looks like we can get all the way down to the end and do a r.transport.processAction() with a nil response.

Copy link
Member

@rvagg rvagg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is fine I guess; it's way too large with too much change to properly review without a more intimate understanding of the codebase so I've done my best to keep up but mostly I'm left commenting on isolated pieces rather than the architectural aspects of it. Overall it seems good, there's some good simplification and pushing some concerns down into the transports seem like they're done well. The channel state migration looks good I think. There's some pieces of commented out code that makes it look incomplete (tests, and the "legacy" transport events which should probably be removed if we're not seeing a path to using them).

Was it worth it?

Feels like an incremental improvement from an architectural quality standpoint. But it's too complex for me to give a judgement beyond that. I hope you think it was worth it since you put in the effort!

Co-authored-by: Rod Vagg <[email protected]>
@hannahhoward
Copy link
Collaborator Author

A bit of background on the commented out events.

Where I would like to get to is no synchronous hooks. Just one OnTransportEvent function.

My baseline experience with these layers of transports is as follows: the more we expect/need to handle events synchronously, the more inflexible the code becomes, and the more likely we end up with race conditions. My idea for the future is one kinda like React in JS if you've worked with that -> data/parameters down + events up. So go-data-transfer should say to the transport: here's what I want to happen, and the transport should say: here's what actually did happen.

The problem is we just aren't quire there yet. I tried to figure this one out and just wasn't prepared to deal with the actuality that GraphSync's current hooks are synchronous. Or, at least, not in a single PR that was huge right?

@hannahhoward
Copy link
Collaborator Author

Was it worth it?
Feels like an incremental improvement from an architectural quality standpoint. But it's too complex for me to give a judgement beyond that. I hope you think it was worth it since you put in the effort!

The hardest part here is I definitely can't keep banging away at this, but this is definitely not complete, and won't be complete till we implement a second transport. That's where the rubber will hit the road. Still, overall, I feel better about this. I just wish it weren't so hard. To some extent, it may be an argument that go-data-transfers goals are overly ambitious.

Comment on lines +123 to +126
// However, the other handler functions may ONLY be called on the same channel
// after all events are dispatched. In other words, the transport MUST allow
// the handler to process all events before calling the other functions which
// have a synchronous return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From your latest comments in the thread, I think this comment is getting at the point you made:

Where I would like to get to is no synchronous hooks. Just one OnTransportEvent function.

Which points to the deficiency in the current half-way state? I take it that you've chewed off the doable events into OnTransportEvent(), with the remaining ones TODO.

So we still have:

  • OnRequestReceived
  • OnRequestReceived
  • OnContextAugment

And you've experimented with splitting OnRequestReceived into Update, Pause and Voucher forms.

The context augment one being synchronous makes sense, the other ones do stand out a little now so I see the awkwardness of not going the full way on bundling everything into the single event channel. But, I think it's reasonable to stop where you have just to get this over the line.

However, it would be good to add some more inline comments about this state as a reminder for later or a hint for the next person picking this up about (a) the fact of the half-way state and (b) the ideal of moving toward fully async eventing.

Copy link
Collaborator Author

@hannahhoward hannahhoward Jul 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

absolutely. yes, the hard part about the others being asynchronous is that graphsync has synchronous hooks. so we'll need a bit of channel magic. messing with all this at once was just creating a back and forth with no progress being made -- honestly, I just needed to get to passing tests at some point.

impl/events.go Outdated Show resolved Hide resolved
Co-authored-by: Rod Vagg <[email protected]>
add additional tests, remove commented out code, improve migrations
…filecoin-project/go-data-transfer into feat/refactor-transport-events-handlers
@codecov-commenter
Copy link

Codecov Report

Merging #342 (e93d876) into feat/refactor-transport (c5f91ae) will increase coverage by 11.31%.
The diff coverage is 69.40%.

Impacted file tree graph

@@                     Coverage Diff                      @@
##           feat/refactor-transport     #342       +/-   ##
============================================================
+ Coverage                    55.43%   66.74%   +11.31%     
============================================================
  Files                           26       25        -1     
  Lines                         2751     2556      -195     
============================================================
+ Hits                          1525     1706      +181     
+ Misses                        1063      674      -389     
- Partials                       163      176       +13     
Impacted Files Coverage Δ
message/message1_1prime/message.go 57.34% <33.33%> (+0.96%) ⬆️
impl/impl.go 52.96% <40.90%> (-0.40%) ⬇️
impl/events.go 40.00% <44.44%> (-6.31%) ⬇️
transport/graphsync/receiver.go 53.39% <54.83%> (+53.39%) ⬆️
channels/channels.go 72.83% <68.75%> (-0.50%) ⬇️
channels/channel_state.go 72.22% <75.00%> (-0.36%) ⬇️
impl/utils.go 75.00% <75.00%> (ø)
transport/graphsync/graphsync.go 62.98% <76.47%> (+62.98%) ⬆️
channels/channels_fsm.go 61.93% <84.09%> (-9.33%) ⬇️
transport/graphsync/hooks.go 67.45% <88.13%> (+67.45%) ⬆️
... and 12 more

@hannahhoward hannahhoward merged commit 80a1331 into feat/refactor-transport Jul 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants