Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flint Index Purging Logic #2372

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public enum Key {
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"),
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit");
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"),
SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"),
RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"),
AUTO_INDEX_MANAGEMENT_ENABLED(
"plugins.query.executionengine.spark.auto_index_management.enabled");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import java.util.function.Function;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -25,11 +25,24 @@

@Getter
@Setter
@AllArgsConstructor
@EqualsAndHashCode
@JsonIgnoreProperties(ignoreUnknown = true)
public class DataSourceMetadata {

public static final String DEFAULT_RESULT_INDEX = "query_execution_result";
public static final int MAX_RESULT_INDEX_NAME_SIZE = 255;
// OS doesn’t allow uppercase: https://tinyurl.com/yse2xdbx
public static final String RESULT_INDEX_NAME_PATTERN = "[a-z0-9_-]+";
public static String INVALID_RESULT_INDEX_NAME_SIZE =
"Result index name size must contains less than "
+ MAX_RESULT_INDEX_NAME_SIZE
+ " characters";
public static String INVALID_CHAR_IN_RESULT_INDEX_NAME =
"Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and"
+ " _(underscore)";
public static String INVALID_RESULT_INDEX_PREFIX =
"Result index must start with " + DEFAULT_RESULT_INDEX;

@JsonProperty private String name;

@JsonProperty private String description;
Expand All @@ -44,18 +57,31 @@ public class DataSourceMetadata {

@JsonProperty private String resultIndex;

public static Function<String, String> DATASOURCE_TO_RESULT_INDEX =
datasourceName -> String.format("%s_%s", DEFAULT_RESULT_INDEX, datasourceName);

public DataSourceMetadata(
String name,
String description,
DataSourceType connector,
List<String> allowedRoles,
Map<String, String> properties,
String resultIndex) {
this.name = name;
String errorMessage = validateCustomResultIndex(resultIndex);
if (errorMessage != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor Nit: can we move this up, In case there is a new revision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will do

throw new IllegalArgumentException(errorMessage);
}
if (resultIndex == null) {
this.resultIndex = fromNameToCustomResultIndex();
} else {
this.resultIndex = resultIndex;
penghuo marked this conversation as resolved.
Show resolved Hide resolved
}

this.connector = connector;
this.description = StringUtils.EMPTY;
this.description = description;
this.properties = properties;
this.allowedRoles = allowedRoles;
this.resultIndex = resultIndex;
}

public DataSourceMetadata() {
Expand All @@ -71,9 +97,56 @@ public DataSourceMetadata() {
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
return new DataSourceMetadata(
DEFAULT_DATASOURCE_NAME,
StringUtils.EMPTY,
DataSourceType.OPENSEARCH,
Collections.emptyList(),
ImmutableMap.of(),
null);
}

public String validateCustomResultIndex(String resultIndex) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
if (resultIndex == null) {
return null;
}
if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) {
return INVALID_RESULT_INDEX_NAME_SIZE;
}
if (!resultIndex.matches(RESULT_INDEX_NAME_PATTERN)) {
return INVALID_CHAR_IN_RESULT_INDEX_NAME;
}
if (resultIndex != null && !resultIndex.startsWith(DEFAULT_RESULT_INDEX)) {
return INVALID_RESULT_INDEX_PREFIX;
}
return null;
}

/**
* Since we are using datasource name to create result index, we need to make sure that the final
* name is valid
*
* @param resultIndex result index name
* @return valid result index name
*/
private String convertToValidResultIndex(String resultIndex) {
// Limit Length
if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) {
resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE);
}

// Pattern Matching: Remove characters that don't match the pattern
StringBuilder validChars = new StringBuilder();
for (char c : resultIndex.toCharArray()) {
if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) {
validChars.append(c);
}
}
return validChars.toString();
}

public String fromNameToCustomResultIndex() {
if (name == null) {
throw new IllegalArgumentException("Datasource name cannot be null");
}
return convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
Expand Down Expand Up @@ -197,6 +198,7 @@ public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSource
ds ->
new DataSourceMetadata(
ds.getName(),
StringUtils.EMPTY,
ds.getConnectorType(),
Collections.emptyList(),
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -62,6 +63,7 @@ void testIterator() {
dataSource ->
new DataSourceMetadata(
dataSource.getName(),
StringUtils.EMPTY,
dataSource.getConnectorType(),
Collections.emptyList(),
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -382,6 +383,7 @@ void testRemovalOfAuthorizationInfo() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand All @@ -407,6 +409,7 @@ void testRemovalOfAuthorizationInfoForAccessKeyAndSecretKye() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand Down Expand Up @@ -434,6 +437,7 @@ void testRemovalOfAuthorizationInfoForGlueWithRoleARN() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testGlue",
StringUtils.EMPTY,
DataSourceType.S3GLUE,
Collections.singletonList("glue_access"),
properties,
Expand Down Expand Up @@ -498,6 +502,7 @@ void testGetRawDataSourceMetadata() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testToDataSourceMetadataFromJson() {
dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS);
dataSourceMetadata.setAllowedRoles(List.of("prometheus_access"));
dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090"));
dataSourceMetadata.setResultIndex("query_execution_result2");
Gson gson = new Gson();
String json = gson.toJson(dataSourceMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -103,6 +104,7 @@ public void updateDataSourceAPITest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"update_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand All @@ -116,6 +118,7 @@ public void updateDataSourceAPITest() {
DataSourceMetadata updateDSM =
new DataSourceMetadata(
"update_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://randomtest.com:9090"),
Expand Down Expand Up @@ -175,6 +178,7 @@ public void deleteDataSourceTest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"delete_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand Down Expand Up @@ -214,6 +218,7 @@ public void getAllDataSourceTest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"get_all_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -44,6 +45,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -44,6 +45,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.opensearch.setting;

import static org.opensearch.common.settings.Settings.EMPTY;
import static org.opensearch.common.unit.TimeValue.timeValueDays;
import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -25,6 +26,7 @@
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.MemorySizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.common.setting.LegacySettings;
import org.opensearch.sql.common.setting.Settings;

Expand Down Expand Up @@ -149,6 +151,27 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> SESSION_INDEX_TTL_SETTING =
Setting.positiveTimeSetting(
Key.SESSION_INDEX_TTL.getKeyValue(),
timeValueDays(14),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> RESULT_INDEX_TTL_SETTING =
Setting.positiveTimeSetting(
Key.RESULT_INDEX_TTL.getKeyValue(),
timeValueDays(60),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> AUTO_INDEX_MANAGEMENT_ENABLED_SETTING =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for both the indices?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Setting.boolSetting(
Key.AUTO_INDEX_MANAGEMENT_ENABLED.getKeyValue(),
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -231,6 +254,24 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_SESSION_LIMIT,
SPARK_EXECUTION_SESSION_LIMIT_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT));
register(
settingBuilder,
clusterSettings,
Key.SESSION_INDEX_TTL,
SESSION_INDEX_TTL_SETTING,
new Updater(Key.SESSION_INDEX_TTL));
register(
settingBuilder,
clusterSettings,
Key.RESULT_INDEX_TTL,
RESULT_INDEX_TTL_SETTING,
new Updater(Key.RESULT_INDEX_TTL));
register(
settingBuilder,
clusterSettings,
Key.AUTO_INDEX_MANAGEMENT_ENABLED,
AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
new Updater(Key.AUTO_INDEX_MANAGEMENT_ENABLED));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -298,6 +339,9 @@ public static List<Setting<?>> pluginSettings() {
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
.add(SESSION_INDEX_TTL_SETTING)
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.build();
}

Expand Down
Loading