diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java index 0878ce910..8ccb73c15 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java @@ -4,6 +4,8 @@ import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -32,10 +34,18 @@ public static void monitorChannelState( ConnectivityState currentState = channel.getState(true); log.info("Channel state changed to: {}", currentState); if (currentState == ConnectivityState.READY) { - onConnectionReady.run(); + if (onConnectionReady != null) { + onConnectionReady.run(); + } else { + log.debug("onConnectionReady is null"); + } } else if (currentState == ConnectivityState.TRANSIENT_FAILURE || currentState == ConnectivityState.SHUTDOWN) { - onConnectionLost.run(); + if (onConnectionLost != null) { + onConnectionLost.run(); + } else { + log.debug("onConnectionLost is null"); + } } // Re-register the state monitor to watch for the next state transition. monitorChannelState(currentState, channel, onConnectionReady, onConnectionLost); @@ -43,54 +53,39 @@ public static void monitorChannelState( } /** - * Waits for the channel to reach a desired state within a specified timeout period. + * Waits for the channel to reach the desired connectivity state within the specified timeout. * - * @param channel the ManagedChannel to monitor. - * @param desiredState the ConnectivityState to wait for. - * @param connectCallback callback invoked when the desired state is reached. - * @param timeout the maximum amount of time to wait. - * @param unit the time unit of the timeout. - * @throws InterruptedException if the current thread is interrupted while waiting. + * @param desiredState the desired {@link ConnectivityState} to wait for + * @param channel the {@link ManagedChannel} to monitor + * @param connectCallback the {@link Runnable} to execute when the desired state is reached + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @throws InterruptedException if the current thread is interrupted while waiting + * @throws GeneralError if the desired state is not reached within the timeout */ public static void waitForDesiredState( - ManagedChannel channel, ConnectivityState desiredState, - Runnable connectCallback, - long timeout, - TimeUnit unit) - throws InterruptedException { - waitForDesiredState(channel, desiredState, connectCallback, new CountDownLatch(1), timeout, unit); - } - - private static void waitForDesiredState( ManagedChannel channel, - ConnectivityState desiredState, Runnable connectCallback, - CountDownLatch latch, long timeout, TimeUnit unit) throws InterruptedException { - channel.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, () -> { - try { - ConnectivityState state = channel.getState(true); - log.debug("Channel state changed to: {}", state); + CountDownLatch latch = new CountDownLatch(1); - if (state == desiredState) { - connectCallback.run(); - latch.countDown(); - return; - } - waitForDesiredState(channel, desiredState, connectCallback, latch, timeout, unit); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Thread interrupted while waiting for desired state", e); - } catch (Exception e) { - log.error("Error occurred while waiting for desired state", e); + Runnable waitForStateTask = () -> { + ConnectivityState currentState = channel.getState(true); + if (currentState == desiredState) { + connectCallback.run(); + latch.countDown(); } - }); + }; + + ScheduledFuture scheduledFuture = Executors.newSingleThreadScheduledExecutor() + .scheduleWithFixedDelay(waitForStateTask, 0, 100, TimeUnit.MILLISECONDS); - // Await the latch or timeout for the state change - if (!latch.await(timeout, unit)) { + boolean success = latch.await(timeout, unit); + scheduledFuture.cancel(true); + if (!success) { throw new GeneralError(String.format( "Deadline exceeded. Condition did not complete within the %d " + "deadline", timeout)); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnector.java similarity index 94% rename from providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java rename to providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnector.java index 9508c521b..d5ca69aff 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnector.java @@ -1,11 +1,6 @@ -package dev.openfeature.contrib.providers.flagd.resolver.grpc; +package dev.openfeature.contrib.providers.flagd.resolver.common; -import com.google.common.annotations.VisibleForTesting; import dev.openfeature.contrib.providers.flagd.FlagdOptions; -import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; -import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelMonitor; -import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent; -import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionState; import dev.openfeature.sdk.ImmutableStructure; import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; @@ -125,8 +120,7 @@ public GrpcConnector( * @param onConnectionEvent a consumer to handle connection events * @param eventStreamObserver a consumer to handle the event stream */ - @VisibleForTesting - GrpcConnector( + public GrpcConnector( final FlagdOptions options, final Function stub, final Function blockingStub, @@ -143,7 +137,7 @@ public GrpcConnector( public void initialize() throws Exception { log.info("Initializing GRPC connection..."); ChannelMonitor.waitForDesiredState( - channel, ConnectivityState.READY, this::onInitialConnect, deadline, TimeUnit.MILLISECONDS); + ConnectivityState.READY, channel, this::onInitialConnect, deadline, TimeUnit.MILLISECONDS); ChannelMonitor.monitorChannelState(ConnectivityState.READY, channel, this::onReady, this::onConnectionLost); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffService.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffService.java deleted file mode 100644 index 1bf183c69..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffService.java +++ /dev/null @@ -1,86 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -import java.util.concurrent.ThreadLocalRandom; -import lombok.Getter; - -/** A service that provides backoff functionality. */ -public class BackoffService { - public static final int DEFAULT_MAX_JITTER = 0x1 << 8; // 256; Random likes boundaries that are a power of 2 - - @Getter - private final BackoffStrategy strategy; - - @Getter - private final int maxJitter; - - /** - * Creates a new BackoffService with the given strategy and default maximum jitter. The default - * maximum jitter is 256. - * - * @param strategy The backoff strategy to use - */ - public BackoffService(BackoffStrategy strategy) { - this(strategy, DEFAULT_MAX_JITTER); - } - - /** - * Creates a new BackoffService with the given strategy and maximum jitter. - * - * @param strategy The backoff strategy to use - * @param maxJitter The maximum jitter value - */ - public BackoffService(BackoffStrategy strategy, int maxJitter) { - this.strategy = strategy; - this.maxJitter = maxJitter; - } - - /** - * Returns the current backoff time in milliseconds. This backoff time will be used in - * waitUntilNextAttempt. - * - * @return the current backoff time in milliseconds - */ - public long getCurrentBackoffMillis() { - return strategy.getCurrentBackoffMillis(); - } - - /** - * Returns a random jitter value between 0 and maxJitter. - * - * @return a random jitter value - */ - public long getRandomJitter() { - if (maxJitter == 0) { - return 0; - } - - return ThreadLocalRandom.current().nextInt(maxJitter); - } - - /** Resets the backoff strategy to its initial state. */ - public void reset() { - strategy.reset(); - } - - /** - * Returns whether the backoff strategy has more attempts left. - * - * @return true if the backoff strategy has more attempts left, false otherwise - */ - public boolean shouldRetry() { - return !strategy.isExhausted(); - } - - /** - * Bolocks the current thread until the next attempt should be made. The time to wait is - * determined by the backoff strategy and a random jitter. - * - * @throws InterruptedException if the thread is interrupted while waiting - */ - public void waitUntilNextAttempt() throws InterruptedException { - long retryDelay = getCurrentBackoffMillis() + getRandomJitter(); - strategy.nextBackoff(); - - Thread.sleep(retryDelay); - } -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffStrategies.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffStrategies.java deleted file mode 100644 index 3827a7617..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffStrategies.java +++ /dev/null @@ -1,27 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -/** A factory class for creating common backoff strategies. */ -public class BackoffStrategies { - private BackoffStrategies() {} - - public static BackoffStrategy exponentialTimeBackoff(long initialBackoffMillis) { - return new ExponentialTimeBackoff(initialBackoffMillis); - } - - public static BackoffStrategy exponentialTimeBackoff(long initialBackoffMillis, long maxBackoffMillis) { - return new ExponentialTimeBackoff(initialBackoffMillis, maxBackoffMillis); - } - - public static BackoffStrategy constantTimeBackoff(long millis) { - return new ConstantTimeBackoff(millis); - } - - public static BackoffStrategy noBackoff() { - return new ConstantTimeBackoff(0L); - } - - public static BackoffStrategy maxRetriesWithExponentialTimeBackoffStrategy( - int maxRetries, long initialBackoffMillis) { - return new NumberOfRetriesBackoff(maxRetries, exponentialTimeBackoff(initialBackoffMillis)); - } -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffStrategy.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffStrategy.java deleted file mode 100644 index 31224de4b..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffStrategy.java +++ /dev/null @@ -1,26 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -/** A strategy interface for determining how long to backoff before retrying a failed operation. */ -public interface BackoffStrategy { - - /** - * The current backoff time in milliseconds. This value should be used to determine how long to - * wait before retrying. - * - * @return the current backoff time in milliseconds - */ - long getCurrentBackoffMillis(); - - /** - * Determines if the backoff strategy has been exhausted. - * - * @return true if the operation should backoff, false otherwise - */ - boolean isExhausted(); - - /** Move to the next backoff time. */ - void nextBackoff(); - - /** Reset the backoff strategy to its initial state. */ - void reset(); -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/CombinedBackoff.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/CombinedBackoff.java deleted file mode 100644 index 89aee161a..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/CombinedBackoff.java +++ /dev/null @@ -1,69 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -import lombok.Getter; - -/** - * A backoff strategy that combines multiple backoff strategies. The strategy starts with the first - * provided strategy and will switch to the next backoff strategy in the list when the current one - * is exhausted. - */ -public class CombinedBackoff implements BackoffStrategy { - private final BackoffStrategy[] backoffStrategies; - private int currentStrategyIndex; - - @Getter - private BackoffStrategy currentStrategy; - - /** - * Creates a new combined backoff strategy. The strategy starts with the first provided strategy - * and will switch to the next backoff strategy in the list when the current one is exhausted. - * - * @param backoffStrategies the list of backoff strategies to combine - */ - public CombinedBackoff(BackoffStrategy[] backoffStrategies) { - this.backoffStrategies = backoffStrategies.clone(); - this.currentStrategyIndex = 0; - this.currentStrategy = this.backoffStrategies[currentStrategyIndex]; - updateCurrentStrategy(); - } - - @Override - public long getCurrentBackoffMillis() { - return currentStrategy.getCurrentBackoffMillis(); - } - - @Override - public boolean isExhausted() { - updateCurrentStrategy(); - return currentStrategy.isExhausted(); - } - - @Override - public void nextBackoff() { - updateCurrentStrategy(); - currentStrategy.nextBackoff(); - } - - /** Switches to the next backoff strategy if the current one is exhausted. */ - private void updateCurrentStrategy() { - // Move to the next non-exhausted strategy if the current one is exhausted - while (!isLastStrategy() && currentStrategy.isExhausted()) { - currentStrategyIndex++; - currentStrategy = backoffStrategies[currentStrategyIndex]; - } - } - - private boolean isLastStrategy() { - return currentStrategyIndex + 1 >= backoffStrategies.length; - } - - @Override - public void reset() { - for (int i = 0; i <= currentStrategyIndex; i++) { - backoffStrategies[i].reset(); - } - - currentStrategyIndex = 0; - currentStrategy = backoffStrategies[currentStrategyIndex]; - } -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ConstantTimeBackoff.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ConstantTimeBackoff.java deleted file mode 100644 index f84ec50f5..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ConstantTimeBackoff.java +++ /dev/null @@ -1,28 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -/** - * A backoff strategy that always returns the same backoff time. This backoff is never exhausted. - */ -public class ConstantTimeBackoff implements BackoffStrategy { - final long millis; - - public ConstantTimeBackoff(long millis) { - this.millis = millis; - } - - @Override - public long getCurrentBackoffMillis() { - return millis; - } - - @Override - public boolean isExhausted() { - return false; - } - - @Override - public void nextBackoff() {} - - @Override - public void reset() {} -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ExponentialTimeBackoff.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ExponentialTimeBackoff.java deleted file mode 100644 index 8dffbc778..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ExponentialTimeBackoff.java +++ /dev/null @@ -1,57 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -/** - * A backoff strategy that exponentially increases the backoff time. This backoff is never - * exhausted. - */ -public class ExponentialTimeBackoff implements BackoffStrategy { - public static final long DEFAULT_MAX_BACK_OFF = 120 * 1000; - - private final long initialBackoff; - private final long maxBackoff; - private long currentBackoff; - - /** - * A backoff strategy that exponentially increases the backoff time. This backoff will double the - * backoff time until the DEFAULT_MAX_BACK_OFF is reached. - * - * @param initialBackoffMillis the initial backoff time in milliseconds - */ - public ExponentialTimeBackoff(long initialBackoffMillis) { - this(initialBackoffMillis, DEFAULT_MAX_BACK_OFF); - } - - /** - * A backoff strategy that exponentially increases the backoff time. This backoff will double the - * backoff time until the maximum backoff time is reached. It is never exhausted but will stale at - * the maximum backoff time. - * - * @param initialBackoffMillis the initial backoff time in milliseconds - * @param maxBackoffMillis the maximum backoff time in milliseconds - */ - public ExponentialTimeBackoff(long initialBackoffMillis, long maxBackoffMillis) { - this.initialBackoff = initialBackoffMillis; - this.maxBackoff = maxBackoffMillis; - reset(); - } - - @Override - public long getCurrentBackoffMillis() { - return currentBackoff; - } - - @Override - public boolean isExhausted() { - return false; - } - - @Override - public void nextBackoff() { - currentBackoff = Math.min(currentBackoff * 2, maxBackoff); - } - - @Override - public void reset() { - currentBackoff = Math.min(initialBackoff, maxBackoff); - } -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/GrpcStreamConnectorBackoffService.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/GrpcStreamConnectorBackoffService.java deleted file mode 100644 index 01152a972..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/GrpcStreamConnectorBackoffService.java +++ /dev/null @@ -1,36 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -/** Backoff service that supports "silent" backoff. */ -public class GrpcStreamConnectorBackoffService extends BackoffService { - private final BackoffStrategy silentRecoverBackoff; - - /** - * Create a new backoff service that will not backoff (0ms) on first attempt. Subsequent attempts - * will backoff exponentially. - * - * @param initialBackoffMillis initial backoff time in milliseconds used for exponential error - * backoff - */ - public GrpcStreamConnectorBackoffService(long initialBackoffMillis) { - this(BackoffStrategies.exponentialTimeBackoff(initialBackoffMillis)); - } - - /** - * Create a new backoff service that will not backoff (0ms) on first attempt. Subsequent attempts - * will backoff using the provided backoff strategy. - * - * @param errorBackoff backoff strategy to use after the first attempt - */ - public GrpcStreamConnectorBackoffService(BackoffStrategy errorBackoff) { - this(new NumberOfRetriesBackoff(1, BackoffStrategies.noBackoff()), errorBackoff); - } - - private GrpcStreamConnectorBackoffService(BackoffStrategy silentRecoverBackoff, BackoffStrategy errorBackoff) { - super(new CombinedBackoff(new BackoffStrategy[] {silentRecoverBackoff, errorBackoff})); - this.silentRecoverBackoff = silentRecoverBackoff; - } - - public boolean shouldRetrySilently() { - return ((CombinedBackoff) getStrategy()).getCurrentStrategy() == silentRecoverBackoff; - } -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/NumberOfRetriesBackoff.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/NumberOfRetriesBackoff.java deleted file mode 100644 index 16a956e01..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/NumberOfRetriesBackoff.java +++ /dev/null @@ -1,54 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -import lombok.Getter; - -/** - * This strategy will backoff for a fixed number of retries before being exhausted. The backoff time - * is determined by the provided {@link BackoffStrategy}. - */ -public class NumberOfRetriesBackoff implements BackoffStrategy { - private final int numRetries; - private final BackoffStrategy backoffStrategy; - - @Getter - private int retryCount; - - /** - * Creates a new backoff strategy that will backoff for a fixed number of retries before being - * exhausted. The backoff time is determined by the provided {@link BackoffStrategy}. - * - * @param numRetries the number of retries before the backoff is exhausted - * @param backoffStrategy the backoff strategy to use for determining the backoff time - */ - public NumberOfRetriesBackoff(int numRetries, BackoffStrategy backoffStrategy) { - this.numRetries = numRetries; - this.backoffStrategy = backoffStrategy; - this.retryCount = 0; - } - - @Override - public long getCurrentBackoffMillis() { - return backoffStrategy.getCurrentBackoffMillis(); - } - - @Override - public boolean isExhausted() { - return retryCount >= numRetries; - } - - @Override - public void nextBackoff() { - if (isExhausted()) { - return; - } - - retryCount++; - backoffStrategy.nextBackoff(); - } - - @Override - public void reset() { - retryCount = 0; - backoffStrategy.reset(); - } -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java index a64275c2b..5c8ad3ea1 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java @@ -12,6 +12,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.Resolver; import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent; import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionState; +import dev.openfeature.contrib.providers.flagd.resolver.common.GrpcConnector; import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache; import dev.openfeature.contrib.providers.flagd.resolver.grpc.strategy.ResolveFactory; import dev.openfeature.contrib.providers.flagd.resolver.grpc.strategy.ResolveStrategy; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index fd617af1f..86111e055 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -10,6 +10,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage; +import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageStateChange; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector; @@ -57,7 +58,7 @@ public InProcessResolver( FlagdOptions options, final Supplier connectedSupplier, Consumer onConnectionEvent) { - this.flagStore = new FlagStore(getConnector(options)); + this.flagStore = new FlagStore(getConnector(options, onConnectionEvent)); this.deadline = options.getDeadline(); this.onConnectionEvent = onConnectionEvent; this.operator = new Operator(); @@ -79,20 +80,15 @@ public void init() throws Exception { while (true) { final StorageStateChange storageStateChange = flagStore.getStateQueue().take(); - switch (storageStateChange.getStorageState()) { - case OK: - onConnectionEvent.accept(new ConnectionEvent( - ConnectionState.CONNECTED, - storageStateChange.getChangedFlagsKeys(), - storageStateChange.getSyncMetadata())); - break; - case ERROR: - onConnectionEvent.accept(new ConnectionEvent(false)); - break; - default: - log.info(String.format( - "Storage emitted unhandled status: %s", storageStateChange.getStorageState())); + if (storageStateChange.getStorageState() != StorageState.OK) { + log.info( + String.format("Storage returned NOK status: %s", storageStateChange.getStorageState())); + continue; } + onConnectionEvent.accept(new ConnectionEvent( + ConnectionState.CONNECTED, + storageStateChange.getChangedFlagsKeys(), + storageStateChange.getSyncMetadata())); } } catch (InterruptedException e) { log.warn("Storage state watcher interrupted", e); @@ -160,14 +156,14 @@ public ProviderEvaluation objectEvaluation(String key, Value defaultValue .build(); } - static Connector getConnector(final FlagdOptions options) { + static Connector getConnector(final FlagdOptions options, Consumer onConnectionEvent) { if (options.getCustomConnector() != null) { return options.getCustomConnector(); } return options.getOfflineFlagSourcePath() != null && !options.getOfflineFlagSourcePath().isEmpty() ? new FileConnector(options.getOfflineFlagSourcePath()) - : new GrpcStreamConnector(options); + : new GrpcStreamConnector(options, onConnectionEvent); } private ProviderEvaluation resolve(Class type, String key, EvaluationContext ctx) { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java index d0e2083fd..e48a65211 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java @@ -1,14 +1,12 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc; import dev.openfeature.contrib.providers.flagd.FlagdOptions; -import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder; -import dev.openfeature.contrib.providers.flagd.resolver.common.backoff.GrpcStreamConnectorBackoffService; +import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent; +import dev.openfeature.contrib.providers.flagd.resolver.common.GrpcConnector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc; -import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceBlockingStub; -import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceStub; import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataRequest; import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; @@ -16,13 +14,12 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.grpc.Context; import io.grpc.Context.CancellableContext; -import io.grpc.ManagedChannel; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; -import org.slf4j.event.Level; /** * Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract. @@ -36,42 +33,34 @@ public class GrpcStreamConnector implements Connector { private final AtomicBoolean shutdown = new AtomicBoolean(false); private final BlockingQueue blockingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final ManagedChannel channel; - private final FlagSyncServiceStub serviceStub; - private final FlagSyncServiceBlockingStub serviceBlockingStub; private final int deadline; - private final int streamDeadlineMs; private final String selector; - private final int retryBackoffMillis; + private final GrpcConnector< + FlagSyncServiceGrpc.FlagSyncServiceStub, FlagSyncServiceGrpc.FlagSyncServiceBlockingStub> + grpcConnector; + private final LinkedBlockingQueue streamReceiver; /** - * Construct a new GrpcStreamConnector. - * - * @param options flagd options + * Creates a new GrpcStreamConnector responsible for observing the event stream. */ - public GrpcStreamConnector(final FlagdOptions options) { - channel = ChannelBuilder.nettyChannel(options); - serviceStub = FlagSyncServiceGrpc.newStub(channel); - serviceBlockingStub = FlagSyncServiceGrpc.newBlockingStub(channel); + public GrpcStreamConnector(final FlagdOptions options, Consumer onConnectionEvent) { deadline = options.getDeadline(); - streamDeadlineMs = options.getStreamDeadlineMs(); selector = options.getSelector(); - retryBackoffMillis = options.getRetryBackoffMs(); + streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE); + grpcConnector = new GrpcConnector<>( + options, + FlagSyncServiceGrpc::newStub, + FlagSyncServiceGrpc::newBlockingStub, + onConnectionEvent, + stub -> stub.syncFlags(SyncFlagsRequest.getDefaultInstance(), new GrpcStreamHandler(streamReceiver))); } /** Initialize gRPC stream connector. */ - public void init() { + public void init() throws Exception { + grpcConnector.initialize(); Thread listener = new Thread(() -> { try { - observeEventStream( - blockingQueue, - shutdown, - serviceStub, - serviceBlockingStub, - selector, - deadline, - streamDeadlineMs, - retryBackoffMillis); + observeEventStream(blockingQueue, shutdown, selector, deadline); } catch (InterruptedException e) { log.warn("gRPC event stream interrupted, flag configurations are stale", e); Thread.currentThread().interrupt(); @@ -96,37 +85,17 @@ public void shutdown() throws InterruptedException { if (shutdown.getAndSet(true)) { return; } - - try { - if (this.channel != null && !this.channel.isShutdown()) { - this.channel.shutdown(); - this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS); - } - } finally { - if (this.channel != null && !this.channel.isShutdown()) { - this.channel.shutdownNow(); - this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS); - log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline)); - } - } + this.grpcConnector.shutdown(); } /** Contains blocking calls, to be used concurrently. */ - static void observeEventStream( + void observeEventStream( final BlockingQueue writeTo, final AtomicBoolean shutdown, - final FlagSyncServiceStub serviceStub, - final FlagSyncServiceBlockingStub serviceBlockingStub, final String selector, - final int deadline, - final int streamDeadlineMs, - int retryBackoffMillis) + final int deadline) throws InterruptedException { - final BlockingQueue streamReceiver = new LinkedBlockingQueue<>(QUEUE_SIZE); - final GrpcStreamConnectorBackoffService backoffService = - new GrpcStreamConnectorBackoffService(retryBackoffMillis); - log.info("Initializing sync stream observer"); while (!shutdown.get()) { @@ -143,15 +112,10 @@ static void observeEventStream( } try (CancellableContext context = Context.current().withCancellation()) { - FlagSyncServiceStub localServiceStub = serviceStub; - if (streamDeadlineMs > 0) { - localServiceStub = localServiceStub.withDeadlineAfter(streamDeadlineMs, TimeUnit.MILLISECONDS); - } - - localServiceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver)); try { - metadataResponse = serviceBlockingStub + metadataResponse = grpcConnector + .getResolver() .withDeadlineAfter(deadline, TimeUnit.MILLISECONDS) .getMetadata(metadataRequest.build()); } catch (Exception e) { @@ -164,27 +128,18 @@ static void observeEventStream( while (!shutdown.get()) { final GrpcResponseModel response = streamReceiver.take(); - if (response.isComplete()) { log.info("Sync stream completed"); - // The stream is complete, this isn't really an error but we should try to + // The stream is complete, this isn't really an error, but we should try to // reconnect break; } Throwable streamException = response.getError(); if (streamException != null || metadataException != null) { - long retryDelay = backoffService.getCurrentBackoffMillis(); - - // if we are in silent recover mode, we should not expose the error to the client - if (backoffService.shouldRetrySilently()) { - logExceptions(Level.INFO, streamException, metadataException, retryDelay); - } else { - logExceptions(Level.ERROR, streamException, metadataException, retryDelay); - if (!writeTo.offer(new QueuePayload( - QueuePayloadType.ERROR, "Error from stream or metadata", metadataResponse))) { - log.error("Failed to convey ERROR status, queue is full"); - } + if (!writeTo.offer(new QueuePayload( + QueuePayloadType.ERROR, "Error from stream or metadata", metadataResponse))) { + log.error("Failed to convey ERROR status, queue is full"); } // close the context to cancel the stream in case just the metadata call failed @@ -199,34 +154,10 @@ static void observeEventStream( if (!writeTo.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) { log.error("Stream writing failed"); } - - // reset backoff if we succeeded in a retry attempt - backoffService.reset(); } } - - // check for shutdown and avoid sleep - if (!shutdown.get()) { - log.debug("Stream failed, retrying in {}ms", backoffService.getCurrentBackoffMillis()); - backoffService.waitUntilNextAttempt(); - } } log.info("Shutdown invoked, exiting event stream listener"); } - - private static void logExceptions( - Level logLevel, Throwable streamException, Exception metadataException, long retryDelay) { - if (streamException != null) { - log.atLevel(logLevel) - .setCause(streamException) - .log("Error initializing stream, retrying in {}ms", retryDelay); - } - - if (metadataException != null) { - log.atLevel(logLevel) - .setCause(metadataException) - .log("Error initializing metadata, retrying in {}ms", retryDelay); - } - } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index b01cead09..c223cbc31 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -19,7 +19,7 @@ import dev.openfeature.contrib.providers.flagd.resolver.Resolver; import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent; import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionState; -import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcConnector; +import dev.openfeature.contrib.providers.flagd.resolver.common.GrpcConnector; import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache; import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitorTest.java new file mode 100644 index 000000000..539ef8d86 --- /dev/null +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitorTest.java @@ -0,0 +1,90 @@ +package dev.openfeature.contrib.providers.flagd.resolver.common; + +import static dev.openfeature.contrib.providers.flagd.resolver.common.ChannelMonitor.monitorChannelState; +import static dev.openfeature.contrib.providers.flagd.resolver.common.ChannelMonitor.waitForDesiredState; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import dev.openfeature.sdk.exceptions.GeneralError; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +class ChannelMonitorTest { + @Test + void testWaitForDesiredState() throws InterruptedException { + ManagedChannel channel = mock(ManagedChannel.class); + Runnable connectCallback = mock(Runnable.class); + + // Set up the desired state + ConnectivityState desiredState = ConnectivityState.READY; + when(channel.getState(anyBoolean())).thenReturn(desiredState); + + // Call the method + waitForDesiredState(desiredState, channel, connectCallback, 1, TimeUnit.SECONDS); + + // Verify that the callback was run + verify(connectCallback, times(1)).run(); + } + + @Test + void testWaitForDesiredStateTimeout() { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + Runnable connectCallback = mock(Runnable.class); + + // Set up the desired state + ConnectivityState desiredState = ConnectivityState.READY; + when(channel.getState(anyBoolean())).thenReturn(ConnectivityState.IDLE); + + // Call the method and expect a timeout + assertThrows(GeneralError.class, () -> { + waitForDesiredState(desiredState, channel, connectCallback, 1, TimeUnit.SECONDS); + }); + } + + @ParameterizedTest + @EnumSource(ConnectivityState.class) + void testMonitorChannelState(ConnectivityState state) { + ManagedChannel channel = Mockito.mock(ManagedChannel.class); + Runnable onConnectionReady = mock(Runnable.class); + Runnable onConnectionLost = mock(Runnable.class); + + // Set up the expected state + ConnectivityState expectedState = ConnectivityState.IDLE; + when(channel.getState(anyBoolean())).thenReturn(state); + + // Capture the callback + ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Runnable.class); + doNothing().when(channel).notifyWhenStateChanged(eq(expectedState), callbackCaptor.capture()); + + // Call the method + monitorChannelState(expectedState, channel, onConnectionReady, onConnectionLost); + + // Simulate state change + callbackCaptor.getValue().run(); + + // Verify the callbacks based on the state + if (state == ConnectivityState.READY) { + verify(onConnectionReady, times(1)).run(); + verify(onConnectionLost, never()).run(); + } else if (state == ConnectivityState.TRANSIENT_FAILURE || state == ConnectivityState.SHUTDOWN) { + verify(onConnectionReady, never()).run(); + verify(onConnectionLost, times(1)).run(); + } else { + verify(onConnectionReady, never()).run(); + verify(onConnectionLost, never()).run(); + } + } +} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnectorTest.java similarity index 95% rename from providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java rename to providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnectorTest.java index c76963ad4..4c417f957 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnectorTest.java @@ -1,8 +1,7 @@ -package dev.openfeature.contrib.providers.flagd.resolver.grpc; +package dev.openfeature.contrib.providers.flagd.resolver.common; import com.google.common.collect.Lists; import dev.openfeature.contrib.providers.flagd.FlagdOptions; -import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent; import dev.openfeature.flagd.grpc.evaluation.Evaluation; import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc; import io.grpc.ManagedChannel; @@ -30,7 +29,7 @@ class GrpcConnectorTest { private static final boolean DISCONNECTED = false; @Mock - private EventStreamObserver mockEventStreamObserver; + private StreamObserver mockEventStreamObserver; private final ServiceGrpc.ServiceImplBase testServiceImpl = new ServiceGrpc.ServiceImplBase() { @Override diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffServiceTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffServiceTest.java deleted file mode 100644 index 76fc20caa..000000000 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/BackoffServiceTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; - -import java.time.Instant; -import org.junit.jupiter.api.Test; - -class BackoffServiceTest { - @Test - void getCurrentBackoffMillisReturnsBackoffMillisFromStrategy() { - BackoffStrategy mockStrategy = mock(BackoffStrategy.class); - when(mockStrategy.getCurrentBackoffMillis()).thenReturn(1000L); - - BackoffService backoffService = new BackoffService(mockStrategy); - - assertEquals(1000, backoffService.getCurrentBackoffMillis()); - verify(mockStrategy).getCurrentBackoffMillis(); - } - - @Test - void resetCallsResetOnBackoffStrategy() { - BackoffStrategy mockStrategy = mock(BackoffStrategy.class); - - BackoffService backoffService = new BackoffService(mockStrategy); - backoffService.reset(); - - verify(mockStrategy).reset(); - } - - @Test - void waitUntilNextAttemptBlocksForBackoffTimeAndIncreasesBackoff() throws InterruptedException { - BackoffStrategy mockStrategy = mock(BackoffStrategy.class); - when(mockStrategy.getCurrentBackoffMillis()).thenReturn(250L); - - BackoffService backoffService = new BackoffService(mockStrategy, 0); - - Instant beforeWait = Instant.now(); - backoffService.waitUntilNextAttempt(); - Instant afterWait = Instant.now(); - - long timeElapsed = afterWait.toEpochMilli() - beforeWait.toEpochMilli(); - assertTrue(timeElapsed >= 250); - - verify(mockStrategy).getCurrentBackoffMillis(); - verify(mockStrategy).nextBackoff(); - } - - @Test - void shouldRetryReturnsTrueIfStrategyIsNotExhausted() { - BackoffStrategy mockStrategy = mock(BackoffStrategy.class); - when(mockStrategy.isExhausted()).thenReturn(false); - - BackoffService backoffService = new BackoffService(mockStrategy); - - assertTrue(backoffService.shouldRetry()); - verify(mockStrategy).isExhausted(); - } - - @Test - void shouldRetryReturnsFalseIfStrategyIsExhausted() { - BackoffStrategy mockStrategy = mock(BackoffStrategy.class); - when(mockStrategy.isExhausted()).thenReturn(true); - - BackoffService backoffService = new BackoffService(mockStrategy); - - assertFalse(backoffService.shouldRetry()); - verify(mockStrategy).isExhausted(); - } -} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/CombinedBackoffTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/CombinedBackoffTest.java deleted file mode 100644 index 793239e7c..000000000 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/CombinedBackoffTest.java +++ /dev/null @@ -1,120 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; - -import org.junit.jupiter.api.Test; - -class CombinedBackoffTest { - @Test - void currentBackoffIsFirstBackoff() { - BackoffStrategy[] backoffStrategies = - new BackoffStrategy[] {new ConstantTimeBackoff(1), new ConstantTimeBackoff(2)}; - - CombinedBackoff target = new CombinedBackoff(backoffStrategies); - - assertSame(backoffStrategies[0], target.getCurrentStrategy()); - } - - @Test - void currentBackoffIsFirstNonExhaustedBackoff() { - BackoffStrategy exhaustedBackoff = mock(BackoffStrategy.class); - when(exhaustedBackoff.isExhausted()).thenReturn(true); - - BackoffStrategy[] backoffStrategies = - new BackoffStrategy[] {exhaustedBackoff, exhaustedBackoff, new ConstantTimeBackoff(3)}; - - CombinedBackoff target = new CombinedBackoff(backoffStrategies); - - assertSame(backoffStrategies[2], target.getCurrentStrategy()); - } - - @Test - void currentBackoffIsLastBackoffIfAllBackoffsAreExhausted() { - BackoffStrategy firstBackoff = mock(BackoffStrategy.class); - when(firstBackoff.isExhausted()).thenReturn(true); - - BackoffStrategy secondBackoff = mock(BackoffStrategy.class); - when(secondBackoff.isExhausted()).thenReturn(true); - - BackoffStrategy[] backoffStrategies = new BackoffStrategy[] {firstBackoff, secondBackoff}; - - CombinedBackoff target = new CombinedBackoff(backoffStrategies); - - assertSame(backoffStrategies[1], target.getCurrentStrategy()); - } - - @Test - void currentBackoffSwitchesToNextBackoffWhenExhausted() { - BackoffStrategy[] backoffStrategies = new BackoffStrategy[] { - new NumberOfRetriesBackoff(2, new ConstantTimeBackoff(1)), new ConstantTimeBackoff(2) - }; - - CombinedBackoff target = new CombinedBackoff(backoffStrategies); - - target.nextBackoff(); - assertEquals(backoffStrategies[0], target.getCurrentStrategy()); - assertEquals(1, target.getCurrentBackoffMillis()); - - target.nextBackoff(); - assertEquals(backoffStrategies[0], target.getCurrentStrategy()); - assertEquals(1, target.getCurrentBackoffMillis()); - - target.nextBackoff(); - assertEquals(backoffStrategies[1], target.getCurrentStrategy()); - assertEquals(2, target.getCurrentBackoffMillis()); - } - - @Test - void isExhaustedIsTrueAfterAllBackoffsAreExhausted() { - BackoffStrategy[] backoffStrategies = new BackoffStrategy[] { - new NumberOfRetriesBackoff(2, new ConstantTimeBackoff(1)), - new NumberOfRetriesBackoff(2, new ConstantTimeBackoff(2)) - }; - - CombinedBackoff target = new CombinedBackoff(backoffStrategies); - - // Backoff 1 - assertFalse(target.isExhausted()); - - target.nextBackoff(); - assertFalse(target.isExhausted()); - - target.nextBackoff(); - assertFalse(target.isExhausted()); - - // Backoff 2 - target.nextBackoff(); - assertFalse(target.isExhausted()); - - target.nextBackoff(); - assertTrue(target.isExhausted()); - } - - @Test - void resetCallsResetOnAllUsedBackoffsAndswitchesBacktoFirstBackoff() { - BackoffStrategy firstBackoff = mock(BackoffStrategy.class); - BackoffStrategy secondBackoff = mock(BackoffStrategy.class); - BackoffStrategy thirdBackoff = mock(BackoffStrategy.class); - - BackoffStrategy[] backoffStrategies = new BackoffStrategy[] { - new NumberOfRetriesBackoff(1, firstBackoff), - new NumberOfRetriesBackoff(1, secondBackoff), - new NumberOfRetriesBackoff(1, thirdBackoff) - }; - - CombinedBackoff target = new CombinedBackoff(backoffStrategies); - - target.nextBackoff(); // Backoff 1 - target.nextBackoff(); // Backoff 2 - assertEquals(backoffStrategies[1], target.getCurrentStrategy()); - - target.reset(); - - assertEquals(backoffStrategies[0], target.getCurrentStrategy()); - - verify(firstBackoff).reset(); - verify(secondBackoff).reset(); - verify(thirdBackoff, never()).reset(); - } -} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ConstantTimeBackoffTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ConstantTimeBackoffTest.java deleted file mode 100644 index 1d4e2feae..000000000 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ConstantTimeBackoffTest.java +++ /dev/null @@ -1,50 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; - -import org.junit.jupiter.api.Test; - -class ConstantTimeBackoffTest { - @Test - void isNotExhaustedInitially() { - ConstantTimeBackoff constantTimeBackoff = new ConstantTimeBackoff(1000); - assertFalse(constantTimeBackoff.isExhausted()); - } - - @Test - void isNotExhaustedAfterNextBackoff() { - ConstantTimeBackoff constantTimeBackoff = new ConstantTimeBackoff(1000); - constantTimeBackoff.nextBackoff(); - assertFalse(constantTimeBackoff.isExhausted()); - } - - @Test - void getCurrentBackoffMillis() { - ConstantTimeBackoff constantTimeBackoff = new ConstantTimeBackoff(1000); - assertEquals(1000, constantTimeBackoff.getCurrentBackoffMillis()); - } - - @Test - void nextBackoffDoesNotChangeCurrentValue() { - ConstantTimeBackoff constantTimeBackoff = new ConstantTimeBackoff(1000); - constantTimeBackoff.nextBackoff(); - assertEquals(1000, constantTimeBackoff.getCurrentBackoffMillis()); - } - - @Test - void resetDoesNotChangeCurrentValue() { - ConstantTimeBackoff constantTimeBackoff = new ConstantTimeBackoff(1000); - constantTimeBackoff.nextBackoff(); - constantTimeBackoff.reset(); - assertEquals(1000, constantTimeBackoff.getCurrentBackoffMillis()); - } - - @Test - void resetDoesNotChangeIsExhausted() { - ConstantTimeBackoff constantTimeBackoff = new ConstantTimeBackoff(1000); - constantTimeBackoff.nextBackoff(); - constantTimeBackoff.reset(); - assertFalse(constantTimeBackoff.isExhausted()); - } -} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ExponentialTimeBackoffTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ExponentialTimeBackoffTest.java deleted file mode 100644 index 8c07d6115..000000000 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/ExponentialTimeBackoffTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; - -class ExponentialTimeBackoffTest { - @Test - void isNotExhaustedInitially() { - ExponentialTimeBackoff target = new ExponentialTimeBackoff(1000); - assertFalse(target.isExhausted()); - } - - @Test - void isNotExhaustedAfterNextBackoff() { - ExponentialTimeBackoff target = new ExponentialTimeBackoff(1000); - target.nextBackoff(); - assertFalse(target.isExhausted()); - } - - @Test - void getCurrentBackoffMillis() { - ExponentialTimeBackoff target = new ExponentialTimeBackoff(1000); - assertEquals(1000, target.getCurrentBackoffMillis()); - } - - @ParameterizedTest(name = "{0} times backoff") - @ValueSource(ints = {1, 2, 3, 4, 5}) - void backoffIncreasesExponentially(int iteration) { - ExponentialTimeBackoff target = new ExponentialTimeBackoff(2000, Long.MAX_VALUE); - for (int i = 0; i < iteration; i++) { - target.nextBackoff(); - } - - long expectedValue = ((long) Math.pow(2, iteration)) * 2000; - assertEquals(expectedValue, target.getCurrentBackoffMillis()); - } - - @Test - void getCurrentBackoffMillisDoesNotIncreaseBeyondMaxBackoff() { - ExponentialTimeBackoff target = new ExponentialTimeBackoff(1000, 5000); - - target.nextBackoff(); // 2000 - target.nextBackoff(); // 4000 - target.nextBackoff(); // 5000 (8000) - - assertEquals(5000, target.getCurrentBackoffMillis()); - } - - @Test - void maxDefaultBackoffIsDefinedWhenNoBoundaryIsSet() { - ExponentialTimeBackoff target = new ExponentialTimeBackoff(1000); - - // ~7 iterations == 128000; DEFAULT_MAX_BACK_OFF == 120000 - for (int i = 0; i < 7; i++) { - target.nextBackoff(); - } - - assertEquals(ExponentialTimeBackoff.DEFAULT_MAX_BACK_OFF, target.getCurrentBackoffMillis()); - } - - @Test - void resetResetsToInitialValue() { - ExponentialTimeBackoff target = new ExponentialTimeBackoff(1000); - target.nextBackoff(); - target.reset(); - assertEquals(1000, target.getCurrentBackoffMillis()); - } - - @Test - void resetDoesNotChangeIsExhausted() { - ExponentialTimeBackoff target = new ExponentialTimeBackoff(1000); - target.nextBackoff(); - target.reset(); - assertFalse(target.isExhausted()); - } -} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/NumberOfRetriesBackoffTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/NumberOfRetriesBackoffTest.java deleted file mode 100644 index ba3fdf514..000000000 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/common/backoff/NumberOfRetriesBackoffTest.java +++ /dev/null @@ -1,112 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common.backoff; - -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; - -import org.junit.jupiter.api.Test; - -class NumberOfRetriesBackoffTest { - @Test - void currentBackoffMillisIsReadFromInnerBackoff() { - final long expectedBackoffMillis = 1000; - - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - when(backoffMock.getCurrentBackoffMillis()).thenReturn(expectedBackoffMillis); - - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(1, backoffMock); - - long actualBackoffMillis = target.getCurrentBackoffMillis(); - - assertEquals(expectedBackoffMillis, actualBackoffMillis); - verify(backoffMock).getCurrentBackoffMillis(); - } - - @Test - void initialRetryCountIsZero() { - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(1, backoffMock); - - assertEquals(0, target.getRetryCount()); - } - - @Test - void nextBackoffIncreasesRetryCount() { - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(1, backoffMock); - - target.nextBackoff(); - assertEquals(1, target.getRetryCount()); - } - - @Test - void nextBackoffCallsInnerBackoff() { - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(1, backoffMock); - - target.nextBackoff(); - verify(backoffMock).nextBackoff(); - } - - @Test - void nextBackoffIsNotIncreasedIfRetryCountReached() { - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(2, backoffMock); - - target.nextBackoff(); - target.nextBackoff(); - target.nextBackoff(); - - assertEquals(2, target.getRetryCount()); - } - - @Test - void BackoffIsExhaustedIfRetryCountReached() { - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(2, backoffMock); - - assertFalse(target.isExhausted()); - target.nextBackoff(); - - assertFalse(target.isExhausted()); - target.nextBackoff(); - - assertTrue(target.isExhausted()); - target.nextBackoff(); - - assertTrue(target.isExhausted()); - } - - @Test - void nextBackoffDoesNotCallInnerBackoffIfRetryCountReached() { - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(2, backoffMock); - - target.nextBackoff(); - target.nextBackoff(); - verify(backoffMock, times(2)).nextBackoff(); - - target.nextBackoff(); - verifyNoMoreInteractions(backoffMock); - } - - @Test - void resetCallsInnerBackoff() { - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(1, backoffMock); - - target.reset(); - verify(backoffMock).reset(); - } - - @Test - void resetResetsRetryCount() { - BackoffStrategy backoffMock = mock(BackoffStrategy.class); - NumberOfRetriesBackoff target = new NumberOfRetriesBackoff(1, backoffMock); - - // increase retry count - target.nextBackoff(); - - target.reset(); - assertEquals(0, target.getRetryCount()); - } -} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java index 08c758c0b..e49046d13 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java @@ -67,9 +67,9 @@ public void connectorSetup() { .build(); // then - assertInstanceOf(GrpcStreamConnector.class, InProcessResolver.getConnector(forGrpcOptions)); - assertInstanceOf(FileConnector.class, InProcessResolver.getConnector(forOfflineOptions)); - assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions)); + assertInstanceOf(GrpcStreamConnector.class, InProcessResolver.getConnector(forGrpcOptions, e -> {})); + assertInstanceOf(FileConnector.class, InProcessResolver.getConnector(forOfflineOptions, e -> {})); + assertInstanceOf(MockConnector.class, InProcessResolver.getConnector(forCustomConnectorOptions, e -> {})); } @Test @@ -114,10 +114,6 @@ public void eventHandling() throws Throwable { assertEquals(StorageState.OK, storageState.getStorageState()); assertEquals(val, storageState.getSyncMetadata().getValue(key).asString()); }); - - assertTimeoutPreemptively(Duration.ofMillis(200), () -> { - assertEquals(StorageState.ERROR, receiver.take().getStorageState()); - }); } @Test diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java deleted file mode 100644 index f17e09304..000000000 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java +++ /dev/null @@ -1,233 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc; - -import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.*; - -import com.google.protobuf.Struct; -import dev.openfeature.contrib.providers.flagd.FlagdOptions; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; -import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; -import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceBlockingStub; -import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc.FlagSyncServiceStub; -import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; -import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsRequest; -import dev.openfeature.flagd.grpc.sync.Sync.SyncFlagsResponse; -import java.lang.reflect.Field; -import java.time.Duration; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.Test; - -class GrpcStreamConnectorTest { - - private static final Duration MAX_WAIT_MS = Duration.ofMillis(500); - - @Test - public void connectionParameters() throws Throwable { - // given - final FlagdOptions options = FlagdOptions.builder() - .selector("selector") - .deadline(1337) - .streamDeadlineMs(87699) - .build(); - - final GrpcStreamConnector connector = new GrpcStreamConnector(options); - final FlagSyncServiceStub stubMock = mockStubAndReturn(connector); - final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); - final SyncFlagsRequest[] request = new SyncFlagsRequest[1]; - - doAnswer(invocation -> { - request[0] = invocation.getArgument(0, SyncFlagsRequest.class); - return null; - }) - .when(stubMock) - .syncFlags(any(), any()); - - // when - connector.init(); - verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); - verify(blockingStubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS); - verify(stubMock).withDeadlineAfter(87699, TimeUnit.MILLISECONDS); - - // then - final SyncFlagsRequest flagsRequest = request[0]; - assertNotNull(flagsRequest); - assertEquals("selector", flagsRequest.getSelector()); - } - - @Test - public void disableStreamDeadline() throws Throwable { - // given - final FlagdOptions options = - FlagdOptions.builder().selector("selector").streamDeadlineMs(0).build(); - - final GrpcStreamConnector connector = new GrpcStreamConnector(options); - final FlagSyncServiceStub stubMock = mockStubAndReturn(connector); - final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); - final SyncFlagsRequest[] request = new SyncFlagsRequest[1]; - - doAnswer(invocation -> { - request[0] = invocation.getArgument(0, SyncFlagsRequest.class); - return null; - }) - .when(stubMock) - .syncFlags(any(), any()); - - // when - connector.init(); - verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); - verify(stubMock, never()).withDeadlineAfter(anyLong(), any()); - - // then - final SyncFlagsRequest flagsRequest = request[0]; - assertNotNull(flagsRequest); - assertEquals("selector", flagsRequest.getSelector()); - } - - @Test - public void grpcConnectionStatus() throws Throwable { - // given - final String key = "key1"; - final String val = "value1"; - final GrpcStreamConnector connector = - new GrpcStreamConnector(FlagdOptions.builder().build()); - final FlagSyncServiceStub stubMock = mockStubAndReturn(connector); - final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); - final Struct metadata = Struct.newBuilder() - .putFields( - key, - com.google.protobuf.Value.newBuilder() - .setStringValue(val) - .build()) - .build(); - - when(blockingStubMock.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStubMock); - when(blockingStubMock.getMetadata(any())) - .thenReturn( - GetMetadataResponse.newBuilder().setMetadata(metadata).build()); - - final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1]; - - doAnswer(invocation -> { - injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class); - return null; - }) - .when(stubMock) - .syncFlags(any(), any()); - - // when - connector.init(); - // verify and wait for initialization - verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); - verify(blockingStubMock).getMetadata(any()); - - // then - final GrpcStreamHandler grpcStreamHandler = injectedHandler[0]; - assertNotNull(grpcStreamHandler); - - final BlockingQueue streamPayloads = connector.getStream(); - - // accepted data - grpcStreamHandler.onNext(SyncFlagsResponse.newBuilder().build()); - - assertTimeoutPreemptively(MAX_WAIT_MS, () -> { - QueuePayload payload = streamPayloads.take(); - assertEquals(QueuePayloadType.DATA, payload.getType()); - assertEquals( - val, - convertProtobufMapToStructure( - payload.getMetadataResponse().getMetadata().getFieldsMap()) - .asObjectMap() - .get(key)); - }); - - // ping must be ignored - grpcStreamHandler.onNext(SyncFlagsResponse.newBuilder().build()); - - // accepted data - grpcStreamHandler.onNext(SyncFlagsResponse.newBuilder().build()); - - assertTimeoutPreemptively(MAX_WAIT_MS, () -> { - QueuePayload payload = streamPayloads.take(); - assertEquals(QueuePayloadType.DATA, payload.getType()); - }); - } - - @Test - public void listenerExitOnShutdown() throws Throwable { - // given - final GrpcStreamConnector connector = - new GrpcStreamConnector(FlagdOptions.builder().build()); - final FlagSyncServiceStub stubMock = mockStubAndReturn(connector); - final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector); - final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1]; - final Struct metadata = Struct.newBuilder().build(); - - when(blockingStubMock.withDeadlineAfter(anyLong(), any())).thenReturn(blockingStubMock); - when(blockingStubMock.getMetadata(any())) - .thenReturn( - GetMetadataResponse.newBuilder().setMetadata(metadata).build()); - when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock); - doAnswer(invocation -> { - injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class); - return null; - }) - .when(stubMock) - .syncFlags(any(), any()); - - // when - connector.init(); - // verify and wait for initialization - verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any()); - verify(blockingStubMock).getMetadata(any()); - - // then - final GrpcStreamHandler grpcStreamHandler = injectedHandler[0]; - assertNotNull(grpcStreamHandler); - - // invoke shutdown - connector.shutdown(); - // mock channel close of gRPC handler - grpcStreamHandler.onError(new Exception("Channel closed, exiting")); - - // Validate mock calls & no more event propagation - verify(stubMock, times(1)).syncFlags(any(), any()); - - grpcStreamHandler.onNext(SyncFlagsResponse.newBuilder() - // .setState(SyncService.SyncState.SYNC_STATE_ALL) - .build()); - - // there should be no data - assertNull(connector.getStream().poll(100, TimeUnit.MILLISECONDS)); - } - - private static FlagSyncServiceStub mockStubAndReturn(final GrpcStreamConnector connector) throws Throwable { - final Field serviceStubField = GrpcStreamConnector.class.getDeclaredField("serviceStub"); - serviceStubField.setAccessible(true); - - final FlagSyncServiceStub stubMock = mock(FlagSyncServiceStub.class); - when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock); - - serviceStubField.set(connector, stubMock); - - return stubMock; - } - - private static FlagSyncServiceBlockingStub mockBlockingStubAndReturn(final GrpcStreamConnector connector) - throws Throwable { - final Field blockingStubField = GrpcStreamConnector.class.getDeclaredField("serviceBlockingStub"); - blockingStubField.setAccessible(true); - - final FlagSyncServiceBlockingStub blockingStubMock = mock(FlagSyncServiceBlockingStub.class); - - blockingStubField.set(connector, blockingStubMock); - - return blockingStubMock; - } -}