-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: Use grpc intern reconnections for rpc event stream #1112
Conversation
5add6fa
to
f11e0d5
Compare
@@ -55,7 +57,7 @@ public final class Config { | |||
public static final String LRU_CACHE = CacheType.LRU.getValue(); | |||
static final String DEFAULT_CACHE = LRU_CACHE; | |||
|
|||
static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 5; | |||
static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this should not be used anymore, i think we can remove it
static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 7; |
889111c
to
9db2081
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, this looks really good, and sets the foundation for a lot of simplifications.
I added some comments, more questions out of curiosity.
Thank you
...d/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ConnectionState.java
Outdated
Show resolved
Hide resolved
providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/Util.java
Outdated
Show resolved
Hide resolved
private void handleProviderReadyEvent() { | ||
this.onConnectionEvent.accept(true, Collections.emptyList()); | ||
if (this.cache.getEnabled()) { | ||
if (this.cache.getEnabled().equals(Boolean.TRUE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Do we always want to purge the cache on a reconnect, that means even from Stale to ready? - (sidenote: need to check what is in the gherkin file for this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't know, this change was only suggested by sonar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do need to purge here ; we could have missed change events while we were stale, so now that we are connected we should build up a new cache.
...flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java
Outdated
Show resolved
Hide resolved
.../flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java
Outdated
Show resolved
Hide resolved
990f424
to
b68b290
Compare
providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java
Outdated
Show resolved
Hide resolved
providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java
Show resolved
Hide resolved
important, before merging open-feature/flagd#1478 should be merged to fix on the property and env var naming |
d0cf4b7
to
20d6ed2
Compare
boolean previous = connected; | ||
boolean current = connected = connectionEvent.isConnected(); | ||
final boolean wasConnected = connected; | ||
final boolean isConnected = connected = connectionEvent.isConnected(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😎
providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/Util.java
Outdated
Show resolved
Hide resolved
this.cache.clear(); | ||
} | ||
this.onConnectionEvent.accept(false, Collections.emptyList()); | ||
if (this.cache.getEnabled().equals(Boolean.TRUE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another nit, but generally I prefer such expressions in the reverse order in Java, since Boolean.TRUE
will obviously never ever be null
, so it's less likely to cause NPEs whereas if this.cache.getEnabled()
becomes nullable at some point (maybe it's a boxed type) you can see NPEs here calling .equals
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, i actually just followed the sonar linter suggestion here
@Override | ||
public void onCompleted() { | ||
if (this.cache.getEnabled()) { | ||
if (this.cache.getEnabled().equals(Boolean.TRUE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java
Outdated
Show resolved
Hide resolved
assertEquals(OBJECT_VARIANT, objectDetails.getVariant()); | ||
assertEquals(STATIC_REASON, objectDetails.getReason()); | ||
} | ||
// @Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...d/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm nearly ready to approve this but I have a few questions/comments, mostly nits/testing.
onConnectionLost.run(); | ||
} | ||
// Re-register the state monitor to watch for the next state transition. | ||
monitorChannelState(ch, onConnectionReady, onConnectionLost); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently the SDK is unreliable when it comes to state changes
"Transitions" to the same state are possible, because intermediate states may not have been observed. The API is only reliable in tracking the current state.
can we live with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's interesting. @toddbaert @aepfli
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it there is a small chance that we miss state transitions happening during calling getState()
and registering the callback via notifyWhenStateChanged()
. I don't see any way out here. This could only be mitigated by checking the current state right after calling notifyWhenStateChanged()
again and react accordingly, but this does only minimize the time frame where we might miss a transition.
The other option would be to switch the logic to a "poll-based" approach and forget about using notifyWhenStageChanged()
at all. A nice side effect is that the code becomes quite trivial, and does not involve any recursive calls for re-register the callbacks. Something like this:
public static void monitorChannelState(ManagedChannel channel, Runnable onConnectionReady,
Runnable onConnectionLost, long pollIntervalMs) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
AtomicReference<ConnectivityState> lastState = new AtomicReference<>(ConnectivityState.IDLE);
Runnable pollTask = () -> {
ConnectivityState currentState = channel.getState(true);
if (currentState != lastState.get()) {
if (currentState == ConnectivityState.READY) {
onConnectionReady.run();
} else if (currentState == ConnectivityState.TRANSIENT_FAILURE) {
onConnectionLost.run();
}
lastState.set(currentState);
}
};
scheduler.scheduleAtFixedRate(pollTask, 0, pollIntervalMs, TimeUnit.MILLISECONDS);
}
wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that polling will solve our issue here; we may have interpreted the API wrong (see grpc/grpc-java#11615 (comment)). We should provide the state we think we're currently in into the method and not fetch the current one, and use it as the parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it there is a small chance that we miss state transitions happening during calling getState() and registering the callback via notifyWhenStateChanged(). I don't see any way out here.
I don't either.
@chrfwow also pointed out to me that monitorChannelState
is recursive and, at least as this is written currently, it will cause stack overflows eventually. We could use a runnable or something to call the callback instead, but that seems like it could be combersome.
I think the polling approach is better, TBH. It gets my vote for both the above reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chrfwow did some additional tests on that. Turned out that we are not dealing with a recursion that will end up in a SO. So this PR contains the implementation that is using monitorChannelState
. I will also create a separate branch/PR and immediately close it that will contain the alternative polling logic, just in case.. :) Because I don't think making this configurable and maintain both approaches is worth it.
@@ -159,33 +159,41 @@ private boolean isConnected() { | |||
} | |||
|
|||
private void onConnectionEvent(ConnectionEvent connectionEvent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we ok with this method not being thread safe?
76d824e
to
3d1bb14
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides deleting some commented tests, a resolution for this is my only blocker. I've left my opinion on that one, but as long as we have a reliable re-connection and no stack overflow potential, I don't mind our approach.
a9ec8d5
to
6c2e058
Compare
I am sorry that I am causing so many conflicts. If it is okay for you, I will resolve them for you |
fa83483
to
51f04cb
Compare
providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java
Show resolved
Hide resolved
9a9a26b
to
1f886de
Compare
Signed-off-by: Bernd Warmuth <[email protected]>
1f886de
to
5cd45e4
Compare
Signed-off-by: Bernd Warmuth <[email protected]>
Signed-off-by: Bernd Warmuth <[email protected]>
@@ -80,7 +81,7 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws | |||
} | |||
|
|||
this.flagResolver.init(); | |||
this.initialized = true; | |||
this.initialized = this.connected = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think here we have a potential race condition between event emiting stuff happening in init()
and setting these fields and the logic which depends on them (onConnectionEvent(...))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i do think, that the state of initialization is not bound to the connection state. i can be initialized but not connected, or there could have been a connection issue in the middle. Maybe it would be better if the .init()
method returns a boolean. The provider it self should/does not know about the status connected in theory. because that is something, which happens in this pr purely because we only have grpc connection. But on the other hand for inprocess, we do have a filewatcher, what does connected mean here? is the method onConnectionEvent correct, or is this more an abstraction of the underlying provider events, which we could directlu y use? All this discussion brings me more to the conclusio, that this connection logic etc should be part of the connector, and not of this outside class.
As this pr is already quiet big, and this is a potential race condition but with limited impact, i would suggest to move forward with this pr as is. Start the migration of the in-process to this really cool and neat reusable implementation of our connector. Afterwards we can tackle this in a better and controlled manner, as soon as both implementations are normalized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
This PR
FLAGD_RETRY_GRACE_PERIOD
, the provider will emit a READY event and resume full operation. If reconnection is not achieved within this time frame, an ERROR event will be emitted. The default value forFLAGD_RETRY_GRACE_PERIOD
is set to5
.This PR marks the initial step in phasing out our custom gRPC backoff and reconnect logic. Instead, we are transitioning to fully rely on the default behaviour provided by the gRPC library.
This change is currently limited to the RPC mode of the provider (specifically the
dev.openfeature.contrib.providers.flagd.resolver.grpc
package). In this first phase, theGrpcConnector
within this package has been refactored to operate independently of custom backoff logic, leveraging only gRPC internals to maintain a reliable connection.In the next phase/PR, the refactored GrpcConnector should be reused within the
dev.openfeature.contrib.providers.flagd.resolver.process
package, enabling the removal of custom reconnect logic from that implementation as well.