Skip to content

Commit

Permalink
Implement multi tenancy in Flow Framework (opensearch-project#980)
Browse files Browse the repository at this point in the history
* Import SdkClient and inject it

Signed-off-by: Daniel Widdis <[email protected]>

* Pass sdkClient to IndicesHandler and EncryptorUtils classes

Signed-off-by: Daniel Widdis <[email protected]>

* Extract tenant id from REST header into RestAction

Signed-off-by: Daniel Widdis <[email protected]>

* Pass tenant id to transport actions in template

Signed-off-by: Daniel Widdis <[email protected]>

* Validate tenant id existence in workflow transport actions

Signed-off-by: Daniel Widdis <[email protected]>

* Pass SdkClient and tenant id to util used for access control checks

Signed-off-by: Daniel Widdis <[email protected]>

* Perform tenant id validation checks for workflow APIs

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate Update workflow get action to SdkCleint

Signed-off-by: Daniel Widdis <[email protected]>

* Pass tenantId to IndicesHandler and use in EncryptorUtils

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate EncryptorUtils getting master key from index

Signed-off-by: Daniel Widdis <[email protected]>

* Refactor fetching master key to permit reuse

Signed-off-by: Daniel Widdis <[email protected]>

* Refactor initializeMasterKey to use common code

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate indexing new key to config

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate template indexing to sdkClient

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate template deletion to sdkClient

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate get template to sdkClient

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate provision template to sdkClient

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate max workflow search to sdkClient

Signed-off-by: Daniel Widdis <[email protected]>

* Add tenantId to GetWorkflowStateRequest

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate GetWorkflowStateRequest to multitenant client

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate getProvisioningProgress to avoid repetition

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate canDeleteWorkflowStateDoc to avoid repetition

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate initial state document creation to metadata client

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate state document deletion to metadata client

Signed-off-by: Daniel Widdis <[email protected]>

* Add Tenant aware Rest Tests for Workflows

Signed-off-by: Daniel Widdis <[email protected]>

* Fix javadocs

Signed-off-by: Daniel Widdis <[email protected]>

* Add publishToMavenLocal for more CI

Signed-off-by: Daniel Widdis <[email protected]>

* Fix some CI

Signed-off-by: Daniel Widdis <[email protected]>

* Enable tenant aware search

Signed-off-by: Daniel Widdis <[email protected]>

* Refactor state index update method using multitenant client

Signed-off-by: Daniel Widdis <[email protected]>

* Get metadata client artifacts from Maven Snapshot

Signed-off-by: Daniel Widdis <[email protected]>

* Update tests for new update async code

Signed-off-by: Daniel Widdis <[email protected]>

* Switch SdkClient to use default generic thread executor

Signed-off-by: Daniel Widdis <[email protected]>

* Migrate last updates to sdkClient

Signed-off-by: Daniel Widdis <[email protected]>

* Revert (most) changes to unit tests based on async client changes

Signed-off-by: Daniel Widdis <[email protected]>

* Pass tenant id when updating state during provisioning

Signed-off-by: Daniel Widdis <[email protected]>

* Integrate tenantId with synchronous provisioning

Signed-off-by: Daniel Widdis <[email protected]>

* Fix failing integ tests after rebase, code review updates

Signed-off-by: Daniel Widdis <[email protected]>

* Replace fakeTenantId placeholders with actual tenant id

Signed-off-by: Daniel Widdis <[email protected]>

* Use version catalog for commons-lang3 and httpcore dependencies

Signed-off-by: Daniel Widdis <[email protected]>

* Exclude transitive httpclient dependency from metadata and rest client

Signed-off-by: Daniel Widdis <[email protected]>

* Fix more test errors and tweak dependencies

Signed-off-by: Daniel Widdis <[email protected]>

* More code review comments and refactoring

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jan 25, 2025
1 parent c9442f4 commit ff500da
Show file tree
Hide file tree
Showing 84 changed files with 3,377 additions and 1,097 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,22 @@ jobs:
- name: Build and Run Tests
run: |
./gradlew integTest -PnumNodes=3
integTenantAwareTest:
needs: [spotless, javadoc]
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
java: [21]
name: Tenant Aware Integ Test JDK${{ matrix.java }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java }}
distribution: temurin
- name: Build and Run Tests
run: |
./gradlew integTest "-Dtests.rest.tenantaware=true"
1 change: 0 additions & 1 deletion .github/workflows/test_security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ jobs:
steps:
- name: Run start commands
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}

