Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multi tenancy in Flow Framework #980

Merged
merged 43 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
b735dc9
Import SdkClient and inject it
dbwiddis Dec 12, 2024
d6ad44e
Pass sdkClient to IndicesHandler and EncryptorUtils classes
dbwiddis Dec 12, 2024
62d037f
Extract tenant id from REST header into RestAction
dbwiddis Dec 13, 2024
19ee59c
Pass tenant id to transport actions in template
dbwiddis Dec 13, 2024
7233b07
Validate tenant id existence in workflow transport actions
dbwiddis Dec 13, 2024
d9444c2
Pass SdkClient and tenant id to util used for access control checks
dbwiddis Dec 14, 2024
4e80fd5
Perform tenant id validation checks for workflow APIs
dbwiddis Dec 16, 2024
de2ef7d
Migrate Update workflow get action to SdkCleint
dbwiddis Dec 18, 2024
0516813
Pass tenantId to IndicesHandler and use in EncryptorUtils
dbwiddis Dec 18, 2024
d3e7afd
Migrate EncryptorUtils getting master key from index
dbwiddis Dec 20, 2024
4b04466
Refactor fetching master key to permit reuse
dbwiddis Dec 20, 2024
2dd8659
Refactor initializeMasterKey to use common code
dbwiddis Dec 20, 2024
b9ce179
Migrate indexing new key to config
dbwiddis Dec 20, 2024
ae081da
Migrate template indexing to sdkClient
dbwiddis Dec 20, 2024
aa6478f
Migrate template deletion to sdkClient
dbwiddis Dec 21, 2024
634e493
Migrate get template to sdkClient
dbwiddis Dec 21, 2024
0208fcd
Migrate provision template to sdkClient
dbwiddis Dec 21, 2024
d627aee
Migrate max workflow search to sdkClient
dbwiddis Dec 22, 2024
2537ef3
Add tenantId to GetWorkflowStateRequest
dbwiddis Dec 22, 2024
43a2b1a
Migrate GetWorkflowStateRequest to multitenant client
dbwiddis Dec 23, 2024
c3e991b
Migrate getProvisioningProgress to avoid repetition
dbwiddis Dec 23, 2024
99229d5
Migrate canDeleteWorkflowStateDoc to avoid repetition
dbwiddis Dec 23, 2024
19bdf9e
Migrate initial state document creation to metadata client
dbwiddis Dec 24, 2024
75f9abb
Migrate state document deletion to metadata client
dbwiddis Dec 24, 2024
5662f42
Add Tenant aware Rest Tests for Workflows
dbwiddis Dec 26, 2024
b05c763
Fix javadocs
dbwiddis Dec 27, 2024
a15cd6a
Add publishToMavenLocal for more CI
dbwiddis Dec 27, 2024
02aa702
Fix some CI
dbwiddis Dec 27, 2024
b2e0b19
Enable tenant aware search
dbwiddis Dec 28, 2024
e8fb48b
Refactor state index update method using multitenant client
dbwiddis Jan 7, 2025
0a70337
Get metadata client artifacts from Maven Snapshot
dbwiddis Jan 7, 2025
10c2797
Update tests for new update async code
dbwiddis Jan 8, 2025
9314dd8
Switch SdkClient to use default generic thread executor
dbwiddis Jan 15, 2025
cd33238
Migrate last updates to sdkClient
dbwiddis Jan 15, 2025
5bc549a
Revert (most) changes to unit tests based on async client changes
dbwiddis Jan 15, 2025
959c28f
Pass tenant id when updating state during provisioning
dbwiddis Jan 16, 2025
3028de0
Integrate tenantId with synchronous provisioning
dbwiddis Jan 17, 2025
a325ad4
Fix failing integ tests after rebase, code review updates
dbwiddis Jan 23, 2025
72e3076
Replace fakeTenantId placeholders with actual tenant id
dbwiddis Jan 23, 2025
bd85c69
Use version catalog for commons-lang3 and httpcore dependencies
dbwiddis Jan 23, 2025
ec010c0
Exclude transitive httpclient dependency from metadata and rest client
dbwiddis Jan 23, 2025
bbed331
Fix more test errors and tweak dependencies
dbwiddis Jan 24, 2025
3550786
More code review comments and refactoring
dbwiddis Jan 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,22 @@ jobs:
- name: Build and Run Tests
run: |
./gradlew integTest -PnumNodes=3
integTenantAwareTest:
needs: [spotless, javadoc]
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
java: [21]
name: Tenant Aware Integ Test JDK${{ matrix.java }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java }}
distribution: temurin
- name: Build and Run Tests
run: |
./gradlew integTest "-Dtests.rest.tenantaware=true"
1 change: 0 additions & 1 deletion .github/workflows/test_security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ jobs:
steps:
- name: Run start commands
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}

