Skip to content

Commit

Permalink
process statements one at a time in rest server
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang committed Nov 1, 2019
1 parent e6d4d70 commit 67467f5
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,12 @@ static KsqlRestApplication buildApplication(
final String commandTopicName = KsqlInternalTopicUtils.getTopicName(
ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX);

final Map<String, Object> consumerConfigs = restConfig.getCommandConsumerProperties();
consumerConfigs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final Map<String, Object> commandConsumerConfigs = restConfig.getCommandConsumerProperties();
commandConsumerConfigs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

final CommandStore commandStore = CommandStore.Factory.create(
commandTopicName,
consumerConfigs);
commandConsumerConfigs);

final StatementExecutor statementExecutor =
new StatementExecutor(serviceContext, ksqlEngine, hybridQueryIdGenerator);
Expand Down Expand Up @@ -530,7 +530,7 @@ static KsqlRestApplication buildApplication(
commandTopicName,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
commandRunner,
consumerConfigs,
commandConsumerConfigs,
restConfig.getCommandProducerProperties()
);

Expand Down Expand Up @@ -576,7 +576,7 @@ static KsqlRestApplication buildApplication(
preconditions,
configurables,
rocksDBConfigSetterHandler,
transactionalProducerFactory
transactionalProducerFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public TransactionalProducerImpl(
);
this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner");

initialize();
}

@VisibleForTesting
Expand All @@ -96,12 +98,15 @@ public TransactionalProducerImpl(
this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner");
}

private void initialize() {
commandConsumer.assign(Collections.singleton(commandTopicPartition));
commandProducer.initTransactions();
}

/** begins transaction */
public void begin() {
commandConsumer.assign(Collections.singleton(commandTopicPartition));
commandProducer.initTransactions();
commandProducer.beginTransaction();
waitForConsumer();
}

public void waitForConsumer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ QueuedCommandStatus enqueueCommand(
ConfiguredStatement<?> statement,
TransactionalProducer transactionalProducer
);

/**
* Documentation on this
*/
// TransactionalProducer createTransactionalProducer();

