Skip to content

Commit

Permalink
feat: flagd in process offline mode (#473)
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <[email protected]>
Signed-off-by: Kavindu Dodanduwa <[email protected]>
Co-authored-by: Giovanni Liva <[email protected]>
Co-authored-by: Todd Baert <[email protected]>
  • Loading branch information
3 people authored Oct 9, 2023
1 parent 51d15d2 commit 6920557
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 21 deletions.
15 changes: 15 additions & 0 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ FlagdProvider flagdProvider = new FlagdProvider(

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).

In-process resolver can also work in an offline mode. To enable this mode, you should provide a valid flag configuration file with the option `offlineFlagSourcePath`.
The file must contain a valid flagd flag source file.

```java
FlagdProvider flagdProvider = new FlagdProvider(
FlagdOptions.builder()
.resolverType(Config.Evaluator.IN_PROCESS)
.offlineFlagSourcePath("PATH")
.build());
```

Provider will not detect file changes nor re-read the file after the initial read.
This mode is useful for local development, test cases and for offline application.
For a full-featured, production-ready file-based implementation, use the RPC evaluator in combination with the flagd standalone application, which can be configured to watch files for changes.

### Configuration options

Options can be defined in the constructor or as environment variables, with constructor options having the highest
Expand Down
Binary file modified providers/flagd/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import lombok.Builder;
import lombok.Getter;

import javax.annotation.Nonnull;

import static dev.openfeature.contrib.providers.flagd.Config.BASE_EVENT_STREAM_RETRY_BACKOFF_MS;
import static dev.openfeature.contrib.providers.flagd.Config.BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.CACHE_ENV_VAR_NAME;
Expand All @@ -20,78 +22,78 @@
import static dev.openfeature.contrib.providers.flagd.Config.MAX_CACHE_SIZE_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.PORT_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.SOURCE_SELECTOR_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.SERVER_CERT_PATH_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.SOCKET_PATH_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.SOURCE_SELECTOR_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.TLS_ENV_VAR_NAME;
import static dev.openfeature.contrib.providers.flagd.Config.fallBackToEnvOrDefault;

/**
* FlagdOptions is a builder to build flagd provider options.
* */
*/
@Builder
@Getter
@SuppressWarnings("PMD.TooManyStaticImports")
public class FlagdOptions {

/**
* flagd resolving type.
* */
*/
@Builder.Default
private Config.Evaluator resolverType = DEFAULT_RESOLVER_TYPE;

/**
* flagd connection host.
* */
*/
@Builder.Default
private String host = fallBackToEnvOrDefault(HOST_ENV_VAR_NAME, DEFAULT_HOST);

/**
* flagd connection port.
* */
*/
@Builder.Default
private int port = Integer.parseInt(fallBackToEnvOrDefault(PORT_ENV_VAR_NAME, DEFAULT_PORT));

/**
* Use TLS connectivity.
* */
*/
@Builder.Default
private boolean tls = Boolean.parseBoolean(fallBackToEnvOrDefault(TLS_ENV_VAR_NAME, DEFAULT_TLS));

/**
* TLS certificate overriding if TLS connectivity is used.
* */
*/
@Builder.Default
private String certPath = fallBackToEnvOrDefault(SERVER_CERT_PATH_ENV_VAR_NAME, null);

/**
* Unix socket path to flagd.
* */
*/
@Builder.Default
private String socketPath = fallBackToEnvOrDefault(SOCKET_PATH_ENV_VAR_NAME, null);

/**
* Cache type to use. Supports - lru, disabled.
* */
*/
@Builder.Default
private String cacheType = fallBackToEnvOrDefault(CACHE_ENV_VAR_NAME, DEFAULT_CACHE);

/**
* Max cache size.
* */
*/
@Builder.Default
private int maxCacheSize = fallBackToEnvOrDefault(MAX_CACHE_SIZE_ENV_VAR_NAME, DEFAULT_MAX_CACHE_SIZE);

/**
* Max event stream connection retries.
* */
*/
@Builder.Default
private int maxEventStreamRetries =
fallBackToEnvOrDefault(MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, DEFAULT_MAX_EVENT_STREAM_RETRIES);

/**
* Backoff interval in milliseconds.
* */
*/
@Builder.Default
private int retryBackoffMs =
fallBackToEnvOrDefault(BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME, BASE_EVENT_STREAM_RETRY_BACKOFF_MS);
Expand All @@ -101,7 +103,7 @@ public class FlagdOptions {
* Connection deadline in milliseconds.
* For RPC resolving, this is the deadline to connect to flagd for flag evaluation.
* For in-process resolving, this is the deadline for sync stream termination.
* */
*/
@Builder.Default
private int deadline = fallBackToEnvOrDefault(DEADLINE_MS_ENV_VAR_NAME, DEFAULT_DEADLINE);

Expand All @@ -111,9 +113,43 @@ public class FlagdOptions {
@Builder.Default
private String selector = fallBackToEnvOrDefault(SOURCE_SELECTOR_ENV_VAR_NAME, null);

/**
* File source of flags to be used by offline mode.
* Setting this enables the offline mode of the in-process provider.
*/
private String offlineFlagSourcePath;

/**
* Flagd option to state the offline mode. Only get set with offlineFlagSourcePath.
*/
private boolean isOffline;

/**
* Inject OpenTelemetry for the library runtime. Providing sdk will initiate distributed tracing for flagd grpc
* connectivity.
* */
*/
private OpenTelemetry openTelemetry;

/**
* Overload default lombok builder.
*/
public static class FlagdOptionsBuilder {

/**
* File source of flags to be used by offline mode.
* Setting this enables the offline mode of the in-process provider.
*/
public FlagdOptionsBuilder offlineFlagSourcePath(@Nonnull final String offlineFlagSourcePath) {
this.isOffline = true;
this.offlineFlagSourcePath = offlineFlagSourcePath;

return this;
}

// Remove the public access as this needs to be connected to offlineFlagSourcePath
@SuppressWarnings({"PMD.UnusedFormalParameter", "PMD.UnusedPrivateMethod"})
private FlagdOptionsBuilder isOffline(final boolean isOffline) {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.FlagStore;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.Storage;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.StorageState;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file.FileConnector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamConnector;
import dev.openfeature.contrib.providers.flagd.resolver.process.targeting.Operator;
import dev.openfeature.contrib.providers.flagd.resolver.process.targeting.TargetingRuleException;
Expand Down Expand Up @@ -42,8 +44,11 @@ public class InProcessResolver implements Resolver {
* Initialize an in-process resolver.
*/
public InProcessResolver(FlagdOptions options, Consumer<ProviderState> stateConsumer) {
// currently we support gRPC connector
this.flagStore = new FlagStore(new GrpcStreamConnector(options));
final Connector connector = options.isOffline()
? new FileConnector(options.getOfflineFlagSourcePath())
: new GrpcStreamConnector(options);

this.flagStore = new FlagStore(connector);
this.deadline = options.getDeadline();
this.stateConsumer = stateConsumer;
this.operator = new Operator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public FlagStore(final Connector connector) {
/**
* Initialize storage layer.
*/
public void init() {
public void init() throws Exception {
connector.init();
Thread streamer = new Thread(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* Storage abstraction for resolver.
*/
public interface Storage {
void init();
void init() throws Exception;

void shutdown() throws InterruptedException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* {@link StreamPayload} format.
*/
public interface Connector {
void init();
void init() throws Exception;

BlockingQueue<StreamPayload> getStream();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file;

import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* File connector reads flag configurations and expose the context through {@code Connector} contract.
* The implementation is kept minimal and suites testing, local development needs.
*/
@SuppressFBWarnings(value = {"EI_EXPOSE_REP", "PATH_TRAVERSAL_IN"},
justification = "File connector read feature flag from a file source.")
@Slf4j
public class FileConnector implements Connector {

private final String flagSourcePath;
private final BlockingQueue<StreamPayload> queue = new LinkedBlockingQueue<>(1);

public FileConnector(final String flagSourcePath) {
this.flagSourcePath = flagSourcePath;
}

/**
* Initialize file connector. Reads content of the provided source file and offer it through queue.
*/
public void init() throws IOException {
final String flagData = new String(Files.readAllBytes(Paths.get(flagSourcePath)), StandardCharsets.UTF_8);

if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
throw new RuntimeException("Unable to write to queue. Queue is full.");
}

log.info(String.format("Using feature flag configurations from file %s", flagSourcePath));
}

/**
* Expose the queue to fulfil the {@code Connector} contract.
*/
public BlockingQueue<StreamPayload> getStream() {
return queue;
}

/**
* NO-OP shutdown.
*/
public void shutdown() throws InterruptedException {
// NO-OP nothing to do here
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ public void TestDefaults() {
assertEquals(builder.getMaxEventStreamRetries(), DEFAULT_MAX_EVENT_STREAM_RETRIES);
assertNull(builder.getSelector());
assertNull(builder.getOpenTelemetry());
assertNull(builder.getOfflineFlagSourcePath());
assertFalse(builder.isOffline());
}

@Test
public void TestBuilderOptions(){
public void TestBuilderOptions() {
OpenTelemetry openTelemetry = Mockito.mock(OpenTelemetry.class);

FlagdOptions flagdOptions = FlagdOptions.builder()
Expand All @@ -45,6 +47,7 @@ public void TestBuilderOptions(){
.maxCacheSize(100)
.maxEventStreamRetries(1)
.selector("app=weatherApp")
.offlineFlagSourcePath("some-path")
.openTelemetry(openTelemetry)
.build();

Expand All @@ -56,6 +59,8 @@ public void TestBuilderOptions(){
assertEquals(flagdOptions.getMaxCacheSize(), 100);
assertEquals(flagdOptions.getMaxEventStreamRetries(), 1);
assertEquals(flagdOptions.getSelector(), "app=weatherApp");
assertTrue(flagdOptions.isOffline());
assertEquals("some-path", flagdOptions.getOfflineFlagSourcePath());
assertEquals(flagdOptions.getOpenTelemetry(), openTelemetry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,12 @@ public static String getFlagsFromResource(final String file) throws IOException
return new String(Files.readAllBytes(Paths.get(url.getPath())));
}
}

public static String getResourcePath(final String relativePath) {
final URL url = FlagParser.class.getClassLoader().getResource(relativePath);
if (url == null) {
throw new IllegalStateException(String.format("Resource %s not found", relativePath));
}
return url.getPath();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
class FlagStoreTest {

@Test
public void connectorHandling() throws InterruptedException {
public void connectorHandling() throws Exception {
final int maxDelay = 500;

final BlockingQueue<StreamPayload> payload = new LinkedBlockingQueue<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.file;

import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;

import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.VALID_LONG;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.getResourcePath;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;

class FileConnectorTest {

@Test
void readAndExposeFeatureFlagsFromSource() throws IOException {
// given
final FileConnector connector = new FileConnector(getResourcePath(VALID_LONG));

// when
connector.init();

// then
final BlockingQueue<StreamPayload> stream = connector.getStream();
final StreamPayload[] payload = new StreamPayload[1];

assertNotNull(stream);
assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
payload[0] = stream.take();
});

assertNotNull(payload[0].getData());
assertEquals(StreamPayloadType.DATA, payload[0].getType());
}

@Test
void throwsErrorIfInvalidFile(){
// given
final FileConnector connector = new FileConnector("INVALID_PATH");

// then
assertThrows(IOException.class, connector::init);
}

}

0 comments on commit 6920557

Please sign in to comment.