Skip to content

Commit

Permalink
fix some problem
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas-HuWei committed Jul 11, 2024
1 parent 839437e commit 22f5dc1
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 43 deletions.
1 change: 1 addition & 0 deletions config/plugin_config
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@ connector-paimon
connector-rocketmq
connector-tdengine
connector-web3j
connector-milvus
--end--
2 changes: 2 additions & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,5 @@ seatunnel.source.Oracle-CDC = connector-cdc-oracle
seatunnel.sink.Pulsar = connector-pulsar
seatunnel.source.ObsFile = connector-file-obs
seatunnel.sink.ObsFile = connector-file-obs
seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.milvus.convert.MilvusConvertUtils;
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;

Expand Down Expand Up @@ -257,7 +257,9 @@ private FieldType convertToFieldType(Column column, PrimaryKey primaryKey) {
FieldType.Builder build =
FieldType.newBuilder()
.withName(column.getName())
.withDataType(convertToDataType(seaTunnelDataType.getSqlType()));
.withDataType(
MilvusConvertUtils.convertSqlTypeToDataType(
seaTunnelDataType.getSqlType()));
switch (seaTunnelDataType.getSqlType()) {
case ROW:
build.withMaxLength(65535);
Expand Down Expand Up @@ -296,7 +298,8 @@ private FieldType convertToFieldType(Column column, PrimaryKey primaryKey) {
case ARRAY:
ArrayType arrayType = (ArrayType) column.getDataType();
SeaTunnelDataType elementType = arrayType.getElementType();
build.withElementType(convertToDataType(elementType.getSqlType()));
build.withElementType(
MilvusConvertUtils.convertSqlTypeToDataType(elementType.getSqlType()));
build.withMaxCapacity(4095);
switch (elementType.getSqlType()) {
case STRING:
Expand Down Expand Up @@ -328,45 +331,6 @@ private FieldType convertToFieldType(Column column, PrimaryKey primaryKey) {
return build.build();
}

private static DataType convertToDataType(SqlType sqlType) {
switch (sqlType) {
case BOOLEAN:
return DataType.Bool;
case TINYINT:
return DataType.Int8;
case SMALLINT:
return DataType.Int16;
case INT:
return DataType.Int32;
case BIGINT:
return DataType.Int64;
case FLOAT:
return DataType.Float;
case DOUBLE:
return DataType.Double;
case STRING:
return DataType.VarChar;
case ARRAY:
return DataType.Array;
case FLOAT_VECTOR:
return DataType.FloatVector;
case BINARY_VECTOR:
return DataType.BinaryVector;
case FLOAT16_VECTOR:
return DataType.Float16Vector;
case BFLOAT16_VECTOR:
return DataType.BFloat16Vector;
case SPARSE_FLOAT_VECTOR:
return DataType.SparseFloatVector;
case DATE:
return DataType.VarChar;
case ROW:
return DataType.VarChar;
}
throw new CatalogException(
String.format("Not support convert to milvus type, sqlType is %s", sqlType));
}

@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.VectorIndex;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand Down Expand Up @@ -346,4 +347,43 @@ public static Object convertBySeaTunnelType(SeaTunnelDataType<?> fieldType, Obje
MilvusConnectionErrorCode.NOT_SUPPORT_TYPE, sqlType.name());
}
}

public static DataType convertSqlTypeToDataType(SqlType sqlType) {
switch (sqlType) {
case BOOLEAN:
return DataType.Bool;
case TINYINT:
return DataType.Int8;
case SMALLINT:
return DataType.Int16;
case INT:
return DataType.Int32;
case BIGINT:
return DataType.Int64;
case FLOAT:
return DataType.Float;
case DOUBLE:
return DataType.Double;
case STRING:
return DataType.VarChar;
case ARRAY:
return DataType.Array;
case FLOAT_VECTOR:
return DataType.FloatVector;
case BINARY_VECTOR:
return DataType.BinaryVector;
case FLOAT16_VECTOR:
return DataType.Float16Vector;
case BFLOAT16_VECTOR:
return DataType.BFloat16Vector;
case SPARSE_FLOAT_VECTOR:
return DataType.SparseFloatVector;
case DATE:
return DataType.VarChar;
case ROW:
return DataType.VarChar;
}
throw new CatalogException(
String.format("Not support convert to milvus type, sqlType is %s", sqlType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public boolean needFlush() {
}

@Override
public boolean flush() {
public synchronized boolean flush() {
if (CollectionUtils.isEmpty(this.milvusDataCache)) {
return true;
}
Expand Down
7 changes: 7 additions & 0 deletions seatunnel-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-milvus</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- jdbc driver -->
<dependency>
<groupId>com.aliyun.phoenix</groupId>
Expand Down

0 comments on commit 22f5dc1

Please sign in to comment.