Skip to content

Commit

Permalink
Add missing tags and MV support (#2336)
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar authored Oct 23, 2023
1 parent b30d3c9 commit 877c9c3
Show file tree
Hide file tree
Showing 16 changed files with 460 additions and 187 deletions.
2 changes: 1 addition & 1 deletion common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ repositories {
dependencies {
api "org.antlr:antlr4-runtime:4.7.1"
api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.21.0'
api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}"
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
Expand Down
2 changes: 1 addition & 1 deletion integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ dependencies {
testImplementation group: 'org.opensearch.client', name: 'opensearch-rest-client', version: "${opensearch_version}"
testImplementation group: 'org.opensearch.driver', name: 'opensearch-sql-jdbc', version: System.getProperty("jdbcDriverVersion", '1.2.0.0')
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: '2.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.21.0'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}"
testImplementation project(':opensearch-sql-plugin')
testImplementation project(':legacy')
testImplementation('org.junit.jupiter:junit-jupiter-api:5.6.2')
Expand Down
2 changes: 1 addition & 1 deletion ppl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dependencies {
implementation "org.antlr:antlr4-runtime:4.7.1"
implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
api group: 'org.json', name: 'json', version: '20231013'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.21.0'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}"
api project(':common')
api project(':core')
api project(':protocol')
Expand Down
34 changes: 34 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ singleStatement
statement
: skippingIndexStatement
| coveringIndexStatement
| materializedViewStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -76,6 +77,39 @@ dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
;

createMaterializedViewStatement
: CREATE MATERIALIZED VIEW (IF NOT EXISTS)? mvName=multipartIdentifier
AS query=materializedViewQuery
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;

describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;

/*
* Match all remaining tokens in non-greedy way
* so WITH clause won't be captured by this rule.
*/
materializedViewQuery
: .+?
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
5 changes: 5 additions & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,26 @@ COMMA: ',';
DOT: '.';


AS: 'AS';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
IN: 'IN';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
MATERIALIZED: 'MATERIALIZED';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WITH: 'WITH';


Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ PIPE: '|';
CONCAT_PIPE: '||';
HAT: '^';
COLON: ':';
DOUBLE_COLON: '::';
ARROW: '->';
FAT_ARROW : '=>';
HENT_START: '/*+';
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,7 @@ primaryExpression
| CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase
| CASE value=expression whenClause+ (ELSE elseExpression=expression)? END #simpleCase
| name=(CAST | TRY_CAST) LEFT_PAREN expression AS dataType RIGHT_PAREN #cast
| primaryExpression DOUBLE_COLON dataType #castByColon
| STRUCT LEFT_PAREN (argument+=namedExpression (COMMA argument+=namedExpression)*)? RIGHT_PAREN #struct
| FIRST LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #first
| ANY_VALUE LEFT_PAREN expression (IGNORE NULLS)? RIGHT_PAREN #any_value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ public class SparkConstants {
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
// TODO should be replaced with mvn jar.
public static final String FLINT_CATALOG_JAR =
"s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.FullyQualifiedTableName;
import org.opensearch.sql.spark.dispatcher.model.IndexDetails;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.session.CreateSessionRequest;
import org.opensearch.sql.spark.execution.session.Session;
import org.opensearch.sql.spark.execution.session.SessionId;
Expand All @@ -59,9 +59,8 @@ public class SparkQueryDispatcher {

public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String SCHEMA_TAG_KEY = "schema";
public static final String TABLE_TAG_KEY = "table";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";
public static final String JOB_TYPE_TAG_KEY = "job_type";

private EMRServerlessClient emrServerlessClient;

Expand Down Expand Up @@ -111,6 +110,8 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) {
IndexDetails indexDetails =
SQLQueryUtils.extractIndexDetails(dispatchQueryRequest.getQuery());
fillMissingDetails(dispatchQueryRequest, indexDetails);

if (indexDetails.isDropIndex()) {
return handleDropIndexQuery(dispatchQueryRequest, indexDetails);
} else {
Expand All @@ -121,17 +122,29 @@ private DispatchQueryResponse handleSQLQuery(DispatchQueryRequest dispatchQueryR
}
}

// TODO: Revisit this logic.
// Currently, Spark if datasource is not provided in query.
// Spark Assumes the datasource to be catalog.
// This is required to handle drop index case properly when datasource name is not provided.
private static void fillMissingDetails(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
if (indexDetails.getFullyQualifiedTableName() != null
&& indexDetails.getFullyQualifiedTableName().getDatasourceName() == null) {
indexDetails
.getFullyQualifiedTableName()
.setDatasourceName(dispatchQueryRequest.getDatasource());
}
}

private DispatchQueryResponse handleIndexQuery(
DispatchQueryRequest dispatchQueryRequest, IndexDetails indexDetails) {
FullyQualifiedTableName fullyQualifiedTableName = indexDetails.getFullyQualifiedTableName();
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource());
dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
Map<String, String> tags = getDefaultTagsForJobSubmission(dispatchQueryRequest);
tags.put(INDEX_TAG_KEY, indexDetails.getIndexName());
tags.put(TABLE_TAG_KEY, fullyQualifiedTableName.getTableName());
tags.put(SCHEMA_TAG_KEY, fullyQualifiedTableName.getSchemaName());
tags.put(INDEX_TAG_KEY, indexDetails.openSearchIndexName());
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
Expand All @@ -142,12 +155,12 @@ private DispatchQueryResponse handleIndexQuery(
.dataSource(
dataSourceService.getRawDataSourceMetadata(
dispatchQueryRequest.getDatasource()))
.structuredStreaming(indexDetails.getAutoRefresh())
.structuredStreaming(indexDetails.isAutoRefresh())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
tags,
indexDetails.getAutoRefresh(),
indexDetails.isAutoRefresh(),
dataSourceMetadata.getResultIndex());
String jobId = emrServerlessClient.startJobRun(startJobRequest);
return new DispatchQueryResponse(
Expand Down Expand Up @@ -178,6 +191,7 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ
session = createdSession.get();
} else {
// create session if not exist
tags.put(JOB_TYPE_TAG_KEY, JobType.INTERACTIVE.getText());
session =
sessionManager.createSession(
new CreateSessionRequest(
Expand All @@ -204,6 +218,7 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ
dataSourceMetadata.getResultIndex(),
session.getSessionId().getSessionId());
} else {
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,129 @@

package org.opensearch.sql.spark.dispatcher.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import com.google.common.base.Preconditions;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.spark.flint.FlintIndexType;

/** Index details in an async query. */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@EqualsAndHashCode
public class IndexDetails {

public static final String STRIP_CHARS = "`";

private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
private Boolean autoRefresh = false;
private boolean autoRefresh;
private boolean isDropIndex;
// materialized view special case where
// table name and mv name are combined.
private String mvName;
private FlintIndexType indexType;

private IndexDetails() {}

public static IndexDetailsBuilder builder() {
return new IndexDetailsBuilder();
}

// Builder class
public static class IndexDetailsBuilder {
private final IndexDetails indexDetails;

public IndexDetailsBuilder() {
indexDetails = new IndexDetails();
}

public IndexDetailsBuilder indexName(String indexName) {
indexDetails.indexName = indexName;
return this;
}

public IndexDetailsBuilder fullyQualifiedTableName(FullyQualifiedTableName tableName) {
indexDetails.fullyQualifiedTableName = tableName;
return this;
}

public IndexDetailsBuilder autoRefresh(Boolean autoRefresh) {
indexDetails.autoRefresh = autoRefresh;
return this;
}

public IndexDetailsBuilder isDropIndex(boolean isDropIndex) {
indexDetails.isDropIndex = isDropIndex;
return this;
}

public IndexDetailsBuilder mvName(String mvName) {
indexDetails.mvName = mvName;
return this;
}

public IndexDetailsBuilder indexType(FlintIndexType indexType) {
indexDetails.indexType = indexType;
return this;
}

public IndexDetails build() {
Preconditions.checkNotNull(indexDetails.indexType, "Index Type can't be null");
switch (indexDetails.indexType) {
case COVERING:
Preconditions.checkNotNull(
indexDetails.indexName, "IndexName can't be null for Covering Index.");
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Covering Index.");
break;
case SKIPPING:
Preconditions.checkNotNull(
indexDetails.fullyQualifiedTableName, "TableName can't be null for Skipping Index.");
break;
case MATERIALIZED_VIEW:
Preconditions.checkNotNull(indexDetails.mvName, "Materialized view name can't be null");
break;
}

return indexDetails;
}
}

public String openSearchIndexName() {
FullyQualifiedTableName fullyQualifiedTableName = getFullyQualifiedTableName();
if (FlintIndexType.SKIPPING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else if (FlintIndexType.COVERING.equals(getIndexType())) {
String indexName =
"flint"
+ "_"
+ fullyQualifiedTableName.getDatasourceName()
+ "_"
+ fullyQualifiedTableName.getSchemaName()
+ "_"
+ fullyQualifiedTableName.getTableName()
+ "_"
+ getIndexName()
+ "_"
+ getIndexType().getSuffix();
return indexName.toLowerCase();
} else {
throw new UnsupportedOperationException(
String.format("Unsupported Index Type : %s", getIndexType()));
String indexName = StringUtils.EMPTY;
switch (getIndexType()) {
case COVERING:
indexName =
"flint"
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(getIndexName(), STRIP_CHARS)
+ "_"
+ getIndexType().getSuffix();
break;
case SKIPPING:
indexName =
"flint"
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS)
+ "_"
+ getIndexType().getSuffix();
break;
case MATERIALIZED_VIEW:
indexName = "flint" + "_" + StringUtils.strip(getMvName(), STRIP_CHARS).toLowerCase();
break;
}
return indexName.toLowerCase();
}
}
Loading

0 comments on commit 877c9c3

Please sign in to comment.