-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor events handlers #342
Refactor events handlers #342
Conversation
refactors the transport events interface - moves data limit tracking into the transport - makes index tracking opaque work inside the transport - refactors event flow -- we now have an initiated event + queued & awaiting acceptance states - leans into async communication between transport and data transfer - adds a bunch of transport tests - refactor pausing to support more correct resumption in state machine
49bb7d7
to
087a0dc
Compare
Oh one more comment -- the code in the graphsync transport is still messy, and there are various things I'd like to consider changing there. However, the good news is, if the interfaces are right, we clean this up later as a non-breaking change. |
One more proposal, which could be a challenge, but which I think is ideal in the long term, is have everything be asychronous -- send commands down, events up. It's a bit tricky to coordinate with graphsync, but boy this would make everything clearer in the logic. |
chst.AddLog("") | ||
return nil | ||
}), | ||
FromMany(datatransfer.Ongoing, datatransfer.Requested, datatransfer.Queued, datatransfer.AwaitingAcceptance).ToJustRecord(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So these 5 pause/resume related events are now not mutating state, just triggering the action functions that set state on the associated ChannelState, correct? That seems pretty reasonable I think. I could imagine some kind of hierarchical fsm setup working for this, but would it buy us much? Are there more complex mutations that we'd make if we had the ability to do so? Maybe this is almost at the level of pausing the fsm itself and that's the cross-cutting concern here—fsm is in a paused state so certain actions should be invalid while that's the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea exactly. I'm not convinced FSM is the right model in a larger sense anyway, so I'd rather not further invest in even more complicated options.
ReceivedIndex: internal.CborGenCompatibleNode{Node: receivedIndex}, | ||
SentIndex: internal.CborGenCompatibleNode{Node: sentIndex}, | ||
QueuedIndex: internal.CborGenCompatibleNode{Node: queuedIndex}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine the thinking about adding migration testing is that this transition is pretty straightforward and the absolute hassle of having to come up with fixtures for this is just not worth it? It seems good from eyeballing it but we'll probably want to see it actually working in practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no I should probably do that!
@@ -0,0 +1,876 @@ | |||
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the diff of this with the previous internalchannel_cbor_gen.go show no meaningful change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably? it's all auto generated scripts but the structs are copy paste.
transport.go
Outdated
type TransportReceivedVoucherRequest struct { | ||
Request Request | ||
} | ||
|
||
type TransportReceivedUpdateRequest struct { | ||
Request Request | ||
} | ||
|
||
type TransportReceivedCancelRequest struct { | ||
Request Request | ||
} | ||
|
||
// TransportReceivedResponse occurs when we receive a response to a request | ||
type TransportReceivedResponse struct { | ||
Response Response | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the story with these? are they are TODO in order to finish this work? they seem like important pieces of the puzzle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I think these come under:
there are some legacy "Transport Events" from experiments with making more stuff asynchronous commented out
But I'd like to hear more about these specific events - were they never actually used? Are we losing anything by not having them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so my original plan, which I'd love your feedback on, was to make the whole thing an event stream, which would remove the requirement of any synchronous hooks. As it turns out, that's kinda hard. This was an in between point when I thought may be I could make this all work for just incoming new or restart requests, which I may still consider. But, honestly, this was the first final interface I could get stood up and running after trying several times!
err = m.recordAcceptedValidationEvents(chst, result) | ||
} | ||
// dispatch channel events and generate a response message | ||
err = m.processValidationUpdate(ctx, chst, result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this refactor buys you anything tbh, it's fine, but how much I miss ternaries in Go! In a sane programming language this would simply be:
err = !result.Accepted ? m.recordRejectedValidationEvents(chid, result) : m.recordAcceptedValidationEvents(chst, result)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate the lack of terneries in go too. on this we agree!
var chid datatransfer.ChannelID | ||
|
||
gsData.Gs1.RegisterOutgoingBlockHook(func(p peer.ID, r graphsync.RequestData, block graphsync.BlockData, ha graphsync.OutgoingBlockHookActions) { | ||
if block.Index() == 5 && block.BlockSizeOnWire() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the flake in here was a race on where the pause was triggered? and shifting further down the pipe to the hook, with added delay, fixed it?
@@ -76,7 +76,7 @@ func UpdateRequest(id datatransfer.TransferID, isPaused bool) datatransfer.Reque | |||
} | |||
|
|||
// VoucherRequest generates a new request for the data transfer protocol | |||
func VoucherRequest(id datatransfer.TransferID, voucher *datatransfer.TypedVoucher) (datatransfer.Request, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👌
func (c *Channel) updateFromChannelState(chst datatransfer.ChannelState) error { | ||
// read the sent value | ||
sentNode := chst.SentIndex() | ||
if !sentNode.IsNull() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More of a comment than a change request: these IsNull
s dictate the contract on ChannelState
that is currently fulfilled by CborGenCompatibleNode
. If we were to move away from cbor-gen then that contract would need to follow. By default the Node
s present here would need to satisfy IsNull()==true
in their default state—which isn't necessarily natural (e.g. maybe Absent
or simply nil
would be more natural choices).
🤷 I think I'm just feeling the mild awkwardness of the implicit datatransfer.ChannelState
contract being dictated by use this far down in the stack. Hopefully tests would pick up any problems if we changed later on without realising this.
// ActionFromChannelState comparse internal graphsync channel state with the data transfer | ||
// state and determines what if any action should be taken on graphsync | ||
func (c *Channel) ActionFromChannelState(chst datatransfer.ChannelState) Action { | ||
c.lk.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just an RLock()
would be OK here wouldn't it?
|
||
// extension not found; probably not our request. | ||
if message == nil { | ||
chid, ok := t.requestIDToChannelID.load(request.ID()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add (back) a comment here to say why we might not get one for this request.ID()
? is it simply going to be a spoof (or mistake) from the peer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough
}) | ||
} | ||
|
||
/* "gs outgoing request with recognized dt push channel will record incoming blocks": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there still pending tests in this comment block to be ported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no they should go. finishing those out right now.
if receiveErr == datatransfer.ErrResume && ch.Paused() { | ||
|
||
var extensions []graphsync.ExtensionData | ||
if receiveErr != nil { | ||
if response != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's happening in here now if response == nil
, is it right that we continue on (i.e. that this is really if receiveErr != nil && response != nil {
. It looks like we can get all the way down to the end and do a r.transport.processAction()
with a nil
response
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is fine I guess; it's way too large with too much change to properly review without a more intimate understanding of the codebase so I've done my best to keep up but mostly I'm left commenting on isolated pieces rather than the architectural aspects of it. Overall it seems good, there's some good simplification and pushing some concerns down into the transports seem like they're done well. The channel state migration looks good I think. There's some pieces of commented out code that makes it look incomplete (tests, and the "legacy" transport events which should probably be removed if we're not seeing a path to using them).
Was it worth it?
Feels like an incremental improvement from an architectural quality standpoint. But it's too complex for me to give a judgement beyond that. I hope you think it was worth it since you put in the effort!
Co-authored-by: Rod Vagg <[email protected]>
A bit of background on the commented out events. Where I would like to get to is no synchronous hooks. Just one OnTransportEvent function. My baseline experience with these layers of transports is as follows: the more we expect/need to handle events synchronously, the more inflexible the code becomes, and the more likely we end up with race conditions. My idea for the future is one kinda like React in JS if you've worked with that -> data/parameters down + events up. So go-data-transfer should say to the transport: here's what I want to happen, and the transport should say: here's what actually did happen. The problem is we just aren't quire there yet. I tried to figure this one out and just wasn't prepared to deal with the actuality that GraphSync's current hooks are synchronous. Or, at least, not in a single PR that was huge right? |
The hardest part here is I definitely can't keep banging away at this, but this is definitely not complete, and won't be complete till we implement a second transport. That's where the rubber will hit the road. Still, overall, I feel better about this. I just wish it weren't so hard. To some extent, it may be an argument that go-data-transfers goals are overly ambitious. |
// However, the other handler functions may ONLY be called on the same channel | ||
// after all events are dispatched. In other words, the transport MUST allow | ||
// the handler to process all events before calling the other functions which | ||
// have a synchronous return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From your latest comments in the thread, I think this comment is getting at the point you made:
Where I would like to get to is no synchronous hooks. Just one OnTransportEvent function.
Which points to the deficiency in the current half-way state? I take it that you've chewed off the doable events into OnTransportEvent()
, with the remaining ones TODO.
So we still have:
- OnRequestReceived
- OnRequestReceived
- OnContextAugment
And you've experimented with splitting OnRequestReceived
into Update
, Pause
and Voucher
forms.
The context augment one being synchronous makes sense, the other ones do stand out a little now so I see the awkwardness of not going the full way on bundling everything into the single event channel. But, I think it's reasonable to stop where you have just to get this over the line.
However, it would be good to add some more inline comments about this state as a reminder for later or a hint for the next person picking this up about (a) the fact of the half-way state and (b) the ideal of moving toward fully async eventing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
absolutely. yes, the hard part about the others being asynchronous is that graphsync has synchronous hooks. so we'll need a bit of channel magic. messing with all this at once was just creating a back and forth with no progress being made -- honestly, I just needed to get to passing tests at some point.
Co-authored-by: Rod Vagg <[email protected]>
add additional tests, remove commented out code, improve migrations
…filecoin-project/go-data-transfer into feat/refactor-transport-events-handlers
Codecov Report
@@ Coverage Diff @@
## feat/refactor-transport #342 +/- ##
============================================================
+ Coverage 55.43% 66.74% +11.31%
============================================================
Files 26 25 -1
Lines 2751 2556 -195
============================================================
+ Hits 1525 1706 +181
+ Misses 1063 674 -389
- Partials 163 176 +13
|
Goals
Complete the transport refactor, moving several items below the transport boundary, cleaning up event flow, fixing issues in the state machine, making transport interface more high level, and limit data transfer manager's responbilities to voucher processing and state handling.
Implementation
There was a ton of awful logic in the channel state machine plus some memory caches trying to track which new "I sent/received data" events from the transport were actually new, as well as when exactly when we hit our data limits so we can tell the transport to pause. The really simple solution here is the transport should just handle this. It now does. In fact, we could make this even simpler and remove the "DataSentProgress" / "DataQueuedProgress" / "DataReceivedProgress" events because we now know for sure that they will always happen one-to-one with the corresponding "DataSent/DataQueued/DataReceived". I've left them there for now cause existing code may be expecting these. But, also, maybe just fuck-it -- let's make it simple!
Similarly, the transport now tracks data limits and pauses itself as needed.
Previously, every piece of data sent (i.e. each block) had an integer index. I did a lot of thinking about what an "index" actually is, and I finally came to the conclusion it's simply the transports way of knowing how to pick up again after a restart. In the case of graphsync and selectors, I think the ultimate ideal way to restart is "restart form this path in a selector traversal" (hey this even exists actually already -- ipld/go-ipld-prime#358). I thought about what I would do to eventually evolve the graphsync transport to eventually do resume at path, and I realized I'd need to switch the index type. So with that in mind, I decided to just make indexes IPLD nodes. The value of the IPLD here is simply that it can be serialized into the transfer state. Otherwise it's effectively type "any", with the transport being the one that knows how to interpret it.
I went through several iterations on the transport events handler interface before finally settling on what I ended up with.
I wanted to distinguish between "blocking" events -- where the transport expects a response from go-data-transfer -- and non-blocking events that could potentially be dispatched asynchronously, and to limit the number of blocking events. I attempted to make almost everything asychronous at one point, then pulled back a bit at the end. I ultimately decided new requests and responses needed to be processed and return errors as needed, because the transport uses those errors to accept or reject requests. Everything else is now dispatched through an OnTransportEvent() callback, which has no return value. While it's actually called synchronously for the moment, theoretically it could be asynchronous, though attention would need to be paid to ordering with the OnRequestReceived/OnResponseReceived synchronous events.
I also went through several iterations as well trying to think about how data transfer would pass on things like adjusted data limits and updated pause / resume states. I went through various interations of a "channel update" data structure building on the one I had in earlier transport refactor PRs. I ultimately decided the absolute simpest thing was to let go-data-transfer tell the transport when state had been updated and let the transport simply pull ChannelState as needed. The resulting code so far seems relatively reasonable.
This was driving me crazy
It's a simple way to clean up code
Transport unit testing is really weird without a real graphsync below or a responsive data transfer above. But the current versions seem slightly more comprehesible?
For Discussion
TODO before merge:
TODOS before data transfer v2.0 shipping
Open Questions