diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java index 0cb1066dafec..261f8c353074 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/SchemaUtilTest.java @@ -31,7 +31,6 @@ import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.SchemaBuilder; import org.junit.Assert; import org.junit.Test; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java index 18e50ad404cd..671ff00748e6 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/function/udf/math/AbsTest.java @@ -19,7 +19,6 @@ import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; -import io.confluent.ksql.function.udf.KudfTester; import java.math.BigDecimal; import org.junit.Before; import org.junit.Test; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java index 2be09621e46e..15c7d28f12cd 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java @@ -40,26 +40,26 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Streams; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers; -import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.Field; import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.KeySerde; import io.confluent.ksql.serde.WindowInfo; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.structured.SchemaKTable; import io.confluent.ksql.testutils.AnalysisTestUtil; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.LimitedProxyBuilder; import io.confluent.ksql.util.MetaStoreFixture; -import io.confluent.ksql.execution.context.QueryLoggerUtil; import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java index 1fbaf67bbdc8..bbd62a593e45 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/DataSourceNodeTest.java @@ -35,6 +35,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.plan.StreamSource; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.DataSource; @@ -43,7 +46,6 @@ import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.metastore.model.KsqlTopic; import io.confluent.ksql.model.WindowType; -import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; @@ -54,11 +56,9 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.serde.WindowInfo; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.structured.SchemaKTable; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.util.SchemaUtil; import io.confluent.ksql.util.timestamp.LongColumnTimestampExtractionPolicy; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/FilterNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/FilterNodeTest.java index 9cdb660d78b7..f6ab35b47af0 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/FilterNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/FilterNodeTest.java @@ -20,10 +20,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.confluent.ksql.logging.processing.ProcessingLogContext; -import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.execution.context.QueryContext.Stacker; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.structured.SchemaKStream; import org.junit.Before; import org.junit.Rule; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java index 3f763b2b9461..98e15a0d5e71 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java @@ -33,6 +33,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.metastore.MetaStore; @@ -41,7 +43,6 @@ import io.confluent.ksql.metastore.model.KeyField.LegacyField; import io.confluent.ksql.metastore.model.KsqlTopic; import io.confluent.ksql.parser.tree.WithinExpression; -import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.planner.plan.JoinNode.JoinType; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.Field; @@ -56,7 +57,6 @@ import io.confluent.ksql.serde.ValueFormat; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.structured.SchemaKTable; import io.confluent.ksql.testutils.AnalysisTestUtil; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java index 1ab751ddee93..987f6ac5fee3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java @@ -25,17 +25,17 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.MetaStore; -import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.Field; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.serde.KeySerde; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.testutils.AnalysisTestUtil; import io.confluent.ksql.util.KsqlConfig; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java index 00d85f8d60f9..9786c0012010 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNodeTest.java @@ -26,15 +26,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.metastore.model.KeyField; import io.confluent.ksql.metastore.model.KsqlTopic; -import io.confluent.ksql.execution.builder.KsqlQueryBuilder; -import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode.SinkFactory; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; @@ -42,11 +42,9 @@ import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.serde.ValueFormat; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.QueryIdGenerator; -import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.util.timestamp.LongColumnTimestampExtractionPolicy; import java.util.Optional; import java.util.OptionalInt; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java index 5d2c5bf790a9..cf0b5763b9f1 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/ProjectNodeTest.java @@ -26,18 +26,18 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import io.confluent.ksql.execution.builder.KsqlQueryBuilder; +import io.confluent.ksql.execution.context.QueryContext.Stacker; +import io.confluent.ksql.execution.expression.tree.BooleanLiteral; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.metastore.model.KeyField; -import io.confluent.ksql.execution.expression.tree.BooleanLiteral; -import io.confluent.ksql.execution.builder.KsqlQueryBuilder; import io.confluent.ksql.schema.ksql.Field; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; -import io.confluent.ksql.execution.context.QueryContext.Stacker; import io.confluent.ksql.structured.SchemaKStream; import io.confluent.ksql.util.KsqlException; -import io.confluent.ksql.execution.plan.SelectExpression; import java.util.Arrays; import java.util.Optional; import org.apache.kafka.connect.data.Schema; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java index 9d67bc8f901d..e294c5e971c2 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedStreamTest.java @@ -29,8 +29,8 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.function.KsqlAggregateFunction; import io.confluent.ksql.metastore.model.KeyField; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java index 11344d3d09be..531e87cfad6a 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java @@ -27,9 +27,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.expression.tree.DereferenceExpression; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.QualifiedName; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index 17f0b3acbb9f..3213fab36ee4 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -34,9 +34,15 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.expression.tree.DereferenceExpression; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.FunctionCall; +import io.confluent.ksql.execution.expression.tree.QualifiedName; +import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.MetaStore; @@ -47,11 +53,6 @@ import io.confluent.ksql.metastore.model.KsqlTopic; import io.confluent.ksql.metastore.model.MetaStoreMatchers.KeyFieldMatchers; import io.confluent.ksql.metastore.model.MetaStoreMatchers.OptionalMatchers; -import io.confluent.ksql.execution.expression.tree.DereferenceExpression; -import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.execution.expression.tree.FunctionCall; -import io.confluent.ksql.execution.expression.tree.QualifiedName; -import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; import io.confluent.ksql.planner.plan.FilterNode; import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.planner.plan.ProjectNode; @@ -75,7 +76,6 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.SchemaUtil; -import io.confluent.ksql.execution.plan.SelectExpression; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index a6c716a8d585..83e6746a7b47 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -35,9 +35,14 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import io.confluent.ksql.execution.context.QueryContext; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.expression.tree.DereferenceExpression; +import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.execution.expression.tree.QualifiedName; +import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; +import io.confluent.ksql.execution.plan.SelectExpression; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.metastore.MetaStore; @@ -46,10 +51,6 @@ import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.metastore.model.KsqlTopic; import io.confluent.ksql.metastore.model.MetaStoreMatchers.KeyFieldMatchers; -import io.confluent.ksql.execution.expression.tree.DereferenceExpression; -import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.execution.expression.tree.QualifiedName; -import io.confluent.ksql.execution.expression.tree.QualifiedNameReference; import io.confluent.ksql.planner.plan.FilterNode; import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.planner.plan.ProjectNode; @@ -72,7 +73,6 @@ import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; import io.confluent.ksql.util.SchemaUtil; -import io.confluent.ksql.execution.plan.SelectExpression; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/QueryLoggerUtilTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/QueryLoggerUtilTest.java index 7ab4ce3a957a..0a3a9b8b2ea7 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/QueryLoggerUtilTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/QueryLoggerUtilTest.java @@ -18,9 +18,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; +import io.confluent.ksql.execution.context.QueryContext; import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.query.QueryId; -import io.confluent.ksql.execution.context.QueryContext; import org.junit.Test; public class QueryLoggerUtilTest { diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/builder/KsqlQueryBuilderTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/builder/KsqlQueryBuilderTest.java index 45b1ee0b8998..8470a28ce28c 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/builder/KsqlQueryBuilderTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/builder/KsqlQueryBuilderTest.java @@ -25,6 +25,9 @@ import com.google.common.testing.NullPointerTester; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.context.QueryContext.Stacker; +import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.ProcessingLogContext; import io.confluent.ksql.model.WindowType; @@ -39,10 +42,7 @@ import io.confluent.ksql.serde.ValueSerdeFactory; import io.confluent.ksql.serde.WindowInfo; import io.confluent.ksql.services.ServiceContext; -import io.confluent.ksql.execution.context.QueryContext; -import io.confluent.ksql.execution.context.QueryContext.Stacker; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.execution.context.QueryLoggerUtil; import java.time.Duration; import java.util.Optional; import java.util.function.Supplier; diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSourceTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSourceTest.java index df664e641ef9..6075dd941cfe 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSourceTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/plan/StreamSourceTest.java @@ -56,7 +56,6 @@ import org.apache.kafka.streams.Topology.AutoOffsetReset; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KsqlConsumed; import org.apache.kafka.streams.kstream.ValueMapperWithKey; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 1d7d0f810764..8ea2eb5e6abe 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server; import static io.confluent.ksql.rest.server.KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG; +import static java.util.Objects.requireNonNull; import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper; import com.google.common.annotations.VisibleForTesting; @@ -44,6 +45,7 @@ import io.confluent.ksql.rest.server.computation.StatementExecutor; import io.confluent.ksql.rest.server.context.KsqlRestServiceContextBinder; import io.confluent.ksql.rest.server.filters.KsqlAuthorizationFilter; +import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.server.resources.KsqlExceptionMapper; import io.confluent.ksql.rest.server.resources.KsqlResource; import io.confluent.ksql.rest.server.resources.RootDocument; @@ -64,7 +66,6 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.ServiceContextFactory; import io.confluent.ksql.statement.ConfiguredStatement; -import io.confluent.ksql.statement.Injectors; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.RetryUtil; @@ -86,7 +87,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.concurrent.Executors; @@ -100,6 +100,7 @@ import javax.websocket.server.ServerEndpointConfig; import javax.websocket.server.ServerEndpointConfig.Configurator; import javax.ws.rs.core.Configurable; +import org.apache.kafka.streams.StreamsConfig; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.websocket.jsr356.server.ServerContainer; import org.glassfish.hk2.utilities.Binder; @@ -115,7 +116,7 @@ public final class KsqlRestApplication extends Application imple public static final String COMMANDS_STREAM_NAME = "KSQL_COMMANDS"; - private final KsqlConfig ksqlConfig; + private final KsqlConfig ksqlConfigNoPort; private final KsqlEngine ksqlEngine; private final CommandRunner commandRunner; private final CommandStore commandStore; @@ -131,6 +132,7 @@ public final class KsqlRestApplication extends Application imple private final ProcessingLogContext processingLogContext; private final List preconditions; private final KsqlConnect ksqlConnect; + private final List configurables; public static String getCommandsStreamName() { return COMMANDS_STREAM_NAME; @@ -155,39 +157,34 @@ public static String getCommandsStreamName() { final ServerState serverState, final ProcessingLogContext processingLogContext, final List preconditions, - final KsqlConnect ksqlConnect + final KsqlConnect ksqlConnect, + final List configurables ) { super(config); - this.serviceContext = Objects.requireNonNull(serviceContext, "serviceContext"); - this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); - this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); - this.commandRunner = Objects.requireNonNull(commandRunner, "commandRunner"); - this.rootDocument = Objects.requireNonNull(rootDocument, "rootDocument"); - this.statusResource = Objects.requireNonNull(statusResource, "statusResource"); - this.streamedQueryResource = - Objects.requireNonNull(streamedQueryResource, "streamedQueryResource"); - this.ksqlResource = Objects.requireNonNull(ksqlResource, "ksqlResource"); - this.commandStore = Objects.requireNonNull(commandStore, "commandStore"); - this.serverState = Objects.requireNonNull(serverState, "serverState"); - this.processingLogContext = Objects.requireNonNull( - processingLogContext, - "processingLogContext"); - this.preconditions = Objects.requireNonNull(preconditions, "preconditions"); - this.versionCheckerAgent = - Objects.requireNonNull(versionCheckerAgent, "versionCheckerAgent"); - this.serviceContextBinderFactory = Objects.requireNonNull( - serviceContextBinderFactory, "serviceContextBinderFactory"); - this.securityExtension = Objects.requireNonNull( - securityExtension, "securityExtension" - ); - this.ksqlConnect = Objects - .requireNonNull(ksqlConnect, "ksqlConnect"); + this.serviceContext = requireNonNull(serviceContext, "serviceContext"); + this.ksqlConfigNoPort = requireNonNull(ksqlConfig, "ksqlConfig"); + this.ksqlEngine = requireNonNull(ksqlEngine, "ksqlEngine"); + this.commandRunner = requireNonNull(commandRunner, "commandRunner"); + this.rootDocument = requireNonNull(rootDocument, "rootDocument"); + this.statusResource = requireNonNull(statusResource, "statusResource"); + this.streamedQueryResource = requireNonNull(streamedQueryResource, "streamedQueryResource"); + this.ksqlResource = requireNonNull(ksqlResource, "ksqlResource"); + this.commandStore = requireNonNull(commandStore, "commandStore"); + this.serverState = requireNonNull(serverState, "serverState"); + this.processingLogContext = requireNonNull(processingLogContext, "processingLogContext"); + this.preconditions = requireNonNull(preconditions, "preconditions"); + this.versionCheckerAgent = requireNonNull(versionCheckerAgent, "versionCheckerAgent"); + this.serviceContextBinderFactory = + requireNonNull(serviceContextBinderFactory, "serviceContextBinderFactory"); + this.securityExtension = requireNonNull(securityExtension, "securityExtension"); + this.ksqlConnect = requireNonNull(ksqlConnect, "ksqlConnect"); + this.configurables = requireNonNull(configurables, "configurables"); } @Override public void setupResources(final Configurable config, final KsqlRestConfig appConfig) { config.register(rootDocument); - config.register(new ServerInfoResource(serviceContext, ksqlConfig)); + config.register(new ServerInfoResource(serviceContext, ksqlConfigNoPort)); config.register(statusResource); config.register(ksqlResource); config.register(streamedQueryResource); @@ -198,6 +195,8 @@ public void setupResources(final Configurable config, final KsqlRestConfig ap @Override public void start() throws Exception { super.start(); + final KsqlConfig ksqlConfigWithPort = buildConfigWithPort(); + configurables.forEach(c -> c.configure(ksqlConfigWithPort)); startKsql(); commandRunner.start(); ksqlConnect.startAsync(); @@ -256,11 +255,11 @@ private void initialize() { ProcessingLogServerUtils.maybeCreateProcessingLogTopic( serviceContext.getTopicClient(), processingLogContext.getConfig(), - ksqlConfig + ksqlConfigNoPort ); maybeCreateProcessingLogStream( processingLogContext.getConfig(), - ksqlConfig, + ksqlConfigNoPort, ksqlEngine, commandStore ); @@ -341,7 +340,7 @@ public void configureBaseApplication( new JacksonMessageBodyProvider(JsonMapper.INSTANCE.mapper); config.register(jsonProvider); config.register(JsonParseExceptionMapper.class); - config.register(serviceContextBinderFactory.apply(ksqlConfig, securityExtension)); + config.register(serviceContextBinderFactory.apply(ksqlConfigNoPort, securityExtension)); // Don't want to buffer rows when streaming JSON in a request to the query resource config.property(ServerProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 0); @@ -369,7 +368,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { final StatementParser statementParser = new StatementParser(ksqlEngine); final KsqlAuthorizationValidator authorizationValidator = - KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext); + KsqlAuthorizationValidatorFactory.create(ksqlConfigNoPort, serviceContext); container.addEndpoint( ServerEndpointConfig.Builder @@ -382,7 +381,7 @@ protected void registerWebSocketEndpoints(final ServerContainer container) { @SuppressWarnings("unchecked") public T getEndpointInstance(final Class endpointClass) { return (T) new WSQueryEndpoint( - ksqlConfig, + buildConfigWithPort(), JsonMapper.INSTANCE.mapper, statementParser, ksqlEngine, @@ -452,18 +451,12 @@ static KsqlRestApplication buildApplication( final String commandTopic = KsqlInternalTopicUtils.getTopicName( ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX); - final StatementParser statementParser = new StatementParser(ksqlEngine); - final CommandStore commandStore = CommandStore.Factory.create( commandTopic, restConfig.getCommandConsumerProperties(), restConfig.getCommandProducerProperties()); - final StatementExecutor statementExecutor = new StatementExecutor( - ksqlConfig, - ksqlEngine, - statementParser - ); + final StatementExecutor statementExecutor = new StatementExecutor(ksqlEngine); final RootDocument rootDocument = new RootDocument(); @@ -479,9 +472,7 @@ static KsqlRestApplication buildApplication( KsqlAuthorizationValidatorFactory.create(ksqlConfig, serviceContext); final StreamedQueryResource streamedQueryResource = new StreamedQueryResource( - ksqlConfig, ksqlEngine, - statementParser, commandStore, Duration.ofMillis( restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)), @@ -491,13 +482,12 @@ static KsqlRestApplication buildApplication( ); final KsqlResource ksqlResource = new KsqlResource( - ksqlConfig, ksqlEngine, commandStore, Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), versionChecker::updateLastRequestTime, - Injectors.DEFAULT, - authorizationValidator); + authorizationValidator + ); final List managedTopics = new LinkedList<>(); managedTopics.add(commandTopic); @@ -508,7 +498,7 @@ static KsqlRestApplication buildApplication( statementExecutor, commandStore, maxStatementRetries, - new ClusterTerminator(ksqlConfig, ksqlEngine, serviceContext, managedTopics), + new ClusterTerminator(ksqlEngine, serviceContext, managedTopics), serverState ); @@ -526,6 +516,12 @@ static KsqlRestApplication buildApplication( ) ); + final List configurables = ImmutableList.of( + ksqlResource, + streamedQueryResource, + statementExecutor + ); + return new KsqlRestApplication( serviceContext, ksqlEngine, @@ -543,7 +539,8 @@ static KsqlRestApplication buildApplication( serverState, processingLogContext, preconditions, - ksqlConnect + ksqlConnect, + configurables ); } @@ -551,7 +548,11 @@ private void registerCommandTopic() { final String commandTopic = commandStore.getCommandTopicName(); - KsqlInternalTopicUtils.ensureTopic(commandTopic, ksqlConfig, serviceContext.getTopicClient()); + KsqlInternalTopicUtils.ensureTopic( + commandTopic, + ksqlConfigNoPort, + serviceContext.getTopicClient() + ); final String createCmd = "CREATE STREAM " + COMMANDS_STREAM_NAME + " (STATEMENT STRING)" @@ -559,7 +560,7 @@ private void registerCommandTopic() { final ParsedStatement parsed = ksqlEngine.parse(createCmd).get(0); final PreparedStatement prepared = ksqlEngine.prepare(parsed); - ksqlEngine.execute(ConfiguredStatement.of(prepared, ImmutableMap.of(), ksqlConfig)); + ksqlEngine.execute(ConfiguredStatement.of(prepared, ImmutableMap.of(), ksqlConfigNoPort)); } private static KsqlSecurityExtension loadSecurityExtension(final KsqlConfig ksqlConfig) { @@ -624,4 +625,24 @@ private static void maybeCreateProcessingLogStream( commandQueue.enqueueCommand(configured.get()); } + + /** + * Build a complete config with the KS IQ application.server set. + * + * @return true server config. + */ + private KsqlConfig buildConfigWithPort() { + final Map props = ksqlConfigNoPort.originals(); + + // Wire up KS IQ endpoint discovery to the FIRST listener: + final URL firstListener = getListeners().get(0); + props.put( + KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.APPLICATION_SERVER_CONFIG, + firstListener.toString() + ); + + log.info("Using first listener URL for intra-node communication: {}", firstListener); + + return new KsqlConfig(props); + } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java index f900472e91fb..8eb7e84e286e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/StatementExecutor.java @@ -15,6 +15,7 @@ package io.confluent.ksql.rest.server.computation; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.exception.ExceptionUtil; @@ -32,6 +33,7 @@ import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandId.Action; import io.confluent.ksql.rest.server.computation.CommandId.Type; +import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.util.QueryCapacityUtil; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; @@ -47,6 +49,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,35 +57,49 @@ * Handles the actual execution (or delegation to KSQL core) of all distributed statements, as well * as tracking their statuses as things move along. */ -public class StatementExecutor { +public class StatementExecutor implements KsqlConfigurable { private static final Logger log = LoggerFactory.getLogger(StatementExecutor.class); - private final KsqlConfig ksqlConfig; private final KsqlEngine ksqlEngine; private final StatementParser statementParser; private final Map statusStore; + private KsqlConfig ksqlConfig; private enum Mode { RESTORE, EXECUTE } - public StatementExecutor( - final KsqlConfig ksqlConfig, + public StatementExecutor(final KsqlEngine ksqlEngine) { + this( + ksqlEngine, + new StatementParser(ksqlEngine) + ); + } + + @VisibleForTesting + StatementExecutor( final KsqlEngine ksqlEngine, final StatementParser statementParser ) { - Objects.requireNonNull(ksqlConfig, "ksqlConfig cannot be null."); Objects.requireNonNull(ksqlEngine, "ksqlEngine cannot be null."); - this.ksqlConfig = ksqlConfig; this.ksqlEngine = ksqlEngine; this.statementParser = statementParser; this.statusStore = new ConcurrentHashMap<>(); } - protected KsqlEngine getKsqlEngine() { + @Override + public void configure(final KsqlConfig config) { + if (!config.getKsqlStreamConfigProps().containsKey(StreamsConfig.APPLICATION_SERVER_CONFIG)) { + throw new IllegalArgumentException("Need KS application server set"); + } + + ksqlConfig = config; + } + + KsqlEngine getKsqlEngine() { return ksqlEngine; } @@ -92,6 +109,8 @@ protected KsqlEngine getKsqlEngine() { * @param queuedCommand The command to be executed */ void handleStatement(final QueuedCommand queuedCommand) { + throwIfNotConfigured(); + handleStatementWithTerminatedQueries( queuedCommand.getCommand(), queuedCommand.getCommandId(), @@ -100,6 +119,8 @@ void handleStatement(final QueuedCommand queuedCommand) { } void handleRestore(final QueuedCommand queuedCommand) { + throwIfNotConfigured(); + handleStatementWithTerminatedQueries( queuedCommand.getCommand(), queuedCommand.getCommandId(), @@ -126,6 +147,12 @@ public Optional getStatus(final CommandId statementId) { return Optional.ofNullable(statusStore.get(statementId)); } + private void throwIfNotConfigured() { + if (ksqlConfig == null) { + throw new IllegalStateException("No initialized"); + } + } + private void putStatus(final CommandId commandId, final Optional commandStatusFuture, final CommandStatus status) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/Errors.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/Errors.java index 54055095ea19..0c8603ed43a1 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/Errors.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/Errors.java @@ -25,13 +25,14 @@ import io.confluent.ksql.rest.entity.KsqlEntityList; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlStatementErrorMessage; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; public final class Errors { private static final int HTTP_TO_ERROR_CODE_MULTIPLIER = 100; static final int ERROR_CODE_BAD_REQUEST = toErrorCode(BAD_REQUEST.getStatusCode()); - static final int ERROR_CODE_BAD_STATEMENT = toErrorCode(BAD_REQUEST.getStatusCode()) + 1; + public static final int ERROR_CODE_BAD_STATEMENT = toErrorCode(BAD_REQUEST.getStatusCode()) + 1; private static final int ERROR_CODE_QUERY_ENDPOINT = toErrorCode(BAD_REQUEST.getStatusCode()) + 2; public static final int ERROR_CODE_UNAUTHORIZED = toErrorCode(UNAUTHORIZED.getStatusCode()); @@ -65,6 +66,14 @@ public static int toErrorCode(final int statusCode) { return statusCode * HTTP_TO_ERROR_CODE_MULTIPLIER; } + public static Response notReady() { + return Response + .status(SERVICE_UNAVAILABLE) + .header(HttpHeaders.RETRY_AFTER, 10) + .entity(new KsqlErrorMessage(ERROR_CODE_SERVER_NOT_READY, "Server initializing")) + .build(); + } + public static Response accessDenied(final String msg) { return Response .status(FORBIDDEN) diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlConfigurable.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlConfigurable.java new file mode 100644 index 000000000000..2819bb004aa1 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlConfigurable.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server.resources; + +import io.confluent.ksql.util.KsqlConfig; + +public interface KsqlConfigurable { + + /** + * Called with the server config. + * + * @param config server config + */ + void configure(KsqlConfig config); +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index c2ac2a51b237..257356aeb80a 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -47,11 +47,11 @@ import io.confluent.ksql.services.SandboxedServiceContext; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.Injector; +import io.confluent.ksql.statement.Injectors; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.version.metrics.ActivenessRegistrar; - import java.time.Duration; import java.util.List; import java.util.Objects; @@ -65,12 +65,13 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.kafka.streams.StreamsConfig; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling @Path("/ksql") @Consumes({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON}) @Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON}) -public class KsqlResource { +public class KsqlResource implements KsqlConfigurable { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private static final List TERMINATE_CLUSTER = @@ -90,12 +91,30 @@ public class KsqlResource { private final CommandQueue commandQueue; private final Duration distributedCmdResponseTimeout; private final ActivenessRegistrar activenessRegistrar; - private final RequestValidator validator; - private final RequestHandler handler; + private final BiFunction injectorFactory; + private final KsqlAuthorizationValidator authorizationValidator; + private RequestValidator validator; + private RequestHandler handler; public KsqlResource( - final KsqlConfig ksqlConfig, + final KsqlEngine ksqlEngine, + final CommandQueue commandQueue, + final Duration distributedCmdResponseTimeout, + final ActivenessRegistrar activenessRegistrar, + final KsqlAuthorizationValidator authorizationValidator + ) { + this( + ksqlEngine, + commandQueue, + distributedCmdResponseTimeout, + activenessRegistrar, + Injectors.DEFAULT, + authorizationValidator + ); + } + + KsqlResource( final KsqlEngine ksqlEngine, final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, @@ -109,13 +128,24 @@ public KsqlResource( Objects.requireNonNull(distributedCmdResponseTimeout, "distributedCmdResponseTimeout"); this.activenessRegistrar = Objects.requireNonNull(activenessRegistrar, "activenessRegistrar"); + this.injectorFactory = Objects.requireNonNull(injectorFactory, "injectorFactory"); + this.authorizationValidator = Objects + .requireNonNull(authorizationValidator, "authorizationValidator"); + } + + @Override + public void configure(final KsqlConfig config) { + if (!config.getKsqlStreamConfigProps().containsKey(StreamsConfig.APPLICATION_SERVER_CONFIG)) { + throw new IllegalArgumentException("Need KS application server set"); + } this.validator = new RequestValidator( CustomValidators.VALIDATOR_MAP, injectorFactory, ksqlEngine::createSandbox, - ksqlConfig + config ); + this.handler = new RequestHandler( CustomExecutors.EXECUTOR_MAP, new DistributingExecutor( @@ -125,12 +155,13 @@ public KsqlResource( authorizationValidator ), ksqlEngine, - ksqlConfig, + config, new DefaultCommandQueueSync( commandQueue, KsqlResource::shouldSynchronize, - distributedCmdResponseTimeout) - ); + distributedCmdResponseTimeout + ) + ); } @POST @@ -139,6 +170,8 @@ public Response terminateCluster( @Context final ServiceContext serviceContext, final ClusterTerminateRequest request ) { + throwIfNotConfigured(); + ensureValidPatterns(request.getDeleteTopicList()); try { return Response.ok( @@ -155,6 +188,8 @@ public Response handleKsqlStatements( @Context final ServiceContext serviceContext, final KsqlRequest request ) { + throwIfNotConfigured(); + activenessRegistrar.updateLastRequestTime(); try { @@ -190,6 +225,12 @@ public Response handleKsqlStatements( } } + private void throwIfNotConfigured() { + if (validator == null || handler == null) { + throw new KsqlRestException(Errors.notReady()); + } + } + private static boolean shouldSynchronize(final Class statementClass) { return !SYNC_BLACKLIST.contains(statementClass) // we never need to synchronize distributed statements diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index a0487c1379e7..5c5d11c74472 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -16,6 +16,7 @@ package io.confluent.ksql.rest.server.resources.streaming; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; @@ -26,6 +27,7 @@ import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; import io.confluent.ksql.rest.server.resources.Errors; +import io.confluent.ksql.rest.server.resources.KsqlConfigurable; import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.rest.util.CommandStoreUtil; import io.confluent.ksql.rest.util.ErrorResponseUtil; @@ -49,17 +51,17 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Path("/query") @Produces({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON}) @Consumes({Versions.KSQL_V1_JSON, MediaType.APPLICATION_JSON}) -public class StreamedQueryResource { +public class StreamedQueryResource implements KsqlConfigurable { private static final Logger log = LoggerFactory.getLogger(StreamedQueryResource.class); - private final KsqlConfig ksqlConfig; private final KsqlEngine ksqlEngine; private final StatementParser statementParser; private final CommandQueue commandQueue; @@ -68,9 +70,29 @@ public class StreamedQueryResource { private final ObjectMapper objectMapper; private final ActivenessRegistrar activenessRegistrar; private final KsqlAuthorizationValidator authorizationValidator; + private KsqlConfig ksqlConfig; public StreamedQueryResource( - final KsqlConfig ksqlConfig, + final KsqlEngine ksqlEngine, + final CommandQueue commandQueue, + final Duration disconnectCheckInterval, + final Duration commandQueueCatchupTimeout, + final ActivenessRegistrar activenessRegistrar, + final KsqlAuthorizationValidator authorizationValidator + ) { + this( + ksqlEngine, + new StatementParser(ksqlEngine), + commandQueue, + disconnectCheckInterval, + commandQueueCatchupTimeout, + activenessRegistrar, + authorizationValidator + ); + } + + @VisibleForTesting + StreamedQueryResource( final KsqlEngine ksqlEngine, final StatementParser statementParser, final CommandQueue commandQueue, @@ -79,7 +101,6 @@ public StreamedQueryResource( final ActivenessRegistrar activenessRegistrar, final KsqlAuthorizationValidator authorizationValidator ) { - this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig"); this.ksqlEngine = Objects.requireNonNull(ksqlEngine, "ksqlEngine"); this.statementParser = Objects.requireNonNull(statementParser, "statementParser"); this.commandQueue = Objects.requireNonNull(commandQueue, "commandQueue"); @@ -93,11 +114,22 @@ public StreamedQueryResource( this.authorizationValidator = authorizationValidator; } + @Override + public void configure(final KsqlConfig config) { + if (!config.getKsqlStreamConfigProps().containsKey(StreamsConfig.APPLICATION_SERVER_CONFIG)) { + throw new IllegalArgumentException("Need KS application server set"); + } + + ksqlConfig = config; + } + @POST public Response streamQuery( @Context final ServiceContext serviceContext, final KsqlRequest request - ) throws Exception { + ) { + throwIfNotConfigured(); + activenessRegistrar.updateLastRequestTime(); final PreparedStatement statement = parseStatement(request); @@ -108,6 +140,12 @@ public Response streamQuery( return handleStatement(serviceContext, request, statement); } + private void throwIfNotConfigured() { + if (ksqlConfig == null) { + throw new KsqlRestException(Errors.notReady()); + } + } + private PreparedStatement parseStatement(final KsqlRequest request) { final String ksql = request.getKsql(); if (ksql.trim().isEmpty()) { @@ -126,7 +164,7 @@ private Response handleStatement( final ServiceContext serviceContext, final KsqlRequest request, final PreparedStatement statement - ) throws Exception { + ) { try { authorizationValidator.checkAuthorization( serviceContext, @@ -164,7 +202,7 @@ private Response handleQuery( final ServiceContext serviceContext, final PreparedStatement statement, final Map streamsProperties - ) throws Exception { + ) { final ConfiguredStatement configured = ConfiguredStatement.of(statement, streamsProperties, ksqlConfig); @@ -173,7 +211,7 @@ private Response handleQuery( .get(); if (!(query instanceof TransientQueryMetadata)) { - throw new Exception(String.format( + throw new IllegalStateException(String.format( "Unexpected metadata type: expected TransientQueryMetadata, found %s instead", query.getClass() )); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ClusterTerminator.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ClusterTerminator.java index de34bbdf0958..44b0a6fc313e 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ClusterTerminator.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ClusterTerminator.java @@ -24,7 +24,6 @@ import io.confluent.ksql.serde.Format; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.ExecutorUtil; -import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.QueryMetadata; @@ -49,12 +48,10 @@ public class ClusterTerminator { private final List managedTopics; public ClusterTerminator( - final KsqlConfig ksqlConfig, final KsqlEngine ksqlEngine, final ServiceContext serviceContext, final List managedTopics ) { - Objects.requireNonNull(ksqlConfig, "ksqlConfig is null."); Objects.requireNonNull(ksqlEngine, "ksqlEngine is null."); this.ksqlEngine = ksqlEngine; this.serviceContext = Objects.requireNonNull(serviceContext); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java index 74fd0a14ee53..bba56b21d49f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/QueryDescriptionTest.java @@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableSet; import io.confluent.ksql.metastore.model.KsqlTopic; import io.confluent.ksql.physical.LimitHandler; -import io.confluent.ksql.util.QuerySchemas; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; @@ -33,6 +32,7 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.QuerySchemas; import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Collections; import java.util.LinkedHashMap; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java index 474444f427c2..4b2ac43b65aa 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/KsqlRestApplicationTest.java @@ -177,7 +177,9 @@ public void setUp() { serverState, processingLogContext, ImmutableList.of(precondition1, precondition2), - ksqlConnect); + ksqlConnect, + ImmutableList.of(ksqlResource, streamedQueryResource) + ); } @Test diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java index 0a7926949293..b4037caa4f6c 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/RecoveryTest.java @@ -35,7 +35,6 @@ import io.confluent.ksql.metastore.model.KsqlTopic; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.rest.entity.KsqlRequest; -import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandId.Action; import io.confluent.ksql.rest.server.computation.CommandId.Type; import io.confluent.ksql.rest.server.resources.KsqlResource; @@ -47,7 +46,6 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.services.TestServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; -import io.confluent.ksql.statement.Injectors; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; @@ -63,6 +61,7 @@ import java.util.Set; import java.util.stream.Collectors; import javax.ws.rs.core.Response; +import org.apache.kafka.streams.StreamsConfig; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -72,7 +71,11 @@ public class RecoveryTest { - private final KsqlConfig ksqlConfig = KsqlConfigTestUtil.create("0.0.0.0"); + private final KsqlConfig ksqlConfig = KsqlConfigTestUtil.create( + "0.0.0.0", + ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, "http://localhost:23") + ); + private final List commands = new LinkedList<>(); private final FakeKafkaTopicClient topicClient = new FakeKafkaTopicClient(); private final ServiceContext serviceContext = TestServiceContext.create(topicClient); @@ -166,19 +169,16 @@ private class KsqlServer { serverState = new ServerState(); serverState.setReady(); this.ksqlResource = new KsqlResource( - ksqlConfig, ksqlEngine, fakeCommandQueue, Duration.ofMillis(0), ()->{}, - Injectors.DEFAULT, (sc, metastore, statement) -> { - return; - }); - this.statementExecutor = new StatementExecutor( - ksqlConfig, - ksqlEngine, - new StatementParser(ksqlEngine)); + } + ); + + this.statementExecutor = new StatementExecutor(ksqlEngine); + this.commandRunner = new CommandRunner( statementExecutor, fakeCommandQueue, @@ -186,6 +186,9 @@ private class KsqlServer { mock(ClusterTerminator.class), serverState ); + + this.statementExecutor.configure(ksqlConfig); + this.ksqlResource.configure(ksqlConfig); } void recover() { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java index 2abee6d89ff8..1e82b0fb2790 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/StatementExecutorTest.java @@ -31,11 +31,13 @@ import static org.hamcrest.Matchers.not; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.confluent.ksql.KsqlConfigTestUtil; import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.KsqlEngineTestUtil; +import io.confluent.ksql.execution.expression.tree.QualifiedName; import io.confluent.ksql.function.InternalFunctionRegistry; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.metastore.MetaStore; @@ -47,7 +49,6 @@ import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties; import io.confluent.ksql.parser.tree.CreateStreamAsSelect; import io.confluent.ksql.parser.tree.DropStream; -import io.confluent.ksql.execution.expression.tree.QualifiedName; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.RunScript; import io.confluent.ksql.parser.tree.Statement; @@ -73,15 +74,19 @@ import java.util.OptionalInt; import java.util.concurrent.TimeUnit; import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.streams.StreamsConfig; import org.easymock.EasyMockSupport; import org.easymock.IArgumentMatcher; +import org.easymock.Mock; import org.hamcrest.CoreMatchers; import org.hamcrest.integration.EasyMock2Adapter; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.RuleChain; public class StatementExecutorTest extends EasyMockSupport { @@ -100,9 +105,19 @@ public class StatementExecutorTest extends EasyMockSupport { private StatementExecutor statementExecutorWithMocks; private ServiceContext serviceContext; + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + private QueuedCommand queuedCommand; + @Before public void setUp() { - ksqlConfig = KsqlConfigTestUtil.create(CLUSTER); + ksqlConfig = KsqlConfigTestUtil.create( + CLUSTER, + ImmutableMap.of(StreamsConfig.APPLICATION_SERVER_CONFIG, "host:1234") + ); + final FakeKafkaTopicClient fakeKafkaTopicClient = new FakeKafkaTopicClient(); fakeKafkaTopicClient.createTopic("pageview_topic", 1, (short) 1, emptyMap()); fakeKafkaTopicClient.createTopic("foo", 1, (short) 1, emptyMap()); @@ -115,9 +130,11 @@ public void setUp() { final StatementParser statementParser = new StatementParser(ksqlEngine); - statementExecutor = new StatementExecutor(ksqlConfig, ksqlEngine, statementParser); - statementExecutorWithMocks - = new StatementExecutor(ksqlConfig, mockEngine, mockParser); + statementExecutor = new StatementExecutor(ksqlEngine, statementParser); + statementExecutorWithMocks = new StatementExecutor(mockEngine, mockParser); + + statementExecutor.configure(ksqlConfig); + statementExecutorWithMocks.configure(ksqlConfig); } @After @@ -133,32 +150,40 @@ public void tearDown() { .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) .around(CLUSTER); - private void handleStatement( - final Command command, - final CommandId commandId, - final Optional commandStatus) { - handleStatement(statementExecutor, command, commandId, commandStatus); + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnConfigureIfAppServerNotSet() { + // Given: + final KsqlConfig configNoAppServer = new KsqlConfig(ImmutableMap.of()); + + // When: + statementExecutorWithMocks.configure(configNoAppServer); } - private static void handleStatement( - final StatementExecutor statementExecutor, - final Command command, - final CommandId commandId, - final Optional commandStatus) { - statementExecutor.handleStatement(new QueuedCommand(commandId, command, commandStatus)); + @Test(expected = IllegalStateException.class) + public void shouldThrowOnHandleStatementIfNotConfigured() { + // Given: + statementExecutor = new StatementExecutor(mockEngine, mockParser); + + // When: + statementExecutor.handleStatement(queuedCommand); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowOnHandleRestoreIfNotConfigured() { + // Given: + statementExecutor = new StatementExecutor(mockEngine, mockParser); + + // When: + statementExecutor.handleRestore(queuedCommand); } @Test public void shouldThrowOnUnexpectedException() { // Given: final String statementText = "mama said knock you out"; - final StatementParser statementParser = mock(StatementParser.class); - final KsqlEngine mockEngine = mock(KsqlEngine.class); - final KsqlConfig ksqlConfig = new KsqlConfig(emptyMap()); - final StatementExecutor statementExecutor = new StatementExecutor( - ksqlConfig, mockEngine, statementParser); + final RuntimeException exception = new RuntimeException("i'm gonna knock you out"); - expect(statementParser.parseSingleStatement(statementText)).andThrow( + expect(mockParser.parseSingleStatement(statementText)).andThrow( exception); final Command command = new Command( statementText, @@ -166,11 +191,11 @@ public void shouldThrowOnUnexpectedException() { emptyMap()); final CommandId commandId = new CommandId( CommandId.Type.STREAM, "_CSASGen", CommandId.Action.CREATE); - replay(statementParser); + replay(mockParser); // When: try { - handleStatement(statementExecutor, command, commandId, Optional.empty()); + handleStatement(statementExecutorWithMocks, command, commandId, Optional.empty()); Assert.fail("handleStatement should throw"); } catch (final RuntimeException caughtException) { // Then: @@ -201,13 +226,9 @@ public void shouldBuildQueriesWithPersistedConfig() { expect(mockQueryMetadata.getQueryId()).andStubReturn(mock(QueryId.class)); - final KsqlConfig ksqlConfig = new KsqlConfig(emptyMap()); final KsqlConfig expectedConfig = ksqlConfig.overrideBreakingConfigsWithOriginalValues( originalConfig.getAllConfigPropsWithSecretsObfuscated()); - final StatementExecutor statementExecutor = new StatementExecutor( - ksqlConfig, mockEngine, mockParser); - final Command csasCommand = new Command( statementText, emptyMap(), @@ -229,7 +250,7 @@ public void shouldBuildQueriesWithPersistedConfig() { replay(mockParser, mockEngine, mockMetaStore, mockQueryMetadata); - handleStatement(statementExecutor, csasCommand, csasCommandId, Optional.empty()); + handleStatement(statementExecutorWithMocks, csasCommand, csasCommandId, Optional.empty()); verify(mockParser, mockEngine, mockMetaStore, mockQueryMetadata); } @@ -788,8 +809,21 @@ private void tryDropThatViolatesReferentialIntegrity() { assertThat( dropTableCommandStatus1.get().getMessage(), containsString("You need to terminate them before dropping TABLE1.")); + } + private void handleStatement( + final Command command, + final CommandId commandId, + final Optional commandStatus) { + handleStatement(statementExecutor, command, commandId, commandStatus); + } + private static void handleStatement( + final StatementExecutor statementExecutor, + final Command command, + final CommandId commandId, + final Optional commandStatus) { + statementExecutor.handleStatement(new QueuedCommand(commandId, command, commandStatus)); } private void terminateQueries() { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java index a908b055c6da..dc772f169cfe 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/KsqlResourceTest.java @@ -66,7 +66,6 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.engine.KsqlEngineTestUtil; -import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.execution.expression.tree.QualifiedName; import io.confluent.ksql.execution.expression.tree.StringLiteral; @@ -119,6 +118,7 @@ import io.confluent.ksql.rest.util.TerminateCluster; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; +import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.serde.Format; import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KeyFormat; @@ -165,6 +165,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.eclipse.jetty.http.HttpStatus.Code; import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -330,6 +331,71 @@ public void tearDown() { serviceContext.close(); } + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnConfigureIfAppServerNotSet() { + // Given: + final KsqlConfig configNoAppServer = new KsqlConfig(ImmutableMap.of()); + + // When: + ksqlResource.configure(configNoAppServer); + } + + @Test + public void shouldThrowOnHandleStatementIfNotConfigured() { + // Given: + ksqlResource = new KsqlResource( + ksqlEngine, + commandStore, + DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT, + activenessRegistrar, + (ec, sc) -> InjectorChain.of( + schemaInjectorFactory.apply(sc), + topicInjectorFactory.apply(ec), + new TopicDeleteInjector(ec, sc)), + authorizationValidator + ); + + // Then: + expectedException.expect(KsqlRestException.class); + expectedException.expect(exceptionStatusCode(CoreMatchers.is(Code.SERVICE_UNAVAILABLE))); + expectedException + .expect(exceptionErrorMessage(errorMessage(Matchers.is("Server initializing")))); + + // When: + ksqlResource.handleKsqlStatements( + serviceContext, + new KsqlRequest("query", Collections.emptyMap(), null) + ); + } + + @Test + public void shouldThrowOnHandleTerminateIfNotConfigured() { + // Given: + ksqlResource = new KsqlResource( + ksqlEngine, + commandStore, + DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT, + activenessRegistrar, + (ec, sc) -> InjectorChain.of( + schemaInjectorFactory.apply(sc), + topicInjectorFactory.apply(ec), + new TopicDeleteInjector(ec, sc)), + authorizationValidator + ); + + // Then: + expectedException.expect(KsqlRestException.class); + expectedException.expect(exceptionStatusCode(CoreMatchers.is(Code.SERVICE_UNAVAILABLE))); + expectedException + .expect(exceptionErrorMessage(errorMessage(Matchers.is("Server initializing")))); + + // When: + ksqlResource.terminateCluster( + serviceContext, + new ClusterTerminateRequest(ImmutableList.of("")) + ); + } + @Test public void shouldShowNoQueries() { // When: @@ -1914,7 +1980,6 @@ private static void validateQueryDescription( private void setUpKsqlResource() { ksqlResource = new KsqlResource( - ksqlConfig, ksqlEngine, commandStore, DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT, @@ -1925,6 +1990,8 @@ private void setUpKsqlResource() { new TopicDeleteInjector(ec, sc)), authorizationValidator ); + + ksqlResource.configure(ksqlConfig); } private void givenKsqlConfigWith(final Map additionalConfig) { @@ -1999,6 +2066,7 @@ private static Properties getDefaultKsqlConfig() { configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configMap.put("ksql.command.topic.suffix", "commands"); configMap.put(RestConfig.LISTENERS_CONFIG, "http://localhost:8088"); + configMap.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "http://localhost:9099"); final Properties properties = new Properties(); properties.putAll(configMap); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java similarity index 91% rename from ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java rename to ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 1a2854a67528..2949c36f0258 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Confluent Inc. + * Copyright 2019 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -13,7 +13,7 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.rest.server.resources; +package io.confluent.ksql.rest.server.resources.streaming; import static io.confluent.ksql.metastore.model.DataSource.DataSourceType; import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorCode; @@ -36,10 +36,10 @@ import static org.junit.Assert.assertEquals; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.GenericRow; import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; -import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; @@ -53,8 +53,10 @@ import io.confluent.ksql.rest.entity.StreamedRow; import io.confluent.ksql.rest.server.StatementParser; import io.confluent.ksql.rest.server.computation.CommandQueue; -import io.confluent.ksql.rest.server.resources.streaming.StreamedQueryResource; +import io.confluent.ksql.rest.server.resources.Errors; +import io.confluent.ksql.rest.server.resources.KsqlRestException; import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; @@ -82,12 +84,14 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; import org.eclipse.jetty.http.HttpStatus.Code; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -103,11 +107,14 @@ public class StreamedQueryResourceTest { .field("f1", SchemaBuilder.OPTIONAL_INT32_SCHEMA) .build()); + private static final KsqlConfig VALID_CONFIG = new KsqlConfig(ImmutableMap.of( + StreamsConfig.APPLICATION_SERVER_CONFIG, "something:1" + )); + @Rule public final ExpectedException expectedException = ExpectedException.none(); - @Mock(MockType.NICE) - private KsqlConfig ksqlConfig; + @Mock(MockType.NICE) private KsqlEngine mockKsqlEngine; @Mock(MockType.NICE) @@ -141,7 +148,6 @@ public void setup() { replay(mockKsqlEngine, mockStatementParser); testResource = new StreamedQueryResource( - ksqlConfig, mockKsqlEngine, mockStatementParser, commandQueue, @@ -150,10 +156,47 @@ public void setup() { activenessRegistrar, authorizationValidator ); + + testResource.configure(VALID_CONFIG); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowOnConfigureIfAppServerNotSet() { + // Given: + final KsqlConfig configNoAppServer = new KsqlConfig(ImmutableMap.of()); + + // When: + testResource.configure(configNoAppServer); + } + + @Test + public void shouldThrowOnHandleStatementIfNotConfigured() { + // Given: + testResource = new StreamedQueryResource( + mockKsqlEngine, + mockStatementParser, + commandQueue, + DISCONNECT_CHECK_INTERVAL, + COMMAND_QUEUE_CATCHUP_TIMOEUT, + activenessRegistrar, + authorizationValidator + ); + + // Then: + expectedException.expect(KsqlRestException.class); + expectedException.expect(exceptionStatusCode(is(Code.SERVICE_UNAVAILABLE))); + expectedException + .expect(exceptionErrorMessage(errorMessage(Matchers.is("Server initializing")))); + + // When: + testResource.streamQuery( + serviceContext, + new KsqlRequest("query", Collections.emptyMap(), null) + ); } @Test - public void shouldReturn400OnBadStatement() throws Exception { + public void shouldReturn400OnBadStatement() { // Given: reset(mockStatementParser); expect(mockStatementParser.parseSingleStatement(anyString())) @@ -176,7 +219,7 @@ public void shouldReturn400OnBadStatement() throws Exception { } @Test - public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { + public void shouldNotWaitIfCommandSequenceNumberSpecified() { // Given: replay(commandQueue); @@ -303,7 +346,7 @@ public void shouldStreamRowsCorrectly() throws Throwable { queryCloseCallback); reset(mockOutputNode); expect(mockKsqlEngine.execute(serviceContext, - ConfiguredStatement.of(statement, requestStreamsProperties, ksqlConfig))) + ConfiguredStatement.of(statement, requestStreamsProperties, VALID_CONFIG))) .andReturn(ExecuteResult.of(transientQueryMetadata)); replay(mockKsqlEngine, mockStatementParser, mockKafkaStreams, mockOutputNode); @@ -426,7 +469,7 @@ public void write(final int b) throws IOException { } @Test - public void shouldUpdateTheLastRequestTime() throws Exception { + public void shouldUpdateTheLastRequestTime() { // Given: activenessRegistrar.updateLastRequestTime(); EasyMock.expectLastCall(); @@ -444,7 +487,7 @@ public void shouldUpdateTheLastRequestTime() throws Exception { } @Test - public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() throws Exception { + public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() { // Given: reset(mockStatementParser, authorizationValidator); @@ -473,7 +516,7 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() } @Test - public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() throws Exception { + public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { // Given: reset(mockStatementParser, authorizationValidator); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java index fcfefbfd7cf5..b202a7d63a6f 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpointTest.java @@ -35,7 +35,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.engine.KsqlEngine; -import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.json.JsonMapper; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.tree.Query; @@ -52,6 +51,7 @@ import io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint.UserServiceContextFactory; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.security.KsqlAuthorizationProvider; +import io.confluent.ksql.security.KsqlAuthorizationValidator; import io.confluent.ksql.security.KsqlSecurityExtension; import io.confluent.ksql.security.KsqlUserContextProvider; import io.confluent.ksql.services.ConfiguredKafkaClientSupplier; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ClusterTerminatorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ClusterTerminatorTest.java index e8dad9a4ab4c..bf06dec0d6ce 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ClusterTerminatorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/util/ClusterTerminatorTest.java @@ -109,7 +109,6 @@ public void setup() { when(serviceContext.getTopicClient()).thenReturn(kafkaTopicClient); when(serviceContext.getSchemaRegistryClient()).thenReturn(schemaRegistryClient); clusterTerminator = new ClusterTerminator( - ksqlConfig, ksqlEngine, serviceContext, MANAGED_TOPICS);