Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: perform topic permission checks for KSQL service principal #3224

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.Statement;
Expand All @@ -44,15 +45,17 @@ public void validate(
final Statement statement
) {
if (statement instanceof Query) {
validateQueryTopicSources(serviceContext, metaStore, (Query)statement);
validateQuery(serviceContext, metaStore, (Query)statement);
} else if (statement instanceof InsertInto) {
validateInsertInto(serviceContext, metaStore, (InsertInto)statement);
} else if (statement instanceof CreateAsSelect) {
validateCreateAsSelect(serviceContext, metaStore, (CreateAsSelect)statement);
} else if (statement instanceof CreateSource) {
validateCreateSource(serviceContext, (CreateSource)statement);
}
}

private void validateQueryTopicSources(
private void validateQuery(
final ServiceContext serviceContext,
final MetaStore metaStore,
final Query query
Expand All @@ -78,11 +81,15 @@ private void validateCreateAsSelect(
* the target topic using the same ServiceContext used for validation.
*/

validateQueryTopicSources(serviceContext, metaStore, createAsSelect.getQuery());
validateQuery(serviceContext, metaStore, createAsSelect.getQuery());
}

// At this point, the topic should have been created by the TopicCreateInjector
final String kafkaTopic = getCreateAsSelectSinkTopic(metaStore, createAsSelect);
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE);
private void validateCreateSource(
final ServiceContext serviceContext,
final CreateSource createSource
) {
final String sourceTopic = createSource.getProperties().getKafkaTopic();
checkAccess(serviceContext, sourceTopic, AclOperation.READ);
}

private void validateInsertInto(
Expand All @@ -96,7 +103,7 @@ private void validateInsertInto(
* Validates Write on the target topic, and Read on the query sources topics.
*/

validateQueryTopicSources(serviceContext, metaStore, insertInto.getQuery());
validateQuery(serviceContext, metaStore, insertInto.getQuery());

final String kafkaTopic = getSourceTopicName(metaStore, insertInto.getTarget().getSuffix());
checkAccess(serviceContext, kafkaTopic, AclOperation.WRITE);
Expand Down Expand Up @@ -131,12 +138,4 @@ private void checkAccess(
throw new KsqlTopicAuthorizationException(operation, Collections.singleton(topicName));
}
}

private String getCreateAsSelectSinkTopic(
final MetaStore metaStore,
final CreateAsSelect createAsSelect
) {
return createAsSelect.getProperties().getKafkaTopic()
.orElseGet(() -> getSourceTopicName(metaStore, createAsSelect.getName().getSuffix()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
Expand Down Expand Up @@ -60,10 +61,14 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class InsertValuesExecutor {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5);

Expand Down Expand Up @@ -163,6 +168,17 @@ public void execute(
);

producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps());
} catch (final TopicAuthorizationException e) {
// TopicAuthorizationException does not give much detailed information about why it failed,
// except which topics are denied. Here we just add the ACL to make the error message
// consistent with other authorization error messages.
final Exception rootCause = new KsqlTopicAuthorizationException(
AclOperation.WRITE,
e.unauthorizedTopics()
);

throw new KsqlException("Failed to insert values into stream/table: "
+ insertValues.getTarget().getSuffix(), rootCause);
} catch (final Exception e) {
throw new KsqlException("Failed to insert values into stream/table: "
+ insertValues.getTarget().getSuffix(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,26 +306,6 @@ public void shouldCreateAsSelectExistingTopicWithWritePermissionsAllowed() {
// Above command should not throw any exception
}

@Test
public void shouldCreateAsSelectExistingStreamWithoutWritePermissionsDenied() {
// Given:
givenTopicPermissions(TOPIC_1, Collections.singleton(AclOperation.READ));
givenTopicPermissions(TOPIC_2, Collections.singleton(AclOperation.READ));
final Statement statement = givenStatement(String.format(
"CREATE STREAM %s AS SELECT * FROM %s;", STREAM_TOPIC_2, STREAM_TOPIC_1)
);

// Then:
expectedException.expect(KsqlTopicAuthorizationException.class);
expectedException.expectMessage(String.format(
"Authorization denied to Write on topic(s): [%s]", TOPIC_2.name()
));


// When:
accessValidator.validate(serviceContext, metaStore, statement);
}

@Test
public void shouldCreateAsSelectWithTopicAndWritePermissionsAllowed() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -74,6 +76,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -515,6 +518,30 @@ public void shouldThrowOnSerializingValueError() {
executor.execute(statement, engine, serviceContext);
}

@Test
public void shouldThrowOnTopicAuthorizationException() {
// Given:
final ConfiguredStatement<InsertValues> statement = givenInsertValues(
allFieldNames(SCHEMA),
ImmutableList.of(
new LongLiteral(1L),
new StringLiteral("str"),
new StringLiteral("str"),
new LongLiteral(2L))
);
doThrow(new TopicAuthorizationException(Collections.singleton("t1")))
.when(producer).send(any());

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectCause(hasMessage(
containsString("Authorization denied to Write on topic(s): [t1]"))
);

// When:
executor.execute(statement, engine, serviceContext);
}

@Test
public void shouldThrowIfRowKeyAndKeyDoNotMatch() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public KsqlResource(
injectorFactory,
ksqlEngine::createSandbox,
ksqlConfig,
topicAccessValidator);
topicAccessValidator,
SandboxedServiceContext.create(ksqlEngine.getServiceContext()));
this.handler = new RequestHandler(
CustomExecutors.EXECUTOR_MAP,
new DistributingExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.TopicAccessValidator;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.CreateAsSelect;
Expand Down Expand Up @@ -56,6 +58,7 @@ public class RequestValidator {
private final Function<ServiceContext, KsqlExecutionContext> snapshotSupplier;
private final KsqlConfig ksqlConfig;
private final TopicAccessValidator topicAccessValidator;
private final ServiceContext serverServiceContext;

/**
* @param customValidators a map describing how to validate each statement of type
Expand All @@ -70,13 +73,17 @@ public RequestValidator(
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final Function<ServiceContext, KsqlExecutionContext> snapshotSupplier,
final KsqlConfig ksqlConfig,
final TopicAccessValidator topicAccessValidator
final TopicAccessValidator topicAccessValidator,
final ServiceContext serverServiceContext
) {
this.customValidators = requireNonNull(customValidators, "customValidators");
this.injectorFactory = requireNonNull(injectorFactory, "injectorFactory");
this.snapshotSupplier = requireNonNull(snapshotSupplier, "snapshotSupplier");
this.ksqlConfig = requireNonNull(ksqlConfig, "ksqlConfig");
this.topicAccessValidator = topicAccessValidator;
this.topicAccessValidator = requireNonNull(topicAccessValidator, "topicAccessValidator");
this.serverServiceContext = requireSandbox(
requireNonNull(serverServiceContext, "serverServiceContext")
);
}

/**
Expand Down Expand Up @@ -144,11 +151,7 @@ private <T extends Statement> int validate(
} else if (KsqlEngine.isExecutableStatement(configured.getStatement())) {
final ConfiguredStatement<?> statementInjected = injector.inject(configured);

topicAccessValidator.validate(
serviceContext,
executionContext.getMetaStore(),
statementInjected.getStatement()
);
validateTopicPermissions(serviceContext, executionContext.getMetaStore(), statementInjected);

executionContext.execute(serviceContext, statementInjected);
} else {
Expand Down Expand Up @@ -180,4 +183,46 @@ private int validateRunScript(
return validate(serviceContext, executionContext.parse(sql), statement.getOverrides(), sql);
}

/**
* Performs permissions checks on the statement topics.
* </p>
* This check verifies the User and the KSQL server principal have the right ACLs permissions
* to access the statement topics.
*
* @param userServiceContext The context of the user executing this command.
* @param executionContext The execution context which contains the KSQL service context
* and the KSQL metastore.
* @param statement The statement that needs to be checked.
*/
private void validateTopicPermissions(
final ServiceContext userServiceContext,
final MetaStore metaStore,
final ConfiguredStatement<?> configuredStatement
) {
final Statement statement = configuredStatement.getStatement();

topicAccessValidator.validate(userServiceContext, metaStore, statement);

// If these service contexts are different, then KSQL is running in a secured environment
// with authentication and impersonation enabled.
if (userServiceContext != serverServiceContext) {
try {
// Perform a permission check for the KSQL server
topicAccessValidator.validate(serverServiceContext, metaStore, statement);
} catch (final KsqlTopicAuthorizationException e) {
throw new KsqlStatementException(
"The KSQL service principal is not authorized to execute the command: "
+ e.getMessage(),
configuredStatement.getStatementText()
);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to append the e.getMessage() here because KSQL does not print the root cause message when a KsqlStatementException is detected. I didn't want to fix that because I don't know the reason of not including the whole stack. Better fix it later.

} catch (final Exception e) {
throw new KsqlStatementException(
"The KSQL service principal failed to validate the command: "
+ e.getMessage(),
configuredStatement.getStatementText(),
e
);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,7 @@ private void givenMockEngine() {
.thenAnswer(invocation -> realEngine.createSandbox(serviceContext).prepare(invocation.getArgument(0)));
when(ksqlEngine.createSandbox(any())).thenReturn(sandbox);
when(ksqlEngine.getMetaStore()).thenReturn(metaStore);
when(ksqlEngine.getServiceContext()).thenReturn(serviceContext);
when(topicInjectorFactory.apply(ksqlEngine)).thenReturn(topicInjector);
setUpKsqlResource();
}
Expand Down
Loading