Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Transactional Produces to Command Topic #3660

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions config/ksql-production-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ ksql.streams.producer.delivery.timeout.ms=2147483647
# Kafka cluster is unavailable.
ksql.streams.producer.max.block.ms=9223372036854775807

# For better fault tolerance and durability, set the replication factor for the KSQL
# Server's internal topics. Note: the value 3 requires at least 3 brokers in your Kafka cluster.
# For better fault tolerance and durability, set the replication factor and minimum insync replicas
agavra marked this conversation as resolved.
Show resolved Hide resolved
# for the KSQL Server's internal topics.
# Note: the value 3 requires at least 3 brokers in your Kafka cluster.
ksql.internal.topic.replicas=3
ksql.internal.topic.min.insync.replicas=2

# Configure underlying Kafka Streams internal topics in order to achieve better fault tolerance and
# durability, even in the face of Kafka broker failures. Highly recommended for mission critical applications.
Expand Down
3 changes: 3 additions & 0 deletions docs/installation/server-config/security.rst
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ The ACLs required are the same for both :ref:`Interactive and non-interactive (h
KSQL always requires the following ACLs for its internal operations and data management:

- The ``DESCRIBE_CONFIGS`` operation on the ``CLUSTER`` resource type.
- The ``DESCRIBE`` operation on the ``TOPIC`` with ``LITERAL`` name ``__consumer_offsets``.
- The ``DESCRIBE`` operation on the ``TOPIC`` with ``LITERAL`` name ``__transaction_state``.
- The ``DESCRIBE`` and ``WRITE`` operations on the ``TRANSACTIONAL_ID`` with ``LITERAL`` name ``<ksql.service.id>``.
- The ``ALL`` operation on all internal ``TOPICS`` that are ``PREFIXED`` with ``_confluent-ksql-<ksql.service.id>``.
- The ``ALL`` operation on all internal ``GROUPS`` that are ``PREFIXED`` with ``_confluent-ksql-<ksql.service.id>``.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class KsqlConfig extends AbstractConfig {

public static final String KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY = "ksql.internal.topic.replicas";

public static final String KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY =
"ksql.internal.topic.min.insync.replicas";

public static final String KSQL_SCHEMA_REGISTRY_PREFIX = "ksql.schema.registry.";

public static final String SCHEMA_REGISTRY_URL_PROPERTY = "ksql.schema.registry.url";
Expand Down Expand Up @@ -496,8 +499,14 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_INTERNAL_TOPIC_REPLICAS_PROPERTY,
Type.SHORT,
(short) 1,
ConfigDef.Importance.LOW,
ConfigDef.Importance.MEDIUM,
"The replication factor for the internal topics of KSQL server."
).define(
KSQL_INTERNAL_TOPIC_MIN_INSYNC_REPLICAS_PROPERTY,
Type.SHORT,
(short) 1,
ConfigDef.Importance.MEDIUM,
"The minimum number of insync replicas for the internal topics of KSQL server."
).define(
KSQL_UDF_SECURITY_MANAGER_ENABLED,
ConfigDef.Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,36 +40,28 @@ public class CommandTopic {
private final TopicPartition commandTopicPartition;

private Consumer<CommandId, Command> commandConsumer = null;
private Producer<CommandId, Command> commandProducer = null;
private final String commandTopicName;

public CommandTopic(
final String commandTopicName,
final Map<String, Object> kafkaConsumerProperties,
final Map<String, Object> kafkaProducerProperties
final Map<String, Object> kafkaConsumerProperties
) {
this(
commandTopicName,
new KafkaConsumer<>(
Objects.requireNonNull(kafkaConsumerProperties, "kafkaClientProperties"),
InternalTopicJsonSerdeUtil.getJsonDeserializer(CommandId.class, true),
InternalTopicJsonSerdeUtil.getJsonDeserializer(Command.class, false)
),
new KafkaProducer<>(
Objects.requireNonNull(kafkaProducerProperties, "kafkaClientProperties"),
InternalTopicJsonSerdeUtil.getJsonSerializer(true),
InternalTopicJsonSerdeUtil.getJsonSerializer(false)
));
)
);
}

CommandTopic(
final String commandTopicName,
final Consumer<CommandId, Command> commandConsumer,
final Producer<CommandId, Command> commandProducer
final Consumer<CommandId, Command> commandConsumer
) {
this.commandTopicPartition = new TopicPartition(commandTopicName, 0);
this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer");
this.commandProducer = Objects.requireNonNull(commandProducer, "commandProducer");
this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
}

Expand All @@ -86,24 +73,6 @@ public void start() {
commandConsumer.assign(Collections.singleton(commandTopicPartition));
}

public RecordMetadata send(final CommandId commandId, final Command command) {
final ProducerRecord<CommandId, Command> producerRecord = new ProducerRecord<>(
commandTopicName,
0,
Objects.requireNonNull(commandId, "commandId"),
Objects.requireNonNull(command, "command"));
try {
return commandProducer.send(producerRecord).get();
} catch (final ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException)e.getCause();
}
throw new RuntimeException(e.getCause());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}

public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
return commandConsumer.poll(timeout);
}
Expand Down Expand Up @@ -150,6 +119,5 @@ public void wakeup() {

public void close() {
commandConsumer.close();
commandProducer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.query.id.HybridQueryIdGenerator;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.CommandQueue;
import io.confluent.ksql.rest.server.computation.CommandRunner;
import io.confluent.ksql.rest.server.computation.CommandStore;
Expand Down Expand Up @@ -72,7 +74,6 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.RetryUtil;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
Expand Down Expand Up @@ -108,6 +109,10 @@
import javax.websocket.server.ServerEndpointConfig.Configurator;
import javax.ws.rs.core.Configurable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.log4j.LogManager;
import org.eclipse.jetty.server.ServerConnector;
Expand Down Expand Up @@ -467,13 +472,16 @@ static KsqlRestApplication buildApplication(

UserFunctionLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load();

final String commandTopic = KsqlInternalTopicUtils.getTopicName(
final String commandTopicName = KsqlInternalTopicUtils.getTopicName(
ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX);

final CommandStore commandStore = CommandStore.Factory.create(
commandTopic,
commandTopicName,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
restConfig.getCommandConsumerProperties(),
restConfig.getCommandProducerProperties());
restConfig.getCommandProducerProperties()
);

final InteractiveStatementExecutor statementExecutor =
new InteractiveStatementExecutor(serviceContext, ksqlEngine, hybridQueryIdGenerator);
Expand Down Expand Up @@ -501,19 +509,12 @@ static KsqlRestApplication buildApplication(
authorizationValidator
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator
);

final List<String> managedTopics = new LinkedList<>();
managedTopics.add(commandTopic);
managedTopics.add(commandTopicName);
if (processingLogConfig.getBoolean(ProcessingLogConfig.TOPIC_AUTO_CREATE)) {
managedTopics.add(ProcessingLogServerUtils.getTopicName(processingLogConfig, ksqlConfig));
}

final CommandRunner commandRunner = new CommandRunner(
statementExecutor,
commandStore,
Expand All @@ -522,6 +523,14 @@ static KsqlRestApplication buildApplication(
serverState
);

final KsqlResource ksqlResource = new KsqlResource(
ksqlEngine,
commandStore,
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator
);

final List<KsqlServerPrecondition> preconditions = restConfig.getConfiguredInstances(
KsqlRestConfig.KSQL_SERVER_PRECONDITIONS,
KsqlServerPrecondition.class
Expand Down Expand Up @@ -623,27 +632,47 @@ private static void maybeCreateProcessingLogStream(
final KsqlEngine ksqlEngine,
final CommandQueue commandQueue
) {
if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE)
|| !commandQueue.isEmpty()) {
stevenpyzhang marked this conversation as resolved.
Show resolved Hide resolved
if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE)) {
return;
stevenpyzhang marked this conversation as resolved.
Show resolved Hide resolved
}

final PreparedStatement<?> statement = ProcessingLogServerUtils
.processingLogStreamCreateStatement(config, ksqlConfig);
final Supplier<ConfiguredStatement<?>> configured = () -> ConfiguredStatement.of(
statement, Collections.emptyMap(), ksqlConfig);

final Producer<CommandId, Command> transactionalProducer =
commandQueue.createTransactionalProducer();
try {
transactionalProducer.initTransactions();
transactionalProducer.beginTransaction();

if (!commandQueue.isEmpty()) {
return;
}

// We don't wait for the commandRunner in this case since it hasn't been started yet.

final PreparedStatement<?> statement = ProcessingLogServerUtils
.processingLogStreamCreateStatement(config, ksqlConfig);
final Supplier<ConfiguredStatement<?>> configured = () -> ConfiguredStatement.of(
statement, Collections.emptyMap(), ksqlConfig);

ksqlEngine.createSandbox(ksqlEngine.getServiceContext()).execute(
ksqlEngine.getServiceContext(),
configured.get()
);
} catch (final KsqlException e) {

commandQueue.enqueueCommand(configured.get(), transactionalProducer);
transactionalProducer.commitTransaction();
} catch (final ProducerFencedException
| OutOfOrderSequenceException
| AuthorizationException e
) {
// We can't recover from these exceptions, so our only option is close producer and exit.
// This catch doesn't abortTransaction() since doing that would throw another exception.
log.warn("Failed to create processing log stream", e);
return;
} catch (final Exception e) {
transactionalProducer.abortTransaction();
log.warn("Failed to create processing log stream", e);
} finally {
transactionalProducer.close();
}

commandQueue.enqueueCommand(configured.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Producer;

/**
* Represents a queue of {@link Command}s that must be distributed to all
Expand All @@ -34,13 +36,16 @@ public interface CommandQueue extends Closeable {
* it is guaranteed that the command has been persisted, without regard
* for the {@link io.confluent.ksql.rest.entity.CommandStatus CommandStatus}.
*
* @param statement The statement to be distributed
*
* @param statement The statement to be distributed
* @param transactionalProducer The transactional producer used to for enqueue the command
* @return an asynchronous tracker that can be used to determine the current
* state of the command
*/
QueuedCommandStatus enqueueCommand(ConfiguredStatement<?> statement);

QueuedCommandStatus enqueueCommand(
ConfiguredStatement<?> statement,
Producer<CommandId, Command> transactionalProducer
);

/**
* Polls the Queue for any commands that have been enqueued since the last
* invocation to this method.
Expand Down Expand Up @@ -74,6 +79,20 @@ public interface CommandQueue extends Closeable {
void ensureConsumedPast(long seqNum, Duration timeout)
throws InterruptedException, TimeoutException;

/**
* Creates a transactional producer for producing to the command topic.
*
* @return a TransactionalProducer
*/
Producer<CommandId, Command> createTransactionalProducer();

/**
* Blocks until the command topic consumer has processed all records up to
* the current offset when this method is called.
*
*/
void waitForCommandConsumer();

/**
* @return whether or not there are any enqueued commands
*/
Expand Down
Loading