Skip to content

Commit

Permalink
[improve] cassandra connector options
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Feb 5, 2025
1 parent 36b3dd2 commit 894c36a
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 184 deletions.
26 changes: 13 additions & 13 deletions docs/en/connector-v2/sink/Cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ Write data to Apache Cassandra.

## Options

| name | type | required | default value |
|-------------------|---------|----------|---------------|
| host | String | Yes | - |
| keyspace | String | Yes | - |
| table | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| datacenter | String | No | datacenter1 |
| consistency_level | String | No | LOCAL_ONE |
| fields | String | No | LOCAL_ONE |
| batch_size | int | No | 5000 |
| batch_type | String | No | UNLOGGED |
| async_write | boolean | No | true |
| name | type | required | default value |
|-------------------|--------------|----------|---------------|
| host | String | Yes | - |
| keyspace | String | Yes | - |
| table | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| datacenter | String | No | datacenter1 |
| consistency_level | String | No | LOCAL_ONE |
| fields | String Array | No | - |
| batch_size | int | No | 5000 |
| batch_type | String | No | UNLOGGED |
| async_write | boolean | No | true |

### host [string]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class CassandraConfig {
public class CassandraBaseOptions {

public static final Integer DEFAULT_BATCH_SIZE = 5000;

Expand All @@ -42,25 +42,4 @@ public class CassandraConfig {
.stringType()
.defaultValue("LOCAL_ONE")
.withDescription("");

public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription("");

public static final Option<String> FIELDS =
Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");

public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(DEFAULT_BATCH_SIZE)
.withDescription("");

public static final Option<String> BATCH_TYPE =
Options.key("batch_type").stringType().defaultValue("UNLOGGED").withDescription("");

public static final Option<Boolean> ASYNC_WRITE =
Options.key("async_write").booleanType().defaultValue(true).withDescription("");

public static final Option<String> CQL =
Options.key("cql").stringType().noDefaultValue().withDescription("");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.cassandra.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
Expand All @@ -44,54 +44,19 @@ public class CassandraParameters implements Serializable {
private DefaultBatchType batchType;
private Boolean asyncWrite;

public void buildWithConfig(Config config) {
this.host = config.getString(CassandraConfig.HOST.key());
this.keyspace = config.getString(CassandraConfig.KEYSPACE.key());

if (config.hasPath(CassandraConfig.USERNAME.key())) {
this.username = config.getString(CassandraConfig.USERNAME.key());
}
if (config.hasPath(CassandraConfig.PASSWORD.key())) {
this.password = config.getString(CassandraConfig.PASSWORD.key());
}
if (config.hasPath(CassandraConfig.DATACENTER.key())) {
this.datacenter = config.getString(CassandraConfig.DATACENTER.key());
} else {
this.datacenter = CassandraConfig.DATACENTER.defaultValue();
}
if (config.hasPath(CassandraConfig.TABLE.key())) {
this.table = config.getString(CassandraConfig.TABLE.key());
}
if (config.hasPath(CassandraConfig.CQL.key())) {
this.cql = config.getString(CassandraConfig.CQL.key());
}
if (config.hasPath(CassandraConfig.FIELDS.key())) {
this.fields = config.getStringList(CassandraConfig.FIELDS.key());
}
if (config.hasPath(CassandraConfig.CONSISTENCY_LEVEL.key())) {
this.consistencyLevel =
DefaultConsistencyLevel.valueOf(
config.getString(CassandraConfig.CONSISTENCY_LEVEL.key()));
} else {
this.consistencyLevel =
DefaultConsistencyLevel.valueOf(
CassandraConfig.CONSISTENCY_LEVEL.defaultValue());
}
if (config.hasPath(CassandraConfig.BATCH_SIZE.key())) {
this.batchSize = config.getInt(CassandraConfig.BATCH_SIZE.key());
} else {
this.batchSize = CassandraConfig.BATCH_SIZE.defaultValue();
}
if (config.hasPath(CassandraConfig.BATCH_TYPE.key())) {
this.batchType =
DefaultBatchType.valueOf(config.getString(CassandraConfig.BATCH_TYPE.key()));
} else {
this.batchType = DefaultBatchType.valueOf(CassandraConfig.BATCH_TYPE.defaultValue());
}
if (config.hasPath(CassandraConfig.ASYNC_WRITE.key())) {
this.asyncWrite = config.getBoolean(CassandraConfig.ASYNC_WRITE.key());
} else {
this.asyncWrite = true;
}
public void buildWithConfig(ReadonlyConfig config) {
this.host = config.get(CassandraBaseOptions.HOST);
this.keyspace = config.get(CassandraBaseOptions.KEYSPACE);
this.username = config.get(CassandraBaseOptions.USERNAME);
this.password = config.get(CassandraBaseOptions.PASSWORD);
this.datacenter = config.get(CassandraBaseOptions.DATACENTER);
this.table = config.get(CassandraSinkOptions.TABLE);
this.cql = config.get(CassandraSourceOptions.CQL);
this.fields = config.get(CassandraSinkOptions.FIELDS);
this.consistencyLevel =
DefaultConsistencyLevel.valueOf(config.get(CassandraBaseOptions.CONSISTENCY_LEVEL));
this.batchSize = config.get(CassandraSinkOptions.BATCH_SIZE);
this.batchType = DefaultBatchType.valueOf(config.get(CassandraSinkOptions.BATCH_TYPE));
this.asyncWrite = config.get(CassandraSinkOptions.ASYNC_WRITE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cassandra.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.Collections;
import java.util.List;

public class CassandraSinkOptions extends CassandraBaseOptions {

public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription("");

public static final Option<List<String>> FIELDS =
Options.key("fields")
.listType()
.defaultValue(Collections.singletonList("LOCAL_ONE"))
.withDescription("");

public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(DEFAULT_BATCH_SIZE)
.withDescription("");

public static final Option<String> BATCH_TYPE =
Options.key("batch_type").stringType().defaultValue("UNLOGGED").withDescription("");

public static final Option<Boolean> ASYNC_WRITE =
Options.key("async_write").booleanType().defaultValue(true).withDescription("");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cassandra.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class CassandraSourceOptions extends CassandraBaseOptions {

public static final Option<String> CQL =
Options.key("cql").stringType().noDefaultValue().withDescription("");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@

package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
Expand All @@ -39,43 +33,26 @@

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.TABLE;

@AutoService(SeaTunnelSink.class)
public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private final CassandraParameters cassandraParameters = new CassandraParameters();
private SeaTunnelRowType seaTunnelRowType;

private ColumnDefinitions tableSchema;

@Override
public String getPluginName() {
return "Cassandra";
}
private final CassandraParameters cassandraParameters;
private final CatalogTable catalogTable;
private final ColumnDefinitions tableSchema;

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult checkResult =
CheckConfigUtil.checkAllExists(
pluginConfig, HOST.key(), KEYSPACE.key(), TABLE.key());
if (!checkResult.isSuccess()) {
throw new CassandraConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, checkResult.getMsg()));
}
this.cassandraParameters.buildWithConfig(pluginConfig);
public CassandraSink(
CassandraParameters cassandraParameters,
CatalogTable catalogTable,
ReadonlyConfig pluginConfig) {
this.cassandraParameters = cassandraParameters;
this.catalogTable = catalogTable;
try (CqlSession session =
CassandraClient.getCqlSessionBuilder(
cassandraParameters.getHost(),
Expand All @@ -85,8 +62,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
cassandraParameters.getDatacenter())
.build()) {
List<String> fields = cassandraParameters.getFields();
this.tableSchema =
CassandraClient.getTableSchema(session, pluginConfig.getString(TABLE.key()));
this.tableSchema = CassandraClient.getTableSchema(session, pluginConfig.get(TABLE));
if (fields == null || fields.isEmpty()) {
List<String> newFields = new ArrayList<>();
for (int i = 0; i < tableSchema.size(); i++) {
Expand All @@ -101,7 +77,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"Field "
+ field
+ " does not exist in table "
+ pluginConfig.getString(TABLE.key()));
+ pluginConfig.get(TABLE));
}
}
}
Expand All @@ -115,18 +91,19 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
public String getPluginName() {
return "Cassandra";
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType, tableSchema);
return new CassandraSinkWriter(
cassandraParameters, catalogTable.getSeaTunnelRowType(), tableSchema);
}

@Override
public Optional<CatalogTable> getWriteCatalogTable() {
return super.getWriteCatalogTable();
return Optional.of(catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,26 @@
package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;

import com.google.auto.service.AutoService;

import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.ASYNC_WRITE;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.BATCH_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.CONSISTENCY_LEVEL;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.DATACENTER;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.HOST;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.KEYSPACE;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.TABLE;
import static org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.USERNAME;

@AutoService(Factory.class)
public class CassandraSinkFactory implements TableSinkFactory {
@Override
Expand All @@ -34,15 +48,19 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(CassandraConfig.HOST, CassandraConfig.KEYSPACE, CassandraConfig.TABLE)
.bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
.required(HOST, KEYSPACE, TABLE)
.bundled(USERNAME, PASSWORD)
.optional(
CassandraConfig.DATACENTER,
CassandraConfig.CONSISTENCY_LEVEL,
CassandraConfig.FIELDS,
CassandraConfig.BATCH_SIZE,
CassandraConfig.BATCH_TYPE,
CassandraConfig.ASYNC_WRITE)
DATACENTER, CONSISTENCY_LEVEL, FIELDS, BATCH_SIZE, BATCH_TYPE, ASYNC_WRITE)
.build();
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
CassandraParameters cassandraParameters = new CassandraParameters();
cassandraParameters.buildWithConfig(context.getOptions());
return () ->
new CassandraSink(
cassandraParameters, context.getCatalogTable(), context.getOptions());
}
}
Loading

0 comments on commit 894c36a

Please sign in to comment.