-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(java client): Add connector functions to java client #7222
Changes from 5 commits
2b3a202
e05d133
c5e41d0
6288479
df28b44
ba0ee36
22db7fa
cfa7367
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright 2021 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
import java.util.List; | ||
|
||
public interface ConnectorDescription extends ConnectorInfo { | ||
|
||
/** | ||
* | ||
* @return a list of ksqlDB streams and tables that this connector reads/writes to | ||
*/ | ||
List<String> sources(); | ||
|
||
/** | ||
* | ||
* @return a list of topics used by this connector | ||
*/ | ||
List<String> topics(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Copyright 2021 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
public interface ConnectorInfo { | ||
|
||
/** | ||
* @return name of this connector | ||
*/ | ||
String name(); | ||
|
||
/** | ||
* @return type of this connector | ||
*/ | ||
ConnectorType type(); | ||
|
||
/** | ||
* @return class of this connector | ||
*/ | ||
String className(); | ||
|
||
/** | ||
* @return state of this connector | ||
*/ | ||
String state(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
* Copyright 2021 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client; | ||
|
||
public interface ConnectorType { | ||
enum Type { | ||
SOURCE, | ||
SINK, | ||
/** | ||
* Denotes an unknown connector type. This is used when there were errors in connector creation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the only reason the connector type might be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I couldn't find any other references to |
||
*/ | ||
UNKNOWN | ||
} | ||
|
||
/** | ||
* Returns the type. | ||
* | ||
* @return the type | ||
*/ | ||
Type getType(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
/* | ||
* Copyright 2021 Confluent Inc. | ||
* | ||
* Licensed under the Confluent Community License (the "License"); you may not use | ||
* this file except in compliance with the License. You may obtain a copy of the | ||
* License at | ||
* | ||
* http://www.confluent.io/confluent-community-license | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.confluent.ksql.api.client.impl; | ||
|
||
import io.confluent.ksql.api.client.ConnectorDescription; | ||
import io.confluent.ksql.api.client.ConnectorInfo; | ||
import io.vertx.core.json.JsonArray; | ||
import io.vertx.core.json.JsonObject; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.stream.Collectors; | ||
|
||
public final class ConnectorCommandResponseHandler { | ||
|
||
private ConnectorCommandResponseHandler() { | ||
} | ||
|
||
static void handleCreateConnectorResponse( | ||
final JsonObject connectorInfoEntity, | ||
final CompletableFuture<Void> cf | ||
) { | ||
if (connectorInfoEntity.getString("@type").equals("connector_info")) { | ||
cf.complete(null); | ||
} else { | ||
cf.completeExceptionally(new IllegalStateException( | ||
"Unexpected server response format. Response: " + connectorInfoEntity | ||
)); | ||
} | ||
} | ||
|
||
static void handleDropConnectorResponse( | ||
final JsonObject dropConnectorResponseEntity, | ||
final CompletableFuture<Void> cf | ||
) { | ||
if (dropConnectorResponseEntity.getString("@type").equals("drop_connector")) { | ||
cf.complete(null); | ||
} else { | ||
cf.completeExceptionally(new IllegalStateException( | ||
"Unexpected server response format. Response: " + dropConnectorResponseEntity | ||
)); | ||
} | ||
} | ||
|
||
static void handleListConnectorsResponse( | ||
final JsonObject connectorsListEntity, | ||
final CompletableFuture<List<ConnectorInfo>> cf | ||
) { | ||
final Optional<List<ConnectorInfo>> connectors = | ||
getListConnectorsResponse(connectorsListEntity); | ||
if (connectors.isPresent()) { | ||
cf.complete(connectors.get()); | ||
} else { | ||
cf.completeExceptionally(new IllegalStateException( | ||
"Unexpected server response format. Response: " + connectorsListEntity)); | ||
} | ||
} | ||
|
||
static void handleDescribeConnectorsResponse( | ||
final JsonObject connectorDescriptionEntity, | ||
final CompletableFuture<ConnectorDescription> cf | ||
) { | ||
try { | ||
final JsonObject status = connectorDescriptionEntity.getJsonObject("status"); | ||
cf.complete(new ConnectorDescriptionImpl( | ||
status.getString("name"), | ||
connectorDescriptionEntity.getString("connectorClass"), | ||
connectorDescriptionEntity.getJsonArray("sources").getList(), | ||
connectorDescriptionEntity.getJsonArray("topics").getList(), | ||
new ConnectorTypeImpl(status.getString("type")), | ||
status.getJsonObject("connector").getString("state") | ||
)); | ||
} catch (Exception e) { | ||
cf.completeExceptionally(new IllegalStateException( | ||
"Unexpected server response format. Response: " + connectorDescriptionEntity)); | ||
} | ||
} | ||
|
||
/** | ||
* Attempts to parse the provided response entity as a {@code List<ConnectorInfo>}. | ||
* | ||
* @param connectorsEntity response entity | ||
* @return optional containing parsed result if successful, else empty | ||
*/ | ||
private static Optional<List<ConnectorInfo>> getListConnectorsResponse( | ||
final JsonObject connectorsEntity) { | ||
try { | ||
final JsonArray connectors = connectorsEntity.getJsonArray("connectors"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit late for this PR but we've been wanting to change the pattern of how the Java client response interfaces relate to the server response objects for a while now, in order to avoid custom JSON parsing like this. Here's the ticket with context: #6042 We should really do this before the next time we add additional interfaces, but it's outside the scope for this PR. (If you're keen on helping clean up the codebase and want to tackle it as a follow-up PR that'd be super, but no pressure 😁 ) |
||
return Optional.of(connectors.stream() | ||
.map(o -> (JsonObject) o) | ||
.map(o -> new ConnectorInfoImpl( | ||
o.getString("name"), | ||
new ConnectorTypeImpl(o.getString("type")), | ||
o.getString("className"), | ||
o.getString("state"))) | ||
.collect(Collectors.toList())); | ||
} catch (Exception e) { | ||
return Optional.empty(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we use
Map<String, Object>
to pass properties elsewhere in the client interface. Is there a reason for using<String, String>
here, or should we keep this consistent with the other interface methods?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Will update the PR.