Skip to content

Commit

Permalink
rename the transaction manager to TransactionalProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Oct 28, 2019
1 parent 0c95bb7 commit 8b6fef4
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public final class KsqlRestApplication extends Application<KsqlRestConfig> imple
private final List<KsqlServerPrecondition> preconditions;
private final List<KsqlConfigurable> configurables;
private final Consumer<KsqlConfig> rocksDBConfigSetterHandler;
private final ProducerTransactionManagerFactory producerTransactionManagerFactory;
private final TransactionalProducerFactory transactionalProducerFactory;

public static SourceName getCommandsStreamName() {
return COMMANDS_STREAM_NAME;
Expand Down Expand Up @@ -171,7 +171,7 @@ public static SourceName getCommandsStreamName() {
final List<KsqlServerPrecondition> preconditions,
final List<KsqlConfigurable> configurables,
final Consumer<KsqlConfig> rocksDBConfigSetterHandler,
final ProducerTransactionManagerFactory producerTransactionManagerFactory
final TransactionalProducerFactory transactionalProducerFactory
) {
super(config);

Expand All @@ -194,8 +194,8 @@ public static SourceName getCommandsStreamName() {
this.configurables = requireNonNull(configurables, "configurables");
this.rocksDBConfigSetterHandler =
requireNonNull(rocksDBConfigSetterHandler, "rocksDBConfigSetterHandler");
this.producerTransactionManagerFactory =
requireNonNull(producerTransactionManagerFactory, "producerTransactionManagerFactory");
this.transactionalProducerFactory =
requireNonNull(transactionalProducerFactory, "producerTransactionManagerFactory");
}

@Override
Expand Down Expand Up @@ -283,7 +283,7 @@ private void initialize() {
ksqlConfigNoPort,
ksqlEngine,
commandStore,
producerTransactionManagerFactory
transactionalProducerFactory
);

commandRunner.processPriorCommands();
Expand Down Expand Up @@ -522,8 +522,8 @@ static KsqlRestApplication buildApplication(
serverState
);

final ProducerTransactionManagerFactory producerTransactionManagerFactory =
new ProducerTransactionManagerFactory(
final TransactionalProducerFactory transactionalProducerFactory =
new TransactionalProducerFactory(
commandTopicName,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
commandRunner,
Expand All @@ -537,7 +537,7 @@ static KsqlRestApplication buildApplication(
Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)),
versionChecker::updateLastRequestTime,
authorizationValidator,
producerTransactionManagerFactory
transactionalProducerFactory
);

final List<KsqlServerPrecondition> preconditions = restConfig.getConfiguredInstances(
Expand Down Expand Up @@ -573,7 +573,7 @@ static KsqlRestApplication buildApplication(
preconditions,
configurables,
rocksDBConfigSetterHandler,
producerTransactionManagerFactory
transactionalProducerFactory
);
}

Expand Down Expand Up @@ -641,17 +641,17 @@ private static void maybeCreateProcessingLogStream(
final KsqlConfig ksqlConfig,
final KsqlEngine ksqlEngine,
final CommandQueue commandQueue,
final ProducerTransactionManagerFactory producerTransactionManagerFactory
final TransactionalProducerFactory transactionalProducerFactory
) {
if (!config.getBoolean(ProcessingLogConfig.STREAM_AUTO_CREATE)
|| !commandQueue.isEmpty()) {
return;
}

final ProducerTransactionManager producerTransactionManager =
producerTransactionManagerFactory.createProducerTransactionManager();
final TransactionalProducer transactionalProducer =
transactionalProducerFactory.createProducerTransactionManager();

producerTransactionManager.begin();
transactionalProducer.begin();

final PreparedStatement<?> statement = ProcessingLogServerUtils
.processingLogStreamCreateStatement(config, ksqlConfig);
Expand All @@ -668,8 +668,8 @@ private static void maybeCreateProcessingLogStream(
return;
}

commandQueue.enqueueCommand(configured.get(), producerTransactionManager);
producerTransactionManager.commit();
commandQueue.enqueueCommand(configured.get(), transactionalProducer);
transactionalProducer.commit();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* Used to handle transactional writes to the command topic
*/
public class ProducerTransactionManager {
public class TransactionalProducer {

private final TopicPartition commandTopicPartition;
private final String commandTopicName;
Expand All @@ -45,7 +45,7 @@ public class ProducerTransactionManager {
private final Producer<CommandId, Command> commandProducer;
private final CommandRunner commandRunner;

public ProducerTransactionManager(
public TransactionalProducer(
final String commandTopicName,
final CommandRunner commandRunner,
final Map<String, Object> kafkaConsumerProperties,
Expand All @@ -72,7 +72,7 @@ public ProducerTransactionManager(
}

@VisibleForTesting
ProducerTransactionManager(
TransactionalProducer(
final String commandTopicName,
final CommandRunner commandRunner,
final Consumer<CommandId, Command> commandConsumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import java.util.Objects;
import org.apache.kafka.clients.producer.ProducerConfig;

public class ProducerTransactionManagerFactory {
public class TransactionalProducerFactory {
private final String commandTopicName;
private final CommandRunner commandRunner;
private final Map<String, Object> kafkaConsumerProperties;
private final Map<String, Object> kafkaProducerProperties;

public ProducerTransactionManagerFactory(
public TransactionalProducerFactory(
final String commandTopicName,
final String transactionId,
final CommandRunner commandRunner,
Expand All @@ -46,8 +46,8 @@ public ProducerTransactionManagerFactory(
);
}

public ProducerTransactionManager createProducerTransactionManager() {
return new ProducerTransactionManager(
public TransactionalProducer createProducerTransactionManager() {
return new TransactionalProducer(
commandTopicName,
commandRunner,
kafkaConsumerProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package io.confluent.ksql.rest.server.computation;

import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.statement.ConfiguredStatement;
import java.io.Closeable;
import java.time.Duration;
Expand All @@ -36,13 +36,13 @@ public interface CommandQueue extends Closeable {
* for the {@link io.confluent.ksql.rest.entity.CommandStatus CommandStatus}.
*
* @param statement The statement to be distributed
* @param producerTransactionManager The transaction manager for enqueueing command
* @param transactionalProducer The transaction manager for enqueueing command
* @return an asynchronous tracker that can be used to determine the current
* state of the command
*/
QueuedCommandStatus enqueueCommand(
ConfiguredStatement<?> statement,
ProducerTransactionManager producerTransactionManager
TransactionalProducer transactionalProducer
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.google.common.collect.Maps;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.CommandTopic;
import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlException;
import java.io.Closeable;
Expand Down Expand Up @@ -100,7 +100,7 @@ public void close() {
@Override
public QueuedCommandStatus enqueueCommand(
final ConfiguredStatement<?> statement,
final ProducerTransactionManager producerTransactionManager
final TransactionalProducer transactionalProducer
) {
final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement());

Expand Down Expand Up @@ -129,7 +129,7 @@ public QueuedCommandStatus enqueueCommand(
);
try {
final RecordMetadata recordMetadata =
producerTransactionManager.send(commandId, command);
transactionalProducer.send(commandId, command);
return new QueuedCommandStatus(recordMetadata.offset(), statusFuture);
} catch (final Exception e) {
commandStatusMap.remove(commandId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.rest.server.execution.StatementExecutor;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.ServiceContext;
Expand All @@ -46,7 +46,7 @@ public class DistributingExecutor implements StatementExecutor<Statement> {
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final KsqlAuthorizationValidator authorizationValidator;

private ProducerTransactionManager producerTransactionManager;
private TransactionalProducer transactionalProducer;

public DistributingExecutor(
final CommandQueue commandQueue,
Expand Down Expand Up @@ -76,17 +76,17 @@ public Optional<KsqlEntity> execute(
checkAuthorization(injected, serviceContext, executionContext);

try {
if (producerTransactionManager == null) {
if (transactionalProducer == null) {
throw new RuntimeException("Transaction manager for distributing executor not set");
}

final QueuedCommandStatus queuedCommandStatus =
commandQueue.enqueueCommand(injected, producerTransactionManager);
commandQueue.enqueueCommand(injected, transactionalProducer);

final CommandStatus commandStatus = queuedCommandStatus
.tryWaitForFinalStatus(distributedCmdResponseTimeout);

producerTransactionManager = null;
transactionalProducer = null;
return Optional.of(new CommandStatusEntity(
injected.getStatementText(),
queuedCommandStatus.getCommandId(),
Expand All @@ -100,8 +100,8 @@ public Optional<KsqlEntity> execute(
}
}

public void setTransactionManager(final ProducerTransactionManager producerTransactionManager) {
this.producerTransactionManager = producerTransactionManager;
public void setTransactionManager(final TransactionalProducer transactionalProducer) {
this.transactionalProducer = transactionalProducer;
}

private void checkAuthorization(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.server.ProducerTransactionManager;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.rest.server.computation.DistributingExecutor;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
Expand Down Expand Up @@ -75,7 +75,7 @@ public KsqlEntityList execute(
final ServiceContext serviceContext,
final List<ParsedStatement> statements,
final Map<String, Object> propertyOverrides,
final ProducerTransactionManager producerTransactionManager
final TransactionalProducer transactionalProducer
) {
final Map<String, Object> scopedPropertyOverrides = new HashMap<>(propertyOverrides);
final KsqlEntityList entities = new KsqlEntityList();
Expand All @@ -86,7 +86,7 @@ public KsqlEntityList execute(
serviceContext,
prepared,
propertyOverrides,
producerTransactionManager
transactionalProducer
);
if (!result.isEmpty()) {
// This is to maintain backwards compatibility until we deprecate
Expand All @@ -102,7 +102,7 @@ public KsqlEntityList execute(
configured,
scopedPropertyOverrides,
entities,
producerTransactionManager
transactionalProducer
).ifPresent(entities::add);
}
}
Expand All @@ -115,7 +115,7 @@ private <T extends Statement> Optional<KsqlEntity> executeStatement(
final ConfiguredStatement<T> configured,
final Map<String, Object> mutableScopedProperties,
final KsqlEntityList entities,
final ProducerTransactionManager producerTransactionManager
final TransactionalProducer transactionalProducer
) {
final Class<? extends Statement> statementClass = configured.getStatement().getClass();
commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass);
Expand All @@ -124,7 +124,7 @@ private <T extends Statement> Optional<KsqlEntity> executeStatement(
customExecutors.getOrDefault(statementClass, distributor);

if (executor instanceof DistributingExecutor) {
((DistributingExecutor) executor).setTransactionManager(producerTransactionManager);
((DistributingExecutor) executor).setTransactionManager(transactionalProducer);
}

return executor.execute(
Expand All @@ -139,7 +139,7 @@ private KsqlEntityList executeRunScript(
final ServiceContext serviceContext,
final PreparedStatement<?> statement,
final Map<String, Object> propertyOverrides,
final ProducerTransactionManager producerTransactionManager
final TransactionalProducer transactionalProducer
) {
final String sql = (String) propertyOverrides
.get(KsqlConstants.LEGACY_RUN_SCRIPT_STATEMENTS_CONTENT);
Expand All @@ -153,7 +153,7 @@ private KsqlEntityList executeRunScript(
serviceContext,
ksqlEngine.parse(sql),
propertyOverrides,
producerTransactionManager
transactionalProducer
);
}
}
Loading

0 comments on commit 8b6fef4

Please sign in to comment.