From 2b3a202e2735af3a80730d5c8501d4cde124d03e Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Fri, 12 Mar 2021 14:51:38 -0800 Subject: [PATCH 1/8] feat(java client): Add createConnector, dropConnector, listConnectors and describeConnector to java client --- .../io/confluent/ksql/api/client/Client.java | 46 +++++++ .../ksql/api/client/ConnectorDescription.java | 48 +++++++ .../ksql/api/client/ConnectorInfo.java | 39 ++++++ .../ksql/api/client/ConnectorType.java | 27 ++++ .../api/client/CreateConnectorResult.java | 36 ++++++ .../ksql/api/client/impl/ClientImpl.java | 74 +++++++++++ .../impl/ConnectorCommandResponseHandler.java | 117 +++++++++++++++++ .../client/impl/ConnectorDescriptionImpl.java | 103 +++++++++++++++ .../api/client/impl/ConnectorInfoImpl.java | 90 +++++++++++++ .../api/client/impl/ConnectorTypeImpl.java | 61 +++++++++ .../impl/CreateConnectorResultImpl.java | 81 ++++++++++++ .../client/impl/DdlDmlResponseHandlers.java | 14 +- .../confluent/ksql/api/client/ClientTest.java | 89 ++++++++++++- .../impl/ConnectorDescriptionImplTest.java | 48 +++++++ .../client/impl/ConnectorInfoImplTest.java | 43 +++++++ .../client/impl/ConnectorTypeImplTest.java | 37 ++++++ .../impl/CreateConnectorResultImplTest.java | 41 ++++++ .../integration/ClientIntegrationTest.java | 120 +++++++++++++++++- 18 files changed, 1097 insertions(+), 17 deletions(-) create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorDescription.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorInfo.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/CreateConnectorResult.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImpl.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorInfoImpl.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorTypeImpl.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java create mode 100644 ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImplTest.java create mode 100644 ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorInfoImplTest.java create mode 100644 ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorTypeImplTest.java create mode 100644 ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImplTest.java diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java index 0106251a6aa4..efda042075ba 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java @@ -234,6 +234,52 @@ public interface Client { */ CompletableFuture serverInfo(); + /** + * Creates a connector. + * + *

If a non-200 response is received from the server, the {@code CompletableFuture} will be + * failed. + * + * @param connectorName name of the connector + * @param isSource true if the connector is a source connector, false if it is a sink connector + * @param properties connector properties + * @return result of connector creation + */ + CompletableFuture createConnector( + String connectorName, boolean isSource, Map properties); + + /** + * Drops a connector. + * + *

If a non-200 response is received from the server, the {@code CompletableFuture} will be + * failed. + * + * @param connectorName name of the connector to drop + * @return a future that completes once the server response is received + */ + CompletableFuture dropConnector(String connectorName); + + /** + * Returns a list of connectors. + * + *

If a non-200 response is received from the server, the {@code CompletableFuture} will be + * failed. + * + * @return a list of connectors + */ + CompletableFuture> listConnectors(); + + /** + * Returns metadata about a connector . + * + *

If a non-200 response is received from the server, the {@code CompletableFuture} will be + * failed. + * + * @param connectorName name of the connector to describe + * @return metadata about connector + */ + CompletableFuture describeConnector(String connectorName); + /** * Closes the underlying HTTP client. */ diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorDescription.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorDescription.java new file mode 100644 index 000000000000..8c2074c5913b --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorDescription.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client; + +import java.util.List; + +public interface ConnectorDescription { + /** + * @return class of this connector + */ + String connectorClass(); + + /** + * + * @return a list of sources that this connector reads/writes to + */ + List sources(); + + /** + * + * @return a list of topics consumed by this connector + */ + List topics(); + + /** + * @return type of this connector + */ + ConnectorType type(); + + /** + * + * @return state of this connector + */ + String state(); +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorInfo.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorInfo.java new file mode 100644 index 000000000000..1d9ae5f427c8 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorInfo.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client; + +public interface ConnectorInfo { + + /** + * @return name of this connector + */ + String name(); + + /** + * @return type of this connector + */ + ConnectorType type(); + + /** + * @return class of this connector + */ + String className(); + + /** + * @return state of this connector + */ + String state(); +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java new file mode 100644 index 000000000000..05a8e0007653 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java @@ -0,0 +1,27 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client; + +public interface ConnectorType { + enum Type { SOURCE, SINK, UNKNOWN } + + /** + * Returns the type. + * + * @return the type + */ + Type getType(); +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/CreateConnectorResult.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/CreateConnectorResult.java new file mode 100644 index 000000000000..ee4ce6496bc5 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/CreateConnectorResult.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client; + +import java.util.Map; + +public interface CreateConnectorResult { + + /** + * @return name of this connector + */ + String name(); + + /** + * @return type of this connector + */ + ConnectorType type(); + + /** + * @return this connector's properties + */ + Map properties(); +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 7bf2dc77fe07..8535e50f3851 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -23,6 +23,9 @@ import io.confluent.ksql.api.client.BatchedQueryResult; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; +import io.confluent.ksql.api.client.ConnectorDescription; +import io.confluent.ksql.api.client.ConnectorInfo; +import io.confluent.ksql.api.client.CreateConnectorResult; import io.confluent.ksql.api.client.ExecuteStatementResult; import io.confluent.ksql.api.client.KsqlObject; import io.confluent.ksql.api.client.QueryInfo; @@ -58,6 +61,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import java.util.stream.Collectors; import org.reactivestreams.Publisher; // CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling @@ -316,6 +320,76 @@ public CompletableFuture serverInfo() { return cf; } + @Override + public CompletableFuture createConnector( + final String name, + final boolean isSource, + final Map properties + ) { + final CompletableFuture cf = new CompletableFuture<>(); + final String connectorConfigs = String.join(",", properties.entrySet() + .stream() + .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) + .collect(Collectors.toList())); + final String type = isSource ? "SOURCE" : "SINK"; + + makePostRequest( + KSQL_ENDPOINT, + new JsonObject().put("ksql", + String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs)), + cf, + response -> handleSingleEntityResponse( + response, cf, ConnectorCommandResponseHandler::handlerCreateConnectorResponse) + ); + + return cf; + } + + @Override + public CompletableFuture dropConnector(String name) { + final CompletableFuture cf = new CompletableFuture<>(); + + makePostRequest( + KSQL_ENDPOINT, + new JsonObject().put("ksql", "drop connector " + name + ";"), + cf, + response -> handleSingleEntityResponse( + response, cf, ConnectorCommandResponseHandler::handleDropConnectorResponse) + ); + + return cf; + } + + @Override + public CompletableFuture> listConnectors() { + final CompletableFuture> cf = new CompletableFuture<>(); + + makePostRequest( + KSQL_ENDPOINT, + new JsonObject().put("ksql", "list connectors;"), + cf, + response -> handleSingleEntityResponse( + response, cf, ConnectorCommandResponseHandler::handleListConnectorsResponse) + ); + + return cf; + } + + @Override + public CompletableFuture describeConnector(final String name) { + final CompletableFuture cf = new CompletableFuture<>(); + + makePostRequest( + KSQL_ENDPOINT, + new JsonObject().put("ksql", "describe connector " + name + ";"), + cf, + response -> handleSingleEntityResponse( + response, cf, ConnectorCommandResponseHandler::handleDescribeConnectorsResponse) + ); + + return cf; + } + @Override public void close() { httpClient.close(); diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java new file mode 100644 index 000000000000..dbe34bc181d1 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java @@ -0,0 +1,117 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.ConnectorDescription; +import io.confluent.ksql.api.client.ConnectorInfo; +import io.confluent.ksql.api.client.CreateConnectorResult; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class ConnectorCommandResponseHandler { + + static void handlerCreateConnectorResponse( + final JsonObject connectorInfoEntity, + final CompletableFuture cf + ) { + try { + final JsonObject connectorInfoJson = connectorInfoEntity.getJsonObject("info"); + cf.complete(new CreateConnectorResultImpl( + connectorInfoJson.getString("name"), + new ConnectorTypeImpl(connectorInfoJson.getString("type")), + connectorInfoJson.getJsonObject("config").mapTo(Map.class) + )); + } catch (Exception e) { + cf.completeExceptionally(new IllegalStateException( + "Unexpected server response format. Response: " + connectorInfoEntity + )); + } + } + + static void handleDropConnectorResponse( + final JsonObject dropConnectorResponseEntity, + final CompletableFuture cf + ) { + if (dropConnectorResponseEntity.getString("@type").equals("drop_connector")) { + cf.complete(null); + } else { + cf.completeExceptionally(new IllegalStateException( + "Unexpected server response format. Response: " + dropConnectorResponseEntity + )); + } + } + + static void handleListConnectorsResponse( + final JsonObject connectorsListEntity, + final CompletableFuture> cf + ) { + final Optional> connectors = + getListConnectorsResponse(connectorsListEntity); + if (connectors.isPresent()) { + cf.complete(connectors.get()); + } else { + cf.completeExceptionally(new IllegalStateException( + "Unexpected server response format. Response: " + connectorsListEntity)); + } + } + + static void handleDescribeConnectorsResponse( + final JsonObject connectorDescriptionEntity, + final CompletableFuture cf + ) { + try { + final JsonObject status = connectorDescriptionEntity.getJsonObject("status"); + cf.complete(new ConnectorDescriptionImpl( + connectorDescriptionEntity.getString("connectorClass"), + connectorDescriptionEntity.getJsonArray("sources").getList(), + connectorDescriptionEntity.getJsonArray("topics").getList(), + new ConnectorTypeImpl(status.getString("type")), + status.getJsonObject("connector").getString("state") + )); + } catch (Exception e) { + cf.completeExceptionally(new IllegalStateException( + "Unexpected server response format. Response: " + connectorDescriptionEntity)); + } + } + + /** + * Attempts to parse the provided response entity as a {@code List}. + * + * @param connectorsEntity response entity + * @return optional containing parsed result if successful, else empty + */ + private static Optional> getListConnectorsResponse( + final JsonObject connectorsEntity) { + try { + final JsonArray connectors = connectorsEntity.getJsonArray("connectors"); + return Optional.of(connectors.stream() + .map(o -> (JsonObject) o) + .map(o -> new ConnectorInfoImpl( + o.getString("name"), + new ConnectorTypeImpl(o.getString("type")), + o.getString("className"), + o.getString("state"))) + .collect(Collectors.toList())); + } catch (Exception e) { + return Optional.empty(); + } + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImpl.java new file mode 100644 index 000000000000..bec0ebd00642 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImpl.java @@ -0,0 +1,103 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.ConnectorDescription; +import io.confluent.ksql.api.client.ConnectorType; +import java.util.List; +import java.util.Objects; + +public class ConnectorDescriptionImpl implements ConnectorDescription { + + private final String connectorClass; + private final List sources; + private final List topics; + private final ConnectorType type; + private final String state; + + public ConnectorDescriptionImpl( + final String connectorClass, + final List sources, + final List topics, + final ConnectorType type, + final String state + + ) { + this.connectorClass = Objects.requireNonNull(connectorClass); + this.sources = Objects.requireNonNull(sources); + this.topics = Objects.requireNonNull(topics); + this.type = Objects.requireNonNull(type); + this.state = Objects.requireNonNull(state); + } + + + @Override + public String connectorClass() { + return connectorClass; + } + + @Override + public List sources() { + return sources; + } + + @Override + public List topics() { + return topics; + } + + @Override + public ConnectorType type() { + return type; + } + + @Override + public String state() { + return state; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ConnectorDescriptionImpl connectorDescription = (ConnectorDescriptionImpl) o; + return connectorClass.equals(connectorDescription.connectorClass) + && sources.equals(connectorDescription.sources) + && topics.equals(connectorDescription.topics) + && type.equals(connectorDescription.type) + && state.equals(connectorDescription.state); + } + + @Override + public int hashCode() { + return Objects.hash(connectorClass, sources, topics, type, state); + } + + @Override + public String toString() { + return "ConnectorDescription{" + + "connectorClass='" + connectorClass + '\'' + + ", sources=" + sources + + ", topics=" + topics + + ", type=" + type + + ", state=" + state + + '}'; + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorInfoImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorInfoImpl.java new file mode 100644 index 000000000000..feef7f722d89 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorInfoImpl.java @@ -0,0 +1,90 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.ConnectorInfo; +import io.confluent.ksql.api.client.ConnectorType; +import java.util.Objects; + +public class ConnectorInfoImpl implements ConnectorInfo { + + private String name; + private ConnectorType type; + private String className; + private String state; + + public ConnectorInfoImpl( + final String name, + final ConnectorType type, + final String className, + final String state + ) { + this.name = Objects.requireNonNull(name); + this.type = Objects.requireNonNull(type); + this.className = Objects.requireNonNull(className); + this.state = Objects.requireNonNull(state); + } + + @Override + public String name() { + return name; + } + + @Override + public ConnectorType type() { + return type; + } + + @Override + public String className() { + return className; + } + + @Override + public String state() { + return state; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ConnectorInfoImpl connectorInfo = (ConnectorInfoImpl) o; + return name.equals(connectorInfo.name) + && type.equals(connectorInfo.type) + && className.equals(connectorInfo.className) + && state.equals(connectorInfo.state); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, className, state); + } + + @Override + public String toString() { + return "ConnectorInfo{" + + "name='" + name + '\'' + + ", type=" + type + + ", className=" + className + + ", state=" + state + + '}'; + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorTypeImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorTypeImpl.java new file mode 100644 index 000000000000..998f89c50def --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorTypeImpl.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.ConnectorType; +import java.util.Objects; + +public class ConnectorTypeImpl implements ConnectorType { + + private final Type type; + + public ConnectorTypeImpl(final String type) { + this(Type.valueOf(type.trim().toUpperCase())); + } + + private ConnectorTypeImpl(final Type type) { + this.type = type; + } + + @Override + public Type getType() { + return type; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ConnectorTypeImpl that = (ConnectorTypeImpl) o; + return type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(type); + } + + @Override + public String toString() { + return "ConnectorType{" + + "type=" + type + + '}'; + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java new file mode 100644 index 000000000000..d23521be8a44 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.ConnectorType; +import io.confluent.ksql.api.client.CreateConnectorResult; +import java.util.Map; +import java.util.Objects; + +public class CreateConnectorResultImpl implements CreateConnectorResult { + + private final String name; + private final ConnectorType type; + private final Map properties; + + public CreateConnectorResultImpl( + final String name, + final ConnectorType type, + final Map properties + ) { + this.name = Objects.requireNonNull(name); + this.type = Objects.requireNonNull(type); + this.properties = Objects.requireNonNull(properties); + } + + @Override + public String name() { + return name; + } + + @Override + public ConnectorType type() { + return type; + } + + @Override + public Map properties() { + return properties; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final CreateConnectorResultImpl connectorInfo = (CreateConnectorResultImpl) o; + return name.equals(connectorInfo.name) + && type.equals(connectorInfo.type) + && properties.equals(connectorInfo.properties); + } + + @Override + public int hashCode() { + return Objects.hash(name, type, properties); + } + + @Override + public String toString() { + return "CreateConnectorResult{" + + "name='" + name + '\'' + + ", type=" + type + + ", properties=" + properties + + '}'; + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/DdlDmlResponseHandlers.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/DdlDmlResponseHandlers.java index b5515f32253b..21432b8cf9b0 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/DdlDmlResponseHandlers.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/DdlDmlResponseHandlers.java @@ -107,24 +107,22 @@ private static void handleUnexpectedEntity( + "listing custom types.")); } else if (AdminResponseHandlers.isListConnectorsResponse(ksqlEntity)) { cf.completeExceptionally(new KsqlClientException( - EXECUTE_STATEMENT_USAGE_DOC + "The client does not currently support " - + "listing connectors.")); + EXECUTE_STATEMENT_USAGE_DOC + "Use the listConnectors() method instead.")); } else if (AdminResponseHandlers.isDescribeConnectorResponse(ksqlEntity)) { cf.completeExceptionally(new KsqlClientException( - EXECUTE_STATEMENT_USAGE_DOC + "The client does not currently support " - + "'DESCRIBE ' statements.")); + EXECUTE_STATEMENT_USAGE_DOC + "Use the describeConnector() method instead.")); } else if (AdminResponseHandlers.isCreateConnectorResponse(ksqlEntity)) { cf.completeExceptionally(new KsqlClientException( EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC + EXECUTE_STATEMENT_USAGE_DOC - + "The client does not currently support 'CREATE CONNECTOR' statements.")); + + "Use the createConnector() method instead.")); } else if (AdminResponseHandlers.isDropConnectorResponse(ksqlEntity)) { cf.completeExceptionally(new KsqlClientException( EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC + EXECUTE_STATEMENT_USAGE_DOC - + "The client does not currently support 'DROP CONNECTOR' statements.")); + + "Use the dropConnector() method instead.")); } else if (AdminResponseHandlers.isConnectErrorResponse(ksqlEntity)) { cf.completeExceptionally(new KsqlClientException( - EXECUTE_STATEMENT_USAGE_DOC + "The client does not currently support " - + "statements for creating, dropping, listing, or describing connectors.")); + EXECUTE_STATEMENT_USAGE_DOC + "Use createConnector, dropConnector, describeConnector or " + + "listConnectors methods instead.")); } else { cf.completeExceptionally(new IllegalStateException( "Unexpected server response type. Response: " + ksqlEntity diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index a5ce9925f86f..db9f10f75ed1 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -30,6 +30,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; @@ -40,6 +41,7 @@ import io.confluent.ksql.api.client.QueryInfo.QueryType; import io.confluent.ksql.api.client.exception.KsqlClientException; import io.confluent.ksql.api.client.exception.KsqlException; +import io.confluent.ksql.api.client.impl.ConnectorTypeImpl; import io.confluent.ksql.api.client.impl.StreamedQueryResultImpl; import io.confluent.ksql.api.client.util.ClientTestUtil; import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber; @@ -73,6 +75,7 @@ import io.confluent.ksql.rest.entity.QueryStatusCount; import io.confluent.ksql.rest.entity.RunningQuery; import io.confluent.ksql.rest.entity.SchemaInfo; +import io.confluent.ksql.rest.entity.SimpleConnectorInfo; import io.confluent.ksql.rest.entity.SourceDescriptionEntity; import io.confluent.ksql.rest.entity.SourceInfo; import io.confluent.ksql.rest.entity.StreamsList; @@ -94,13 +97,13 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo.ConnectorState; -import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.junit.Test; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -129,6 +132,8 @@ public class ClientTest extends BaseApiTest { protected static final String EXECUTE_STATEMENT_USAGE_DOC = "The executeStatement() method is only " + "for 'CREATE', 'CREATE ... AS SELECT', 'DROP', 'TERMINATE', and 'INSERT INTO ... AS " + "SELECT' statements. "; + protected static final org.apache.kafka.connect.runtime.rest.entities.ConnectorType SOURCE_TYPE = + org.apache.kafka.connect.runtime.rest.entities.ConnectorType.SOURCE; protected Client javaClient; @@ -1063,7 +1068,7 @@ public void shouldFailToListConnectorsViaExecuteStatement() { assertThat(e.getCause(), instanceOf(KsqlClientException.class)); assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_USAGE_DOC)); assertThat(e.getCause().getMessage(), - containsString("does not currently support listing connectors")); + containsString("Use the listConnectors() method instead")); } @Test @@ -1075,7 +1080,7 @@ public void shouldFailToDescribeConnectorViaExecuteStatement() { "name", new ConnectorState("state", "worker", "msg"), Collections.emptyList(), - ConnectorType.SOURCE), + SOURCE_TYPE), Collections.emptyList(), Collections.singletonList("topic"), Collections.emptyList()); testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); @@ -1089,14 +1094,14 @@ public void shouldFailToDescribeConnectorViaExecuteStatement() { assertThat(e.getCause(), instanceOf(KsqlClientException.class)); assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_USAGE_DOC)); assertThat(e.getCause().getMessage(), - containsString("does not currently support 'DESCRIBE ' statements")); + containsString("Use the describeConnector() method instead")); } @Test public void shouldFailToCreateConnectorViaExecuteStatement() { // Given final CreateConnectorEntity entity = new CreateConnectorEntity("create connector;", - new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE)); + new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), SOURCE_TYPE)); testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); // When @@ -1110,7 +1115,7 @@ public void shouldFailToCreateConnectorViaExecuteStatement() { assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC)); assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_USAGE_DOC)); assertThat(e.getCause().getMessage(), - containsString("does not currently support 'CREATE CONNECTOR' statements")); + containsString("Use the createConnector() method instead")); } @Test @@ -1130,7 +1135,7 @@ public void shouldFailToDropConnectorViaExecuteStatement() { assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC)); assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_USAGE_DOC)); assertThat(e.getCause().getMessage(), - containsString("does not currently support 'DROP CONNECTOR' statements")); + containsString("Use the dropConnector() method instead")); } @Test @@ -1399,6 +1404,76 @@ public void shouldGetServerInfo() throws Exception { assertThat(serverInfo.getKafkaClusterId(), is("kafka-cluster-id")); } + @Test + public void shouldListConnectors() throws Exception { + // Given: + final ConnectorList entity = new ConnectorList( + "list connectors;", Collections.emptyList(), Collections.singletonList(new SimpleConnectorInfo("name", SOURCE_TYPE, "class", "state"))); + testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); + + // When: + final List connectors = javaClient.listConnectors().get(); + // Then: + assertThat(connectors.size(), is(1)); + assertThat(connectors.get(0).state(), is("state")); + assertThat(connectors.get(0).name(), is("name")); + assertThat(connectors.get(0).type(), is(new ConnectorTypeImpl("SOURCE"))); + } + + @Test + public void shouldDescribeConnector() throws Exception { + // Given: + final ConnectorDescription entity = new ConnectorDescription("describe connector;", + "connectorClass", + new ConnectorStateInfo( + "name", + new ConnectorState("state", "worker", "msg"), + Collections.emptyList(), + SOURCE_TYPE), + Collections.emptyList(), Collections.singletonList("topic"), Collections.emptyList()); + testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); + + // When: + final io.confluent.ksql.api.client.ConnectorDescription connector = javaClient.describeConnector("name").get(); + + // Then: + assertThat(connector.state(), is("state")); + assertThat(connector.connectorClass(), is("connectorClass")); + assertThat(connector.type(), is(new ConnectorTypeImpl("SOURCE"))); + assertThat(connector.sources().size(), is(0)); + assertThat(connector.topics().size(), is(1)); + assertThat(connector.topics().get(0), is("topic")); + } + + @Test + public void shouldCreateConnector() throws Exception { + // Given + final CreateConnectorEntity entity = new CreateConnectorEntity("create connector;", + new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), SOURCE_TYPE)); + testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); + + // When: + final CreateConnectorResult result = (CreateConnectorResult) javaClient.createConnector("name", true, Collections.EMPTY_MAP).get(); + + // Then: + assertThat(result.name(), is("name")); + assertThat(result.properties().size(), is(0)); + assertThat(result.type(), is(new ConnectorTypeImpl("SOURCE"))); + } + + @Test + public void shouldDropConnector() throws Exception { + // Given + final DropConnectorEntity entity = new DropConnectorEntity("drop connector;", "name"); + testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); + + // When: + final CompletableFuture result = javaClient.dropConnector("name"); + + // Then: + assertTrue(result.complete(null)); + } + protected Client createJavaClient() { return Client.create(createJavaClientOptions(), vertx); } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImplTest.java new file mode 100644 index 000000000000..ff6a1ae68319 --- /dev/null +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImplTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import com.google.common.collect.ImmutableMap; +import com.google.common.testing.EqualsTester; +import java.util.Collections; +import org.junit.Test; + +public class ConnectorDescriptionImplTest { + @Test + public void shouldImplementHashCodeAndEquals() { + new EqualsTester() + .addEqualityGroup( + new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state"), + new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") + ) + .addEqualityGroup( + new ConnectorDescriptionImpl("class2", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") + ) + .addEqualityGroup( + new ConnectorDescriptionImpl("class", Collections.EMPTY_LIST, Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") + ) + .addEqualityGroup( + new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.EMPTY_LIST, new ConnectorTypeImpl("SOURCE"), "state") + ) + .addEqualityGroup( + new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SINK"), "state") + ) + .addEqualityGroup( + new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state2") + ) + .testEquals(); + } +} diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorInfoImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorInfoImplTest.java new file mode 100644 index 000000000000..a765c2559fe8 --- /dev/null +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorInfoImplTest.java @@ -0,0 +1,43 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import com.google.common.testing.EqualsTester; +import org.junit.Test; + +public class ConnectorInfoImplTest { + @Test + public void shouldImplementHashCodeAndEquals() { + new EqualsTester() + .addEqualityGroup( + new ConnectorInfoImpl("c1", new ConnectorTypeImpl("SOURCE"), "class", "state"), + new ConnectorInfoImpl("c1", new ConnectorTypeImpl("SOURCE"), "class", "state") + ) + .addEqualityGroup( + new ConnectorInfoImpl("c2", new ConnectorTypeImpl("SOURCE"), "class", "state") + ) + .addEqualityGroup( + new ConnectorInfoImpl("c1", new ConnectorTypeImpl("SINK"), "class", "state") + ) + .addEqualityGroup( + new ConnectorInfoImpl("c1", new ConnectorTypeImpl("SOURCE"), "pass", "state") + ) + .addEqualityGroup( + new ConnectorInfoImpl("c1", new ConnectorTypeImpl("SOURCE"), "class", "state2") + ) + .testEquals(); + } +} diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorTypeImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorTypeImplTest.java new file mode 100644 index 000000000000..53cacbfae36d --- /dev/null +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorTypeImplTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import com.google.common.testing.EqualsTester; +import org.junit.Test; + +public class ConnectorTypeImplTest { + @Test + public void shouldImplementHashCodeAndEquals() { + new EqualsTester() + .addEqualityGroup( + new ConnectorTypeImpl("SOURCE"), + new ConnectorTypeImpl("SOuRCE") + ) + .addEqualityGroup( + new ConnectorTypeImpl("SINK") + ) + .addEqualityGroup( + new ConnectorTypeImpl("UNKNOWN") + ) + .testEquals(); + } +} diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImplTest.java new file mode 100644 index 000000000000..904a15fd1cb5 --- /dev/null +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImplTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import com.google.common.collect.ImmutableMap; +import com.google.common.testing.EqualsTester; +import org.junit.Test; + +public class CreateConnectorResultImplTest { + @Test + public void shouldImplementHashCodeAndEquals() { + new EqualsTester() + .addEqualityGroup( + new CreateConnectorResultImpl("c1", new ConnectorTypeImpl("SOURCE"), ImmutableMap.of("a", "b")), + new CreateConnectorResultImpl("c1", new ConnectorTypeImpl("SOURCE"), ImmutableMap.of("a", "b")) + ) + .addEqualityGroup( + new CreateConnectorResultImpl("c2", new ConnectorTypeImpl("SOURCE"), ImmutableMap.of("a", "b")) + ) + .addEqualityGroup( + new CreateConnectorResultImpl("c1", new ConnectorTypeImpl("SINK"), ImmutableMap.of("a", "b")) + ) + .addEqualityGroup( + new CreateConnectorResultImpl("c1", new ConnectorTypeImpl("SINK"), ImmutableMap.of("a", "bb")) + ) + .testEquals(); + } +} diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 558665b031ce..2c340cb53bb7 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -34,6 +34,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import io.confluent.common.utils.IntegrationTest; import io.confluent.ksql.GenericKey; @@ -43,6 +44,10 @@ import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; import io.confluent.ksql.api.client.ColumnType; +import io.confluent.ksql.api.client.ConnectorDescription; +import io.confluent.ksql.api.client.ConnectorInfo; +import io.confluent.ksql.api.client.ConnectorType; +import io.confluent.ksql.api.client.CreateConnectorResult; import io.confluent.ksql.api.client.ExecuteStatementResult; import io.confluent.ksql.api.client.InsertAck; import io.confluent.ksql.api.client.InsertsPublisher; @@ -58,6 +63,7 @@ import io.confluent.ksql.api.client.TableInfo; import io.confluent.ksql.api.client.TopicInfo; import io.confluent.ksql.api.client.exception.KsqlClientException; +import io.confluent.ksql.api.client.impl.ConnectorTypeImpl; import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber; import io.confluent.ksql.api.client.util.RowUtil; import io.confluent.ksql.engine.KsqlEngine; @@ -65,6 +71,7 @@ import io.confluent.ksql.integration.Retry; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; +import io.confluent.ksql.rest.server.ConnectExecutable; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; @@ -79,7 +86,13 @@ import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -87,13 +100,17 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.test.TestUtils; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -179,6 +196,10 @@ public class ClientIntegrationTest { + "for 'CREATE', 'CREATE ... AS SELECT', 'DROP', 'TERMINATE', and 'INSERT INTO ... AS " + "SELECT' statements. "; + private static final String TEST_CONNECTOR = "TEST_CONNECTOR"; + private static final String MOCK_SOURCE_CLASS = "org.apache.kafka.connect.tools.MockSourceConnector"; + private static final ConnectorType SOURCE_TYPE = new ConnectorTypeImpl("SOURCE"); + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); private static final TestKsqlRestApp REST_APP = TestKsqlRestApp @@ -193,8 +214,10 @@ public class ClientIntegrationTest { .around(TEST_HARNESS) .around(REST_APP); + private static ConnectExecutable CONNECT; + @BeforeClass - public static void setUpClass() { + public static void setUpClass() throws Exception { TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2); TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, KEY_FORMAT, VALUE_FORMAT); RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER); @@ -212,10 +235,42 @@ public static void setUpClass() { VALUE_FORMAT, AGG_SCHEMA ); + + final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "client_integ_test").toString(); + final String connectFilePath = Paths.get(testDir, "connect.properties").toString(); + Files.createDirectories(Paths.get(testDir)); + + writeConnectConfigs(connectFilePath, ImmutableMap.builder() + .put("bootstrap.servers", TEST_HARNESS.kafkaBootstrapServers()) + .put("group.id", UUID.randomUUID().toString()) + .put("key.converter", StringConverter.class.getName()) + .put("value.converter", JsonConverter.class.getName()) + .put("offset.storage.topic", "connect-offsets") + .put("status.storage.topic", "connect-status") + .put("config.storage.topic", "connect-config") + .put("offset.storage.replication.factor", "1") + .put("status.storage.replication.factor", "1") + .put("config.storage.replication.factor", "1") + .put("value.converter.schemas.enable", "false") + .build() + ); + + CONNECT = ConnectExecutable.of(connectFilePath); + CONNECT.startAsync(); + } + + private static void writeConnectConfigs(final String path, final Map configs) throws Exception { + try (PrintWriter out = new PrintWriter(new OutputStreamWriter( + new FileOutputStream(path, true), StandardCharsets.UTF_8))) { + for (Map.Entry entry : configs.entrySet()) { + out.println(entry.getKey() + "=" + entry.getValue()); + } + } } @AfterClass public static void classTearDown() { + CONNECT.shutdown(); REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); } @@ -870,7 +925,8 @@ public void shouldListTopics() throws Exception { topicInfo(TEST_TOPIC), topicInfo(EMPTY_TEST_TOPIC), topicInfo(EMPTY_TEST_TOPIC_2), - topicInfo(AGG_TABLE) + topicInfo(AGG_TABLE), + topicInfo("connect-config") )); } @@ -972,6 +1028,66 @@ public void shouldGetServerInfo() throws Exception { assertThat(serverInfo.getKafkaClusterId(), is(expectedClusterId)); } + @Test + public void shouldListConnectors() throws Exception { + // Given: + makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); + + // When: + final List connectors = client.listConnectors().get(); + + // Then: + assertThat(connectors.size(), is(1)); + assertThat(connectors.get(0).name(), is(TEST_CONNECTOR)); + assertThat(connectors.get(0).className(), is(MOCK_SOURCE_CLASS)); + assertThat(connectors.get(0).state(), is("RUNNING (1/1 tasks RUNNING)")); + assertThat(connectors.get(0).type(), is(SOURCE_TYPE)); + } + + @Test + public void shouldDescribeConnector() throws Exception { + // Given: + makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); + + // When: + final ConnectorDescription connector = client.describeConnector(TEST_CONNECTOR).get(); + + // Then: + assertThat(connector.type(), is(SOURCE_TYPE)); + assertThat(connector.state(), is("RUNNING")); + assertThat(connector.topics().size(), is(0)); + assertThat(connector.sources().size(), is(0)); + assertThat(connector.connectorClass(), is(MOCK_SOURCE_CLASS)); + } + + @Test + public void shouldDropConnector() throws Exception { + // Given: + makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); + + // When: + client.dropConnector(TEST_CONNECTOR).get(); + + // Then: + final List connectors = client.listConnectors().get(); + assertThat(connectors.size(), is(0)); + } + + @Test + public void shouldCreateConnector() throws Exception { + // When: + final CreateConnectorResult response = + client.createConnector("FOO", true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS)).get(); + final ConnectorDescription connector = client.describeConnector("FOO").get(); + + // Then: + assertThat(response.name(), is("FOO")); + assertThat(response.type(), is(new ConnectorTypeImpl("SOURCE"))); + assertThat(response.properties().size(), is(3)); // the other two are the name and default key converter + assertThat(response.properties().get("connector.class"), is(MOCK_SOURCE_CLASS)); + assertThat(connector.state(), is("RUNNING")); + } + private Client createClient() { final ClientOptions clientOptions = ClientOptions.create() .setHost("localhost") From e05d1334f286fbd99cf75a4054fbfca5aa763ab8 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Mon, 15 Mar 2021 12:36:37 -0700 Subject: [PATCH 2/8] checkstyle --- .../java/io/confluent/ksql/api/client/impl/ClientImpl.java | 2 +- .../ksql/api/client/impl/ConnectorCommandResponseHandler.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 8535e50f3851..fa010fb555c7 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -346,7 +346,7 @@ public CompletableFuture createConnector( } @Override - public CompletableFuture dropConnector(String name) { + public CompletableFuture dropConnector(final String name) { final CompletableFuture cf = new CompletableFuture<>(); makePostRequest( diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java index dbe34bc181d1..1ebf8d8a6ad5 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java @@ -28,6 +28,9 @@ public class ConnectorCommandResponseHandler { + private ConnectorCommandResponseHandler() { + } + static void handlerCreateConnectorResponse( final JsonObject connectorInfoEntity, final CompletableFuture cf From c5e41d0c08c2fe5722c02a123a7a4ed0768dc89b Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Tue, 16 Mar 2021 12:13:08 -0700 Subject: [PATCH 3/8] address review comments --- .../io/confluent/ksql/api/client/Client.java | 2 +- .../ksql/api/client/ConnectorDescription.java | 20 ++------- .../ksql/api/client/ConnectorType.java | 9 +++- .../impl/ConnectorCommandResponseHandler.java | 1 + .../client/impl/ConnectorDescriptionImpl.java | 41 +++++++++++-------- .../api/client/impl/ConnectorInfoImpl.java | 8 ++-- .../impl/CreateConnectorResultImpl.java | 6 +-- .../client/impl/DdlDmlResponseHandlers.java | 4 +- .../confluent/ksql/api/client/ClientTest.java | 5 +-- .../impl/ConnectorDescriptionImplTest.java | 17 ++++---- .../integration/ClientIntegrationTest.java | 27 ++++++------ 11 files changed, 74 insertions(+), 66 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java index efda042075ba..6638a983cb80 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java @@ -270,7 +270,7 @@ CompletableFuture createConnector( CompletableFuture> listConnectors(); /** - * Returns metadata about a connector . + * Returns metadata about a connector. * *

If a non-200 response is received from the server, the {@code CompletableFuture} will be * failed. diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorDescription.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorDescription.java index 8c2074c5913b..69c498722cf5 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorDescription.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorDescription.java @@ -17,32 +17,18 @@ import java.util.List; -public interface ConnectorDescription { - /** - * @return class of this connector - */ - String connectorClass(); +public interface ConnectorDescription extends ConnectorInfo { /** * - * @return a list of sources that this connector reads/writes to + * @return a list of ksqlDB streams and tables that this connector reads/writes to */ List sources(); /** * - * @return a list of topics consumed by this connector + * @return a list of topics used by this connector */ List topics(); - /** - * @return type of this connector - */ - ConnectorType type(); - - /** - * - * @return state of this connector - */ - String state(); } diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java index 05a8e0007653..3cbd720e6c2b 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java @@ -16,7 +16,14 @@ package io.confluent.ksql.api.client; public interface ConnectorType { - enum Type { SOURCE, SINK, UNKNOWN } + enum Type { + SOURCE, + SINK, + /** + * Denotes an unknown connector type. This is used when there were errors in connector creation. + */ + UNKNOWN + } /** * Returns the type. diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java index 1ebf8d8a6ad5..a01954a05a90 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java @@ -83,6 +83,7 @@ static void handleDescribeConnectorsResponse( try { final JsonObject status = connectorDescriptionEntity.getJsonObject("status"); cf.complete(new ConnectorDescriptionImpl( + status.getString("name"), connectorDescriptionEntity.getString("connectorClass"), connectorDescriptionEntity.getJsonArray("sources").getList(), connectorDescriptionEntity.getJsonArray("topics").getList(), diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImpl.java index bec0ebd00642..b57ee88c49d6 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImpl.java @@ -22,31 +22,28 @@ public class ConnectorDescriptionImpl implements ConnectorDescription { - private final String connectorClass; + private final String name; + private final String className; private final List sources; private final List topics; private final ConnectorType type; private final String state; public ConnectorDescriptionImpl( - final String connectorClass, + final String name, + final String className, final List sources, final List topics, final ConnectorType type, final String state ) { - this.connectorClass = Objects.requireNonNull(connectorClass); - this.sources = Objects.requireNonNull(sources); - this.topics = Objects.requireNonNull(topics); - this.type = Objects.requireNonNull(type); - this.state = Objects.requireNonNull(state); - } - - - @Override - public String connectorClass() { - return connectorClass; + this.name = Objects.requireNonNull(name, "name"); + this.className = Objects.requireNonNull(className, "className"); + this.sources = Objects.requireNonNull(sources, "sources"); + this.topics = Objects.requireNonNull(topics, "topics"); + this.type = Objects.requireNonNull(type, "type"); + this.state = Objects.requireNonNull(state, "state"); } @Override @@ -59,11 +56,21 @@ public List topics() { return topics; } + @Override + public String name() { + return name; + } + @Override public ConnectorType type() { return type; } + @Override + public String className() { + return className; + } + @Override public String state() { return state; @@ -78,7 +85,8 @@ public boolean equals(final Object o) { return false; } final ConnectorDescriptionImpl connectorDescription = (ConnectorDescriptionImpl) o; - return connectorClass.equals(connectorDescription.connectorClass) + return name.equals(connectorDescription.name) + && className.equals(connectorDescription.className) && sources.equals(connectorDescription.sources) && topics.equals(connectorDescription.topics) && type.equals(connectorDescription.type) @@ -87,13 +95,14 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(connectorClass, sources, topics, type, state); + return Objects.hash(className, sources, topics, type, state); } @Override public String toString() { return "ConnectorDescription{" - + "connectorClass='" + connectorClass + '\'' + + "name='" + name + '\'' + + "className='" + className + '\'' + ", sources=" + sources + ", topics=" + topics + ", type=" + type diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorInfoImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorInfoImpl.java index feef7f722d89..1e8a76fbca23 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorInfoImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorInfoImpl.java @@ -32,10 +32,10 @@ public ConnectorInfoImpl( final String className, final String state ) { - this.name = Objects.requireNonNull(name); - this.type = Objects.requireNonNull(type); - this.className = Objects.requireNonNull(className); - this.state = Objects.requireNonNull(state); + this.name = Objects.requireNonNull(name, "name"); + this.type = Objects.requireNonNull(type, "type"); + this.className = Objects.requireNonNull(className, "className"); + this.state = Objects.requireNonNull(state, "state"); } @Override diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java index d23521be8a44..691fde49a779 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java @@ -31,9 +31,9 @@ public CreateConnectorResultImpl( final ConnectorType type, final Map properties ) { - this.name = Objects.requireNonNull(name); - this.type = Objects.requireNonNull(type); - this.properties = Objects.requireNonNull(properties); + this.name = Objects.requireNonNull(name, "name"); + this.type = Objects.requireNonNull(type, "type"); + this.properties = Objects.requireNonNull(properties, "properties"); } @Override diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/DdlDmlResponseHandlers.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/DdlDmlResponseHandlers.java index 21432b8cf9b0..b1f6b93a523c 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/DdlDmlResponseHandlers.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/DdlDmlResponseHandlers.java @@ -121,8 +121,8 @@ private static void handleUnexpectedEntity( + "Use the dropConnector() method instead.")); } else if (AdminResponseHandlers.isConnectErrorResponse(ksqlEntity)) { cf.completeExceptionally(new KsqlClientException( - EXECUTE_STATEMENT_USAGE_DOC + "Use createConnector, dropConnector, describeConnector or " - + "listConnectors methods instead.")); + EXECUTE_STATEMENT_USAGE_DOC + "Use the createConnector, dropConnector, describeConnector " + + "or listConnectors methods instead.")); } else { cf.completeExceptionally(new IllegalStateException( "Unexpected server response type. Response: " + ksqlEntity diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index db9f10f75ed1..39e39ffe9dc9 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -1154,8 +1154,7 @@ public void shouldFailOnErrorEntityFromExecuteStatement() { assertThat(e.getCause(), instanceOf(KsqlClientException.class)); assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_USAGE_DOC)); assertThat(e.getCause().getMessage(), - containsString("does not currently support statements for creating, dropping, " - + "listing, or describing connectors")); + containsString("Use the createConnector, dropConnector, describeConnector or listConnectors methods instead")); } @Test @@ -1438,7 +1437,7 @@ public void shouldDescribeConnector() throws Exception { // Then: assertThat(connector.state(), is("state")); - assertThat(connector.connectorClass(), is("connectorClass")); + assertThat(connector.className(), is("connectorClass")); assertThat(connector.type(), is(new ConnectorTypeImpl("SOURCE"))); assertThat(connector.sources().size(), is(0)); assertThat(connector.topics().size(), is(1)); diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImplTest.java index ff6a1ae68319..0f601eba4639 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImplTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/ConnectorDescriptionImplTest.java @@ -25,23 +25,26 @@ public class ConnectorDescriptionImplTest { public void shouldImplementHashCodeAndEquals() { new EqualsTester() .addEqualityGroup( - new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state"), - new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") + new ConnectorDescriptionImpl("name", "class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state"), + new ConnectorDescriptionImpl("name", "class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") ) .addEqualityGroup( - new ConnectorDescriptionImpl("class2", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") + new ConnectorDescriptionImpl("name2", "class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") ) .addEqualityGroup( - new ConnectorDescriptionImpl("class", Collections.EMPTY_LIST, Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") + new ConnectorDescriptionImpl("name", "class2", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") ) .addEqualityGroup( - new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.EMPTY_LIST, new ConnectorTypeImpl("SOURCE"), "state") + new ConnectorDescriptionImpl("name", "class", Collections.EMPTY_LIST, Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state") ) .addEqualityGroup( - new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SINK"), "state") + new ConnectorDescriptionImpl("name", "class", Collections.singletonList("source"), Collections.EMPTY_LIST, new ConnectorTypeImpl("SOURCE"), "state") ) .addEqualityGroup( - new ConnectorDescriptionImpl("class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state2") + new ConnectorDescriptionImpl("name", "class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SINK"), "state") + ) + .addEqualityGroup( + new ConnectorDescriptionImpl("name", "class", Collections.singletonList("source"), Collections.singletonList("topic"), new ConnectorTypeImpl("SOURCE"), "state2") ) .testEquals(); } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 2c340cb53bb7..f022e7358601 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -70,6 +70,8 @@ import io.confluent.ksql.integration.IntegrationTestHarness; import io.confluent.ksql.integration.Retry; import io.confluent.ksql.name.ColumnName; +import io.confluent.ksql.rest.entity.ConnectorList; +import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; import io.confluent.ksql.rest.server.ConnectExecutable; import io.confluent.ksql.rest.server.TestKsqlRestApp; @@ -113,6 +115,7 @@ import org.apache.kafka.test.TestUtils; import org.hamcrest.Description; import org.hamcrest.Matcher; +import org.hamcrest.Matchers; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.After; import org.junit.AfterClass; @@ -281,6 +284,12 @@ public static void classTearDown() { public void setUp() { vertx = Vertx.vertx(); client = createClient(); + makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); + assertThatEventually( + () -> ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;") + .get(0)).getConnectors().size(), + is(1) + ); } @After @@ -292,6 +301,9 @@ public void tearDown() { vertx.close(); } REST_APP.getServiceContext().close(); + + ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors() + .forEach(c -> makeKsqlRequest("DROP CONNECTOR " + c.getName() + ";")); } @Test @@ -1030,9 +1042,6 @@ public void shouldGetServerInfo() throws Exception { @Test public void shouldListConnectors() throws Exception { - // Given: - makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); - // When: final List connectors = client.listConnectors().get(); @@ -1046,9 +1055,6 @@ public void shouldListConnectors() throws Exception { @Test public void shouldDescribeConnector() throws Exception { - // Given: - makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); - // When: final ConnectorDescription connector = client.describeConnector(TEST_CONNECTOR).get(); @@ -1057,14 +1063,11 @@ public void shouldDescribeConnector() throws Exception { assertThat(connector.state(), is("RUNNING")); assertThat(connector.topics().size(), is(0)); assertThat(connector.sources().size(), is(0)); - assertThat(connector.connectorClass(), is(MOCK_SOURCE_CLASS)); + assertThat(connector.className(), is(MOCK_SOURCE_CLASS)); } @Test public void shouldDropConnector() throws Exception { - // Given: - makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); - // When: client.dropConnector(TEST_CONNECTOR).get(); @@ -1124,8 +1127,8 @@ private String findQueryIdForSink(final String sinkName) throws Exception { return queryIds.get(0); } - private static void makeKsqlRequest(final String sql) { - RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); + private static List makeKsqlRequest(final String sql) { + return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); } private static void verifyNumActiveQueries(final int numQueries) { From 628847943fc8f51d083c8c9a425310fc7bf88119 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Tue, 16 Mar 2021 12:25:59 -0700 Subject: [PATCH 4/8] remove CreateConnectorResult --- .../io/confluent/ksql/api/client/Client.java | 2 +- .../api/client/CreateConnectorResult.java | 36 --------- .../ksql/api/client/impl/ClientImpl.java | 7 +- .../impl/ConnectorCommandResponseHandler.java | 16 ++-- .../impl/CreateConnectorResultImpl.java | 81 ------------------- .../confluent/ksql/api/client/ClientTest.java | 10 +-- .../impl/CreateConnectorResultImplTest.java | 41 ---------- .../integration/ClientIntegrationTest.java | 8 +- 8 files changed, 14 insertions(+), 187 deletions(-) delete mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/CreateConnectorResult.java delete mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java delete mode 100644 ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImplTest.java diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java index 6638a983cb80..42df19ce7f57 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java @@ -245,7 +245,7 @@ public interface Client { * @param properties connector properties * @return result of connector creation */ - CompletableFuture createConnector( + CompletableFuture createConnector( String connectorName, boolean isSource, Map properties); /** diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/CreateConnectorResult.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/CreateConnectorResult.java deleted file mode 100644 index ee4ce6496bc5..000000000000 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/CreateConnectorResult.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2021 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.api.client; - -import java.util.Map; - -public interface CreateConnectorResult { - - /** - * @return name of this connector - */ - String name(); - - /** - * @return type of this connector - */ - ConnectorType type(); - - /** - * @return this connector's properties - */ - Map properties(); -} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index fa010fb555c7..172dda429922 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -25,7 +25,6 @@ import io.confluent.ksql.api.client.ClientOptions; import io.confluent.ksql.api.client.ConnectorDescription; import io.confluent.ksql.api.client.ConnectorInfo; -import io.confluent.ksql.api.client.CreateConnectorResult; import io.confluent.ksql.api.client.ExecuteStatementResult; import io.confluent.ksql.api.client.KsqlObject; import io.confluent.ksql.api.client.QueryInfo; @@ -321,12 +320,12 @@ public CompletableFuture serverInfo() { } @Override - public CompletableFuture createConnector( + public CompletableFuture createConnector( final String name, final boolean isSource, final Map properties ) { - final CompletableFuture cf = new CompletableFuture<>(); + final CompletableFuture cf = new CompletableFuture<>(); final String connectorConfigs = String.join(",", properties.entrySet() .stream() .map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())) @@ -339,7 +338,7 @@ public CompletableFuture createConnector( String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs)), cf, response -> handleSingleEntityResponse( - response, cf, ConnectorCommandResponseHandler::handlerCreateConnectorResponse) + response, cf, ConnectorCommandResponseHandler::handleCreateConnectorResponse) ); return cf; diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java index a01954a05a90..bb34a78bcb77 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java @@ -17,7 +17,6 @@ import io.confluent.ksql.api.client.ConnectorDescription; import io.confluent.ksql.api.client.ConnectorInfo; -import io.confluent.ksql.api.client.CreateConnectorResult; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import java.util.List; @@ -31,18 +30,13 @@ public class ConnectorCommandResponseHandler { private ConnectorCommandResponseHandler() { } - static void handlerCreateConnectorResponse( + static void handleCreateConnectorResponse( final JsonObject connectorInfoEntity, - final CompletableFuture cf + final CompletableFuture cf ) { - try { - final JsonObject connectorInfoJson = connectorInfoEntity.getJsonObject("info"); - cf.complete(new CreateConnectorResultImpl( - connectorInfoJson.getString("name"), - new ConnectorTypeImpl(connectorInfoJson.getString("type")), - connectorInfoJson.getJsonObject("config").mapTo(Map.class) - )); - } catch (Exception e) { + if (connectorInfoEntity.getString("@type").equals("connector_info")) { + cf.complete(null); + } else { cf.completeExceptionally(new IllegalStateException( "Unexpected server response format. Response: " + connectorInfoEntity )); diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java deleted file mode 100644 index 691fde49a779..000000000000 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImpl.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2021 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.api.client.impl; - -import io.confluent.ksql.api.client.ConnectorType; -import io.confluent.ksql.api.client.CreateConnectorResult; -import java.util.Map; -import java.util.Objects; - -public class CreateConnectorResultImpl implements CreateConnectorResult { - - private final String name; - private final ConnectorType type; - private final Map properties; - - public CreateConnectorResultImpl( - final String name, - final ConnectorType type, - final Map properties - ) { - this.name = Objects.requireNonNull(name, "name"); - this.type = Objects.requireNonNull(type, "type"); - this.properties = Objects.requireNonNull(properties, "properties"); - } - - @Override - public String name() { - return name; - } - - @Override - public ConnectorType type() { - return type; - } - - @Override - public Map properties() { - return properties; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final CreateConnectorResultImpl connectorInfo = (CreateConnectorResultImpl) o; - return name.equals(connectorInfo.name) - && type.equals(connectorInfo.type) - && properties.equals(connectorInfo.properties); - } - - @Override - public int hashCode() { - return Objects.hash(name, type, properties); - } - - @Override - public String toString() { - return "CreateConnectorResult{" - + "name='" + name + '\'' - + ", type=" + type - + ", properties=" + properties - + '}'; - } -} diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index 39e39ffe9dc9..f730742f1747 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -1445,23 +1445,21 @@ public void shouldDescribeConnector() throws Exception { } @Test - public void shouldCreateConnector() throws Exception { + public void shouldCreateConnector() { // Given final CreateConnectorEntity entity = new CreateConnectorEntity("create connector;", new ConnectorInfo("name", Collections.emptyMap(), Collections.emptyList(), SOURCE_TYPE)); testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); // When: - final CreateConnectorResult result = (CreateConnectorResult) javaClient.createConnector("name", true, Collections.EMPTY_MAP).get(); + final CompletableFuture result = javaClient.createConnector("name", true, Collections.EMPTY_MAP); // Then: - assertThat(result.name(), is("name")); - assertThat(result.properties().size(), is(0)); - assertThat(result.type(), is(new ConnectorTypeImpl("SOURCE"))); + assertTrue(result.complete(null)); } @Test - public void shouldDropConnector() throws Exception { + public void shouldDropConnector() { // Given final DropConnectorEntity entity = new DropConnectorEntity("drop connector;", "name"); testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity)); diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImplTest.java deleted file mode 100644 index 904a15fd1cb5..000000000000 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/CreateConnectorResultImplTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2021 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.api.client.impl; - -import com.google.common.collect.ImmutableMap; -import com.google.common.testing.EqualsTester; -import org.junit.Test; - -public class CreateConnectorResultImplTest { - @Test - public void shouldImplementHashCodeAndEquals() { - new EqualsTester() - .addEqualityGroup( - new CreateConnectorResultImpl("c1", new ConnectorTypeImpl("SOURCE"), ImmutableMap.of("a", "b")), - new CreateConnectorResultImpl("c1", new ConnectorTypeImpl("SOURCE"), ImmutableMap.of("a", "b")) - ) - .addEqualityGroup( - new CreateConnectorResultImpl("c2", new ConnectorTypeImpl("SOURCE"), ImmutableMap.of("a", "b")) - ) - .addEqualityGroup( - new CreateConnectorResultImpl("c1", new ConnectorTypeImpl("SINK"), ImmutableMap.of("a", "b")) - ) - .addEqualityGroup( - new CreateConnectorResultImpl("c1", new ConnectorTypeImpl("SINK"), ImmutableMap.of("a", "bb")) - ) - .testEquals(); - } -} diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index f022e7358601..47ca19e85cff 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -47,7 +47,6 @@ import io.confluent.ksql.api.client.ConnectorDescription; import io.confluent.ksql.api.client.ConnectorInfo; import io.confluent.ksql.api.client.ConnectorType; -import io.confluent.ksql.api.client.CreateConnectorResult; import io.confluent.ksql.api.client.ExecuteStatementResult; import io.confluent.ksql.api.client.InsertAck; import io.confluent.ksql.api.client.InsertsPublisher; @@ -1079,15 +1078,10 @@ public void shouldDropConnector() throws Exception { @Test public void shouldCreateConnector() throws Exception { // When: - final CreateConnectorResult response = - client.createConnector("FOO", true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS)).get(); + client.createConnector("FOO", true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS)).get(); final ConnectorDescription connector = client.describeConnector("FOO").get(); // Then: - assertThat(response.name(), is("FOO")); - assertThat(response.type(), is(new ConnectorTypeImpl("SOURCE"))); - assertThat(response.properties().size(), is(3)); // the other two are the name and default key converter - assertThat(response.properties().get("connector.class"), is(MOCK_SOURCE_CLASS)); assertThat(connector.state(), is("RUNNING")); } From df28b44aebc4d296343400cb0114ef51f9b2de92 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Tue, 16 Mar 2021 13:41:58 -0700 Subject: [PATCH 5/8] checkstyle --- .../ksql/api/client/impl/ConnectorCommandResponseHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java index bb34a78bcb77..e3c8275114f8 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ConnectorCommandResponseHandler.java @@ -20,12 +20,11 @@ import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -public class ConnectorCommandResponseHandler { +public final class ConnectorCommandResponseHandler { private ConnectorCommandResponseHandler() { } From ba0ee36ff146c2ff9ab6f8304dce0fe66980735b Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Wed, 17 Mar 2021 15:18:19 -0700 Subject: [PATCH 6/8] address review comments --- .../ksql/api/client/ConnectorType.java | 3 +- .../integration/ClientIntegrationTest.java | 41 ++++++++++++++----- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java index 3cbd720e6c2b..83882e45a8db 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ConnectorType.java @@ -20,7 +20,8 @@ enum Type { SOURCE, SINK, /** - * Denotes an unknown connector type. This is used when there were errors in connector creation. + * Denotes an unknown connector type. This is used when the connector type cannot be determined, + * such as when there were errors in connector creation. */ UNKNOWN } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 47ca19e85cff..0b0b89e198ac 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -272,6 +272,8 @@ private static void writeConnectConfigs(final String path, final Map makeKsqlRequest("DROP CONNECTOR " + c.getName() + ";")); CONNECT.shutdown(); REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); } @@ -283,12 +285,6 @@ public static void classTearDown() { public void setUp() { vertx = Vertx.vertx(); client = createClient(); - makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); - assertThatEventually( - () -> ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;") - .get(0)).getConnectors().size(), - is(1) - ); } @After @@ -300,9 +296,6 @@ public void tearDown() { vertx.close(); } REST_APP.getServiceContext().close(); - - ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors() - .forEach(c -> makeKsqlRequest("DROP CONNECTOR " + c.getName() + ";")); } @Test @@ -1041,6 +1034,9 @@ public void shouldGetServerInfo() throws Exception { @Test public void shouldListConnectors() throws Exception { + // Given: + givenConnectorExists(); + // When: final List connectors = client.listConnectors().get(); @@ -1054,6 +1050,9 @@ public void shouldListConnectors() throws Exception { @Test public void shouldDescribeConnector() throws Exception { + // Given: + givenConnectorExists(); + // When: final ConnectorDescription connector = client.describeConnector(TEST_CONNECTOR).get(); @@ -1067,6 +1066,9 @@ public void shouldDescribeConnector() throws Exception { @Test public void shouldDropConnector() throws Exception { + // Given: + givenConnectorExists(); + // When: client.dropConnector(TEST_CONNECTOR).get(); @@ -1079,10 +1081,18 @@ public void shouldDropConnector() throws Exception { public void shouldCreateConnector() throws Exception { // When: client.createConnector("FOO", true, ImmutableMap.of("connector.class", MOCK_SOURCE_CLASS)).get(); - final ConnectorDescription connector = client.describeConnector("FOO").get(); // Then: - assertThat(connector.state(), is("RUNNING")); + assertThatEventually( + () -> { + try { + return (client.describeConnector("FOO").get()).state(); + } catch (InterruptedException | ExecutionException e) { + return null; + } + }, + is("RUNNING") + ); } private Client createClient() { @@ -1112,6 +1122,15 @@ private void verifyNumQueries(final int numQueries) { }, is(numQueries)); } + private void givenConnectorExists() { + makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); + assertThatEventually( + () -> ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;") + .get(0)).getConnectors().size(), + is(1) + ); + } + private String findQueryIdForSink(final String sinkName) throws Exception { final List queryIds = client.listQueries().get().stream() .filter(q -> q.getSink().equals(Optional.of(sinkName))) From 22db7fa6b9f226487e351652468882e3a0959e7f Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Thu, 18 Mar 2021 13:23:10 -0700 Subject: [PATCH 7/8] ensure connectors are cleaned up between tests --- .../integration/ClientIntegrationTest.java | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 0b0b89e198ac..96a35a0b7ab7 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -70,6 +70,7 @@ import io.confluent.ksql.integration.Retry; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.rest.entity.ConnectorList; +import io.confluent.ksql.rest.entity.CreateConnectorEntity; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; import io.confluent.ksql.rest.server.ConnectExecutable; @@ -114,7 +115,6 @@ import org.apache.kafka.test.TestUtils; import org.hamcrest.Description; import org.hamcrest.Matcher; -import org.hamcrest.Matchers; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.After; import org.junit.AfterClass; @@ -272,8 +272,7 @@ private static void writeConnectConfigs(final String path, final Map makeKsqlRequest("DROP CONNECTOR " + c.getName() + ";")); + cleanupConnectors(); CONNECT.shutdown(); REST_APP.getPersistentQueries().forEach(str -> makeKsqlRequest("TERMINATE " + str + ";")); } @@ -1073,8 +1072,13 @@ public void shouldDropConnector() throws Exception { client.dropConnector(TEST_CONNECTOR).get(); // Then: - final List connectors = client.listConnectors().get(); - assertThat(connectors.size(), is(0)); + assertThatEventually(() -> { + try { + return client.listConnectors().get().size(); + } catch (InterruptedException | ExecutionException e) { + return null; + } + }, is(0)); } @Test @@ -1123,11 +1127,29 @@ private void verifyNumQueries(final int numQueries) { } private void givenConnectorExists() { + // Make sure we are starting from a clean slate before creating a new connector. + cleanupConnectors(); + makeKsqlRequest("CREATE SOURCE CONNECTOR " + TEST_CONNECTOR + " WITH ('connector.class'='" + MOCK_SOURCE_CLASS + "');"); + + assertThatEventually( + () -> { + try { + return ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors().size(); + } catch (AssertionError e) { + return 0; + }}, + is(1) + ); + } + + private static void cleanupConnectors() { + ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;").get(0)).getConnectors() + .forEach(c -> makeKsqlRequest("DROP CONNECTOR " + c.getName() + ";")); assertThatEventually( () -> ((ConnectorList) makeKsqlRequest("SHOW CONNECTORS;") .get(0)).getConnectors().size(), - is(1) + is(0) ); } From cfa7367929e71d35098627efe8a1f644396295e7 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Fri, 19 Mar 2021 10:35:45 -0700 Subject: [PATCH 8/8] change properties param map value to object --- .../src/main/java/io/confluent/ksql/api/client/Client.java | 2 +- .../main/java/io/confluent/ksql/api/client/impl/ClientImpl.java | 2 +- .../ksql/api/client/integration/ClientIntegrationTest.java | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java index 42df19ce7f57..9f8e7e73fc49 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java @@ -246,7 +246,7 @@ public interface Client { * @return result of connector creation */ CompletableFuture createConnector( - String connectorName, boolean isSource, Map properties); + String connectorName, boolean isSource, Map properties); /** * Drops a connector. diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 172dda429922..b1e990a8f8eb 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -323,7 +323,7 @@ public CompletableFuture serverInfo() { public CompletableFuture createConnector( final String name, final boolean isSource, - final Map properties + final Map properties ) { final CompletableFuture cf = new CompletableFuture<>(); final String connectorConfigs = String.join(",", properties.entrySet() diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 96a35a0b7ab7..11a12491a241 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -70,7 +70,6 @@ import io.confluent.ksql.integration.Retry; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.rest.entity.ConnectorList; -import io.confluent.ksql.rest.entity.CreateConnectorEntity; import io.confluent.ksql.rest.entity.KsqlEntity; import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; import io.confluent.ksql.rest.server.ConnectExecutable;