-
Notifications
You must be signed in to change notification settings - Fork 738
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Multipart subscriptions #2870
Conversation
✅ Deploy Preview for apollo-ios-docs canceled.
|
I'm putting this PR into review so that the general approach and implementation can be commented on. Once I'm back next week I'll finish up the tests for the new |
@@ -35,6 +35,7 @@ open class DefaultInterceptorProvider: InterceptorProvider { | |||
CacheReadInterceptor(store: self.store), | |||
NetworkFetchInterceptor(client: self.client), | |||
ResponseCodeInterceptor(), | |||
MultipartResponseParsingInterceptor(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the interceptor chain for all operation types. Right now it is applicable only to Subscription
but supporting @defer
will require it for Query
too. There is a short-circuit to check for the multipart/mixed
header and exit early if it's not present in the response.
|
||
taskData.append(additionalData: data) | ||
|
||
if let httpResponse = dataTask.response as? HTTPURLResponse, httpResponse.isMultipart { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This multipart boundary check is to ensure that only complete chunks are sent to the next interceptor. Any remaining data that was received that should be processed with the next incoming data to form a complete chunk will get held back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will that chunk get automatically included in the dataTask
next time this function is called? I'm not seeing anywhere that you are holding onto the dataTask
to merge them together when the next chunk comes in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. To me, this makes it even more important that task data is mutated atomically then. Totally good with this implementation in every other regard. But seems very easy for this to get corrupted by parallel threads executing in a weird order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻 - created #2900 to handle it.
Tests/ApolloTests/Interceptors/MultipartResponseParsingInterceptorTests.swift
Show resolved
Hide resolved
Tests/ApolloTests/Interceptors/MultipartResponseParsingInterceptorTests.swift
Show resolved
Hide resolved
@AnthonyMDev this PR is now ready for review. Note - there is a lot of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks awesome! I have a few questions and suggestions, but very minor. Great job on this. It was not a simple task...
guard let taskData = $0[dataTask.taskIdentifier] else { | ||
assertionFailure("No data found for task \(dataTask.taskIdentifier), cannot append received data") | ||
|
||
guard let taskData = self.tasks[dataTask.taskIdentifier] else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By removing the self.$tasks.mutate
block, this is no longer atomic. I get why it's not necessary for updating the TaskData
in place, because TaskData
is a class
. But does this mean that this is not guaranteed to happen in a sequentially? Can these create problematic race conditions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tasks
is annotated with the @Atomic
property wrapper so all access to the task list should still be atomic; we're assured that the task won't be removed at least. We only need mutate
when we want to alter the task list.
data
within each TaskData
is not protected by anything though so yes I suppose there is a chance of a race condition when working with incoming data for a particular task. We shouldn't be using the list atomic lock to ensure consistent data access for a single task though. I suspect it was either a mistake to have mutate
there in the first place or it was being abused for the purpose of single task data access.
The default callback queue is main
which, being a serial queue, will ensure there is no race condition. Anyone who cares about a performant UI though will be giving us their own queue, which could be concurrent and introduce the potential for a race condition.
We could use @Atomic
on TaskData.data
to prevent this but I'd want a performance benchmark test on that to measure the cost of doing so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree with your assessment here. I do think we should be making the mutation of each individual TaskData
be atomic though. This is far too likely to cause a race condition for someone in its current state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻 - created #2900 to handle it.
|
||
taskData.append(additionalData: data) | ||
|
||
if let httpResponse = dataTask.response as? HTTPURLResponse, httpResponse.isMultipart { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will that chunk get automatically included in the dataTask
next time this function is called? I'm not seeing anywhere that you are holding onto the dataTask
to merge them together when the next chunk comes in.
This is both the RFC and implementation. I'm capturing the design decision (RFC) in the PR because the original suggested design differs from what the final solution is. This will provide a high-level overview of the design instead of having to read the code to fully understand the change.
Goal
Support GraphQL subscriptions over HTTP using a multipart response (incremental delivery) format similar to that defined in the
@defer
/@stream
RFC.Existing
GraphQL subscriptions are currently supported in Apollo iOS but only over WebSocket - documentation. Coincidentally a subscription can be sent over HTTP using the
RequestChainNetworkTransport
transport but the response would never be returned because the current request chain is designed for a single response; the technical reason is thaturlSession(_:task:didCompleteWithError:)
would not be called because of the multipart data transfer (the request never actually completes) and subsequently the completion block of the subscription request is never called.Solution
Discarded
The first proposed design was to create an entirely new transport that would operate differently than the current concept of a 'request chain' and implement something closer to Apollo Link. The scope of this change is quite large and it doesn't offer an easy path for users to migrate from existing WebSocket-based subscriptions to HTTP-based subscriptions; additional work would be required to let the user choose which transport a particular subscription should use and
SplitNetworkTransport
would need to juggle another transport.Preferred
The implemented design is to adapt the existing request chain to be re-entrant; the re-entrant behaviour is key to making this design possible. Once the request chain becomes re-entrant all the current interceptors can be reused and it fixes the currently broken behaviour of subscriptions over HTTP. The migration path becomes easier too since users would simply need to send a subscription over the
RequestChainNetworkTransport
instead of theWebSocketTransport
.Technical details
The current limitation of the request chain is that progress through the interceptors is tracked with an index. This makes it impossible to handle a multipart response where network responses need to move forward at different times from the same interceptor, with no guarantee that the chain has completed before the next response arrives. An index is required because in the
proceedAsync(request:response:completion:)
callback there is no indication of which interceptor has finished processing. This is further complicated knowing that these interfaces/classes/structs are all public and there could be any number of custom interceptors that users have built specific to their data flow.To make the request chain support multipart responses it has to become re-entrant which then separates each multipart response into its own forward continuation of the request chain. To do this in a way that is backwards compatible with custom interceptors means we need to hide the abstraction and handle it internally. This is done by wrapping each interceptor in the supplied interceptor list in a wrapper that both functions as the request chain the interceptor calls back to and as the 'forwarder' of request chain progress to each wrapped interceptor. On interceptor callback the wrapper will actually use a new function that requires the interceptor to identify itself; this interceptor identification is what enables independent forward continuation of the request chain.
There is also a new interceptor,
MultipartResponseParsingInterceptor
, that is inserted before theJSONResponseParsingInterceptor
in the interceptor chain. This new interceptor parses the multipart responses at the given boundary and forwards each chunk on to the next interceptor.Will be done in a subsequent PRs: