Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix two stream close issues (b/73167987, b/73382103). #810

Merged
merged 4 commits into from
Feb 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Firestore/Example/Tests/Integration/FSTStreamTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#import <XCTest/XCTest.h>

#import <GRPCClient/GRPCCall.h>

#import <FirebaseFirestore/FIRFirestoreSettings.h>

#import "Firestore/Example/Tests/Util/FSTHelpers.h"
Expand All @@ -35,7 +37,7 @@

/** Exposes otherwise private methods for testing. */
@interface FSTStream (Testing)
- (void)writesFinishedWithError:(NSError *_Nullable)error;
@property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
@end

/**
Expand Down Expand Up @@ -202,7 +204,9 @@ - (void)testWatchStreamStopBeforeHandshake {
}];

// Simulate a final callback from GRPC
[watchStream writesFinishedWithError:nil];
[_workerDispatchQueue dispatchAsync:^{
[watchStream.callbackFilter writesFinishedWithError:nil];
}];

[self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
}
Expand All @@ -224,7 +228,9 @@ - (void)testWriteStreamStopBeforeHandshake {
}];

// Simulate a final callback from GRPC
[writeStream writesFinishedWithError:nil];
[_workerDispatchQueue dispatchAsync:^{
[writeStream.callbackFilter writesFinishedWithError:nil];
}];

[self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
}
Expand Down
101 changes: 51 additions & 50 deletions Firestore/Source/Remote/FSTStream.mm
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ @interface FSTStream () <GRXWriteable>

#pragma mark - FSTCallbackFilter

/** Filter class that allows disabling of GRPC callbacks. */
/**
* Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main
* FSTStream to allow the stream to be stopped externally (either by the user or via idle timer)
* and be able to completely prevent any subsequent events from gRPC from calling back into the
* FSTSTream.
*/
@interface FSTCallbackFilter : NSObject <GRXWriteable>

- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
Expand Down Expand Up @@ -269,12 +274,12 @@ - (void)startWithDelegate:(id)delegate {

/** Add an access token to our RPC, after obtaining one from the credentials provider. */
- (void)resumeStartWithToken:(const Token &)token error:(NSError *)error {
[self.workerDispatchQueue verifyIsCurrentQueue];

if (self.state == FSTStreamStateStopped) {
// Streams can be stopped while waiting for authorization.
return;
}

[self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
(long)self.state);

Expand All @@ -288,6 +293,8 @@ - (void)resumeStartWithToken:(const Token &)token error:(NSError *)error {

self.requestsWriter = [[FSTBufferedWriter alloc] init];
_rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
[_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue];

[FSTDatastore prepareHeadersForRPC:_rpc
databaseID:&self.databaseInfo->database_id()
token:(token.is_valid() ? token.token() : absl::string_view())];
Expand Down Expand Up @@ -369,7 +376,10 @@ - (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *
[self.backoff resetToMax];
}

[self tearDown];
if (finalState != FSTStreamStateError) {
FSTLog(@"%@ %p Performing stream teardown", [self class], (__bridge void *)self);
[self tearDown];
}

if (self.requestsWriter) {
// Clean up the underlying RPC. If this close: is in response to an error, don't attempt to
Expand Down Expand Up @@ -400,8 +410,9 @@ - (void)closeWithFinalState:(FSTStreamState)finalState error:(nullable NSError *
[self notifyStreamInterruptedWithError:error];
}

// Clear the delegates to avoid any possible bleed through of events from GRPC.
_delegate = nil;
// PORTING NOTE: notifyStreamInterruptedWithError may have restarted the stream with a new
Copy link
Contributor

@wilhuff wilhuff Feb 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same problem we keep having: we can't notify our delegate in an inconsistent state. We shouldn't interleave mutating actions and notifcations. Therefore [self notifyStreamInterruptedWIthError:error] must be the last action we take here.

We've actually have observed that callback filter is ineffective. There's a race condition it doesn't handle, which is why #790 was required.

The whole thing needs to be redesigned, but in the meantime I think we should clear the delegate, but just above the notification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, wow. Thanks for flagging this. I didn't know there was history here. I assume the problem is that the callbackFilter is applied /before/ the messages are dispatched to our queue and so even though we suppress callbacks in this code there could still be callbacks already waiting on our queue.

I don't think setting delegate to nil above notifyStreamInterruptedWithError helps any, since in the case I'm fixing the delegate is getting immediately re-assigned to a non-nil value (by notifyStreamInterruptedWithError). So those pending callbacks would just be delivered to the new delegate, which is exactly what we want to prevent with the callbackFilter. :-/

I think the fix is probably to make callbackFilter do the dispatch onto the dispatch queue in addition to doing the filtering so it can actually do its job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the other options would be to remove the dispatch in writeValue: (there's a "TODO(mcg): remove the double-dispatch once GRPCCall at head is released.")... If you think we can actually do that, that would also be sufficient to fix the race condition if I'm understanding it correctly.

Copy link
Contributor

@wilhuff wilhuff Feb 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I had to implement it convince myself it would work, so it to you in a PR.

  1. You're right, we can't nil out the delegate above the call to notifyStreamInterruptedWithError: because the delegate must be non-nil for that to even do anything useful. The two interesting implementations nil it out for themselves anyway, though they are careful to do so before the invocation, which is correct.
  2. Getting gRPC to call us back on our dispatch queue fixes the race condition in FSTCallbackFilter so perhaps new further action is required.

// delegate so we do /not/ want to clear the delegate here. And since we've already suppressed
// callbacks via our callbackFilter, there is no worry about bleed through of events from GRPC.
}

- (void)stop {
Expand Down Expand Up @@ -530,11 +541,7 @@ - (void)handleStreamMessage:(id)value {
*/
- (void)handleStreamClose:(nullable NSError *)error {
FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);

if (![self isStarted]) { // The stream could have already been closed by the idle close timer.
FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class]));
return;
}
FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");

// In theory the stream could close cleanly, however, in our current model we never expect this
// to happen because if we stop a stream ourselves, this callback will never be called. To
Expand All @@ -547,56 +554,50 @@ - (void)handleStreamClose:(nullable NSError *)error {
// The GRXWriteable implementation defines the receive side of the RPC stream.

/**
* Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
* redispatch back onto our own worker queue.
* Called by GRPC when it publishes a value.
*
* GRPC must be configured to use our worker queue by calling
* `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
* the RPC.
*/
- (void)writeValue:(id)value __used {
// TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
// Once released we can set the responseDispatchQueue property on the GRPCCall and then this
// method can call handleStreamMessage directly.
FSTWeakify(self);
[self.workerDispatchQueue dispatchAsync:^{
FSTStrongify(self);
if (![self isStarted]) {
FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class]));
return;
}

if (!self.messageReceived) {
self.messageReceived = YES;
if ([FIRFirestore isLoggingEnabled]) {
FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
(__bridge void *)self,
[FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
}
}
NSError *error;
id proto = [self parseProto:self.responseMessageClass data:value error:&error];
if (proto) {
[self handleStreamMessage:proto];
} else {
[_rpc finishWithError:error];
- (void)writeValue:(id)value {
[self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert([self isStarted], @"writeValue: called for stopped stream.");

if (!self.messageReceived) {
self.messageReceived = YES;
if ([FIRFirestore isLoggingEnabled]) {
FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
(__bridge void *)self,
[FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
}
}];
}
NSError *error;
id proto = [self parseProto:self.responseMessageClass data:value error:&error];
if (proto) {
[self handleStreamMessage:proto];
} else {
[_rpc finishWithError:error];
}
}

/**
* Called by GRPC when it closed the stream with an error representing the final state of the
* stream.
*
* Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
* directly inform stream-specific logic, or call stop to tear down the stream.
* GRPC must be configured to use our worker queue by calling
* `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
* the RPC.
*
* Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call
* stop to tear down the stream.
*/
- (void)writesFinishedWithError:(nullable NSError *)error __used {
error = [FSTDatastore firestoreErrorForError:error];
FSTWeakify(self);
[self.workerDispatchQueue dispatchAsync:^{
FSTStrongify(self);
if (!self || self.state == FSTStreamStateStopped) {
return;
}
[self handleStreamClose:error];
}];
[self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");

[self handleStreamClose:error];
}

@end
Expand Down