diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java index 0f7cfffaad23..806fcfe3c439 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidator.java @@ -19,6 +19,7 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.parser.tree.CreateAsSelect; +import io.confluent.ksql.parser.tree.CreateSource; import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; @@ -44,15 +45,17 @@ public void validate( final Statement statement ) { if (statement instanceof Query) { - validateQueryTopicSources(serviceContext, metaStore, (Query)statement); + validateQuery(serviceContext, metaStore, (Query)statement); } else if (statement instanceof InsertInto) { validateInsertInto(serviceContext, metaStore, (InsertInto)statement); } else if (statement instanceof CreateAsSelect) { validateCreateAsSelect(serviceContext, metaStore, (CreateAsSelect)statement); + } else if (statement instanceof CreateSource) { + validateCreateSource(serviceContext, (CreateSource)statement); } } - private void validateQueryTopicSources( + private void validateQuery( final ServiceContext serviceContext, final MetaStore metaStore, final Query query @@ -78,11 +81,15 @@ private void validateCreateAsSelect( * the target topic using the same ServiceContext used for validation. */ - validateQueryTopicSources(serviceContext, metaStore, createAsSelect.getQuery()); + validateQuery(serviceContext, metaStore, createAsSelect.getQuery()); + } - // At this point, the topic should have been created by the TopicCreateInjector - final String kafkaTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect); - checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE); + private void validateCreateSource( + final ServiceContext serviceContext, + final CreateSource createSource + ) { + final String sourceTopic = createSource.getProperties().getKafkaTopic(); + checkAccess(serviceContext, sourceTopic, AclOperation.READ); } private void validateInsertInto( @@ -96,7 +103,7 @@ private void validateInsertInto( * Validates Write on the target topic, and Read on the query sources topics. */ - validateQueryTopicSources(serviceContext, metaStore, insertInto.getQuery()); + validateQuery(serviceContext, metaStore, insertInto.getQuery()); final String kafkaTopic = getSourceTopicName(metaStore, insertInto.getTarget().getSuffix()); checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE); @@ -131,12 +138,4 @@ private void checkAccess( throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName)); } } - - private String getCreateAsSelectSinkTopic( - final MetaStore metaStore, - final CreateAsSelect createAsSelect - ) { - return createAsSelect.getProperties().getKafkaTopic() - .orElseGet(() -> getSourceTopicName(metaStore, createAsSelect.getName().getSuffix())); - } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 39c4456633bd..20b635d545a5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.ksql.GenericRow; import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.Literal; import io.confluent.ksql.execution.expression.tree.NullLiteral; @@ -60,10 +61,14 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Struct; +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class InsertValuesExecutor { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5); @@ -163,6 +168,17 @@ public void execute( ); producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps()); + } catch (final TopicAuthorizationException e) { + // TopicAuthorizationException does not give much detailed information about why it failed, + // except which topics are denied. Here we just add the ACL to make the error message + // consistent with other authorization error messages. + final Exception rootCause = new KsqlTopicAuthorizationException( + AclOperation.WRITE, + e.unauthorizedTopics() + ); + + throw new KsqlException("Failed to insert values into stream/table: " + + insertValues.getTarget().getSuffix(), rootCause); } catch (final Exception e) { throw new KsqlException("Failed to insert values into stream/table: " + insertValues.getTarget().getSuffix(), e); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java index 7be242c4ca7d..7cc2ee944641 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/AuthorizationTopicAccessValidatorTest.java @@ -306,26 +306,6 @@ public void shouldCreateAsSelectExistingTopicWithWritePermissionsAllowed() { // Above command should not throw any exception } - @Test - public void shouldCreateAsSelectExistingStreamWithoutWritePermissionsDenied() { - // Given: - givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ)); - givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.READ)); - final Statement statement = givenStatement(String.format( - "CREATE STREAM %s AS SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1) - ); - - // Then: - expectedException.expect(KsqlTopicAuthorizationException.class); - expectedException.expectMessage(String.format( - "Authorization denied to Write on topic(s): [%s]", TOPIC_2.name() - )); - - - // When: - accessValidator.validate(serviceContext, metaStore, statement); - } - @Test public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() { // Given: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java index 733bb810af47..d538211ac52f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java @@ -19,6 +19,7 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -63,6 +64,7 @@ import java.math.BigDecimal; import java.math.MathContext; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; @@ -74,6 +76,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.connect.data.Schema; @@ -515,6 +518,30 @@ public void shouldThrowOnSerializingValueError() { executor.execute(statement, engine, serviceContext); } + @Test + public void shouldThrowOnTopicAuthorizationException() { + // Given: + final ConfiguredStatement statement = givenInsertValues( + allFieldNames(SCHEMA), + ImmutableList.of( + new LongLiteral(1L), + new StringLiteral("str"), + new StringLiteral("str"), + new LongLiteral(2L)) + ); + doThrow(new TopicAuthorizationException(Collections.singleton("t1"))) + .when(producer).send(any()); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectCause(hasMessage( + containsString("Authorization denied to Write on topic(s): [t1]")) + ); + + // When: + executor.execute(statement, engine, serviceContext); + } + @Test public void shouldThrowIfRowKeyAndKeyDoNotMatch() { // Given: diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 0f936152b911..ca4080758193 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -113,7 +113,8 @@ public KsqlResource( injectorFactory, ksqlEngine::createSandbox, ksqlConfig, - topicAccessValidator); + topicAccessValidator, + SandboxedServiceContext.create(ksqlEngine.getServiceContext())); this.handler = new RequestHandler( CustomExecutors.EXECUTOR_MAP, new DistributingExecutor( diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java index 89489bef3e81..247d0aed414a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/validation/RequestValidator.java @@ -21,6 +21,8 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.TopicAccessValidator; +import io.confluent.ksql.exception.KsqlTopicAuthorizationException; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.CreateAsSelect; @@ -56,6 +58,7 @@ public class RequestValidator { private final Function snapshotSupplier; private final KsqlConfig ksqlConfig; private final TopicAccessValidator topicAccessValidator; + private final ServiceContext serverServiceContext; /** * @param customValidators a map describing how to validate each statement of type @@ -70,13 +73,17 @@ public RequestValidator( final BiFunction injectorFactory, final Function snapshotSupplier, final KsqlConfig ksqlConfig, - final TopicAccessValidator topicAccessValidator + final TopicAccessValidator topicAccessValidator, + final ServiceContext serverServiceContext ) { this.customValidators = requireNonNull(customValidators, "customValidators"); this.injectorFactory = requireNonNull(injectorFactory, "injectorFactory"); this.snapshotSupplier = requireNonNull(snapshotSupplier, "snapshotSupplier"); this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig"); - this.topicAccessValidator = topicAccessValidator; + this.topicAccessValidator = requireNonNull(topicAccessValidator, "topicAccessValidator"); + this.serverServiceContext = requireSandbox( + requireNonNull(serverServiceContext, "serverServiceContext") + ); } /** @@ -144,11 +151,7 @@ private int validate( } else if (KsqlEngine.isExecutableStatement(configured.getStatement())) { final ConfiguredStatement statementInjected = injector.inject(configured); - topicAccessValidator.validate( - serviceContext, - executionContext.getMetaStore(), - statementInjected.getStatement() - ); + validateTopicPermissions(serviceContext, executionContext.getMetaStore(), statementInjected); executionContext.execute(serviceContext, statementInjected); } else { @@ -180,4 +183,46 @@ private int validateRunScript( return validate(serviceContext, executionContext.parse(sql), statement.getOverrides(), sql); } + /** + * Performs permissions checks on the statement topics. + *

+ * This check verifies the User and the KSQL server principal have the right ACLs permissions + * to access the statement topics. + * + * @param userServiceContext The context of the user executing this command. + * @param executionContext The execution context which contains the KSQL service context + * and the KSQL metastore. + * @param statement The statement that needs to be checked. + */ + private void validateTopicPermissions( + final ServiceContext userServiceContext, + final MetaStore metaStore, + final ConfiguredStatement configuredStatement + ) { + final Statement statement = configuredStatement.getStatement(); + + topicAccessValidator.validate(userServiceContext, metaStore, statement); + + // If these service contexts are different, then KSQL is running in a secured environment + // with authentication and impersonation enabled. + if (userServiceContext != serverServiceContext) { + try { + // Perform a permission check for the KSQL server + topicAccessValidator.validate(serverServiceContext, metaStore, statement); + } catch (final KsqlTopicAuthorizationException e) { + throw new KsqlStatementException( + "The KSQL service principal is not authorized to execute the command: " + + e.getMessage(), + configuredStatement.getStatementText() + ); + } catch (final Exception e) { + throw new KsqlStatementException( + "The KSQL service principal failed to validate the command: " + + e.getMessage(), + configuredStatement.getStatementText(), + e + ); + } + } + } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 5cb0b2e2ce14..bf90f6cf2443 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -1725,6 +1725,7 @@ private void givenMockEngine() { .thenAnswer(invocation -> realEngine.createSandbox(serviceContext).prepare(invocation.getArgument(0))); when(ksqlEngine.createSandbox(any())).thenReturn(sandbox); when(ksqlEngine.getMetaStore()).thenReturn(metaStore); + when(ksqlEngine.getServiceContext()).thenReturn(serviceContext); when(topicInjectorFactory.apply(ksqlEngine)).thenReturn(topicInjector); setUpKsqlResource(); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java index 620b4f9c4faf..ef7d3e046a1b 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/validation/RequestValidatorTest.java @@ -17,10 +17,13 @@ import static io.confluent.ksql.parser.ParserMatchers.configured; import static io.confluent.ksql.parser.ParserMatchers.preparedStatement; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -32,6 +35,7 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.TopicAccessValidator; +import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.MetaStoreImpl; import io.confluent.ksql.metastore.MutableMetaStore; @@ -51,8 +55,10 @@ import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.Sandbox; +import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.kafka.common.acl.AclOperation; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -255,15 +261,14 @@ public void shouldValidateRunScript() { @Test public void shouldThrowIfServiceContextIsNotSandbox() { // Given: - serviceContext = mock(ServiceContext.class); - givenRequestValidator(ImmutableMap.of()); + final ServiceContext otherServiceContext = mock(ServiceContext.class); // Expect: expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Expected sandbox"); // When: - validator.validate(serviceContext, ImmutableList.of(), ImmutableMap.of(), "sql"); + validator.validate(otherServiceContext, ImmutableList.of(), ImmutableMap.of(), "sql"); } @Test @@ -297,6 +302,50 @@ public void shouldExecuteWithSpecifiedServiceContext() { ); } + @Test + public void shouldThrowIfUserServiceContextIsNotAuthorizedToAccessTopics() { + // Given: + final List statements = givenParsed(SOME_STREAM_SQL); + final Statement statement = ksqlEngine.prepare(statements.get(0)).getStatement(); + doThrow(new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton("t1"))) + .when(topicAccessValidator) + .validate( + eq(serviceContext), + any(), + eq(statement) + ); + + // Expect: + expectedException.expect(KsqlTopicAuthorizationException.class); + expectedException.expectMessage("Authorization denied to Read on topic(s): [t1]"); + + // When: + validator.validate(serviceContext, statements, ImmutableMap.of(), SOME_STREAM_SQL); + } + + @Test + public void shouldThrowIfKsqlExecutionContextIsNotAuthorizedToAccessTopics() { + // Given: + final List statements = givenParsed(SOME_STREAM_SQL); + final Statement statement = ksqlEngine.prepare(statements.get(0)).getStatement(); + final ServiceContext userServiceContext = + SandboxedServiceContext.create(TestServiceContext.create()); + + doNothing().when(topicAccessValidator) + .validate(eq(userServiceContext), any(), eq(statement)); + doThrow(new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton("t1"))) + .when(topicAccessValidator) + .validate(eq(serviceContext), any(), eq(statement)); + + // Expect: + expectedException.expect(KsqlStatementException.class); + expectedException.expectMessage("The KSQL service principal is not authorized to " + + "execute the command: Authorization denied to Read on topic(s): [t1]"); + + // When: + validator.validate(userServiceContext, statements, ImmutableMap.of(), SOME_STREAM_SQL); + } + @Test public void shouldCallTopicAccessValidator() { // Given: @@ -327,7 +376,8 @@ private void givenRequestValidator( (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), (sc) -> executionContext, ksqlConfig, - topicAccessValidator + topicAccessValidator, + serviceContext ); }