Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java client): Add connector functions to java client #7222

Merged
merged 8 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,52 @@ public interface Client {
*/
CompletableFuture<ServerInfo> serverInfo();

/**
* Creates a connector.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param connectorName name of the connector
* @param isSource true if the connector is a source connector, false if it is a sink connector
* @param properties connector properties
* @return result of connector creation
*/
CompletableFuture<Void> createConnector(
String connectorName, boolean isSource, Map<String, String> properties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we use Map<String, Object> to pass properties elsewhere in the client interface. Is there a reason for using <String, String> here, or should we keep this consistent with the other interface methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Will update the PR.


/**
* Drops a connector.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param connectorName name of the connector to drop
* @return a future that completes once the server response is received
*/
CompletableFuture<Void> dropConnector(String connectorName);

/**
* Returns a list of connectors.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @return a list of connectors
*/
CompletableFuture<List<ConnectorInfo>> listConnectors();

/**
* Returns metadata about a connector.
*
* <p>If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed.
*
* @param connectorName name of the connector to describe
* @return metadata about connector
*/
CompletableFuture<ConnectorDescription> describeConnector(String connectorName);

/**
* Closes the underlying HTTP client.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

import java.util.List;

public interface ConnectorDescription extends ConnectorInfo {

/**
*
* @return a list of ksqlDB streams and tables that this connector reads/writes to
*/
List<String> sources();

/**
*
* @return a list of topics used by this connector
*/
List<String> topics();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

public interface ConnectorInfo {

/**
* @return name of this connector
*/
String name();

/**
* @return type of this connector
*/
ConnectorType type();

/**
* @return class of this connector
*/
String className();

/**
* @return state of this connector
*/
String state();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client;

public interface ConnectorType {
enum Type {
SOURCE,
SINK,
/**
* Denotes an unknown connector type. This is used when there were errors in connector creation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only reason the connector type might be UNKNOWN or are there others as well, for example, when listing existing connectors? For some reason I thought there were other possible reasons for an UNKNOWN type as well but I could be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any other references to UNKNOWN, but there could be a reason on the Connect side of things that I'm missing. Updated docs to be more accurate

*/
UNKNOWN
}

/**
* Returns the type.
*
* @return the type
*/
Type getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.confluent.ksql.api.client.BatchedQueryResult;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;
import io.confluent.ksql.api.client.ConnectorDescription;
import io.confluent.ksql.api.client.ConnectorInfo;
import io.confluent.ksql.api.client.ExecuteStatementResult;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.QueryInfo;
Expand Down Expand Up @@ -58,6 +60,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
Expand Down Expand Up @@ -316,6 +319,76 @@ public CompletableFuture<ServerInfo> serverInfo() {
return cf;
}

@Override
public CompletableFuture<Void> createConnector(
final String name,
final boolean isSource,
final Map<String, String> properties
) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
final String connectorConfigs = String.join(",", properties.entrySet()
.stream()
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.toList()));
final String type = isSource ? "SOURCE" : "SINK";

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql",
String.format("CREATE %s CONNECTOR %s WITH (%s);", type, name, connectorConfigs)),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleCreateConnectorResponse)
);

return cf;
}

@Override
public CompletableFuture<Void> dropConnector(final String name) {
final CompletableFuture<Void> cf = new CompletableFuture<>();

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "drop connector " + name + ";"),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDropConnectorResponse)
);

return cf;
}

@Override
public CompletableFuture<List<ConnectorInfo>> listConnectors() {
final CompletableFuture<List<ConnectorInfo>> cf = new CompletableFuture<>();

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "list connectors;"),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleListConnectorsResponse)
);

return cf;
}

@Override
public CompletableFuture<ConnectorDescription> describeConnector(final String name) {
final CompletableFuture<ConnectorDescription> cf = new CompletableFuture<>();

makePostRequest(
KSQL_ENDPOINT,
new JsonObject().put("ksql", "describe connector " + name + ";"),
cf,
response -> handleSingleEntityResponse(
response, cf, ConnectorCommandResponseHandler::handleDescribeConnectorsResponse)
);

return cf;
}

@Override
public void close() {
httpClient.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.ConnectorDescription;
import io.confluent.ksql.api.client.ConnectorInfo;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public final class ConnectorCommandResponseHandler {

private ConnectorCommandResponseHandler() {
}

static void handleCreateConnectorResponse(
final JsonObject connectorInfoEntity,
final CompletableFuture<Void> cf
) {
if (connectorInfoEntity.getString("@type").equals("connector_info")) {
cf.complete(null);
} else {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + connectorInfoEntity
));
}
}

static void handleDropConnectorResponse(
final JsonObject dropConnectorResponseEntity,
final CompletableFuture<Void> cf
) {
if (dropConnectorResponseEntity.getString("@type").equals("drop_connector")) {
cf.complete(null);
} else {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + dropConnectorResponseEntity
));
}
}

static void handleListConnectorsResponse(
final JsonObject connectorsListEntity,
final CompletableFuture<List<ConnectorInfo>> cf
) {
final Optional<List<ConnectorInfo>> connectors =
getListConnectorsResponse(connectorsListEntity);
if (connectors.isPresent()) {
cf.complete(connectors.get());
} else {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + connectorsListEntity));
}
}

static void handleDescribeConnectorsResponse(
final JsonObject connectorDescriptionEntity,
final CompletableFuture<ConnectorDescription> cf
) {
try {
final JsonObject status = connectorDescriptionEntity.getJsonObject("status");
cf.complete(new ConnectorDescriptionImpl(
status.getString("name"),
connectorDescriptionEntity.getString("connectorClass"),
connectorDescriptionEntity.getJsonArray("sources").getList(),
connectorDescriptionEntity.getJsonArray("topics").getList(),
new ConnectorTypeImpl(status.getString("type")),
status.getJsonObject("connector").getString("state")
));
} catch (Exception e) {
cf.completeExceptionally(new IllegalStateException(
"Unexpected server response format. Response: " + connectorDescriptionEntity));
}
}

/**
* Attempts to parse the provided response entity as a {@code List<ConnectorInfo>}.
*
* @param connectorsEntity response entity
* @return optional containing parsed result if successful, else empty
*/
private static Optional<List<ConnectorInfo>> getListConnectorsResponse(
final JsonObject connectorsEntity) {
try {
final JsonArray connectors = connectorsEntity.getJsonArray("connectors");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit late for this PR but we've been wanting to change the pattern of how the Java client response interfaces relate to the server response objects for a while now, in order to avoid custom JSON parsing like this. Here's the ticket with context: #6042

We should really do this before the next time we add additional interfaces, but it's outside the scope for this PR. (If you're keen on helping clean up the codebase and want to tackle it as a follow-up PR that'd be super, but no pressure 😁 )

return Optional.of(connectors.stream()
.map(o -> (JsonObject) o)
.map(o -> new ConnectorInfoImpl(
o.getString("name"),
new ConnectorTypeImpl(o.getString("type")),
o.getString("className"),
o.getString("state")))
.collect(Collectors.toList()));
} catch (Exception e) {
return Optional.empty();
}
}
}
Loading