Skip to content

Commit

Permalink
refactor: rename queued query metadata to transient query metadata (#…
Browse files Browse the repository at this point in the history
…3053)

As this name better reflects what it is.
  • Loading branch information
big-andy-coates authored Jul 4, 2019
1 parent c06e65e commit 8c6d36f
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryIdGenerator;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -203,7 +203,7 @@ private QueryMetadata buildPlanForBareQuery(

final SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);

return new QueuedQueryMetadata(
return new TransientQueryMetadata(
statement,
streams,
bareOutputNode.getSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
/**
* Metadata of a transient query, e.g. {@code SELECT * FROM FOO;}.
*/
public class QueuedQueryMetadata extends QueryMetadata {
public class TransientQueryMetadata extends QueryMetadata {

private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final Consumer<LimitHandler> limitHandlerSetter;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public QueuedQueryMetadata(
public TransientQueryMetadata(
final String statementString,
final KafkaStreams kafkaStreams,
final LogicalSchema logicalSchema,
Expand Down Expand Up @@ -81,11 +81,11 @@ public BlockingQueue<KeyValue<String, GenericRow>> getRowQueue() {

@Override
public boolean equals(final Object o) {
if (!(o instanceof QueuedQueryMetadata)) {
if (!(o instanceof TransientQueryMetadata)) {
return false;
}

final QueuedQueryMetadata that = (QueuedQueryMetadata) o;
final TransientQueryMetadata that = (TransientQueryMetadata) o;

return Objects.equals(this.rowQueue, that.rowQueue) && super.equals(o);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -108,7 +108,7 @@ public class KsqlContextTest {
@Mock
private PersistentQueryMetadata persistentQuery;
@Mock
private QueuedQueryMetadata transientQuery;
private TransientQueryMetadata transientQuery;
@Mock
private Injector schemaInjector;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.confluent.ksql.util.PageViewDataProvider;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.util.UserDataProvider;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -155,7 +155,7 @@ public void after() {

@Test
public void shouldSelectAllFromUsers() throws Exception {
final QueuedQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeQuery(
"SELECT * from %s;", USER_TABLE);

final Set<String> expectedUsers = ImmutableSet
Expand All @@ -175,7 +175,7 @@ public void shouldSelectAllFromUsers() throws Exception {

@Test
public void shouldSelectFromPageViewsWithSpecificColumn() throws Exception {
final QueuedQueryMetadata queryMetadata =
final TransientQueryMetadata queryMetadata =
executeQuery("SELECT pageid from %s;", PAGE_VIEW_STREAM);

final List<String> expectedPages =
Expand Down Expand Up @@ -204,7 +204,7 @@ public void shouldSelectAllFromDerivedStream() throws Exception {
USER_TABLE, PAGE_VIEW_STREAM, USER_TABLE, PAGE_VIEW_STREAM,
USER_TABLE);

final QueuedQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeQuery(
"SELECT * from pageviews_female;");

final List<KeyValue<String, GenericRow>> results = new ArrayList<>();
Expand Down Expand Up @@ -266,7 +266,7 @@ public void shouldCreateStreamUsingLikeClause() throws Exception {
+ " WHERE pageId LIKE '%%_5';",
PAGE_VIEW_STREAM);

final QueuedQueryMetadata queryMetadata =
final TransientQueryMetadata queryMetadata =
executeQuery("SELECT userid, pageid from pageviews_like_p5;");

final List<Object> columns = waitForFirstRow(queryMetadata);
Expand All @@ -284,7 +284,7 @@ public void shouldRetainSelectedColumnsInPartitionBy() throws Exception {
+ "partition by viewtime;",
PAGE_VIEW_STREAM);

final QueuedQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeQuery(
"SELECT * from pageviews_by_viewtime;");

final List<Object> columns = waitForFirstRow(queryMetadata);
Expand All @@ -311,7 +311,7 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception {

executeStatement(createStreamStatement);

final QueuedQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeQuery(
"SELECT * from cart_event_product;");

final List<Object> columns = waitForFirstRow(queryMetadata);
Expand Down Expand Up @@ -347,7 +347,7 @@ public void shouldCleanUpAvroSchemaOnDropSource() throws Exception {
@Test
public void shouldSupportConfigurableUdfs() throws Exception {
// When:
final QueuedQueryMetadata queryMetadata = executeQuery(
final TransientQueryMetadata queryMetadata = executeQuery(
"SELECT E2EConfigurableUdf(registertime) AS x from %s;", USER_TABLE);

// Then:
Expand Down Expand Up @@ -377,21 +377,21 @@ private QueryMetadata executeStatement(final String statement,
return queries.isEmpty() ? null : queries.get(0);
}

private QueuedQueryMetadata executeQuery(final String statement,
private TransientQueryMetadata executeQuery(final String statement,
final String... args) {
final QueryMetadata queryMetadata = executeStatement(statement, args);
assertThat(queryMetadata, instanceOf(QueuedQueryMetadata.class));
assertThat(queryMetadata, instanceOf(TransientQueryMetadata.class));
toClose = queryMetadata;
return (QueuedQueryMetadata) queryMetadata;
return (TransientQueryMetadata) queryMetadata;
}

private static List<Object> waitForFirstRow(
final QueuedQueryMetadata queryMetadata) throws Exception {
final TransientQueryMetadata queryMetadata) throws Exception {
return verifyAvailableRows(queryMetadata, 1).get(0).getColumns();
}

private static List<GenericRow> verifyAvailableRows(
final QueuedQueryMetadata queryMetadata,
final TransientQueryMetadata queryMetadata,
final int expectedRows
) throws Exception {
final BlockingQueue<KeyValue<String, GenericRow>> rowQueue = queryMetadata.getRowQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryIdGenerator;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -246,7 +246,7 @@ public void shouldHaveKStreamDataSource() {
@Test
public void shouldMakeBareQuery() {
final QueryMetadata queryMetadata = buildPhysicalPlan(simpleSelectFilter);
assertThat(queryMetadata, instanceOf(QueuedQueryMetadata.class));
assertThat(queryMetadata, instanceOf(TransientQueryMetadata.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -37,14 +37,14 @@ class QueryStreamWriter implements StreamingOutput {

private static final Logger log = LoggerFactory.getLogger(QueryStreamWriter.class);

private final QueuedQueryMetadata queryMetadata;
private final TransientQueryMetadata queryMetadata;
private final long disconnectCheckInterval;
private final ObjectMapper objectMapper;
private volatile Exception streamsException;
private volatile boolean limitReached = false;

QueryStreamWriter(
final QueuedQueryMetadata queryMetadata,
final TransientQueryMetadata queryMetadata,
final long disconnectCheckInterval,
final ObjectMapper objectMapper
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.confluent.ksql.rest.server.resources.streaming.Flow.Subscriber;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -57,8 +57,8 @@ class StreamPublisher implements Flow.Publisher<Collection<StreamedRow>> {
@SuppressWarnings("ConstantConditions")
@Override
public synchronized void subscribe(final Flow.Subscriber<Collection<StreamedRow>> subscriber) {
final QueuedQueryMetadata queryMetadata =
(QueuedQueryMetadata) ksqlEngine.execute(serviceContext, query)
final TransientQueryMetadata queryMetadata =
(TransientQueryMetadata) ksqlEngine.execute(serviceContext, query)
.getQuery()
.get();

Expand All @@ -72,12 +72,12 @@ public synchronized void subscribe(final Flow.Subscriber<Collection<StreamedRow>

class StreamSubscription extends PollingSubscription<Collection<StreamedRow>> {

private final QueuedQueryMetadata queryMetadata;
private final TransientQueryMetadata queryMetadata;
private boolean closed = false;

StreamSubscription(
final Subscriber<Collection<StreamedRow>> subscriber,
final QueuedQueryMetadata queryMetadata
final TransientQueryMetadata queryMetadata
) {
super(exec, subscriber, queryMetadata.getLogicalSchema());
this.queryMetadata = queryMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -167,15 +167,15 @@ private Response handleQuery(
.getQuery()
.get();

if (!(query instanceof QueuedQueryMetadata)) {
if (!(query instanceof TransientQueryMetadata)) {
throw new Exception(String.format(
"Unexpected metadata type: expected QueuedQueryMetadata, found %s instead",
"Unexpected metadata type: expected TransientQueryMetadata, found %s instead",
query.getClass()
));
}

final QueryStreamWriter queryStreamWriter = new QueryStreamWriter(
(QueuedQueryMetadata) query,
(TransientQueryMetadata) query,
disconnectCheckInterval.toMillis(),
objectMapper);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void setUp() {
@Test
public void shouldSetFieldsCorrectlyForQueryMetadata() {
// Given:
final QueryMetadata queryMetadata = new QueuedQueryMetadata(
final QueryMetadata queryMetadata = new TransientQueryMetadata(
"test statement",
queryStreams,
SCHEMA,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import io.confluent.ksql.version.metrics.ActivenessRegistrar;
import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -278,8 +278,8 @@ public void shouldStreamRowsCorrectly() throws Throwable {

reset(mockKsqlEngine);

final QueuedQueryMetadata queuedQueryMetadata =
new QueuedQueryMetadata(
final TransientQueryMetadata transientQueryMetadata =
new TransientQueryMetadata(
queryString,
mockKafkaStreams,
SOME_SCHEMA,
Expand All @@ -296,7 +296,7 @@ public void shouldStreamRowsCorrectly() throws Throwable {
reset(mockOutputNode);
expect(mockKsqlEngine.execute(serviceContext,
ConfiguredStatement.of(statement, requestStreamsProperties, ksqlConfig)))
.andReturn(ExecuteResult.of(queuedQueryMetadata));
.andReturn(ExecuteResult.of(transientQueryMetadata));

expect(mockKsqlEngine.isAcceptingStatements()).andReturn(true);
replay(mockKsqlEngine, mockStatementParser, mockKafkaStreams, mockOutputNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import io.confluent.ksql.physical.LimitHandler;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
Expand Down Expand Up @@ -72,7 +72,7 @@ public class QueryStreamWriterTest {
@Mock(MockType.NICE)
private KsqlEngine ksqlEngine;
@Mock(MockType.NICE)
private QueuedQueryMetadata queryMetadata;
private TransientQueryMetadata queryMetadata;
@Mock(MockType.NICE)
private BlockingQueue<KeyValue<String, GenericRow>> rowQueue;
private Capture<Thread.UncaughtExceptionHandler> ehCapture;
Expand Down

0 comments on commit 8c6d36f

Please sign in to comment.