From 8c6d36fbba5bfeb66b38e14cf2e061c03d28149e Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Thu, 4 Jul 2019 13:56:59 +0100 Subject: [PATCH] refactor: rename queued query metadata to transient query metadata (#3053) As this name better reflects what it is. --- .../ksql/physical/PhysicalPlanBuilder.java | 4 +-- ...adata.java => TransientQueryMetadata.java} | 8 +++--- .../io/confluent/ksql/KsqlContextTest.java | 4 +-- .../integration/EndToEndIntegrationTest.java | 26 +++++++++---------- .../physical/PhysicalPlanBuilderTest.java | 4 +-- .../streaming/QueryStreamWriter.java | 6 ++--- .../resources/streaming/StreamPublisher.java | 10 +++---- .../streaming/StreamedQueryResource.java | 8 +++--- .../rest/entity/QueryDescriptionTest.java | 4 +-- .../resources/StreamedQueryResourceTest.java | 8 +++--- .../streaming/QueryStreamWriterTest.java | 4 +-- 11 files changed, 43 insertions(+), 43 deletions(-) rename ksql-engine/src/main/java/io/confluent/ksql/util/{QueuedQueryMetadata.java => TransientQueryMetadata.java} (93%) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java index c39a68e4d06f..314bc13ce5aa 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java @@ -46,7 +46,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryIdGenerator; import io.confluent.ksql.util.QueryMetadata; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; @@ -203,7 +203,7 @@ private QueryMetadata buildPlanForBareQuery( final SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0); - return new QueuedQueryMetadata( + return new TransientQueryMetadata( statement, streams, bareOutputNode.getSchema(), diff --git a/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java b/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java similarity index 93% rename from ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java rename to ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java index 9b931781f0c0..084e9a46c2d5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/util/QueuedQueryMetadata.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/util/TransientQueryMetadata.java @@ -32,14 +32,14 @@ /** * Metadata of a transient query, e.g. {@code SELECT * FROM FOO;}. */ -public class QueuedQueryMetadata extends QueryMetadata { +public class TransientQueryMetadata extends QueryMetadata { private final BlockingQueue> rowQueue; private final AtomicBoolean isRunning = new AtomicBoolean(true); private final Consumer limitHandlerSetter; // CHECKSTYLE_RULES.OFF: ParameterNumberCheck - public QueuedQueryMetadata( + public TransientQueryMetadata( final String statementString, final KafkaStreams kafkaStreams, final LogicalSchema logicalSchema, @@ -81,11 +81,11 @@ public BlockingQueue> getRowQueue() { @Override public boolean equals(final Object o) { - if (!(o instanceof QueuedQueryMetadata)) { + if (!(o instanceof TransientQueryMetadata)) { return false; } - final QueuedQueryMetadata that = (QueuedQueryMetadata) o; + final TransientQueryMetadata that = (TransientQueryMetadata) o; return Objects.equals(this.rowQueue, that.rowQueue) && super.equals(o); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java index 3db8e799d1df..33aa6b29a928 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/KsqlContextTest.java @@ -44,7 +44,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -108,7 +108,7 @@ public class KsqlContextTest { @Mock private PersistentQueryMetadata persistentQuery; @Mock - private QueuedQueryMetadata transientQuery; + private TransientQueryMetadata transientQuery; @Mock private Injector schemaInjector; @Mock diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java index 0cbd6134c6b1..2fda0f29c467 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/EndToEndIntegrationTest.java @@ -37,7 +37,7 @@ import io.confluent.ksql.util.PageViewDataProvider; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.util.UserDataProvider; import java.util.ArrayList; import java.util.Arrays; @@ -155,7 +155,7 @@ public void after() { @Test public void shouldSelectAllFromUsers() throws Exception { - final QueuedQueryMetadata queryMetadata = executeQuery( + final TransientQueryMetadata queryMetadata = executeQuery( "SELECT * from %s;", USER_TABLE); final Set expectedUsers = ImmutableSet @@ -175,7 +175,7 @@ public void shouldSelectAllFromUsers() throws Exception { @Test public void shouldSelectFromPageViewsWithSpecificColumn() throws Exception { - final QueuedQueryMetadata queryMetadata = + final TransientQueryMetadata queryMetadata = executeQuery("SELECT pageid from %s;", PAGE_VIEW_STREAM); final List expectedPages = @@ -204,7 +204,7 @@ public void shouldSelectAllFromDerivedStream() throws Exception { USER_TABLE, PAGE_VIEW_STREAM, USER_TABLE, PAGE_VIEW_STREAM, USER_TABLE); - final QueuedQueryMetadata queryMetadata = executeQuery( + final TransientQueryMetadata queryMetadata = executeQuery( "SELECT * from pageviews_female;"); final List> results = new ArrayList<>(); @@ -266,7 +266,7 @@ public void shouldCreateStreamUsingLikeClause() throws Exception { + " WHERE pageId LIKE '%%_5';", PAGE_VIEW_STREAM); - final QueuedQueryMetadata queryMetadata = + final TransientQueryMetadata queryMetadata = executeQuery("SELECT userid, pageid from pageviews_like_p5;"); final List columns = waitForFirstRow(queryMetadata); @@ -284,7 +284,7 @@ public void shouldRetainSelectedColumnsInPartitionBy() throws Exception { + "partition by viewtime;", PAGE_VIEW_STREAM); - final QueuedQueryMetadata queryMetadata = executeQuery( + final TransientQueryMetadata queryMetadata = executeQuery( "SELECT * from pageviews_by_viewtime;"); final List columns = waitForFirstRow(queryMetadata); @@ -311,7 +311,7 @@ public void shouldSupportDroppingAndRecreatingJoinQuery() throws Exception { executeStatement(createStreamStatement); - final QueuedQueryMetadata queryMetadata = executeQuery( + final TransientQueryMetadata queryMetadata = executeQuery( "SELECT * from cart_event_product;"); final List columns = waitForFirstRow(queryMetadata); @@ -347,7 +347,7 @@ public void shouldCleanUpAvroSchemaOnDropSource() throws Exception { @Test public void shouldSupportConfigurableUdfs() throws Exception { // When: - final QueuedQueryMetadata queryMetadata = executeQuery( + final TransientQueryMetadata queryMetadata = executeQuery( "SELECT E2EConfigurableUdf(registertime) AS x from %s;", USER_TABLE); // Then: @@ -377,21 +377,21 @@ private QueryMetadata executeStatement(final String statement, return queries.isEmpty() ? null : queries.get(0); } - private QueuedQueryMetadata executeQuery(final String statement, + private TransientQueryMetadata executeQuery(final String statement, final String... args) { final QueryMetadata queryMetadata = executeStatement(statement, args); - assertThat(queryMetadata, instanceOf(QueuedQueryMetadata.class)); + assertThat(queryMetadata, instanceOf(TransientQueryMetadata.class)); toClose = queryMetadata; - return (QueuedQueryMetadata) queryMetadata; + return (TransientQueryMetadata) queryMetadata; } private static List waitForFirstRow( - final QueuedQueryMetadata queryMetadata) throws Exception { + final TransientQueryMetadata queryMetadata) throws Exception { return verifyAvailableRows(queryMetadata, 1).get(0).getColumns(); } private static List verifyAvailableRows( - final QueuedQueryMetadata queryMetadata, + final TransientQueryMetadata queryMetadata, final int expectedRows ) throws Exception { final BlockingQueue> rowQueue = queryMetadata.getRowQueue(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index 10edb01016e6..b509f5f9c51e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -70,7 +70,7 @@ import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryIdGenerator; import io.confluent.ksql.util.QueryMetadata; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -246,7 +246,7 @@ public void shouldHaveKStreamDataSource() { @Test public void shouldMakeBareQuery() { final QueryMetadata queryMetadata = buildPhysicalPlan(simpleSelectFilter); - assertThat(queryMetadata, instanceOf(QueuedQueryMetadata.class)); + assertThat(queryMetadata, instanceOf(TransientQueryMetadata.class)); } @Test diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java index f0ef3733f23d..aab9a7f8bb23 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java @@ -20,7 +20,7 @@ import io.confluent.ksql.GenericRow; import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; @@ -37,14 +37,14 @@ class QueryStreamWriter implements StreamingOutput { private static final Logger log = LoggerFactory.getLogger(QueryStreamWriter.class); - private final QueuedQueryMetadata queryMetadata; + private final TransientQueryMetadata queryMetadata; private final long disconnectCheckInterval; private final ObjectMapper objectMapper; private volatile Exception streamsException; private volatile boolean limitReached = false; QueryStreamWriter( - final QueuedQueryMetadata queryMetadata, + final TransientQueryMetadata queryMetadata, final long disconnectCheckInterval, final ObjectMapper objectMapper ) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamPublisher.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamPublisher.java index a072102076d9..aa9a557abd0c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamPublisher.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamPublisher.java @@ -24,7 +24,7 @@ import io.confluent.ksql.rest.server.resources.streaming.Flow.Subscriber; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -57,8 +57,8 @@ class StreamPublisher implements Flow.Publisher> { @SuppressWarnings("ConstantConditions") @Override public synchronized void subscribe(final Flow.Subscriber> subscriber) { - final QueuedQueryMetadata queryMetadata = - (QueuedQueryMetadata) ksqlEngine.execute(serviceContext, query) + final TransientQueryMetadata queryMetadata = + (TransientQueryMetadata) ksqlEngine.execute(serviceContext, query) .getQuery() .get(); @@ -72,12 +72,12 @@ public synchronized void subscribe(final Flow.Subscriber class StreamSubscription extends PollingSubscription> { - private final QueuedQueryMetadata queryMetadata; + private final TransientQueryMetadata queryMetadata; private boolean closed = false; StreamSubscription( final Subscriber> subscriber, - final QueuedQueryMetadata queryMetadata + final TransientQueryMetadata queryMetadata ) { super(exec, subscriber, queryMetadata.getLogicalSchema()); this.queryMetadata = queryMetadata; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 6d48c973b819..9acc7f87c31d 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -34,7 +34,7 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.QueryMetadata; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import java.time.Duration; import java.util.HashMap; @@ -167,15 +167,15 @@ private Response handleQuery( .getQuery() .get(); - if (!(query instanceof QueuedQueryMetadata)) { + if (!(query instanceof TransientQueryMetadata)) { throw new Exception(String.format( - "Unexpected metadata type: expected QueuedQueryMetadata, found %s instead", + "Unexpected metadata type: expected TransientQueryMetadata, found %s instead", query.getClass() )); } final QueryStreamWriter queryStreamWriter = new QueryStreamWriter( - (QueuedQueryMetadata) query, + (TransientQueryMetadata) query, disconnectCheckInterval.toMillis(), objectMapper); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java index 1e17124d48dd..e49222c5be3f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java @@ -35,7 +35,7 @@ import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.util.timestamp.MetadataTimestampExtractionPolicy; import java.util.Arrays; import java.util.Collections; @@ -94,7 +94,7 @@ public void setUp() { @Test public void shouldSetFieldsCorrectlyForQueryMetadata() { // Given: - final QueryMetadata queryMetadata = new QueuedQueryMetadata( + final QueryMetadata queryMetadata = new TransientQueryMetadata( "test statement", queryStreams, SCHEMA, diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java index b2e783da6828..ac9cc02d81c9 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java @@ -56,7 +56,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.QueryMetadata; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import java.io.EOFException; import java.io.IOException; @@ -278,8 +278,8 @@ public void shouldStreamRowsCorrectly() throws Throwable { reset(mockKsqlEngine); - final QueuedQueryMetadata queuedQueryMetadata = - new QueuedQueryMetadata( + final TransientQueryMetadata transientQueryMetadata = + new TransientQueryMetadata( queryString, mockKafkaStreams, SOME_SCHEMA, @@ -296,7 +296,7 @@ public void shouldStreamRowsCorrectly() throws Throwable { reset(mockOutputNode); expect(mockKsqlEngine.execute(serviceContext, ConfiguredStatement.of(statement, requestStreamsProperties, ksqlConfig))) - .andReturn(ExecuteResult.of(queuedQueryMetadata)); + .andReturn(ExecuteResult.of(transientQueryMetadata)); expect(mockKsqlEngine.isAcceptingStatements()).andReturn(true); replay(mockKsqlEngine, mockStatementParser, mockKafkaStreams, mockOutputNode); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriterTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriterTest.java index 74e80ec9aaf8..d32245758cf4 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriterTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriterTest.java @@ -34,7 +34,7 @@ import io.confluent.ksql.physical.LimitHandler; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.util.QueuedQueryMetadata; +import io.confluent.ksql.util.TransientQueryMetadata; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -72,7 +72,7 @@ public class QueryStreamWriterTest { @Mock(MockType.NICE) private KsqlEngine ksqlEngine; @Mock(MockType.NICE) - private QueuedQueryMetadata queryMetadata; + private TransientQueryMetadata queryMetadata; @Mock(MockType.NICE) private BlockingQueue> rowQueue; private Capture ehCapture;