- name: Checkout Flow Framework
uses: actions/checkout@v4
- name: Setup Java ${{ matrix.java }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.18...2.x)
### Features
- Add multitenant remote metadata client ([#980](https://github.com/opensearch-project/flow-framework/pull/980))
- Add synchronous execution option to workflow provisioning ([#990](https://github.com/opensearch-project/flow-framework/pull/990))

### Enhancements
Expand Down
50 changes: 43 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,18 @@ configurations {

dependencies {
implementation "org.opensearch:opensearch:${opensearch_version}"
implementation 'org.junit.jupiter:junit-jupiter:5.11.4'
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
api group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.17.0'
api(group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}") {
exclude group: "org.apache.httpcomponents.client5", module: "httpclient5"
}
implementation "org.apache.commons:commons-lang3:${versions.commonslang}"
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.amazonaws:aws-encryption-sdk-java:3.0.1"
implementation "software.amazon.cryptography:aws-cryptographic-material-providers:1.8.0"
implementation "org.dafny:DafnyRuntime:4.9.1"
implementation "software.amazon.smithy.dafny:conversion:0.1.1"
implementation 'org.bouncycastle:bcprov-jdk18on:1.80'
api "org.apache.httpcomponents.core5:httpcore5:5.3.2"
api "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.4"
Expand All @@ -188,7 +189,11 @@ dependencies {
implementation "io.swagger.parser.v3:swagger-parser-core:${swaggerVersion}"
implementation "io.swagger.parser.v3:swagger-parser:${swaggerVersion}"
implementation "io.swagger.parser.v3:swagger-parser-v3:${swaggerVersion}"
// Declare Jackson dependencies for tests (from OpenSearch version catalog)
// Multi-tenant SDK Client
implementation ("org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}") {
exclude group: "org.apache.httpcomponents.client5", module: "httpclient5"
}
testImplementation 'org.junit.jupiter:junit-jupiter:5.11.4'
testImplementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
testImplementation "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
testImplementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson_databind}"
Expand All @@ -202,7 +207,6 @@ dependencies {
configurations.all {
resolutionStrategy {
force("com.google.guava:guava:33.4.0-jre") // CVE for 31.1, keep to force transitive dependencies
force("org.apache.httpcomponents.core5:httpcore5:5.3.2") // Dependency Jar Hell
}
}
}
Expand Down Expand Up @@ -262,10 +266,19 @@ integTest {
systemProperty('user', user)
systemProperty('password', password)

// Only tenant aware test if set
if (System.getProperty("tests.rest.tenantaware") == "true") {
filter {
includeTestsMatching "org.opensearch.flowframework.*TenantAwareIT"
}
systemProperty "plugins.flow_framework.multi_tenancy_enabled", "true"
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
}

// Only rest case can run with remote cluster
if (System.getProperty("tests.rest.cluster") != null) {
if (System.getProperty("tests.rest.cluster") != null && System.getProperty("tests.rest.tenantaware") == null) {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.*IT"
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
}
}

Expand All @@ -288,11 +301,34 @@ integTest {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkSecureRestApiIT"
excludeTestsMatching "org.opensearch.flowframework.rest.FlowFrameworkRestApiIT"
excludeTestsMatching "org.opensearch.flowframework.rest.*TenantAwareIT"
}
}

// doFirst delays this block until execution time
doFirst {
if (System.getProperty("tests.rest.tenantaware") == "true") {
def ymlFile = file("$buildDir/testclusters/integTest-0/config/opensearch.yml")
if (ymlFile.exists()) {
ymlFile.withWriterAppend {
writer ->
writer.write("\n# Set multitenancy\n")
writer.write("plugins.flow_framework.multi_tenancy_enabled: true\n")
}
// TODO this properly uses the remote client factory but needs a remote cluster set up
// TODO get the endpoint from a system property
if (System.getProperty("tests.rest.cluster") != null) {
ymlFile.withWriterAppend { writer ->
writer.write("\n# Use a remote cluster\n")
writer.write("plugins.flow_framework.remote_metadata_type: RemoteOpenSearch\n")
writer.write("plugins.flow_framework.remote_metadata_endpoint: https://127.0.0.1:9200\n")
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
}
}
} else {
throw new GradleException("opensearch.yml not found at: $ymlFile")
}
}

// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
// use longer timeouts for requests.
def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.remote.metadata.client.impl.SdkClientFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
Expand All @@ -75,22 +77,36 @@
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX;
import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.TENANT_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_THREAD_POOL;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_ENDPOINT;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_REGION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_SERVICE_NAME;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.REMOTE_METADATA_TYPE;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.TASK_REQUEST_RETRY_DURATION;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_ID_FIELD_KEY;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
Expand Down Expand Up @@ -121,9 +137,28 @@
Settings settings = environment.settings();
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, xContentRegistry);
SdkClient sdkClient = SdkClientFactory.createSdkClient(
client,
xContentRegistry,
// Here we assume remote metadata client is only used with tenant awareness.
// This may change in the future allowing more options for this map
FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED.get(settings)
? Map.ofEntries(
Map.entry(REMOTE_METADATA_TYPE_KEY, REMOTE_METADATA_TYPE.get(settings)),
Map.entry(REMOTE_METADATA_ENDPOINT_KEY, REMOTE_METADATA_ENDPOINT.get(settings)),
Map.entry(REMOTE_METADATA_REGION_KEY, REMOTE_METADATA_REGION.get(settings)),
Map.entry(REMOTE_METADATA_SERVICE_NAME_KEY, REMOTE_METADATA_SERVICE_NAME.get(settings)),
Map.entry(TENANT_AWARE_KEY, "true"),
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
Map.entry(TENANT_ID_FIELD_KEY, TENANT_ID_FIELD)

Check warning on line 152 in src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java#L146-L152

Added lines #L146 - L152 were not covered by tests
)
: Collections.emptyMap(),
// TODO: Find a better thread pool or make one
client.threadPool().executor(ThreadPool.Names.GENERIC)
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client, sdkClient, xContentRegistry);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(
client,
sdkClient,
clusterService,
encryptorUtils,
xContentRegistry
Expand All @@ -137,15 +172,22 @@
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool, flowFrameworkSettings);

SearchHandler searchHandler = new SearchHandler(settings, clusterService, client, FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES);
SearchHandler searchHandler = new SearchHandler(
settings,
clusterService,
client,
sdkClient,
FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES
);

return List.of(
workflowStepFactory,
workflowProcessSorter,
encryptorUtils,
flowFrameworkIndicesHandler,
searchHandler,
flowFrameworkSettings
flowFrameworkSettings,
sdkClient
);
}

Expand Down Expand Up @@ -196,7 +238,12 @@
MAX_WORKFLOW_STEPS,
WORKFLOW_REQUEST_TIMEOUT,
TASK_REQUEST_RETRY_DURATION,
FILTER_BY_BACKEND_ROLES
FILTER_BY_BACKEND_ROLES,
FLOW_FRAMEWORK_MULTI_TENANCY_ENABLED,
REMOTE_METADATA_TYPE,
REMOTE_METADATA_ENDPOINT,
REMOTE_METADATA_REGION,
REMOTE_METADATA_SERVICE_NAME
);
}

Expand All @@ -206,21 +253,21 @@
new ScalingExecutorBuilder(
WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
PROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(8, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(5),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_WORKFLOW_THREAD_POOL
),
new ScalingExecutorBuilder(
DEPROVISION_WORKFLOW_THREAD_POOL,
1,
Math.max(2, OpenSearchExecutors.allocatedProcessors(settings) - 1),
Math.max(4, OpenSearchExecutors.allocatedProcessors(settings) - 1),
TimeValue.timeValueMinutes(1),
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + DEPROVISION_WORKFLOW_THREAD_POOL
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
package org.opensearch.flowframework.common;

import org.opensearch.Version;

/**
* Representation of common values that are used across project
*/
Expand Down Expand Up @@ -82,6 +84,10 @@ private CommonValue() {}
public static final String USE_CASE = "use_case";
/** The param name for reprovisioning, used by the create workflow API */
public static final String REPROVISION_WORKFLOW = "reprovision";
/** The REST header containing the tenant id */
public static final String TENANT_ID_HEADER = "x-tenant-id";
/** The field name containing the tenant id */
public static final String TENANT_ID_FIELD = "tenant_id";

/*
* Constants associated with plugin configuration
Expand Down Expand Up @@ -244,4 +250,9 @@ private CommonValue() {}
public static final String ML_COMMONS_API_SPEC_YAML_URI =
"https://raw.githubusercontent.com/opensearch-project/opensearch-api-specification/refs/heads/main/spec/namespaces/ml.yaml";

/*
* Constants associated with non-BWC features
*/
/** Version 2.19.0 */
public static final Version VERSION_2_19_0 = Version.fromString("2.19.0");
}
Loading
Loading