/**
* Polls the Queue for any commands that have been enqueued since the last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void processPriorCommands() {
void fetchAndRunCommands() {
final List<QueuedCommand> commands = commandStore.getNewCommands(NEW_CMDS_TIMEOUT);
if (commands.isEmpty()) {
completeSatisfiedSequenceNumberFutures();
completeOffsetProccessedFutures();
return;
}
final Optional<QueuedCommand> terminateCmd = findTerminateCommand(commands);
Expand All @@ -166,7 +166,7 @@ void fetchAndRunCommands() {

executeStatement(command);
}
completeSatisfiedSequenceNumberFutures();
completeOffsetProccessedFutures();
}

private void executeStatement(final QueuedCommand queuedCommand) {
Expand Down Expand Up @@ -234,7 +234,7 @@ public void ensureProcessedPastOffset(final long seqNum, final Duration timeout)
}
}

private void completeSatisfiedSequenceNumberFutures() {
private void completeOffsetProccessedFutures() {
offsetProcessedFutureStore.completeFuturesUpToAndIncludingSequenceNumber(
commandStore.getConsumerPosition() - 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@

import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.server.TransactionalProducer;
import io.confluent.ksql.rest.server.validation.RequestValidator;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.services.SandboxedServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.KsqlServerException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -44,24 +48,30 @@ public class DistributingExecutor {
private final Duration distributedCmdResponseTimeout;
private final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory;
private final KsqlAuthorizationValidator authorizationValidator;
private final RequestValidator requestValidator;

public DistributingExecutor(
final CommandQueue commandQueue,
final Duration distributedCmdResponseTimeout,
final BiFunction<KsqlExecutionContext, ServiceContext, Injector> injectorFactory,
final KsqlAuthorizationValidator authorizationValidator
final KsqlAuthorizationValidator authorizationValidator,
final RequestValidator requestValidator
) {
this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue");
this.distributedCmdResponseTimeout =
Objects.requireNonNull(distributedCmdResponseTimeout, "distributedCmdResponseTimeout");
this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory");
this.authorizationValidator =
Objects.requireNonNull(authorizationValidator, "authorizationValidator");
this.requestValidator =
Objects.requireNonNull(requestValidator, "requestValidator");
}

public Optional<KsqlEntity> execute(
final ConfiguredStatement<Statement> statement,
final KsqlParser.ParsedStatement parsedStatement,
final Map<String, Object> mutableScopedProperties,
final String sql,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext,
final TransactionalProducer transactionalProducer
Expand All @@ -73,9 +83,19 @@ public Optional<KsqlEntity> execute(
checkAuthorization(injected, serviceContext, executionContext);

try {
transactionalProducer.begin();

requestValidator.validate(
SandboxedServiceContext.create(serviceContext),
Collections.singletonList(parsedStatement),
mutableScopedProperties,
sql
);

final QueuedCommandStatus queuedCommandStatus =
commandQueue.enqueueCommand(injected, transactionalProducer);

transactionalProducer.commit();
final CommandStatus commandStatus = queuedCommandStatus
.tryWaitForFinalStatus(distributedCmdResponseTimeout);

Expand All @@ -86,6 +106,7 @@ public Optional<KsqlEntity> execute(
queuedCommandStatus.getCommandSequenceNumber()
));
} catch (final Exception e) {
transactionalProducer.abort();
throw new KsqlServerException(String.format(
"Could not write the statement '%s' into the command topic: " + e.getMessage(),
statement.getStatementText()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public KsqlEntityList execute(
final ServiceContext serviceContext,
final List<ParsedStatement> statements,
final Map<String, Object> propertyOverrides,
final String sql,
final TransactionalProducer transactionalProducer
) {
final Map<String, Object> scopedPropertyOverrides = new HashMap<>(propertyOverrides);
Expand All @@ -86,7 +87,7 @@ public KsqlEntityList execute(
serviceContext,
prepared,
propertyOverrides,
transactionalProducer
transactionalProducer
);
if (!result.isEmpty()) {
// This is to maintain backwards compatibility until we deprecate
Expand All @@ -100,7 +101,9 @@ public KsqlEntityList execute(
executeStatement(
serviceContext,
configured,
parsed,
scopedPropertyOverrides,
sql,
entities,
transactionalProducer
).ifPresent(entities::add);
Expand All @@ -113,20 +116,22 @@ public KsqlEntityList execute(
private <T extends Statement> Optional<KsqlEntity> executeStatement(
final ServiceContext serviceContext,
final ConfiguredStatement<T> configured,
final ParsedStatement parsed,
final Map<String, Object> mutableScopedProperties,
final String sql,
final KsqlEntityList entities,
final TransactionalProducer transactionalProducer
) {
final Class<? extends Statement> statementClass = configured.getStatement().getClass();

// currently this needs to be commented out since this check will fail since the commandRunner consumer
// can't poll the non-comitted records.
// commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass);
// this could probably be removed since we waitForConsumer()
// before moving on to the next statement
commandQueueSync.waitFor(new KsqlEntityList(entities), statementClass);

final StatementExecutor<T> executor = (StatementExecutor<T>)
customExecutors.getOrDefault(statementClass,
(stmt, props, ctx, svcCtx) ->
distributor.execute(stmt, props, ctx, svcCtx, transactionalProducer));
distributor.execute(stmt, parsed, props, sql, ctx, svcCtx, transactionalProducer));

return executor.execute(
configured,
Expand Down Expand Up @@ -154,6 +159,7 @@ private KsqlEntityList executeRunScript(
serviceContext,
ksqlEngine.parse(sql),
propertyOverrides,
sql,
transactionalProducer
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.ClusterTerminateRequest;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlRequest;
import io.confluent.ksql.rest.entity.Versions;
Expand Down Expand Up @@ -167,7 +165,8 @@ public void configure(final KsqlConfig config) {
commandQueue,
distributedCmdResponseTimeout,
injectorFactory,
authorizationValidator
authorizationValidator,
this.validator
),
ksqlEngine,
config,
Expand Down Expand Up @@ -196,6 +195,7 @@ public Response terminateCluster(
serviceContext,
TERMINATE_CLUSTER,
request.getStreamsProperties(),
request.toString(),
transactionalProducerFactory.createProducerTransactionManager()
)
).build();
Expand Down Expand Up @@ -240,36 +240,26 @@ public Response handleKsqlStatements(
private KsqlEntityList handleTransactionalProduce(
final ServiceContext serviceContext,
final KsqlRequest request
) {
) throws InterruptedException {
final TransactionalProducer transactionalProducer =
transactionalProducerFactory.createProducerTransactionManager();
transactionalProducer.begin();
final List<ParsedStatement> statements = ksqlEngine.parse(request.getKsql());
final KsqlEntityList entities;

try {
transactionalProducer.waitForConsumer();

validator.validate(
SandboxedServiceContext.create(serviceContext),
statements,
request.getStreamsProperties(),
request.getKsql()
);

entities = handler.execute(
serviceContext,
statements,
request.getStreamsProperties(),
transactionalProducer
);
validator.validate(
SandboxedServiceContext.create(serviceContext),
statements,
request.getStreamsProperties(),
request.getKsql()
);

transactionalProducer.commit();
} catch (Exception e) {
transactionalProducer.abort();
LOG.error("Aborted transactional produce for: " + request);
throw e;
}
final KsqlEntityList entities = handler.execute(
serviceContext,
statements,
request.getStreamsProperties(),
request.getKsql(),
transactionalProducer
);

transactionalProducer.close();
return entities;
}
Expand Down
Loading

0 comments on commit 67467f5

Please sign in to comment.