diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index bed8fa4ba04c..41bddf7ea410 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -478,12 +478,12 @@ static KsqlRestApplication buildApplication( final String commandTopicName = KsqlInternalTopicUtils.getTopicName( ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX); - final Map consumerConfigs = restConfig.getCommandConsumerProperties(); - consumerConfigs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + final Map commandConsumerConfigs = restConfig.getCommandConsumerProperties(); + commandConsumerConfigs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final CommandStore commandStore = CommandStore.Factory.create( commandTopicName, - consumerConfigs); + commandConsumerConfigs); final StatementExecutor statementExecutor = new StatementExecutor(serviceContext, ksqlEngine, hybridQueryIdGenerator); @@ -530,7 +530,7 @@ static KsqlRestApplication buildApplication( commandTopicName, ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), commandRunner, - consumerConfigs, + commandConsumerConfigs, restConfig.getCommandProducerProperties() ); @@ -576,7 +576,7 @@ static KsqlRestApplication buildApplication( preconditions, configurables, rocksDBConfigSetterHandler, - transactionalProducerFactory + transactionalProducerFactory ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/TransactionalProducerImpl.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/TransactionalProducerImpl.java index 395a5916e9bd..e53d82edb7fd 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/TransactionalProducerImpl.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/TransactionalProducerImpl.java @@ -77,6 +77,8 @@ public TransactionalProducerImpl( ); this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName"); this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner"); + + initialize(); } @VisibleForTesting @@ -96,12 +98,15 @@ public TransactionalProducerImpl( this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner"); } + private void initialize() { + commandConsumer.assign(Collections.singleton(commandTopicPartition)); + commandProducer.initTransactions(); + } /** begins transaction */ public void begin() { - commandConsumer.assign(Collections.singleton(commandTopicPartition)); - commandProducer.initTransactions(); commandProducer.beginTransaction(); + waitForConsumer(); } public void waitForConsumer() { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java index 05ba5c580a53..0e9222320c92 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java @@ -44,11 +44,6 @@ QueuedCommandStatus enqueueCommand( ConfiguredStatement statement, TransactionalProducer transactionalProducer ); - - /** - * Documentation on this - */ - // TransactionalProducer createTransactionalProducer(); /** * Polls the Queue for any commands that have been enqueued since the last diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java index 5861b6163046..a281c8831415 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java @@ -149,7 +149,7 @@ public void processPriorCommands() { void fetchAndRunCommands() { final List commands = commandStore.getNewCommands(NEW_CMDS_TIMEOUT); if (commands.isEmpty()) { - completeSatisfiedSequenceNumberFutures(); + completeOffsetProccessedFutures(); return; } final Optional terminateCmd = findTerminateCommand(commands); @@ -166,7 +166,7 @@ void fetchAndRunCommands() { executeStatement(command); } - completeSatisfiedSequenceNumberFutures(); + completeOffsetProccessedFutures(); } private void executeStatement(final QueuedCommand queuedCommand) { @@ -234,7 +234,7 @@ public void ensureProcessedPastOffset(final long seqNum, final Duration timeout) } } - private void completeSatisfiedSequenceNumberFutures() { + private void completeOffsetProccessedFutures() { offsetProcessedFutureStore.completeFuturesUpToAndIncludingSequenceNumber( commandStore.getConsumerPosition() - 1); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 7e1e23fd99c4..11afe7168043 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -16,17 +16,21 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.parser.KsqlParser; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.server.TransactionalProducer; +import io.confluent.ksql.rest.server.validation.RequestValidator; import io.confluent.ksql.security.KsqlAuthorizationValidator; +import io.confluent.ksql.services.SandboxedServiceContext; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.statement.Injector; import io.confluent.ksql.util.KsqlServerException; import java.time.Duration; +import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -44,12 +48,14 @@ public class DistributingExecutor { private final Duration distributedCmdResponseTimeout; private final BiFunction injectorFactory; private final KsqlAuthorizationValidator authorizationValidator; + private final RequestValidator requestValidator; public DistributingExecutor( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final BiFunction injectorFactory, - final KsqlAuthorizationValidator authorizationValidator + final KsqlAuthorizationValidator authorizationValidator, + final RequestValidator requestValidator ) { this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); this.distributedCmdResponseTimeout = @@ -57,11 +63,15 @@ public DistributingExecutor( this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory"); this.authorizationValidator = Objects.requireNonNull(authorizationValidator, "authorizationValidator"); + this.requestValidator = + Objects.requireNonNull(requestValidator, "requestValidator"); } public Optional execute( final ConfiguredStatement statement, + final KsqlParser.ParsedStatement parsedStatement, final Map mutableScopedProperties, + final String sql, final KsqlExecutionContext executionContext, final ServiceContext serviceContext, final TransactionalProducer transactionalProducer @@ -73,9 +83,19 @@ public Optional execute( checkAuthorization(injected, serviceContext, executionContext); try { + transactionalProducer.begin(); + + requestValidator.validate( + SandboxedServiceContext.create(serviceContext), + Collections.singletonList(parsedStatement), + mutableScopedProperties, + sql + ); + final QueuedCommandStatus queuedCommandStatus = commandQueue.enqueueCommand(injected, transactionalProducer); + transactionalProducer.commit(); final CommandStatus commandStatus = queuedCommandStatus .tryWaitForFinalStatus(distributedCmdResponseTimeout); @@ -86,6 +106,7 @@ public Optional execute( queuedCommandStatus.getCommandSequenceNumber() )); } catch (final Exception e) { + transactionalProducer.abort(); throw new KsqlServerException(String.format( "Could not write the statement '%s' into the command topic: " + e.getMessage(), statement.getStatementText()), e); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java index 72c708fed308..cc9e5b84c02d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java @@ -75,6 +75,7 @@ public KsqlEntityList execute( final ServiceContext serviceContext, final List statements, final Map propertyOverrides, + final String sql, final TransactionalProducer transactionalProducer ) { final Map scopedPropertyOverrides = new HashMap<>(propertyOverrides); @@ -86,7 +87,7 @@ public KsqlEntityList execute( serviceContext, prepared, propertyOverrides, - transactionalProducer + transactionalProducer ); if (!result.isEmpty()) { // This is to maintain backwards compatibility until we deprecate @@ -100,7 +101,9 @@ public KsqlEntityList execute( executeStatement( serviceContext, configured, + parsed, scopedPropertyOverrides, + sql, entities, transactionalProducer ).ifPresent(entities::add); @@ -113,20 +116,22 @@ public KsqlEntityList execute( private Optional executeStatement( final ServiceContext serviceContext, final ConfiguredStatement configured, + final ParsedStatement parsed, final Map mutableScopedProperties, + final String sql, final KsqlEntityList entities, final TransactionalProducer transactionalProducer ) { final Class statementClass = configured.getStatement().getClass(); - // currently this needs to be commented out since this check will fail since the commandRunner consumer - // can't poll the non-comitted records. - // commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass); + // this could probably be removed since we waitForConsumer() + // before moving on to the next statement + commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass); final StatementExecutor executor = (StatementExecutor) customExecutors.getOrDefault(statementClass, (stmt, props, ctx, svcCtx) -> - distributor.execute(stmt, props, ctx, svcCtx, transactionalProducer)); + distributor.execute(stmt, parsed, props, sql, ctx, svcCtx, transactionalProducer)); return executor.execute( configured, @@ -154,6 +159,7 @@ private KsqlEntityList executeRunScript( serviceContext, ksqlEngine.parse(sql), propertyOverrides, + sql, transactionalProducer ); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index 314642384df3..43353f209394 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -31,8 +31,6 @@ import io.confluent.ksql.parser.tree.UnsetProperty; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.ClusterTerminateRequest; -import io.confluent.ksql.rest.entity.CommandStatusEntity; -import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlRequest; import io.confluent.ksql.rest.entity.Versions; @@ -167,7 +165,8 @@ public void configure(final KsqlConfig config) { commandQueue, distributedCmdResponseTimeout, injectorFactory, - authorizationValidator + authorizationValidator, + this.validator ), ksqlEngine, config, @@ -196,6 +195,7 @@ public Response terminateCluster( serviceContext, TERMINATE_CLUSTER, request.getStreamsProperties(), + request.toString(), transactionalProducerFactory.createProducerTransactionManager() ) ).build(); @@ -240,36 +240,26 @@ public Response handleKsqlStatements( private KsqlEntityList handleTransactionalProduce( final ServiceContext serviceContext, final KsqlRequest request - ) { + ) throws InterruptedException { final TransactionalProducer transactionalProducer = transactionalProducerFactory.createProducerTransactionManager(); - transactionalProducer.begin(); final List statements = ksqlEngine.parse(request.getKsql()); - final KsqlEntityList entities; - - try { - transactionalProducer.waitForConsumer(); - - validator.validate( - SandboxedServiceContext.create(serviceContext), - statements, - request.getStreamsProperties(), - request.getKsql() - ); - entities = handler.execute( - serviceContext, - statements, - request.getStreamsProperties(), - transactionalProducer - ); + validator.validate( + SandboxedServiceContext.create(serviceContext), + statements, + request.getStreamsProperties(), + request.getKsql() + ); - transactionalProducer.commit(); - } catch (Exception e) { - transactionalProducer.abort(); - LOG.error("Aborted transactional produce for: " + request); - throw e; - } + final KsqlEntityList entities = handler.execute( + serviceContext, + statements, + request.getStreamsProperties(), + request.getKsql(), + transactionalProducer + ); + transactionalProducer.close(); return entities; } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index f98c269d9ca8..333b31a47c06 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -30,6 +30,7 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.parser.KsqlParser; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.ListProperties; import io.confluent.ksql.parser.tree.Statement; @@ -40,6 +41,7 @@ import io.confluent.ksql.rest.entity.CommandStatus.Status; import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.server.TransactionalProducer; +import io.confluent.ksql.rest.server.validation.RequestValidator; import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; @@ -85,6 +87,8 @@ public class DistributingExecutorTest { @Mock KsqlAuthorizationValidator authorizationValidator; @Mock KsqlExecutionContext executionContext; @Mock MetaStore metaStore; + @Mock RequestValidator requestValidator; + @Mock KsqlParser.ParsedStatement parsedStatement; @Mock TransactionalProducer transactionalProducer; @@ -107,14 +111,15 @@ public void setUp() throws InterruptedException { queue, DURATION_10_MS, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), - authorizationValidator + authorizationValidator, + requestValidator ); } @Test public void shouldEnqueueSuccessfulCommand() throws InterruptedException { // When: - distributor.execute(EMPTY_STATEMENT, ImmutableMap.of(), executionContext, serviceContext, transactionalProducer); + distributor.execute(EMPTY_STATEMENT, parsedStatement, ImmutableMap.of(), "", executionContext, serviceContext, transactionalProducer); // Then: verify(queue, times(1)).enqueueCommand(eq(EMPTY_STATEMENT), any()); @@ -123,7 +128,7 @@ public void shouldEnqueueSuccessfulCommand() throws InterruptedException { @Test public void shouldInferSchemas() { // When: - distributor.execute(EMPTY_STATEMENT, ImmutableMap.of(), executionContext, serviceContext, transactionalProducer); + distributor.execute(EMPTY_STATEMENT, parsedStatement, ImmutableMap.of(), "", executionContext, serviceContext, transactionalProducer); // Then: verify(schemaInjector, times(1)).inject(eq(EMPTY_STATEMENT)); @@ -135,7 +140,9 @@ public void shouldReturnCommandStatus() { final CommandStatusEntity commandStatusEntity = (CommandStatusEntity) distributor.execute( EMPTY_STATEMENT, + parsedStatement, ImmutableMap.of(), + "", executionContext, serviceContext, transactionalProducer @@ -170,7 +177,7 @@ public void shouldThrowExceptionOnFailureToEnqueue() { expectedException.expectCause(is(cause)); // When: - distributor.execute(configured, ImmutableMap.of(), executionContext, serviceContext, transactionalProducer); + distributor.execute(configured, parsedStatement, ImmutableMap.of(),"", executionContext, serviceContext, transactionalProducer); } @Test @@ -187,7 +194,7 @@ public void shouldThrowFailureIfCannotInferSchema() { expectedException.expectMessage("Could not infer!"); // When: - distributor.execute(configured, ImmutableMap.of(), executionContext, serviceContext, transactionalProducer); + distributor.execute(configured, parsedStatement, ImmutableMap.of(), "", executionContext, serviceContext, transactionalProducer); } @Test @@ -205,7 +212,7 @@ public void shouldThrowExceptionIfUserServiceContextIsDeniedAuthorization() { expectedException.expect(KsqlTopicAuthorizationException.class); // When: - distributor.execute(configured, ImmutableMap.of(), executionContext, userServiceContext, transactionalProducer); + distributor.execute(configured, parsedStatement, ImmutableMap.of(), "", executionContext, userServiceContext, transactionalProducer); } @Test @@ -224,6 +231,6 @@ public void shouldThrowServerExceptionIfServerServiceContextIsDeniedAuthorizatio expectedException.expectCause(is(instanceOf(KsqlTopicAuthorizationException.class))); // When: - distributor.execute(configured, ImmutableMap.of(), executionContext, userServiceContext, transactionalProducer); + distributor.execute(configured, parsedStatement, ImmutableMap.of(), "", executionContext, userServiceContext, transactionalProducer); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java index 9c9a58c8b760..cea79885c31e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java @@ -65,7 +65,7 @@ public class RequestHandlerTest { private static final String SOME_STREAM_SQL = "CREATE STREAM x WITH (value_format='json', kafka_topic='x');"; - + @Mock KsqlEngine ksqlEngine; @Mock KsqlConfig ksqlConfig; @Mock ServiceContext serviceContext; @@ -86,7 +86,7 @@ public void setUp() { when(ksqlEngine.prepare(any())) .thenAnswer(invocation -> new DefaultKsqlParser().prepare(invocation.getArgument(0), metaStore)); - when(distributor.execute(any(), any(), any(), any(), any())).thenReturn(Optional.of(entity)); + when(distributor.execute(any(), any(), any(), any(), any(), any(), any())).thenReturn(Optional.of(entity)); doNothing().when(sync).waitFor(any(), any()); } @@ -101,7 +101,7 @@ public void shouldUseCustomExecutor() { // When final List statements = new DefaultKsqlParser().parse(SOME_STREAM_SQL); - final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of(), transactionalProducer); + final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of(), SOME_STREAM_SQL, transactionalProducer); // Then assertThat(entities, contains(entity)); @@ -124,7 +124,7 @@ public void shouldDefaultToDistributor() { // When final List statements = new DefaultKsqlParser().parse(SOME_STREAM_SQL); - final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of(), transactionalProducer); + final KsqlEntityList entities = handler.execute(serviceContext, statements, ImmutableMap.of(), SOME_STREAM_SQL, transactionalProducer); // Then assertThat(entities, contains(entity)); @@ -133,7 +133,9 @@ public void shouldDefaultToDistributor() { preparedStatement(instanceOf(CreateStream.class)), ImmutableMap.of(), ksqlConfig))), + eq(statements.get(0)), eq(ImmutableMap.of()), + eq(SOME_STREAM_SQL), eq(ksqlEngine), eq(serviceContext), eq(transactionalProducer) @@ -152,7 +154,8 @@ public void shouldDistributeProperties() { serviceContext, statements, ImmutableMap.of("x", "y"), - transactionalProducer + "", + transactionalProducer ); // Then @@ -162,7 +165,9 @@ public void shouldDistributeProperties() { preparedStatement(instanceOf(CreateStream.class)), ImmutableMap.of("x", "y"), ksqlConfig))), + statements.get(0), any(), + eq(SOME_STREAM_SQL), eq(ksqlEngine), eq(serviceContext), eq(transactionalProducer) @@ -190,7 +195,7 @@ public void shouldWaitForDistributedStatements() { ); // When - handler.execute(serviceContext, statements, ImmutableMap.of(), transactionalProducer); + handler.execute(serviceContext, statements, ImmutableMap.of(), SOME_STREAM_SQL, transactionalProducer); // Then verify(sync).waitFor(argThat(hasItems(entity1, entity2)), any()); @@ -214,7 +219,7 @@ public void shouldInlineRunScriptStatements() { // When: final List statements = new DefaultKsqlParser() .parse("RUN SCRIPT '/some/script.sql';" ); - handler.execute(serviceContext, statements, props, transactionalProducer); + handler.execute(serviceContext, statements, props, SOME_STREAM_SQL, transactionalProducer); // Then: verify(customExecutor, times(1)) @@ -245,7 +250,7 @@ public void shouldOnlyReturnLastInRunScript() { .parse("RUN SCRIPT '/some/script.sql';" ); // When: - final KsqlEntityList result = handler.execute(serviceContext, statements, props, transactionalProducer); + final KsqlEntityList result = handler.execute(serviceContext, statements, props, SOME_STREAM_SQL, transactionalProducer); // Then: assertThat(result, contains(entity2));