From 219852bb5a26797b8fe2856c6285cb9a1450caed Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Tue, 19 Nov 2019 14:29:48 -0800 Subject: [PATCH] Revert "feat: Transactional Produces to Command Topic (#3660)" This reverts commit cba2877eba335030c2ca14f407d4aafed1ae647f. --- config/ksql-production-server.properties | 6 +- docs/installation/server-config/security.rst | 3 - .../io/confluent/ksql/util/KsqlConfig.java | 11 +- .../ksql/rest/server/CommandTopic.java | 40 +++++- .../ksql/rest/server/KsqlRestApplication.java | 77 ++++------- .../rest/server/computation/CommandQueue.java | 27 +--- .../rest/server/computation/CommandStore.java | 125 ++---------------- .../computation/DistributingExecutor.java | 68 +--------- .../rest/server/execution/RequestHandler.java | 30 +---- .../rest/server/resources/KsqlResource.java | 12 +- .../rest/util/KsqlInternalTopicUtils.java | 13 +- .../ksql/rest/integration/RestApiTest.java | 20 +-- .../ksql/rest/server/CommandTopicTest.java | 61 ++++++++- .../rest/server/KsqlRestApplicationTest.java | 49 ++----- .../server/computation/CommandStoreTest.java | 83 ++++-------- .../computation/DistributingExecutorTest.java | 57 ++------ .../rest/server/computation/RecoveryTest.java | 29 +--- .../server/execution/RequestHandlerTest.java | 17 +-- .../server/resources/KsqlResourceTest.java | 63 +++------ .../rest/util/KsqlInternalTopicUtilsTest.java | 13 +- .../util/EmbeddedSingleNodeKafkaCluster.java | 4 - 21 files changed, 244 insertions(+), 564 deletions(-) diff --git a/config/ksql-production-server.properties b/config/ksql-production-server.properties index 1c3943aa67d1..8436544dd0dd 100644 --- a/config/ksql-production-server.properties +++ b/config/ksql-production-server.properties @@ -74,11 +74,9 @@ ksql.streams.producer.delivery.timeout.ms=2147483647 # Kafka cluster is unavailable. ksql.streams.producer.max.block.ms=9223372036854775807 -# For better fault tolerance and durability, set the replication factor and minimum insync replicas -# for the KSQL Server's internal topics. -# Note: the value 3 requires at least 3 brokers in your Kafka cluster. +# For better fault tolerance and durability, set the replication factor for the KSQL +# Server's internal topics. Note: the value 3 requires at least 3 brokers in your Kafka cluster. ksql.internal.topic.replicas=3 -ksql.internal.topic.min.insync.replicas=2 # Configure underlying Kafka Streams internal topics in order to achieve better fault tolerance and # durability, even in the face of Kafka broker failures. Highly recommended for mission critical applications. diff --git a/docs/installation/server-config/security.rst b/docs/installation/server-config/security.rst index 90f6363df763..e5f2c0e487a5 100644 --- a/docs/installation/server-config/security.rst +++ b/docs/installation/server-config/security.rst @@ -404,9 +404,6 @@ The ACLs required are the same for both :ref:`Interactive and non-interactive (h KSQL always requires the following ACLs for its internal operations and data management: - The ``DESCRIBE_CONFIGS`` operation on the ``CLUSTER`` resource type. -- The ``DESCRIBE`` operation on the ``TOPIC`` with ``LITERAL`` name ``__consumer_offsets``. -- The ``DESCRIBE`` operation on the ``TOPIC`` with ``LITERAL`` name ``__transaction_state``. -- The ``DESCRIBE`` and ``WRITE`` operations on the ``TRANSACTIONAL_ID`` with ``LITERAL`` name ````. - The ``ALL`` operation on all internal ``TOPICS`` that are ``PREFIXED`` with ``_confluent-ksql-``. - The ``ALL`` operation on all internal ``GROUPS`` that are ``PREFIXED`` with ``_confluent-ksql-``. diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 1549061aacd1..0b5fe5666d10 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -65,9 +65,6 @@ public class KsqlConfig extends AbstractConfig { public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas"; - public static final String KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY = - "ksql.internal.topic.min.insync.replicas"; - public static final String KSQL_SCHEMA_REGISTRY_PREFIX = "ksql.schema.registry."; public static final String SCHEMA_REGISTRY_URL_PROPERTY = "ksql.schema.registry.url"; @@ -518,14 +515,8 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY, Type.SHORT, (short) 1, - ConfigDef.Importance.MEDIUM, + ConfigDef.Importance.LOW, "The replication factor for the internal topics of KSQL server." - ).define( - KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY, - Type.SHORT, - (short) 1, - ConfigDef.Importance.MEDIUM, - "The minimum number of insync replicas for the internal topics of KSQL server." ).define( KSQL_UDF_SECURITY_MANAGER_ENABLED, ConfigDef.Type.BOOLEAN, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java index 42e9fca96082..869791b7bc3f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java @@ -26,10 +26,15 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,11 +45,13 @@ public class CommandTopic { private final TopicPartition commandTopicPartition; private Consumer commandConsumer = null; + private Producer commandProducer = null; private final String commandTopicName; public CommandTopic( final String commandTopicName, - final Map kafkaConsumerProperties + final Map kafkaConsumerProperties, + final Map kafkaProducerProperties ) { this( commandTopicName, @@ -52,16 +59,22 @@ public CommandTopic( Objects.requireNonNull(kafkaConsumerProperties, "kafkaClientProperties"), InternalTopicJsonSerdeUtil.getJsonDeserializer(CommandId.class, true), InternalTopicJsonSerdeUtil.getJsonDeserializer(Command.class, false) - ) - ); + ), + new KafkaProducer<>( + Objects.requireNonNull(kafkaProducerProperties, "kafkaClientProperties"), + InternalTopicJsonSerdeUtil.getJsonSerializer(true), + InternalTopicJsonSerdeUtil.getJsonSerializer(false) + )); } CommandTopic( final String commandTopicName, - final Consumer commandConsumer + final Consumer commandConsumer, + final Producer commandProducer ) { this.commandTopicPartition = new TopicPartition(commandTopicName, 0); this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer"); + this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer"); this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName"); } @@ -73,6 +86,24 @@ public void start() { commandConsumer.assign(Collections.singleton(commandTopicPartition)); } + public RecordMetadata send(final CommandId commandId, final Command command) { + final ProducerRecord producerRecord = new ProducerRecord<>( + commandTopicName, + 0, + Objects.requireNonNull(commandId, "commandId"), + Objects.requireNonNull(command, "command")); + try { + return commandProducer.send(producerRecord).get(); + } catch (final ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException)e.getCause(); + } + throw new RuntimeException(e.getCause()); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + public Iterable> getNewCommands(final Duration timeout) { return commandConsumer.poll(timeout); } @@ -119,5 +150,6 @@ public void wakeup() { public void close() { commandConsumer.close(); + commandProducer.close(); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 31604704d844..6e93b41de237 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -39,9 +39,7 @@ import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.query.id.HybridQueryIdGenerator; -import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.KsqlErrorMessage; -import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.CommandStore; @@ -74,6 +72,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.RetryUtil; import io.confluent.ksql.util.Version; import io.confluent.ksql.util.WelcomeMsgUtils; @@ -109,10 +108,6 @@ import javax.websocket.server.ServerEndpointConfig.Configurator; import javax.ws.rs.core.Configurable; import org.apache.commons.lang3.StringUtils; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.errors.AuthorizationException; -import org.apache.kafka.common.errors.OutOfOrderSequenceException; -import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.streams.StreamsConfig; import org.apache.log4j.LogManager; import org.eclipse.jetty.server.ServerConnector; @@ -472,16 +467,13 @@ static KsqlRestApplication buildApplication( UserFunctionLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load(); - final String commandTopicName = KsqlInternalTopicUtils.getTopicName( + final String commandTopic = KsqlInternalTopicUtils.getTopicName( ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX); final CommandStore commandStore = CommandStore.Factory.create( - commandTopicName, - ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), - Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), + commandTopic, restConfig.getCommandConsumerProperties(), - restConfig.getCommandProducerProperties() - ); + restConfig.getCommandProducerProperties()); final InteractiveStatementExecutor statementExecutor = new InteractiveStatementExecutor(serviceContext, ksqlEngine, hybridQueryIdGenerator); @@ -509,12 +501,19 @@ static KsqlRestApplication buildApplication( authorizationValidator ); + final KsqlResource ksqlResource = new KsqlResource( + ksqlEngine, + commandStore, + Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), + versionChecker::updateLastRequestTime, + authorizationValidator + ); + final List managedTopics = new LinkedList<>(); - managedTopics.add(commandTopicName); + managedTopics.add(commandTopic); if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) { managedTopics.add(ProcessingLogServerUtils.getTopicName(processingLogConfig, ksqlConfig)); } - final CommandRunner commandRunner = new CommandRunner( statementExecutor, commandStore, @@ -523,14 +522,6 @@ static KsqlRestApplication buildApplication( serverState ); - final KsqlResource ksqlResource = new KsqlResource( - ksqlEngine, - commandStore, - Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), - versionChecker::updateLastRequestTime, - authorizationValidator - ); - final List preconditions = restConfig.getConfiguredInstances( KsqlRestConfig.KSQL_SERVER_PRECONDITIONS, KsqlServerPrecondition.class @@ -632,47 +623,27 @@ private static void maybeCreateProcessingLogStream( final KsqlEngine ksqlEngine, final CommandQueue commandQueue ) { - if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE)) { + if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE) + || !commandQueue.isEmpty()) { return; } - final Producer transactionalProducer = - commandQueue.createTransactionalProducer(); - try { - transactionalProducer.initTransactions(); - transactionalProducer.beginTransaction(); - - if (!commandQueue.isEmpty()) { - return; - } + final PreparedStatement statement = ProcessingLogServerUtils + .processingLogStreamCreateStatement(config, ksqlConfig); + final Supplier> configured = () -> ConfiguredStatement.of( + statement, Collections.emptyMap(), ksqlConfig); - // We don't wait for the commandRunner in this case since it hasn't been started yet. - - final PreparedStatement statement = ProcessingLogServerUtils - .processingLogStreamCreateStatement(config, ksqlConfig); - final Supplier> configured = () -> ConfiguredStatement.of( - statement, Collections.emptyMap(), ksqlConfig); - + try { ksqlEngine.createSandbox(ksqlEngine.getServiceContext()).execute( ksqlEngine.getServiceContext(), configured.get() ); - - commandQueue.enqueueCommand(configured.get(), transactionalProducer); - transactionalProducer.commitTransaction(); - } catch (final ProducerFencedException - | OutOfOrderSequenceException - | AuthorizationException e - ) { - // We can't recover from these exceptions, so our only option is close producer and exit. - // This catch doesn't abortTransaction() since doing that would throw another exception. + } catch (final KsqlException e) { log.warn("Failed to create processing log stream", e); - } catch (final Exception e) { - transactionalProducer.abortTransaction(); - log.warn("Failed to create processing log stream", e); - } finally { - transactionalProducer.close(); + return; } + + commandQueue.enqueueCommand(configured.get()); } /** diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java index 01bb5b753a1d..54aa6c74ba7b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandQueue.java @@ -15,13 +15,11 @@ package io.confluent.ksql.rest.server.computation; -import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.statement.ConfiguredStatement; import java.io.Closeable; import java.time.Duration; import java.util.List; import java.util.concurrent.TimeoutException; -import org.apache.kafka.clients.producer.Producer; /** * Represents a queue of {@link Command}s that must be distributed to all @@ -36,16 +34,13 @@ public interface CommandQueue extends Closeable { * it is guaranteed that the command has been persisted, without regard * for the {@link io.confluent.ksql.rest.entity.CommandStatus CommandStatus}. * - * @param statement The statement to be distributed - * @param transactionalProducer The transactional producer used to for enqueue the command + * @param statement The statement to be distributed + * * @return an asynchronous tracker that can be used to determine the current * state of the command */ - QueuedCommandStatus enqueueCommand( - ConfiguredStatement statement, - Producer transactionalProducer - ); - + QueuedCommandStatus enqueueCommand(ConfiguredStatement statement); + /** * Polls the Queue for any commands that have been enqueued since the last * invocation to this method. @@ -79,20 +74,6 @@ QueuedCommandStatus enqueueCommand( void ensureConsumedPast(long seqNum, Duration timeout) throws InterruptedException, TimeoutException; - /** - * Creates a transactional producer for producing to the command topic. - * - * @return a TransactionalProducer - */ - Producer createTransactionalProducer(); - - /** - * Blocks until the command topic consumer has processed all records up to - * the current offset when this method is called. - * - */ - void waitForCommandConsumer(); - /** * @return whether or not there are any enqueued commands */ diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index b79b952b8eca..1bc1dab1eda0 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -17,19 +17,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.CommandId; -import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.server.CommandTopic; -import io.confluent.ksql.rest.server.resources.KsqlRestException; -import io.confluent.ksql.rest.util.InternalTopicJsonSerdeUtil; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlException; import java.io.Closeable; import java.time.Duration; -import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -37,37 +31,21 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.IsolationLevel; -import org.apache.kafka.common.TopicPartition; /** * Wrapper class for the command topic. Used for reading from the topic (either all messages from * the beginning until now, or any new messages since then), and writing to it. */ -// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class CommandStore implements CommandQueue, Closeable { private static final Duration POLLING_TIMEOUT_FOR_COMMAND_TOPIC = Duration.ofMillis(5000); - private static final int COMMAND_TOPIC_PARTITION = 0; private final CommandTopic commandTopic; private final CommandIdAssigner commandIdAssigner; private final Map commandStatusMap; private final SequenceNumberFutureStore sequenceNumberFutureStore; - private final String commandTopicName; - private final Duration commandQueueCatchupTimeout; - private final Map kafkaConsumerProperties; - private final Map kafkaProducerProperties; - public static final class Factory { private Factory() { @@ -75,57 +53,27 @@ private Factory() { public static CommandStore create( final String commandTopicName, - final String transactionId, - final Duration commandQueueCatchupTimeout, final Map kafkaConsumerProperties, final Map kafkaProducerProperties ) { - kafkaConsumerProperties.put( - ConsumerConfig.ISOLATION_LEVEL_CONFIG, - IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT) - ); - kafkaProducerProperties.put( - ProducerConfig.TRANSACTIONAL_ID_CONFIG, - transactionId - ); - kafkaProducerProperties.put( - ProducerConfig.ACKS_CONFIG, - "all" - ); - return new CommandStore( - commandTopicName, - new CommandTopic(commandTopicName, kafkaConsumerProperties), + new CommandTopic(commandTopicName, kafkaConsumerProperties, kafkaProducerProperties), new CommandIdAssigner(), - new SequenceNumberFutureStore(), - kafkaConsumerProperties, - kafkaProducerProperties, - commandQueueCatchupTimeout + new SequenceNumberFutureStore() ); } } CommandStore( - final String commandTopicName, final CommandTopic commandTopic, final CommandIdAssigner commandIdAssigner, - final SequenceNumberFutureStore sequenceNumberFutureStore, - final Map kafkaConsumerProperties, - final Map kafkaProducerProperties, - final Duration commandQueueCatchupTimeout + final SequenceNumberFutureStore sequenceNumberFutureStore ) { this.commandTopic = Objects.requireNonNull(commandTopic, "commandTopic"); this.commandIdAssigner = Objects.requireNonNull(commandIdAssigner, "commandIdAssigner"); this.commandStatusMap = Maps.newConcurrentMap(); this.sequenceNumberFutureStore = Objects.requireNonNull(sequenceNumberFutureStore, "sequenceNumberFutureStore"); - this.commandQueueCatchupTimeout = - Objects.requireNonNull(commandQueueCatchupTimeout, "commandQueueCatchupTimeout"); - this.kafkaConsumerProperties = - Objects.requireNonNull(kafkaConsumerProperties, "kafkaConsumerProperties"); - this.kafkaProducerProperties = - Objects.requireNonNull(kafkaProducerProperties, "kafkaProducerProperties"); - this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName"); } @Override @@ -142,7 +90,7 @@ public void start() { } /** - * Close the store, rendering it unable to read commands + * Close the store, rendering it unable to read or write commands */ @Override public void close() { @@ -150,10 +98,7 @@ public void close() { } @Override - public QueuedCommandStatus enqueueCommand( - final ConfiguredStatement statement, - final Producer transactionalProducer - ) { + public QueuedCommandStatus enqueueCommand(final ConfiguredStatement statement) { final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement()); // new commands that generate queries will use the new query id generation method from now on @@ -180,13 +125,8 @@ public QueuedCommandStatus enqueueCommand( } ); try { - final ProducerRecord producerRecord = new ProducerRecord<>( - commandTopicName, - COMMAND_TOPIC_PARTITION, - commandId, - command); final RecordMetadata recordMetadata = - transactionalProducer.send(producerRecord).get(); + commandTopic.send(commandId, command); return new QueuedCommandStatus(recordMetadata.offset(), statusFuture); } catch (final Exception e) { commandStatusMap.remove(commandId); @@ -242,58 +182,9 @@ public void ensureConsumedPast(final long seqNum, final Duration timeout) } catch (final TimeoutException e) { throw new TimeoutException( String.format( - "Timeout reached while waiting for command sequence number of %d." - + " Caused by: %s " - + "(Timeout: %d ms)", + "Timeout reached while waiting for command sequence number of %d. (Timeout: %d ms)", seqNum, - e.getMessage(), - timeout.toMillis() - )); - } - } - - @Override - public Producer createTransactionalProducer() { - return new KafkaProducer<>( - kafkaProducerProperties, - InternalTopicJsonSerdeUtil.getJsonSerializer(true), - InternalTopicJsonSerdeUtil.getJsonSerializer(false) - ); - } - - @Override - public void waitForCommandConsumer() { - try { - final long endOffset = getCommandTopicOffset(); - ensureConsumedPast(endOffset - 1, commandQueueCatchupTimeout); - } catch (final InterruptedException e) { - final String errorMsg = - "Interrupted while waiting for command topic consumer to process command topic"; - throw new KsqlRestException( - Errors.serverErrorForStatement(e, errorMsg, new KsqlEntityList())); - } catch (final TimeoutException e) { - final String errorMsg = - "Timeout while waiting for command topic consumer to process command topic"; - throw new KsqlRestException( - Errors.serverErrorForStatement(e, errorMsg, new KsqlEntityList())); - } - } - - // Must create a new consumer because consumers are not threadsafe - private long getCommandTopicOffset() { - final TopicPartition commandTopicPartition = new TopicPartition( - commandTopicName, - COMMAND_TOPIC_PARTITION - ); - - try (Consumer commandConsumer = new KafkaConsumer<>( - kafkaConsumerProperties, - InternalTopicJsonSerdeUtil.getJsonDeserializer(CommandId.class, true), - InternalTopicJsonSerdeUtil.getJsonDeserializer(Command.class, false) - )) { - commandConsumer.assign(Collections.singleton(commandTopicPartition)); - return commandConsumer.endOffsets(Collections.singletonList(commandTopicPartition)) - .get(commandTopicPartition); + timeout.toMillis())); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 1aa548f5834a..8c3553051b8f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -16,30 +16,21 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.metastore.MetaStore; -import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatusEntity; import io.confluent.ksql.rest.entity.KsqlEntity; -import io.confluent.ksql.rest.server.validation.RequestValidator; -import io.confluent.ksql.rest.util.TerminateCluster; +import io.confluent.ksql.rest.server.execution.StatementExecutor; import io.confluent.ksql.security.KsqlAuthorizationValidator; -import io.confluent.ksql.services.SandboxedServiceContext; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.statement.Injector; import io.confluent.ksql.util.KsqlServerException; import java.time.Duration; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.BiFunction; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.errors.AuthorizationException; -import org.apache.kafka.common.errors.OutOfOrderSequenceException; -import org.apache.kafka.common.errors.ProducerFencedException; /** * A {@code StatementExecutor} that encapsulates a command queue and will @@ -47,20 +38,18 @@ * duration for the command to be executed remotely if configured with a * {@code distributedCmdResponseTimeout}. */ -public class DistributingExecutor { +public class DistributingExecutor implements StatementExecutor { private final CommandQueue commandQueue; private final Duration distributedCmdResponseTimeout; private final BiFunction injectorFactory; private final Optional authorizationValidator; - private final RequestValidator requestValidator; public DistributingExecutor( final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final BiFunction injectorFactory, - final Optional authorizationValidator, - final RequestValidator requestValidator + final Optional authorizationValidator ) { this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); this.distributedCmdResponseTimeout = @@ -68,55 +57,22 @@ public DistributingExecutor( this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory"); this.authorizationValidator = Objects.requireNonNull(authorizationValidator, "authorizationValidator"); - this.requestValidator = - Objects.requireNonNull(requestValidator, "requestValidator"); } - /** - * The transactional protocol for sending a command to the command topic is to - * initTransaction(), beginTransaction(), wait for commandRunner to finish processing all previous - * commands that were present at the start of the transaction, validate the current command, - * enqueue the command in the command topic, and commit the transaction. - * Only successfully committed commands can be read by the command topic consumer. - * If any exceptions are thrown during this protocol, the transaction is aborted. - * If a new transactional producer is initialized while the current transaction is incomplete, - * the old producer will be fenced off and unable to continue with its transaction. - */ + @Override public Optional execute( final ConfiguredStatement statement, - final ParsedStatement parsedStatement, final Map mutableScopedProperties, final KsqlExecutionContext executionContext, - final ServiceContext serviceContext - ) { + final ServiceContext serviceContext) { final ConfiguredStatement injected = injectorFactory .apply(executionContext, serviceContext) .inject(statement); checkAuthorization(injected, serviceContext, executionContext); - final Producer transactionalProducer = - commandQueue.createTransactionalProducer(); try { - transactionalProducer.initTransactions(); - transactionalProducer.beginTransaction(); - commandQueue.waitForCommandConsumer(); - - // Don't perform validation on Terminate Cluster statements - if (!parsedStatement.getStatementText() - .equals(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT)) { - requestValidator.validate( - SandboxedServiceContext.create(serviceContext), - Collections.singletonList(parsedStatement), - mutableScopedProperties, - parsedStatement.getStatementText() - ); - } - - final QueuedCommandStatus queuedCommandStatus = - commandQueue.enqueueCommand(injected, transactionalProducer); - - transactionalProducer.commitTransaction(); + final QueuedCommandStatus queuedCommandStatus = commandQueue.enqueueCommand(injected); final CommandStatus commandStatus = queuedCommandStatus .tryWaitForFinalStatus(distributedCmdResponseTimeout); @@ -126,22 +82,10 @@ public Optional execute( commandStatus, queuedCommandStatus.getCommandSequenceNumber() )); - } catch (final ProducerFencedException - | OutOfOrderSequenceException - | AuthorizationException e - ) { - // We can't recover from these exceptions, so our only option is close producer and exit. - // This catch doesn't abortTransaction() since doing that would throw another exception. - throw new KsqlServerException(String.format( - "Could not write the statement '%s' into the command topic: " + e.getMessage(), - statement.getStatementText()), e); } catch (final Exception e) { - transactionalProducer.abortTransaction(); throw new KsqlServerException(String.format( "Could not write the statement '%s' into the command topic: " + e.getMessage(), statement.getStatementText()), e); - } finally { - transactionalProducer.close(); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java index 7b52c488a9b3..008a8c73f403 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java @@ -80,11 +80,7 @@ public KsqlEntityList execute( for (ParsedStatement parsed : statements) { final PreparedStatement prepared = ksqlEngine.prepare(parsed); if (prepared.getStatement() instanceof RunScript) { - final KsqlEntityList result = executeRunScript( - serviceContext, - prepared, - propertyOverrides - ); + final KsqlEntityList result = executeRunScript(serviceContext, prepared, propertyOverrides); if (!result.isEmpty()) { // This is to maintain backwards compatibility until we deprecate // RunScript in the next major release - the expected behavior was @@ -94,13 +90,8 @@ public KsqlEntityList execute( } else { final ConfiguredStatement configured = ConfiguredStatement.of( prepared, scopedPropertyOverrides, ksqlConfig); - executeStatement( - serviceContext, - configured, - parsed, - scopedPropertyOverrides, - entities - ).ifPresent(entities::add); + executeStatement(serviceContext, configured, scopedPropertyOverrides, entities) + .ifPresent(entities::add); } } return entities; @@ -110,18 +101,14 @@ public KsqlEntityList execute( private Optional executeStatement( final ServiceContext serviceContext, final ConfiguredStatement configured, - final ParsedStatement parsed, final Map mutableScopedProperties, final KsqlEntityList entities ) { final Class statementClass = configured.getStatement().getClass(); - commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass); final StatementExecutor executor = (StatementExecutor) - customExecutors.getOrDefault(statementClass, - (stmt, props, ctx, svcCtx) -> - distributor.execute(stmt, parsed, props, ctx, svcCtx)); + customExecutors.getOrDefault(statementClass, distributor); return executor.execute( configured, @@ -134,8 +121,7 @@ private Optional executeStatement( private KsqlEntityList executeRunScript( final ServiceContext serviceContext, final PreparedStatement statement, - final Map propertyOverrides - ) { + final Map propertyOverrides) { final String sql = (String) propertyOverrides .get(KsqlConstants.LEGACY_RUN_SCRIPT_STATEMENTS_CONTENT); @@ -144,10 +130,6 @@ private KsqlEntityList executeRunScript( "Request is missing script content", statement.getStatementText()); } - return execute( - serviceContext, - ksqlEngine.parse(sql), - propertyOverrides - ); + return execute(serviceContext, ksqlEngine.parse(sql), propertyOverrides); } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index f828fbf14371..9c31a47aef78 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -158,8 +158,7 @@ public void configure(final KsqlConfig config) { commandQueue, distributedCmdResponseTimeout, injectorFactory, - authorizationValidator, - this.validator + authorizationValidator ), ksqlEngine, config, @@ -183,12 +182,9 @@ public Response terminateCluster( ensureValidPatterns(request.getDeleteTopicList()); try { - final KsqlEntityList entities = handler.execute( - serviceContext, - TERMINATE_CLUSTER, - request.getStreamsProperties() - ); - return Response.ok(entities).build(); + return Response.ok( + handler.execute(serviceContext, TERMINATE_CLUSTER, request.getStreamsProperties()) + ).build(); } catch (final Exception e) { return Errors.serverErrorForStatement( e, TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT, new KsqlEntityList()); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtils.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtils.java index d949e531fcb6..b1776a30f52d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtils.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtils.java @@ -68,16 +68,14 @@ public static String getTopicName(final KsqlConfig ksqlConfig, final String topi public static void ensureTopic(final String name, final KsqlConfig ksqlConfig, final KafkaTopicClient topicClient) { - final short replicationFactor = - ksqlConfig.getShort(KsqlConfig.KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY); + final short replicationFactor = + ksqlConfig.originals().containsKey(KsqlConfig.KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY) + ? ksqlConfig.getShort(KsqlConfig.KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY) : 1; if (replicationFactor < 2) { log.warn("Creating topic {} with replication factor of {} which is less than 2. " + "This is not advisable in a production environment. ", name, replicationFactor); } - - final short minInsyncReplica = - ksqlConfig.getShort(KsqlConfig.KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY); final long requiredTopicRetention = Long.MAX_VALUE; if (topicClient.isTopicExists(name)) { @@ -113,10 +111,7 @@ public static void ensureTopic(final String name, replicationFactor, ImmutableMap.of( TopicConfig.RETENTION_MS_CONFIG, requiredTopicRetention, - TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE, - TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInsyncReplica, - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, false - ) + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) ); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java index 2a0675503ba9..a8d0f6a78ec3 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/integration/RestApiTest.java @@ -26,11 +26,9 @@ import static org.apache.kafka.common.acl.AclOperation.CREATE; import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS; -import static org.apache.kafka.common.acl.AclOperation.WRITE; import static org.apache.kafka.common.resource.ResourceType.CLUSTER; import static org.apache.kafka.common.resource.ResourceType.GROUP; import static org.apache.kafka.common.resource.ResourceType.TOPIC; -import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; @@ -131,22 +129,9 @@ public class RestApiTest { ops(ALL) ) .withAcl( - NORMAL_USER, - resource(TRANSACTIONAL_ID, "default_"), - ops(WRITE) - ) - .withAcl( - NORMAL_USER, - resource(TRANSACTIONAL_ID, "default_"), - ops(DESCRIBE) - ).withAcl( NORMAL_USER, resource(TOPIC, "__consumer_offsets"), ops(DESCRIBE) - ).withAcl( - NORMAL_USER, - resource(TOPIC, "__transaction_state"), - ops(DESCRIBE) ) ) .build(); @@ -359,9 +344,8 @@ public void shouldPrintTopicOverWebSocket() { @Test public void shouldDeleteTopic() { // Given: - makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"); - final String query = REST_APP.getPersistentQueries().iterator().next(); - makeKsqlRequest("TERMINATE QUERY " + query + ";"); + makeKsqlRequest("CREATE STREAM X AS SELECT * FROM " + PAGE_VIEW_STREAM + ";" + + "TERMINATE QUERY CSAS_X_2; "); assertThat("Expected topic X to be created", topicExists("X")); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java index 312ea1993cff..49924445d084 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java @@ -34,14 +34,19 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -54,6 +59,8 @@ public class CommandTopicTest { private static final String COMMAND_TOPIC_NAME = "foo"; @Mock private Consumer commandConsumer; + @Mock + private Producer commandProducer; private CommandTopic commandTopic; @@ -78,12 +85,17 @@ public class CommandTopicTest { @Captor private ArgumentCaptor> topicPartitionsCaptor; + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private final static TopicPartition TOPIC_PARTITION = new TopicPartition(COMMAND_TOPIC_NAME, 0); + @Before @SuppressWarnings("unchecked") public void setup() { - commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer); + commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandProducer); + when(commandProducer.send(any(ProducerRecord.class))).thenReturn(future); } @Test @@ -96,6 +108,52 @@ public void shouldAssignCorrectPartitionToConsumer() { .assign(eq(Collections.singleton(new TopicPartition(COMMAND_TOPIC_NAME, 0)))); } + @Test + public void shouldSendCommandCorrectly() throws Exception { + // When + commandTopic.send(commandId1, command1); + + // Then + verify(commandProducer).send(new ProducerRecord<>(COMMAND_TOPIC_NAME, 0, commandId1, command1)); + verify(future).get(); + } + + @Test + public void shouldThrowExceptionIfSendIsNotSuccessful() throws Exception { + // Given: + when(future.get()) + .thenThrow(new ExecutionException(new RuntimeException("Send was unsuccessful!"))); + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("Send was unsuccessful!"); + + // When + commandTopic.send(commandId1, command1); + } + + @Test + public void shouldThrowRuntimeExceptionIfSendCausesNonRuntimeException() throws Exception { + // Given: + when(future.get()).thenThrow(new ExecutionException( + new Exception("Send was unsuccessful because of non RunTime exception!"))); + expectedException.expect(RuntimeException.class); + expectedException.expectMessage( + "java.lang.Exception: Send was unsuccessful because of non RunTime exception!"); + + // When + commandTopic.send(commandId1, command1); + } + + @Test + public void shouldThrowRuntimeExceptionIfSendThrowsInterruptedException() throws Exception { + // Given: + when(future.get()).thenThrow(new InterruptedException("InterruptedException")); + expectedException.expect(RuntimeException.class); + expectedException.expectMessage("InterruptedException"); + + // When + commandTopic.send(commandId1, command1); + } + @Test public void shouldGetNewCommandsIteratorCorrectly() { // Given: @@ -221,6 +279,7 @@ public void shouldCloseAllResources() { //Then: verify(commandConsumer).close(); + verify(commandProducer).close(); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 12f69549948e..a7842bfa03a4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -35,12 +35,9 @@ import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.logging.processing.ProcessingLogContext; -import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; -import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.KsqlErrorMessage; -import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.CommandRunner; import io.confluent.ksql.rest.server.computation.CommandStore; import io.confluent.ksql.rest.server.computation.QueuedCommandStatus; @@ -66,7 +63,6 @@ import java.util.Queue; import java.util.function.Consumer; import javax.ws.rs.core.Configurable; -import org.apache.kafka.clients.producer.Producer; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -91,8 +87,6 @@ public class KsqlRestApplicationTest { @Mock private KsqlEngine ksqlEngine; @Mock - private MetaStore metaStore; - @Mock private KsqlExecutionContext sandBox; @Mock private KsqlConfig ksqlConfig; @@ -132,8 +126,6 @@ public class KsqlRestApplicationTest { private PreparedStatement preparedStatement; @Mock private Consumer rocksDBConfigSetterHandler; - @Mock - private Producer transactionalProducer; private PreparedStatement logCreateStatement; private KsqlRestApplication app; @@ -155,15 +147,13 @@ public void setUp() { when(ksqlEngine.prepare(any())).thenReturn((PreparedStatement)preparedStatement); when(commandQueue.isEmpty()).thenReturn(true); - when(commandQueue.enqueueCommand(any(), any(Producer.class))) + when(commandQueue.enqueueCommand(any())) .thenReturn(queuedCommandStatus); when(commandQueue.getCommandTopicName()).thenReturn(CMD_TOPIC_NAME); when(serviceContext.getTopicClient()).thenReturn(topicClient); when(topicClient.isTopicExists(CMD_TOPIC_NAME)).thenReturn(false); when(precondition1.checkPrecondition(any(), any())).thenReturn(Optional.empty()); when(precondition2.checkPrecondition(any(), any())).thenReturn(Optional.empty()); - when(commandQueue.createTransactionalProducer()). - thenReturn(transactionalProducer); logCreateStatement = ProcessingLogServerUtils.processingLogStreamCreateStatement( processingLogConfig, @@ -237,23 +227,18 @@ public void shouldRegisterAuthorizationFilterWithAuthorizationProvider() { } @Test - public void shouldCreateLogStreamTransactionally() { + public void shouldCreateLogStream() { // When: app.startKsql(); // Then: - final InOrder inOrder = Mockito.inOrder(sandBox, commandQueue, transactionalProducer); - inOrder.verify(transactionalProducer, times(1)).initTransactions(); - inOrder.verify(transactionalProducer, times(1)).beginTransaction(); - inOrder.verify(sandBox).execute( + verify(commandQueue).isEmpty(); + verify(sandBox).execute( argThat(equalTo(ksqlEngine.getServiceContext())), argThat(configured(equalTo(logCreateStatement))) ); - inOrder.verify(commandQueue).enqueueCommand( - argThat(configured(equalTo(logCreateStatement), Collections.emptyMap(), ksqlConfig)), - any()); - inOrder.verify(transactionalProducer, times(1)).commitTransaction(); - inOrder.verify(transactionalProducer, times(1)).close(); + verify(commandQueue).enqueueCommand( + argThat(configured(equalTo(logCreateStatement), Collections.emptyMap(), ksqlConfig))); } @Test @@ -266,19 +251,19 @@ public void shouldNotCreateLogStreamIfAutoCreateNotConfigured() { app.startKsql(); // Then: - verify(commandQueue, never()).enqueueCommand(any(), any()); + verify(commandQueue, never()).enqueueCommand(any()); } @Test - public void shouldOnlyCreateLogStreamIfCommandQueueEmpty() { + public void shouldOnlyCreateLogStreamIfCommandTopicEmpty() { // Given: when(commandQueue.isEmpty()).thenReturn(false); - + // When: app.startKsql(); // Then: - verify(commandQueue, never()).enqueueCommand(any(), any()); + verify(commandQueue, never()).enqueueCommand(any()); } @Test @@ -290,7 +275,7 @@ public void shouldNotCreateLogStreamIfValidationFails() { app.startKsql(); // Then: - verify(commandQueue, never()).enqueueCommand(any(), any()); + verify(commandQueue, never()).enqueueCommand(any()); } @Test @@ -301,9 +286,7 @@ public void shouldStartCommandStoreBeforeEnqueuingLogStream() { // Then: final InOrder inOrder = Mockito.inOrder(commandQueue); inOrder.verify(commandQueue).start(); - inOrder.verify(commandQueue).enqueueCommand( - argThat(configured(equalTo(logCreateStatement))), - any()); + inOrder.verify(commandQueue).enqueueCommand(argThat(configured(equalTo(logCreateStatement)))); } @Test @@ -314,9 +297,7 @@ public void shouldCreateLogTopicBeforeEnqueuingLogStream() { // Then: final InOrder inOrder = Mockito.inOrder(topicClient, commandQueue); inOrder.verify(topicClient).createTopic(eq(LOG_TOPIC_NAME), anyInt(), anyShort()); - inOrder.verify(commandQueue).enqueueCommand( - argThat(configured(equalTo(logCreateStatement))), - any()); + inOrder.verify(commandQueue).enqueueCommand(argThat(configured(equalTo(logCreateStatement)))); } @Test @@ -349,9 +330,7 @@ public void shouldEnqueueLogStreamBeforeSettingReady() { // Then: final InOrder inOrder = Mockito.inOrder(commandQueue, serverState); - inOrder.verify(commandQueue).enqueueCommand( - argThat(configured(equalTo(logCreateStatement))), - any()); + inOrder.verify(commandQueue).enqueueCommand(argThat(configured(equalTo(logCreateStatement)))); inOrder.verify(serverState).setReady(); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java index 0721b03008b5..699368508b9a 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/CommandStoreTest.java @@ -43,15 +43,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.RecordBatch; @@ -69,9 +66,9 @@ @RunWith(MockitoJUnitRunner.class) public class CommandStoreTest { - private static final String COMMAND_TOPIC_NAME = "command"; + private static final String COMMAND_TOPIC = "command"; private static final TopicPartition COMMAND_TOPIC_PARTITION = - new TopicPartition(COMMAND_TOPIC_NAME, 0); + new TopicPartition(COMMAND_TOPIC, 0); private static final KsqlConfig KSQL_CONFIG = new KsqlConfig( Collections.singletonMap(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG, "foo")); private static final Map OVERRIDE_PROPERTIES = @@ -94,9 +91,6 @@ public class CommandStoreTest { private Statement statement; @Mock private CommandIdAssigner commandIdAssigner; - @Mock - private Producer transactionalProducer; - private ConfiguredStatement configured; private final CommandId commandId = @@ -106,33 +100,6 @@ public class CommandStoreTest { private final RecordMetadata recordMetadata = new RecordMetadata( COMMAND_TOPIC_PARTITION, 0, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); - private final Future testFuture = new Future() { - @Override - public boolean cancel(final boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return false; - } - - @Override - public RecordMetadata get() { - return recordMetadata; - } - - @Override - public RecordMetadata get(final long timeout, final TimeUnit unit) { - return null; - } - }; - private CommandStore commandStore; @Before @@ -141,11 +108,11 @@ public void setUp() { .thenAnswer(invocation -> new CommandId( CommandId.Type.STREAM, "foo" + COUNTER.getAndIncrement(), CommandId.Action.CREATE)); - when(transactionalProducer.send(any(ProducerRecord.class))).thenReturn(testFuture); + when(commandTopic.send(any(), any())).thenReturn(recordMetadata); when(commandTopic.getNewCommands(any())).thenReturn(buildRecords(commandId, command)); - when(commandTopic.getCommandTopicName()).thenReturn(COMMAND_TOPIC_NAME); + when(commandTopic.getCommandTopicName()).thenReturn(COMMAND_TOPIC); when(sequenceNumberFutureStore.getFutureForSequenceNumber(anyLong())).thenReturn(future); @@ -153,13 +120,9 @@ public void setUp() { PreparedStatement.of(statementText, statement), OVERRIDE_PROPERTIES, KSQL_CONFIG); commandStore = new CommandStore( - COMMAND_TOPIC_NAME, commandTopic, commandIdAssigner, - sequenceNumberFutureStore, - Collections.emptyMap(), - Collections.emptyMap(), - TIMEOUT + sequenceNumberFutureStore ); } @@ -167,44 +130,44 @@ public void setUp() { public void shouldFailEnqueueIfCommandWithSameIdRegistered() { // Given: when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); expectedException.expect(IllegalStateException.class); // When: - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); } @Test public void shouldCleanupCommandStatusOnProduceError() { // Given: - when(transactionalProducer.send(any(ProducerRecord.class))) + when(commandTopic.send(any(), any())) .thenThrow(new RuntimeException("oops")) - .thenReturn(testFuture); + .thenReturn(recordMetadata); expectedException.expect(KsqlException.class); expectedException.expectMessage("Could not write the statement 'test-statement' into the command topic."); - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); // When: - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); } @Test public void shouldEnqueueNewAfterHandlingExistingCommand() { // Given: when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); commandStore.getNewCommands(NEW_CMDS_TIMEOUT); // Should: - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); } @Test public void shouldRegisterBeforeDistributeAndReturnStatusOnGetNewCommands() { // Given: when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); - when(transactionalProducer.send(any(ProducerRecord.class))).thenAnswer( + when(commandTopic.send(any(), any())).thenAnswer( invocation -> { final QueuedCommand queuedCommand = commandStore.getNewCommands(NEW_CMDS_TIMEOUT).get(0); assertThat(queuedCommand.getCommandId(), equalTo(commandId)); @@ -213,15 +176,15 @@ public void shouldRegisterBeforeDistributeAndReturnStatusOnGetNewCommands() { queuedCommand.getStatus().get().getStatus().getStatus(), equalTo(CommandStatus.Status.QUEUED)); assertThat(queuedCommand.getOffset(), equalTo(0L)); - return testFuture; + return recordMetadata; } ); // When: - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); // Then: - verify(transactionalProducer).send(any(ProducerRecord.class)); + verify(commandTopic).send(any(), any()); } @Test @@ -245,10 +208,10 @@ public void shouldFilterNullCommands() { @Test public void shouldDistributeCommand() { when(commandIdAssigner.getCommandId(any())).thenReturn(commandId); - when(transactionalProducer.send(any(ProducerRecord.class))).thenReturn(testFuture); + when(commandTopic.send(any(), any())).thenReturn(recordMetadata); // When: - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); // Then: //verify(transactionalProducer).send(same(commandId), any()); @@ -258,7 +221,7 @@ public void shouldDistributeCommand() { public void shouldIncludeCommandSequenceNumberInSuccessfulQueuedCommandStatus() { // When: final QueuedCommandStatus commandStatus = - commandStore.enqueueCommand(configured, transactionalProducer); + commandStore.enqueueCommand(configured); // Then: assertThat(commandStatus.getCommandSequenceNumber(), equalTo(recordMetadata.offset())); @@ -280,7 +243,7 @@ public void shouldThrowExceptionOnTimeout() throws Exception { expectedException.expect(TimeoutException.class); expectedException.expectMessage( - "Timeout reached while waiting for command sequence number of 2. Caused by: null (Timeout: 1000 ms)" + "Timeout reached while waiting for command sequence number of 2. (Timeout: 1000 ms)" ); // When: @@ -340,7 +303,7 @@ public void shouldClose() { @Test public void shouldGetCommandTopicName() { - assertThat(commandStore.getCommandTopicName(), equalTo(COMMAND_TOPIC_NAME)); + assertThat(commandStore.getCommandTopicName(), equalTo(COMMAND_TOPIC)); } @Test @@ -359,7 +322,7 @@ private static ConsumerRecords buildRecords(final Object... assertThat(args[i], instanceOf(CommandId.class)); assertThat(args[i + 1], anyOf(is(nullValue()), instanceOf(Command.class))); records.add( - new ConsumerRecord<>(COMMAND_TOPIC_NAME, 0, 0, (CommandId) args[i], (Command) args[i + 1])); + new ConsumerRecord<>(COMMAND_TOPIC, 0, 0, (CommandId) args[i], (Command) args[i + 1])); } return new ConsumerRecords<>(Collections.singletonMap(COMMAND_TOPIC_PARTITION, records)); } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 83abe615b4f1..4067e3fa344c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -30,7 +30,6 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.metastore.MetaStore; -import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.ListProperties; import io.confluent.ksql.parser.tree.Statement; @@ -40,11 +39,8 @@ import io.confluent.ksql.rest.entity.CommandStatus; import io.confluent.ksql.rest.entity.CommandStatus.Status; import io.confluent.ksql.rest.entity.CommandStatusEntity; -import io.confluent.ksql.rest.server.validation.RequestValidator; import io.confluent.ksql.security.KsqlAuthorizationValidator; -import io.confluent.ksql.services.SandboxedServiceContext; import io.confluent.ksql.services.ServiceContext; -import io.confluent.ksql.services.TestServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.statement.Injector; import io.confluent.ksql.statement.InjectorChain; @@ -52,25 +48,20 @@ import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; import java.time.Duration; -import java.util.Collections; import java.util.HashMap; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; -import org.apache.kafka.clients.producer.Producer; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.InOrder; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class DistributingExecutorTest { - private static final String SQL_STRING = "some ksql statement;"; private static final Duration DURATION_10_MS = Duration.ofMillis(10); private static final CommandId CS_COMMAND = new CommandId(Type.STREAM, "stream", Action.CREATE); private static final CommandStatus SUCCESS_STATUS = new CommandStatus(Status.SUCCESS, ""); @@ -93,10 +84,6 @@ public class DistributingExecutorTest { @Mock KsqlAuthorizationValidator authorizationValidator; @Mock KsqlExecutionContext executionContext; @Mock MetaStore metaStore; - @Mock RequestValidator requestValidator; - @Mock ParsedStatement parsedStatement; - @Mock - Producer transactionalProducer; private DistributingExecutor distributor; private AtomicLong scnCounter; @@ -106,51 +93,34 @@ public void setUp() throws InterruptedException { scnCounter = new AtomicLong(); when(schemaInjector.inject(any())).thenAnswer(inv -> inv.getArgument(0)); when(topicInjector.inject(any())).thenAnswer(inv -> inv.getArgument(0)); - when(queue.enqueueCommand(EMPTY_STATEMENT, transactionalProducer)).thenReturn(status); + when(queue.enqueueCommand(any())).thenReturn(status); when(status.tryWaitForFinalStatus(any())).thenReturn(SUCCESS_STATUS); when(status.getCommandId()).thenReturn(CS_COMMAND); when(status.getCommandSequenceNumber()).thenAnswer(inv -> scnCounter.incrementAndGet()); when(executionContext.getMetaStore()).thenReturn(metaStore); - serviceContext = SandboxedServiceContext.create(TestServiceContext.create()); when(executionContext.getServiceContext()).thenReturn(serviceContext); - when(requestValidator.validate( - serviceContext, Collections.singletonList(parsedStatement), ImmutableMap.of(), SQL_STRING)).thenReturn(1); - when(parsedStatement.getStatementText()).thenReturn(SQL_STRING); - when(queue.createTransactionalProducer()).thenReturn(transactionalProducer); distributor = new DistributingExecutor( queue, DURATION_10_MS, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), - Optional.of(authorizationValidator), - requestValidator + Optional.of(authorizationValidator) ); } @Test - public void shouldEnqueueSuccessfulCommandTransactionally() { + public void shouldEnqueueSuccessfulCommand() throws InterruptedException { // When: - distributor.execute(EMPTY_STATEMENT, parsedStatement, ImmutableMap.of(), executionContext, serviceContext); + distributor.execute(EMPTY_STATEMENT, ImmutableMap.of(), executionContext, serviceContext); // Then: - InOrder inOrder = Mockito.inOrder(transactionalProducer, queue, requestValidator); - inOrder.verify(transactionalProducer, times(1)).initTransactions(); - inOrder.verify(transactionalProducer, times(1)).beginTransaction(); - inOrder.verify(queue, times(1)).waitForCommandConsumer(); - inOrder.verify(requestValidator).validate( - serviceContext, - Collections.singletonList(parsedStatement), - ImmutableMap.of(), - SQL_STRING); - inOrder.verify(queue, times(1)).enqueueCommand(EMPTY_STATEMENT, transactionalProducer); - inOrder.verify(transactionalProducer, times(1)).commitTransaction(); - inOrder.verify(transactionalProducer, times(1)).close(); + verify(queue, times(1)).enqueueCommand(eq(EMPTY_STATEMENT)); } @Test public void shouldInferSchemas() { // When: - distributor.execute(EMPTY_STATEMENT, parsedStatement, ImmutableMap.of(), executionContext, serviceContext); + distributor.execute(EMPTY_STATEMENT, ImmutableMap.of(), executionContext, serviceContext); // Then: verify(schemaInjector, times(1)).inject(eq(EMPTY_STATEMENT)); @@ -162,7 +132,6 @@ public void shouldReturnCommandStatus() { final CommandStatusEntity commandStatusEntity = (CommandStatusEntity) distributor.execute( EMPTY_STATEMENT, - parsedStatement, ImmutableMap.of(), executionContext, serviceContext @@ -172,12 +141,14 @@ public void shouldReturnCommandStatus() { // Then: assertThat(commandStatusEntity, equalTo(new CommandStatusEntity("", CS_COMMAND, SUCCESS_STATUS, 1L))); + } @Test public void shouldThrowExceptionOnFailureToEnqueue() { // Given: final KsqlException cause = new KsqlException("fail"); + when(queue.enqueueCommand(any())).thenThrow(cause); final PreparedStatement preparedStatement = PreparedStatement.of("x", new ListProperties(Optional.empty())); @@ -188,7 +159,6 @@ public void shouldThrowExceptionOnFailureToEnqueue() { ImmutableMap.of(), KSQL_CONFIG); - when(queue.enqueueCommand(configured, transactionalProducer)).thenThrow(cause); // Expect: expectedException.expect(KsqlServerException.class); expectedException.expectMessage( @@ -196,8 +166,7 @@ public void shouldThrowExceptionOnFailureToEnqueue() { expectedException.expectCause(is(cause)); // When: - distributor.execute(configured, parsedStatement, ImmutableMap.of(), executionContext, serviceContext); - verify(transactionalProducer, times(1)).abortTransaction(); + distributor.execute(configured, ImmutableMap.of(), executionContext, serviceContext); } @Test @@ -214,7 +183,7 @@ public void shouldThrowFailureIfCannotInferSchema() { expectedException.expectMessage("Could not infer!"); // When: - distributor.execute(configured, parsedStatement, ImmutableMap.of(), executionContext, serviceContext); + distributor.execute(configured, ImmutableMap.of(), executionContext, serviceContext); } @Test @@ -232,13 +201,13 @@ public void shouldThrowExceptionIfUserServiceContextIsDeniedAuthorization() { expectedException.expect(KsqlTopicAuthorizationException.class); // When: - distributor.execute(configured, parsedStatement, ImmutableMap.of(), executionContext, userServiceContext); + distributor.execute(configured, ImmutableMap.of(), executionContext, userServiceContext); } @Test public void shouldThrowServerExceptionIfServerServiceContextIsDeniedAuthorization() { // Given: - final ServiceContext userServiceContext = SandboxedServiceContext.create(TestServiceContext.create()); + final ServiceContext userServiceContext = mock(ServiceContext.class); final PreparedStatement preparedStatement = PreparedStatement.of("", new ListProperties(Optional.empty())); final ConfiguredStatement configured = @@ -251,6 +220,6 @@ public void shouldThrowServerExceptionIfServerServiceContextIsDeniedAuthorizatio expectedException.expectCause(is(instanceOf(KsqlTopicAuthorizationException.class))); // When: - distributor.execute(configured, parsedStatement, ImmutableMap.of(), executionContext, userServiceContext); + distributor.execute(configured, ImmutableMap.of(), executionContext, userServiceContext); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index d414585f83f0..6edbcaa81274 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -65,7 +65,6 @@ import java.util.Set; import java.util.stream.Collectors; import javax.ws.rs.core.Response; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.streams.StreamsConfig; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -73,7 +72,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; public class RecoveryTest { @@ -87,18 +85,9 @@ public class RecoveryTest { private final HybridQueryIdGenerator hybridQueryIdGenerator = new HybridQueryIdGenerator(); private final ServiceContext serviceContext = TestServiceContext.create(topicClient); - - @Mock - @SuppressWarnings("unchecked") - private Producer transactionalProducer = (Producer) mock(Producer.class); - private final KsqlServer server1 = new KsqlServer(commands); private final KsqlServer server2 = new KsqlServer(commands); - - @Before - public void setup() { } - @After public void tearDown() { server1.close(); @@ -119,16 +108,15 @@ private static class FakeCommandQueue implements CommandQueue { private final List commandLog; private final CommandIdAssigner commandIdAssigner; private int offset; - private Producer transactionalProducer; - FakeCommandQueue(final List commandLog, final Producer transactionalProducer) { + FakeCommandQueue( + final List commandLog) { this.commandIdAssigner = new CommandIdAssigner(); this.commandLog = commandLog; - this.transactionalProducer = transactionalProducer; } @Override - public QueuedCommandStatus enqueueCommand(final ConfiguredStatement statement, final Producer transactionalProducer) { + public QueuedCommandStatus enqueueCommand(final ConfiguredStatement statement) { final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement()); final long commandSequenceNumber = commandLog.size(); commandLog.add( @@ -162,11 +150,6 @@ public List getRestoreCommands() { public void ensureConsumedPast(final long seqNum, final Duration timeout) { } - @Override - public Producer createTransactionalProducer() { - return transactionalProducer; - } - @Override public boolean isEmpty() { return commandLog.isEmpty(); @@ -176,10 +159,6 @@ public boolean isEmpty() { public void wakeup() { } - @Override - public void waitForCommandConsumer() { - } - @Override public void close() { } @@ -195,7 +174,7 @@ private class KsqlServer { KsqlServer(final List commandLog) { this.ksqlEngine = createKsqlEngine(); - this.fakeCommandQueue = new FakeCommandQueue(commandLog, transactionalProducer); + this.fakeCommandQueue = new FakeCommandQueue(commandLog); serverState = new ServerState(); serverState.setReady(); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java index 0bcb4e104a72..3e54152011c8 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/RequestHandlerTest.java @@ -64,7 +64,7 @@ public class RequestHandlerTest { private static final String SOME_STREAM_SQL = "CREATE STREAM x WITH (value_format='json', kafka_topic='x');"; - + @Mock KsqlEngine ksqlEngine; @Mock KsqlConfig ksqlConfig; @Mock ServiceContext serviceContext; @@ -83,7 +83,7 @@ public void setUp() { when(ksqlEngine.prepare(any())) .thenAnswer(invocation -> new DefaultKsqlParser().prepare(invocation.getArgument(0), metaStore)); - when(distributor.execute(any(), any(), any(), any(), any())).thenReturn(Optional.of(entity)); + when(distributor.execute(any(), any(), any(), any())).thenReturn(Optional.of(entity)); doNothing().when(sync).waitFor(any(), any()); } @@ -130,7 +130,6 @@ public void shouldDefaultToDistributor() { preparedStatement(instanceOf(CreateStream.class)), ImmutableMap.of(), ksqlConfig))), - eq(statements.get(0)), eq(ImmutableMap.of()), eq(ksqlEngine), eq(serviceContext) @@ -154,13 +153,11 @@ public void shouldDistributeProperties() { // Then assertThat(entities, contains(entity)); verify(distributor, times(1)) - .execute( - argThat(is(configured( - preparedStatement(instanceOf(CreateStream.class)), - ImmutableMap.of("x", "y"), - ksqlConfig))), - eq(statements.get(0)), - eq(ImmutableMap.of("x", "y")), + .execute(argThat(is(configured( + preparedStatement(instanceOf(CreateStream.class)), + ImmutableMap.of("x", "y"), + ksqlConfig))), + any(), eq(ksqlEngine), eq(serviceContext) ); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index 904afa9f4238..ce22611d4965 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -53,7 +53,6 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; @@ -115,7 +114,6 @@ import io.confluent.ksql.rest.entity.StreamsList; import io.confluent.ksql.rest.entity.TablesList; import io.confluent.ksql.rest.server.KsqlRestConfig; -import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.CommandStatusFuture; import io.confluent.ksql.rest.server.computation.CommandStore; import io.confluent.ksql.rest.server.computation.QueuedCommandStatus; @@ -163,7 +161,6 @@ import javax.ws.rs.core.Response; import org.apache.avro.Schema.Type; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.streams.StreamsConfig; @@ -265,8 +262,6 @@ public class KsqlResourceTest { private Injector sandboxTopicInjector; @Mock private KsqlAuthorizationValidator authorizationValidator; - @Mock - private Producer transactionalProducer; private KsqlResource ksqlResource; private SchemaRegistryClient schemaRegistryClient; @@ -302,15 +297,12 @@ public void setUp() throws IOException, RestClientException { metaStore ); - when(commandStore.createTransactionalProducer()) - .thenReturn(transactionalProducer); - ksqlEngine = realEngine; when(sandbox.getMetaStore()).thenAnswer(inv -> metaStore.copy()); addTestTopicAndSources(); - when(commandStore.enqueueCommand(any(), any(Producer.class))) + when(commandStore.enqueueCommand(any())) .thenReturn(commandStatus) .thenReturn(commandStatus1) .thenReturn(commandStatus2); @@ -701,9 +693,7 @@ public void shouldDistributePersistentQuery() { configured( preparedStatement( "CREATE STREAM S AS SELECT * FROM test_stream;", - CreateStreamAsSelect.class))) - ), any(Producer.class) - ); + CreateStreamAsSelect.class))))); } @Test @@ -713,7 +703,7 @@ public void shouldDistributeWithConfig() { // Then: verify(commandStore).enqueueCommand( - argThat(configured(VALID_EXECUTABLE_REQUEST.getStreamsProperties(), ksqlConfig)), any(Producer.class)); + argThat(configured(VALID_EXECUTABLE_REQUEST.getStreamsProperties(), ksqlConfig))); } @Test @@ -822,8 +812,7 @@ public void shouldDistributeAvoCreateStatementWithColumns() { argThat(is(configured(preparedStatement( "CREATE STREAM S (foo INT) WITH(VALUE_FORMAT='AVRO', KAFKA_TOPIC='orders-topic');", CreateStream.class) - ))), any(Producer.class) - ); + )))); } @Test @@ -847,8 +836,8 @@ public void shouldSupportTopicInferenceInVerification() { makeRequest(sql); // Then: - verify(sandbox, times(2)).execute(any(SandboxedServiceContext.class), eq(configuredStatement)); - verify(commandStore).enqueueCommand(argThat(configured(preparedStatementText(sql))), any(Producer.class)); + verify(sandbox).execute(any(SandboxedServiceContext.class), eq(configuredStatement)); + verify(commandStore).enqueueCommand(argThat(configured(preparedStatementText(sql)))); } @Test @@ -872,7 +861,7 @@ public void shouldSupportTopicInferenceInExecution() { makeRequest(sql); // Then: - verify(commandStore).enqueueCommand(eq(configured), any(Producer.class)); + verify(commandStore).enqueueCommand(eq(configured)); } @Test @@ -929,8 +918,8 @@ public void shouldSupportSchemaInference() { makeRequest(sql); // Then: - verify(sandbox, times(2)).execute(any(SandboxedServiceContext.class), eq(CFG_0_WITH_SCHEMA)); - verify(commandStore).enqueueCommand(eq(CFG_1_WITH_SCHEMA), any(Producer.class)); + verify(sandbox).execute(any(SandboxedServiceContext.class), eq(CFG_0_WITH_SCHEMA)); + verify(commandStore).enqueueCommand(eq(CFG_1_WITH_SCHEMA)); } @Test @@ -1135,7 +1124,7 @@ public void shouldFailMultipleStatementsAtomically() { ); // Then: - verify(commandStore, never()).enqueueCommand(any(), any(Producer.class)); + verify(commandStore, never()).enqueueCommand(any()); } @Test @@ -1153,8 +1142,7 @@ public void shouldDistributeTerminateQuery() { // Then: verify(commandStore) .enqueueCommand( - argThat(is(configured(preparedStatement(terminateSql, TerminateQuery.class)))), - any(Producer.class)); + argThat(is(configured(preparedStatement(terminateSql, TerminateQuery.class))))); assertThat(result.getStatementText(), is(terminateSql)); } @@ -1313,8 +1301,7 @@ public void shouldSetProperty() { argThat(is(configured( preparedStatementText(csas), ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), - ksqlConfig))), - any(Producer.class)); + ksqlConfig)))); assertThat(results, hasSize(1)); assertThat(results.get(0).getStatementText(), is(csas)); @@ -1336,8 +1323,7 @@ public void shouldSetPropertyOnlyOnCommandsFollowingTheSetStatement() { argThat(is(configured( preparedStatementText(csas), ImmutableMap.of(), - ksqlConfig))), - any(Producer.class)); + ksqlConfig)))); } @Test @@ -1384,8 +1370,7 @@ public void shouldUnsetProperty() { // Then: verify(commandStore).enqueueCommand( - argThat(is(configured(preparedStatementText(csas), emptyMap(), ksqlConfig))), - any(Producer.class)); + argThat(is(configured(preparedStatementText(csas), emptyMap(), ksqlConfig)))); assertThat(result.getStatementText(), is(csas)); } @@ -1416,8 +1401,7 @@ public void shouldScopeSetPropertyToSingleRequest() { // Then: verify(commandStore).enqueueCommand( - argThat(is(configured(preparedStatementText(csas), emptyMap(), ksqlConfig))), - any(Producer.class)); + argThat(is(configured(preparedStatementText(csas), emptyMap(), ksqlConfig)))); } @Test @@ -1463,7 +1447,7 @@ public void shouldFailAllCommandsIfWouldReachActivePersistentQueriesLimit() { containsString("would cause the number of active, persistent queries " + "to exceed the configured limit")); - verify(commandStore, never()).enqueueCommand(any(), any(Producer.class)); + verify(commandStore, never()).enqueueCommand(any()); } @Test @@ -1688,21 +1672,18 @@ public void shouldHandleTerminateRequestCorrectly() { (CommandStatusEntity) ((KsqlEntityList) response.getEntity()).get(0); assertThat(commandStatusEntity.getCommandStatus().getStatus(), equalTo(CommandStatus.Status.QUEUED)); - verify(transactionalProducer, times(1)).initTransactions(); verify(commandStore).enqueueCommand( argThat(is(configured( preparedStatementText(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT), Collections.singletonMap( ClusterTerminateRequest.DELETE_TOPIC_LIST_PROP, ImmutableList.of("Foo")), - ksqlConfig))), - any(Producer.class)); + ksqlConfig)))); } @Test public void shouldFailIfCannotWriteTerminateCommand() { // Given: - when(commandStore.enqueueCommand(any(), any(Producer.class))) - .thenThrow(new KsqlException("")); + when(commandStore.enqueueCommand(any())).thenThrow(new KsqlException("")); // When: final Response response = ksqlResource.terminateCluster( @@ -1745,7 +1726,7 @@ public void shouldNeverEnqueueIfErrorIsThrown() { Code.BAD_REQUEST); // Then: - verify(commandStore, never()).enqueueCommand(any(), any(Producer.class)); + verify(commandStore, never()).enqueueCommand(any()); } @Test @@ -1835,8 +1816,7 @@ public void shouldInlineRunScriptStatements() { // Then: verify(commandStore).enqueueCommand( - argThat(is(configured(preparedStatement(instanceOf(CreateStreamAsSelect.class))))), - any(Producer.class)); + argThat(is(configured(preparedStatement(instanceOf(CreateStreamAsSelect.class)))))); } @Test @@ -1854,8 +1834,7 @@ public void shouldThrowOnRunScriptStatementMissingScriptContent() { @Test public void shouldThrowServerErrorOnFailedToDistribute() { // Given: - when(commandStore.enqueueCommand(any(), any(Producer.class))) - .thenThrow(new KsqlException("blah")); + when(commandStore.enqueueCommand(any())).thenThrow(new KsqlException("blah")); final String statement = "CREATE STREAM " + streamName + " AS SELECT * FROM test_stream;"; // Expect: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtilsTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtilsTest.java index 568df4941350..cc9fa4a39c99 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtilsTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtilsTest.java @@ -51,14 +51,10 @@ public class KsqlInternalTopicUtilsTest { private static final String TOPIC_NAME = "topic"; private static final short NREPLICAS = 2; - private static final short INSYNC_REPLICAS = 1; - private static final boolean ENABLE_UNCLEAN_ELECTION = false; private final Map commandTopicConfig = ImmutableMap.of( TopicConfig.RETENTION_MS_CONFIG, Long.MAX_VALUE, - TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE, - TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, INSYNC_REPLICAS, - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, ENABLE_UNCLEAN_ELECTION); + TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); @Mock private KafkaTopicClient topicClient; @@ -70,9 +66,10 @@ public class KsqlInternalTopicUtilsTest { @Before public void setUp() { + when(ksqlConfig.originals()).thenReturn( + ImmutableMap.of(KsqlConfig.KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY, NREPLICAS) + ); when(ksqlConfig.getShort(KsqlConfig.KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY)).thenReturn(NREPLICAS); - when(ksqlConfig.getShort( - KsqlConfig.KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY)).thenReturn(INSYNC_REPLICAS); when(topicClient.isTopicExists(TOPIC_NAME)).thenReturn(false); } @@ -174,7 +171,7 @@ public void shouldFailIfTopicExistsWithInvalidNReplicas() { } @Test - public void shouldNotFailIfTopicIsOverreplicated() { + public void hsouldNotFailIfTopicIsOverreplicated() { // Given: whenTopicExistsWith(1, NREPLICAS + 1); diff --git a/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java b/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java index 10f8f451f7bd..cb60edf80129 100644 --- a/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java +++ b/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java @@ -470,10 +470,6 @@ private Properties buildBrokerConfig(final String logDir) { config.put(KafkaConfig.LogRetentionTimeMillisProp(), -1); // Stop logs marked for deletion from being deleted config.put(KafkaConfig.LogDeleteDelayMsProp(), Long.MAX_VALUE); - // Set to 1 because only 1 broker - config.put(KafkaConfig.TransactionsTopicReplicationFactorProp(), (short) 1); - // Set to 1 because only 1 broker - config.put(KafkaConfig.TransactionsTopicMinISRProp(), 1); return config; }