Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: implement grpc reconnect for inprocess mode #1150

Merged
merged 7 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -32,65 +34,58 @@ public static void monitorChannelState(
ConnectivityState currentState = channel.getState(true);
log.info("Channel state changed to: {}", currentState);
if (currentState == ConnectivityState.READY) {
onConnectionReady.run();
if (onConnectionReady != null) {
onConnectionReady.run();
} else {
log.debug("onConnectionReady is null");
}
} else if (currentState == ConnectivityState.TRANSIENT_FAILURE
|| currentState == ConnectivityState.SHUTDOWN) {
onConnectionLost.run();
if (onConnectionLost != null) {
onConnectionLost.run();
} else {
log.debug("onConnectionLost is null");
}
}
// Re-register the state monitor to watch for the next state transition.
monitorChannelState(currentState, channel, onConnectionReady, onConnectionLost);
});
}

/**
* Waits for the channel to reach a desired state within a specified timeout period.
* Waits for the channel to reach the desired connectivity state within the specified timeout.
*
* @param channel the ManagedChannel to monitor.
* @param desiredState the ConnectivityState to wait for.
* @param connectCallback callback invoked when the desired state is reached.
* @param timeout the maximum amount of time to wait.
* @param unit the time unit of the timeout.
* @throws InterruptedException if the current thread is interrupted while waiting.
* @param desiredState the desired {@link ConnectivityState} to wait for
* @param channel the {@link ManagedChannel} to monitor
* @param connectCallback the {@link Runnable} to execute when the desired state is reached
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @throws InterruptedException if the current thread is interrupted while waiting
* @throws GeneralError if the desired state is not reached within the timeout
*/
public static void waitForDesiredState(
ManagedChannel channel,
ConnectivityState desiredState,
Runnable connectCallback,
long timeout,
TimeUnit unit)
throws InterruptedException {
waitForDesiredState(channel, desiredState, connectCallback, new CountDownLatch(1), timeout, unit);
}

private static void waitForDesiredState(
ManagedChannel channel,
ConnectivityState desiredState,
Runnable connectCallback,
CountDownLatch latch,
long timeout,
TimeUnit unit)
throws InterruptedException {
channel.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, () -> {
try {
ConnectivityState state = channel.getState(true);
log.debug("Channel state changed to: {}", state);
CountDownLatch latch = new CountDownLatch(1);

if (state == desiredState) {
connectCallback.run();
latch.countDown();
return;
}
waitForDesiredState(channel, desiredState, connectCallback, latch, timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Thread interrupted while waiting for desired state", e);
} catch (Exception e) {
log.error("Error occurred while waiting for desired state", e);
Runnable waitForStateTask = () -> {
ConnectivityState currentState = channel.getState(true);
if (currentState == desiredState) {
connectCallback.run();
latch.countDown();
}
});
};

ScheduledFuture<?> scheduledFuture = Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(waitForStateTask, 0, 100, TimeUnit.MILLISECONDS);

// Await the latch or timeout for the state change
if (!latch.await(timeout, unit)) {
boolean success = latch.await(timeout, unit);
scheduledFuture.cancel(true);
if (!success) {
throw new GeneralError(String.format(
"Deadline exceeded. Condition did not complete within the %d " + "deadline", timeout));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.grpc;
package dev.openfeature.contrib.providers.flagd.resolver.common;

import com.google.common.annotations.VisibleForTesting;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelMonitor;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionState;
import dev.openfeature.sdk.ImmutableStructure;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -125,8 +120,7 @@ public GrpcConnector(
* @param onConnectionEvent a consumer to handle connection events
* @param eventStreamObserver a consumer to handle the event stream
*/
@VisibleForTesting
GrpcConnector(
public GrpcConnector(
final FlagdOptions options,
final Function<ManagedChannel, T> stub,
final Function<ManagedChannel, K> blockingStub,
Expand All @@ -143,7 +137,7 @@ public GrpcConnector(
public void initialize() throws Exception {
log.info("Initializing GRPC connection...");
ChannelMonitor.waitForDesiredState(
channel, ConnectivityState.READY, this::onInitialConnect, deadline, TimeUnit.MILLISECONDS);
ConnectivityState.READY, channel, this::onInitialConnect, deadline, TimeUnit.MILLISECONDS);
ChannelMonitor.monitorChannelState(ConnectivityState.READY, channel, this::onReady, this::onConnectionLost);
}

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading
Loading