- name: Checkout Flow Framework
uses: actions/checkout@v4
- name: Setup Java ${{ matrix.java }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.17...2.x)
### Features
- Add multitenant remote metadata client ([#980](https://github.com/opensearch-project/flow-framework/pull/980))
- Add synchronous execution option to workflow provisioning ([#990](https://github.com/opensearch-project/flow-framework/pull/990))
- Add ApiSpecFetcher for Fetching and Comparing API Specifications ([#651](https://github.com/opensearch-project/flow-framework/issues/651))
- Add optional config field to tool step ([#899](https://github.com/opensearch-project/flow-framework/pull/899))
Expand Down
50 changes: 43 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,18 @@ configurations {

dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.11.4'
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
implementation "org.apache.commons:commons-lang3:${versions.commonslang}"
api(group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}") {
exclude group: "org.apache.httpcomponents.client5", module: "httpclient5"
}
api group: 'org.slf4j', name: 'slf4j-api', version: '1.7.36'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.17.0'
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.amazonaws:aws-encryption-sdk-java:3.0.1"
implementation "software.amazon.cryptography:aws-cryptographic-material-providers:1.8.0"
implementation "org.dafny:DafnyRuntime:4.9.1"
implementation "software.amazon.smithy.dafny:conversion:0.1.1"
implementation 'org.bouncycastle:bcprov-jdk18on:1.78.1'
implementation 'org.bouncycastle:bcprov-jdk18on:1.80'
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.4"
Expand All @@ -188,7 +189,11 @@ dependencies {
implementation "io.swagger.parser.v3:swagger-parser-core:${swaggerVersion}"
implementation "io.swagger.parser.v3:swagger-parser:${swaggerVersion}"
implementation "io.swagger.parser.v3:swagger-parser-v3:${swaggerVersion}"
// Declare Jackson dependencies for tests (from OpenSearch version catalog)
// Multi-tenant SDK Client
implementation ("org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}") {
exclude group: "org.apache.httpcomponents.client5", module: "httpclient5"
}
testImplementation 'org.junit.jupiter:junit-jupiter:5.11.4'
testImplementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
testImplementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
testImplementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson_databind}"
Expand All @@ -202,7 +207,6 @@ dependencies {
configurations.all {
resolutionStrategy {
force("com.google.guava:guava:33.4.0-jre") // CVE for 31.1, keep to force transitive dependencies
force("org.apache.httpcomponents.core5:httpcore5:5.3.2") // Dependency Jar Hell
}
}
}
Expand Down Expand Up @@ -262,10 +266,19 @@ integTest {
systemProperty('user', user)
systemProperty('password', password)

// Only tenant aware test if set
if (System.getProperty("tests.rest.tenantaware") == "true") {
filter {
includeTestsMatching "org.opensearch.flowframework.*TenantAwareIT"
}
systemProperty "plugins.flow_framework.multi_tenancy_enabled", "true"
}

// Only rest case can run with remote cluster
if (System.getProperty("tests.rest.cluster") != null) {
if (System.getProperty("tests.rest.cluster") != null && System.getProperty("tests.rest.tenantaware") == null) {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.*IT"
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
}
}

Expand All @@ -288,11 +301,34 @@ integTest {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkSecureRestApiIT"
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkRestApiIT"
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
}
}

// doFirst delays this block until execution time
doFirst {
if (System.getProperty("tests.rest.tenantaware") == "true") {
def ymlFile = file("$buildDir/testclusters/integTest-0/config/opensearch.yml")
if (ymlFile.exists()) {
ymlFile.withWriterAppend {
writer ->
writer.write("\n# Set multitenancy\n")
writer.write("plugins.flow_framework.multi_tenancy_enabled: true\n")
}
// TODO this properly uses the remote client factory but needs a remote cluster set up
// TODO get the endpoint from a system property
if (System.getProperty("tests.rest.cluster") != null) {
ymlFile.withWriterAppend { writer ->
writer.write("\n# Use a remote cluster\n")
writer.write("plugins.flow_framework.remote_metadata_type: RemoteOpenSearch\n")
writer.write("plugins.flow_framework.remote_metadata_endpoint: https://127.0.0.1:9200\n")
}
}
} else {
throw new GradleException("opensearch.yml not found at: $ymlFile")
}
}

// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
// use longer timeouts for requests.
def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.remote.metadata.client.impl.SdkClientFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
Expand All @@ -75,22 +77,36 @@
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX;
import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.TENANT_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_ENDPOINT;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_REGION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_SERVICE_NAME;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_TYPE;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_ID_FIELD_KEY;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
Expand Down Expand Up @@ -121,9 +137,28 @@ public Collection<Object> createComponents(
Settings settings = environment.settings();
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, xContentRegistry);
SdkClient sdkClient = SdkClientFactory.createSdkClient(
client,
xContentRegistry,
// Here we assume remote metadata client is only used with tenant awareness.
// This may change in the future allowing more options for this map
FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.get(settings)
? Map.ofEntries(
Map.entry(REMOTE_METADATA_TYPE_KEY, REMOTE_METADATA_TYPE.get(settings)),
Map.entry(REMOTE_METADATA_ENDPOINT_KEY, REMOTE_METADATA_ENDPOINT.get(settings)),
Map.entry(REMOTE_METADATA_REGION_KEY, REMOTE_METADATA_REGION.get(settings)),
Map.entry(REMOTE_METADATA_SERVICE_NAME_KEY, REMOTE_METADATA_SERVICE_NAME.get(settings)),
Map.entry(TENANT_AWARE_KEY, "true"),
Map.entry(TENANT_ID_FIELD_KEY, TENANT_ID_FIELD)
)
: Collections.emptyMap(),
// TODO: Find a better thread pool or make one
client.threadPool().executor(ThreadPool.Names.GENERIC)
);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, sdkClient, xContentRegistry);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(
client,
sdkClient,
clusterService,
encryptorUtils,
xContentRegistry
Expand All @@ -137,15 +172,22 @@ public Collection<Object> createComponents(
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings);

SearchHandler searchHandler = new SearchHandler(settings, clusterService, client, FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES);
SearchHandler searchHandler = new SearchHandler(
settings,
clusterService,
client,
sdkClient,
FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES
);

return List.of(
workflowStepFactory,
workflowProcessSorter,
encryptorUtils,
flowFrameworkIndicesHandler,
searchHandler,
flowFrameworkSettings
flowFrameworkSettings,
sdkClient
);
}

Expand Down Expand Up @@ -196,7 +238,12 @@ public List<Setting<?>> getSettings() {
MAX_WORKFLOW_STEPS,
WORKFLOW_REQUEST_TIMEOUT,
TASK_REQUEST_RETRY_DURATION,
FILTER_BY_BACKEND_ROLES
FILTER_BY_BACKEND_ROLES,
FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED,
REMOTE_METADATA_TYPE,
REMOTE_METADATA_ENDPOINT,
REMOTE_METADATA_REGION,
REMOTE_METADATA_SERVICE_NAME
);
}

Expand All @@ -206,21 +253,21 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
PROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(8, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/opensearch/flowframework/common/CommonValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
package org.opensearch.flowframework.common;

import org.opensearch.Version;

/**
* Representation of common values that are used across project
*/
Expand Down Expand Up @@ -82,6 +84,10 @@ private CommonValue() {}
public static final String USE_CASE = "use_case";
/** The param name for reprovisioning, used by the create workflow API */
public static final String REPROVISION_WORKFLOW = "reprovision";
/** The REST header containing the tenant id */
public static final String TENANT_ID_HEADER = "x-tenant-id";
/** The field name containing the tenant id */
public static final String TENANT_ID_FIELD = "tenant_id";

/*
* Constants associated with plugin configuration
Expand Down Expand Up @@ -244,4 +250,9 @@ private CommonValue() {}
public static final String ML_COMMONS_API_SPEC_YAML_URI =
"https://raw.githubusercontent.com/opensearch-project/opensearch-api-specification/refs/heads/main/spec/namespaces/ml.yaml";

/*
* Constants associated with non-BWC features
*/
/** Version 2.19.0 */
public static final Version VERSION_2_19_0 = Version.fromString("2.19.0");
}
Loading

0 comments on commit ff500da

Please sign in to comment.