diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java
index 0f7cfffaad23..806fcfe3c439 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java
@@ -19,6 +19,7 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.CreateAsSelect;
+import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
@@ -44,15 +45,17 @@ public void validate(
final Statement statement
) {
if (statement instanceof Query) {
- validateQueryTopicSources(serviceContext, metaStore, (Query)statement);
+ validateQuery(serviceContext, metaStore, (Query)statement);
} else if (statement instanceof InsertInto) {
validateInsertInto(serviceContext, metaStore, (InsertInto)statement);
} else if (statement instanceof CreateAsSelect) {
validateCreateAsSelect(serviceContext, metaStore, (CreateAsSelect)statement);
+ } else if (statement instanceof CreateSource) {
+ validateCreateSource(serviceContext, (CreateSource)statement);
}
}
- private void validateQueryTopicSources(
+ private void validateQuery(
final ServiceContext serviceContext,
final MetaStore metaStore,
final Query query
@@ -78,11 +81,15 @@ private void validateCreateAsSelect(
* the target topic using the same ServiceContext used for validation.
*/
- validateQueryTopicSources(serviceContext, metaStore, createAsSelect.getQuery());
+ validateQuery(serviceContext, metaStore, createAsSelect.getQuery());
+ }
- // At this point, the topic should have been created by the TopicCreateInjector
- final String kafkaTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect);
- checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE);
+ private void validateCreateSource(
+ final ServiceContext serviceContext,
+ final CreateSource createSource
+ ) {
+ final String sourceTopic = createSource.getProperties().getKafkaTopic();
+ checkAccess(serviceContext, sourceTopic, AclOperation.READ);
}
private void validateInsertInto(
@@ -96,7 +103,7 @@ private void validateInsertInto(
* Validates Write on the target topic, and Read on the query sources topics.
*/
- validateQueryTopicSources(serviceContext, metaStore, insertInto.getQuery());
+ validateQuery(serviceContext, metaStore, insertInto.getQuery());
final String kafkaTopic = getSourceTopicName(metaStore, insertInto.getTarget().getSuffix());
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE);
@@ -131,12 +138,4 @@ private void checkAccess(
throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName));
}
}
-
- private String getCreateAsSelectSinkTopic(
- final MetaStore metaStore,
- final CreateAsSelect createAsSelect
- ) {
- return createAsSelect.getProperties().getKafkaTopic()
- .orElseGet(() -> getSourceTopicName(metaStore, createAsSelect.getName().getSuffix()));
- }
}
diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java
index 39c4456633bd..20b635d545a5 100644
--- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java
+++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java
@@ -20,6 +20,7 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
+import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
@@ -60,10 +61,14 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;
+// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class InsertValuesExecutor {
+ // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling
private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5);
@@ -163,6 +168,17 @@ public void execute(
);
producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps());
+ } catch (final TopicAuthorizationException e) {
+ // TopicAuthorizationException does not give much detailed information about why it failed,
+ // except which topics are denied. Here we just add the ACL to make the error message
+ // consistent with other authorization error messages.
+ final Exception rootCause = new KsqlTopicAuthorizationException(
+ AclOperation.WRITE,
+ e.unauthorizedTopics()
+ );
+
+ throw new KsqlException("Failed to insert values into stream/table: "
+ + insertValues.getTarget().getSuffix(), rootCause);
} catch (final Exception e) {
throw new KsqlException("Failed to insert values into stream/table: "
+ insertValues.getTarget().getSuffix(), e);
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java
index 7be242c4ca7d..7cc2ee944641 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java
@@ -306,26 +306,6 @@ public void shouldCreateAsSelectExistingTopicWithWritePermissionsAllowed() {
// Above command should not throw any exception
}
- @Test
- public void shouldCreateAsSelectExistingStreamWithoutWritePermissionsDenied() {
- // Given:
- givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ));
- givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.READ));
- final Statement statement = givenStatement(String.format(
- "CREATE STREAM %s AS SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1)
- );
-
- // Then:
- expectedException.expect(KsqlTopicAuthorizationException.class);
- expectedException.expectMessage(String.format(
- "Authorization denied to Write on topic(s): [%s]", TOPIC_2.name()
- ));
-
-
- // When:
- accessValidator.validate(serviceContext, metaStore, statement);
- }
-
@Test
public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() {
// Given:
diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java
index 733bb810af47..d538211ac52f 100644
--- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java
+++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java
@@ -19,6 +19,7 @@
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -63,6 +64,7 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -74,6 +76,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Schema;
@@ -515,6 +518,30 @@ public void shouldThrowOnSerializingValueError() {
executor.execute(statement, engine, serviceContext);
}
+ @Test
+ public void shouldThrowOnTopicAuthorizationException() {
+ // Given:
+ final ConfiguredStatement statement = givenInsertValues(
+ allFieldNames(SCHEMA),
+ ImmutableList.of(
+ new LongLiteral(1L),
+ new StringLiteral("str"),
+ new StringLiteral("str"),
+ new LongLiteral(2L))
+ );
+ doThrow(new TopicAuthorizationException(Collections.singleton("t1")))
+ .when(producer).send(any());
+
+ // Expect:
+ expectedException.expect(KsqlException.class);
+ expectedException.expectCause(hasMessage(
+ containsString("Authorization denied to Write on topic(s): [t1]"))
+ );
+
+ // When:
+ executor.execute(statement, engine, serviceContext);
+ }
+
@Test
public void shouldThrowIfRowKeyAndKeyDoNotMatch() {
// Given:
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
index 0f936152b911..ca4080758193 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
@@ -113,7 +113,8 @@ public KsqlResource(
injectorFactory,
ksqlEngine::createSandbox,
ksqlConfig,
- topicAccessValidator);
+ topicAccessValidator,
+ SandboxedServiceContext.create(ksqlEngine.getServiceContext()));
this.handler = new RequestHandler(
CustomExecutors.EXECUTOR_MAP,
new DistributingExecutor(
diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java
index 89489bef3e81..247d0aed414a 100644
--- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java
+++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java
@@ -21,6 +21,8 @@
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.TopicAccessValidator;
+import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
+import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.CreateAsSelect;
@@ -56,6 +58,7 @@ public class RequestValidator {
private final Function snapshotSupplier;
private final KsqlConfig ksqlConfig;
private final TopicAccessValidator topicAccessValidator;
+ private final ServiceContext serverServiceContext;
/**
* @param customValidators a map describing how to validate each statement of type
@@ -70,13 +73,17 @@ public RequestValidator(
final BiFunction injectorFactory,
final Function snapshotSupplier,
final KsqlConfig ksqlConfig,
- final TopicAccessValidator topicAccessValidator
+ final TopicAccessValidator topicAccessValidator,
+ final ServiceContext serverServiceContext
) {
this.customValidators = requireNonNull(customValidators, "customValidators");
this.injectorFactory = requireNonNull(injectorFactory, "injectorFactory");
this.snapshotSupplier = requireNonNull(snapshotSupplier, "snapshotSupplier");
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
- this.topicAccessValidator = topicAccessValidator;
+ this.topicAccessValidator = requireNonNull(topicAccessValidator, "topicAccessValidator");
+ this.serverServiceContext = requireSandbox(
+ requireNonNull(serverServiceContext, "serverServiceContext")
+ );
}
/**
@@ -144,11 +151,7 @@ private int validate(
} else if (KsqlEngine.isExecutableStatement(configured.getStatement())) {
final ConfiguredStatement> statementInjected = injector.inject(configured);
- topicAccessValidator.validate(
- serviceContext,
- executionContext.getMetaStore(),
- statementInjected.getStatement()
- );
+ validateTopicPermissions(serviceContext, executionContext.getMetaStore(), statementInjected);
executionContext.execute(serviceContext, statementInjected);
} else {
@@ -180,4 +183,46 @@ private int validateRunScript(
return validate(serviceContext, executionContext.parse(sql), statement.getOverrides(), sql);
}
+ /**
+ * Performs permissions checks on the statement topics.
+ *
+ * This check verifies the User and the KSQL server principal have the right ACLs permissions
+ * to access the statement topics.
+ *
+ * @param userServiceContext The context of the user executing this command.
+ * @param executionContext The execution context which contains the KSQL service context
+ * and the KSQL metastore.
+ * @param statement The statement that needs to be checked.
+ */
+ private void validateTopicPermissions(
+ final ServiceContext userServiceContext,
+ final MetaStore metaStore,
+ final ConfiguredStatement> configuredStatement
+ ) {
+ final Statement statement = configuredStatement.getStatement();
+
+ topicAccessValidator.validate(userServiceContext, metaStore, statement);
+
+ // If these service contexts are different, then KSQL is running in a secured environment
+ // with authentication and impersonation enabled.
+ if (userServiceContext != serverServiceContext) {
+ try {
+ // Perform a permission check for the KSQL server
+ topicAccessValidator.validate(serverServiceContext, metaStore, statement);
+ } catch (final KsqlTopicAuthorizationException e) {
+ throw new KsqlStatementException(
+ "The KSQL service principal is not authorized to execute the command: "
+ + e.getMessage(),
+ configuredStatement.getStatementText()
+ );
+ } catch (final Exception e) {
+ throw new KsqlStatementException(
+ "The KSQL service principal failed to validate the command: "
+ + e.getMessage(),
+ configuredStatement.getStatementText(),
+ e
+ );
+ }
+ }
+ }
}
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
index 5cb0b2e2ce14..bf90f6cf2443 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java
@@ -1725,6 +1725,7 @@ private void givenMockEngine() {
.thenAnswer(invocation -> realEngine.createSandbox(serviceContext).prepare(invocation.getArgument(0)));
when(ksqlEngine.createSandbox(any())).thenReturn(sandbox);
when(ksqlEngine.getMetaStore()).thenReturn(metaStore);
+ when(ksqlEngine.getServiceContext()).thenReturn(serviceContext);
when(topicInjectorFactory.apply(ksqlEngine)).thenReturn(topicInjector);
setUpKsqlResource();
}
diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java
index 620b4f9c4faf..ef7d3e046a1b 100644
--- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java
+++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java
@@ -17,10 +17,13 @@
import static io.confluent.ksql.parser.ParserMatchers.configured;
import static io.confluent.ksql.parser.ParserMatchers.preparedStatement;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -32,6 +35,7 @@
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.TopicAccessValidator;
+import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.function.InternalFunctionRegistry;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.MutableMetaStore;
@@ -51,8 +55,10 @@
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.Sandbox;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.kafka.common.acl.AclOperation;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -255,15 +261,14 @@ public void shouldValidateRunScript() {
@Test
public void shouldThrowIfServiceContextIsNotSandbox() {
// Given:
- serviceContext = mock(ServiceContext.class);
- givenRequestValidator(ImmutableMap.of());
+ final ServiceContext otherServiceContext = mock(ServiceContext.class);
// Expect:
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Expected sandbox");
// When:
- validator.validate(serviceContext, ImmutableList.of(), ImmutableMap.of(), "sql");
+ validator.validate(otherServiceContext, ImmutableList.of(), ImmutableMap.of(), "sql");
}
@Test
@@ -297,6 +302,50 @@ public void shouldExecuteWithSpecifiedServiceContext() {
);
}
+ @Test
+ public void shouldThrowIfUserServiceContextIsNotAuthorizedToAccessTopics() {
+ // Given:
+ final List statements = givenParsed(SOME_STREAM_SQL);
+ final Statement statement = ksqlEngine.prepare(statements.get(0)).getStatement();
+ doThrow(new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton("t1")))
+ .when(topicAccessValidator)
+ .validate(
+ eq(serviceContext),
+ any(),
+ eq(statement)
+ );
+
+ // Expect:
+ expectedException.expect(KsqlTopicAuthorizationException.class);
+ expectedException.expectMessage("Authorization denied to Read on topic(s): [t1]");
+
+ // When:
+ validator.validate(serviceContext, statements, ImmutableMap.of(), SOME_STREAM_SQL);
+ }
+
+ @Test
+ public void shouldThrowIfKsqlExecutionContextIsNotAuthorizedToAccessTopics() {
+ // Given:
+ final List statements = givenParsed(SOME_STREAM_SQL);
+ final Statement statement = ksqlEngine.prepare(statements.get(0)).getStatement();
+ final ServiceContext userServiceContext =
+ SandboxedServiceContext.create(TestServiceContext.create());
+
+ doNothing().when(topicAccessValidator)
+ .validate(eq(userServiceContext), any(), eq(statement));
+ doThrow(new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton("t1")))
+ .when(topicAccessValidator)
+ .validate(eq(serviceContext), any(), eq(statement));
+
+ // Expect:
+ expectedException.expect(KsqlStatementException.class);
+ expectedException.expectMessage("The KSQL service principal is not authorized to "
+ + "execute the command: Authorization denied to Read on topic(s): [t1]");
+
+ // When:
+ validator.validate(userServiceContext, statements, ImmutableMap.of(), SOME_STREAM_SQL);
+ }
+
@Test
public void shouldCallTopicAccessValidator() {
// Given:
@@ -327,7 +376,8 @@ private void givenRequestValidator(
(ec, sc) -> InjectorChain.of(schemaInjector, topicInjector),
(sc) -> executionContext,
ksqlConfig,
- topicAccessValidator
+ topicAccessValidator,
+ serviceContext
);
}