diff --git a/milvusreader/pom.xml b/milvusreader/pom.xml
new file mode 100644
index 000000000..c66a6c7ba
--- /dev/null
+++ b/milvusreader/pom.xml
@@ -0,0 +1,109 @@
+
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ milvusreader
+
+
+ UTF-8
+ official
+ 1.8
+
+
+
+
+ guava
+ com.google.guava
+ 32.0.1-jre
+
+
+
+
+
+
+ io.milvus
+ milvus-sdk-java
+ 2.4.8
+
+
+ org.jetbrains.kotlin
+ kotlin-test-junit5
+ 2.0.0
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.0
+ test
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ 2.0.0
+
+
+ com.alibaba.datax
+ datax-common
+ 0.0.1-SNAPSHOT
+ compile
+
+
+ org.projectlombok
+ lombok
+ 1.18.30
+ provided
+
+
+
+
+
+
+
+ src/main/resources
+
+ **/*.*
+
+ true
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/milvusreader/src/main/assembly/package.xml b/milvusreader/src/main/assembly/package.xml
new file mode 100644
index 000000000..3b12c5cb5
--- /dev/null
+++ b/milvusreader/src/main/assembly/package.xml
@@ -0,0 +1,36 @@
+
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/milvusreader
+
+
+ target/
+
+ milvuswriter-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/milvusreader
+
+
+
+
+
+ false
+ plugin/reader/milvusreader/libs
+ runtime
+
+
+
diff --git a/milvusreader/src/main/doc/milvusreader.md b/milvusreader/src/main/doc/milvusreader.md
new file mode 100644
index 000000000..bb2257422
--- /dev/null
+++ b/milvusreader/src/main/doc/milvusreader.md
@@ -0,0 +1,105 @@
+### Datax MilvusReader
+#### 1 快速介绍
+
+MilvusReader 插件利用 Milvus 的java客户端MilvusClient进行Milvus的读操作。
+
+#### 2 实现原理
+
+MilvusReader通过Datax框架从Milvus读取数据,通过主控的JOB程序按照指定的规则对Milvus中的数据进行分片,并行读取,然后将Milvus支持的类型通过逐一判断转换成Datax支持的类型。
+
+#### 3 功能说明
+* 该示例从Milvus读一份Collection数据到另一个Milvus。
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "milvusreader",
+ "parameter": {
+ "uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10
+ }
+ },
+ "writer": {
+ "name": "milvuswriter",
+ "parameter": {
+ "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10,
+ "column": [
+ {
+ "name": "id",
+ "type": "Int64",
+ "isPrimaryKey": true
+ },
+ {
+ "name": "title_vector",
+ "type": "FloatVector",
+ "dimension": 768
+ },
+ {
+ "name": "title",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "link",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "reading_time",
+ "type": "Int64"
+ },
+ {
+ "name": "publication",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "claps",
+ "type": "Int64"
+ },
+ {
+ "name": "responses",
+ "type": "Int64"
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+#### 4 参数说明
+
+* uri: Milvus Cluster endpoint。【必填】
+* token:Milvus的连接token。【必填】
+* collection: 读取数据的collection。【必填】
+* partition: 读取数据的partition。【选填】
+* batchSize: 每次读取数据的行数【选填】
+
+#### 5 类型转换
+
+| DataX 内部类型| Milvus 数据类型 |
+| -------- |-----------------|
+| Long | int |
+| Double | double |
+| String | string, varchar |
+| Boolean | bool |
+
+- 当前暂不支持读取dynamic schema的数据,及按partition读取
+
+#### 6 性能报告
+#### 7 测试报告
diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java
new file mode 100644
index 000000000..ed22c129e
--- /dev/null
+++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.datax.plugin.reader.milvusreader;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+public class BufferUtils {
+
+ public static ByteBuffer toByteBuffer(Short[] shortArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
+
+ for (Short value : shortArray) {
+ byteBuffer.putShort(value);
+ }
+
+ // Compatible compilation and running versions are not consistent
+ // Flip the buffer to prepare for reading
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Short[] toShortArray(ByteBuffer byteBuffer) {
+ Short[] shortArray = new Short[byteBuffer.capacity() / 2];
+
+ for (int i = 0; i < shortArray.length; i++) {
+ shortArray[i] = byteBuffer.getShort();
+ }
+
+ return shortArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Float[] floatArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
+
+ for (float value : floatArray) {
+ byteBuffer.putFloat(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Float[] toFloatArray(ByteBuffer byteBuffer) {
+ Float[] floatArray = new Float[byteBuffer.capacity() / 4];
+
+ for (int i = 0; i < floatArray.length; i++) {
+ floatArray[i] = byteBuffer.getFloat();
+ }
+
+ return floatArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Double[] doubleArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
+
+ for (double value : doubleArray) {
+ byteBuffer.putDouble(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
+ Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
+
+ for (int i = 0; i < doubleArray.length; i++) {
+ doubleArray[i] = byteBuffer.getDouble();
+ }
+
+ return doubleArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Integer[] intArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
+
+ for (int value : intArray) {
+ byteBuffer.putInt(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Integer[] toIntArray(ByteBuffer byteBuffer) {
+ Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
+
+ for (int i = 0; i < intArray.length; i++) {
+ intArray[i] = byteBuffer.getInt();
+ }
+
+ return intArray;
+ }
+}
diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java
new file mode 100644
index 000000000..b0f608be1
--- /dev/null
+++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java
@@ -0,0 +1,10 @@
+package com.alibaba.datax.plugin.reader.milvusreader;
+
+public class KeyConstant {
+ public static final String URI = "uri";
+ public static final String TOKEN = "token";
+ public static final String DATABASE = "database";
+ public static final String COLLECTION = "collection";
+ public static final String PARTITION = "partition";
+ public static final String BATCH_SIZE = "batchSize";
+}
diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java
new file mode 100644
index 000000000..51e7798ed
--- /dev/null
+++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java
@@ -0,0 +1,130 @@
+package com.alibaba.datax.plugin.reader.milvusreader;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.plugin.RecordSender;
+import com.alibaba.datax.common.spi.Reader;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import io.milvus.orm.iterator.QueryIterator;
+import io.milvus.response.QueryResultsWrapper;
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.service.collection.request.DescribeCollectionReq;
+import io.milvus.v2.service.collection.request.HasCollectionReq;
+import io.milvus.v2.service.collection.response.DescribeCollectionResp;
+import io.milvus.v2.service.vector.request.QueryIteratorReq;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class MilvusReader extends Reader {
+ public static class Job extends Reader.Job {
+ private Configuration originalConfig = null;
+ /**
+ * 切分任务。
+ *
+ * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
+ */
+ @Override
+ public List split(int mandatoryNumber) {
+ List configList = new ArrayList();
+ for(int i = 0; i < mandatoryNumber; i++) {
+ configList.add(this.originalConfig.clone());
+ }
+ return configList;
+ }
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ }
+ public static class Task extends Reader.Task {
+
+ private MilvusClientV2 milvusClientV2;
+
+ private MilvusSourceConverter milvusSourceConverter;
+
+ private String collection = null;
+ private String partition = null;
+ private Integer batchSize;
+
+ private CreateCollectionReq.CollectionSchema collectionSchema;
+
+ @Override
+ public void init() {
+ log.info("Initializing Milvus writer");
+ // get configuration
+ Configuration writerSliceConfig = this.getPluginJobConf();
+ this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
+ this.partition = writerSliceConfig.getString(KeyConstant.PARTITION, null);
+ this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
+ log.info("Collection:{}", this.collection);
+ // connect to milvus
+ ConnectConfig connectConfig = ConnectConfig.builder()
+ .uri(writerSliceConfig.getString(KeyConstant.URI))
+ .token(writerSliceConfig.getString(KeyConstant.TOKEN))
+ .build();
+ if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) {
+ log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE));
+ connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE));
+ }
+ this.milvusClientV2 = new MilvusClientV2(connectConfig);
+ this.milvusSourceConverter = new MilvusSourceConverter();
+ log.info("Milvus writer initialized");
+ }
+ @Override
+ public void prepare() {
+ super.prepare();
+ Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build());
+ if (!hasCollection) {
+ log.error("Collection {} does not exist", collection);
+ throw new RuntimeException("Collection does not exist");
+ }
+ DescribeCollectionReq describeCollectionReq = DescribeCollectionReq.builder()
+ .collectionName(collection)
+ .build();
+ DescribeCollectionResp describeCollectionResp = milvusClientV2.describeCollection(describeCollectionReq);
+ this.collectionSchema = describeCollectionResp.getCollectionSchema();
+ }
+
+ @Override
+ public void destroy() {
+ log.info("Closing Milvus reader, closing connection");
+ this.milvusClientV2.close();
+ }
+
+ @Override
+ public void startRead(RecordSender recordSender) {
+ QueryIteratorReq queryIteratorReq = QueryIteratorReq.builder()
+ .collectionName(collection)
+ .outputFields(Collections.singletonList("*"))
+ .batchSize(batchSize)
+ .build();
+ if(partition != null) {
+ queryIteratorReq.setPartitionNames(Collections.singletonList(partition));
+ }
+ QueryIterator queryIterator = milvusClientV2.queryIterator(queryIteratorReq);
+ while (true){
+ List rowRecords = queryIterator.next();
+ if(rowRecords.isEmpty()){
+ break;
+ }
+ rowRecords.forEach(rowRecord -> {
+ Record record = recordSender.createRecord();
+ record = milvusSourceConverter.toDataXRecord(record, rowRecord, collectionSchema);
+ recordSender.sendToWriter(record);
+ });
+ }
+ }
+ }
+}
diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java
new file mode 100644
index 000000000..2f8eb6fd3
--- /dev/null
+++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java
@@ -0,0 +1,64 @@
+package com.alibaba.datax.plugin.reader.milvusreader;
+
+import com.alibaba.datax.common.element.BoolColumn;
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.DoubleColumn;
+import com.alibaba.datax.common.element.LongColumn;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.element.StringColumn;
+import com.google.gson.Gson;
+import io.milvus.response.QueryResultsWrapper;
+import io.milvus.v2.common.DataType;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class MilvusSourceConverter {
+
+ public Record toDataXRecord(Record record, QueryResultsWrapper.RowRecord rowRecord, CreateCollectionReq.CollectionSchema collectionSchema) {
+ Map fields = rowRecord.getFieldValues();
+
+ for (int i = 0; i < collectionSchema.getFieldSchemaList().size(); i++) {
+ CreateCollectionReq.FieldSchema fieldSchema = collectionSchema.getFieldSchemaList().get(i);
+ String fieldName = fieldSchema.getName();
+ Object fieldValue = fields.get(fieldName);
+ Column column = convertToDataXColumn(fieldSchema.getDataType(), fieldValue);
+ record.addColumn(column);
+ }
+ return record;
+ }
+
+ private Column convertToDataXColumn(DataType dataType, Object fieldValue) {
+ Gson gson = new Gson();
+ switch (dataType) {
+ case Bool:
+ return new BoolColumn(Boolean.getBoolean(fieldValue.toString()));
+ case Int8:
+ case Int16:
+ case Int32:
+ case Int64:
+ return new LongColumn(Integer.parseInt(fieldValue.toString()));
+ case Float:
+ case Double:
+ return new DoubleColumn(java.lang.Double.parseDouble(fieldValue.toString()));
+ case VarChar:
+ case String:
+ return new StringColumn(fieldValue.toString());
+ case JSON:
+ return new StringColumn(gson.toJson(fieldValue));
+ case Array:
+ return new StringColumn(gson.toJson(fieldValue));
+ case FloatVector:
+ List floats = (List) fieldValue;
+ return new StringColumn(Arrays.toString(floats.toArray()));
+ case BinaryVector:
+ Integer[] binarys = BufferUtils.toIntArray((ByteBuffer) fieldValue);
+ return new StringColumn(Arrays.toString(binarys));
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " + dataType);
+ }
+ }
+}
diff --git a/milvusreader/src/main/resources/plugin.json b/milvusreader/src/main/resources/plugin.json
new file mode 100644
index 000000000..dc90019da
--- /dev/null
+++ b/milvusreader/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "milvusreader",
+ "class": "com.alibaba.datax.plugin.reader.milvusreader.MilvusReader",
+ "description": "useScene: prod. mechanism: via milvusclient connect milvus read data concurrent.",
+ "developer": "nianliuu"
+}
diff --git a/milvusreader/src/main/resources/plugin_job_template.json b/milvusreader/src/main/resources/plugin_job_template.json
new file mode 100644
index 000000000..de33d6512
--- /dev/null
+++ b/milvusreader/src/main/resources/plugin_job_template.json
@@ -0,0 +1,9 @@
+{
+ "name": "milvusreader",
+ "parameter": {
+ "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19532",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10
+ }
+}
\ No newline at end of file
diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml
new file mode 100644
index 000000000..b889cbe41
--- /dev/null
+++ b/milvuswriter/pom.xml
@@ -0,0 +1,109 @@
+
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+
+ milvuswriter
+
+
+ UTF-8
+ official
+ 1.8
+
+
+
+
+ guava
+ com.google.guava
+ 32.0.1-jre
+
+
+
+
+
+
+ io.milvus
+ milvus-sdk-java
+ 2.4.8
+
+
+ org.jetbrains.kotlin
+ kotlin-test-junit5
+ 2.0.0
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.0
+ test
+
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ 2.0.0
+
+
+ com.alibaba.datax
+ datax-common
+ 0.0.1-SNAPSHOT
+ compile
+
+
+ org.projectlombok
+ lombok
+ 1.18.30
+ provided
+
+
+
+
+
+
+
+ src/main/resources
+
+ **/*.*
+
+ true
+
+
+
+
+
+ maven-compiler-plugin
+
+ ${jdk-version}
+ ${jdk-version}
+ ${project-sourceEncoding}
+
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/milvuswriter/src/doc/milvuswriter.md b/milvuswriter/src/doc/milvuswriter.md
new file mode 100644
index 000000000..a0d773f69
--- /dev/null
+++ b/milvuswriter/src/doc/milvuswriter.md
@@ -0,0 +1,116 @@
+### Datax MilvusWriter插件
+#### 1 快速介绍
+
+MilvusWriter 插件利用 Milvus 的java客户端MilvusClient进行Milvus的写操作。
+
+#### 2 实现原理
+
+MilvusWriter通过Datax框架向Milvus写入数据,通过主控的JOB程序按照指定的规则向Milvus写入,然后将Datax的类型通过逐一判断转换成Milvus支持的类型。
+
+#### 3 功能说明
+* 该示例从Milvus读一份Collection数据到另一个Milvus。
+```json
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "milvusreader",
+ "parameter": {
+ "uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10
+ }
+ },
+ "writer": {
+ "name": "milvuswriter",
+ "parameter": {
+ "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530",
+ "token": "*****",
+ "collection": "medium_articles",
+ "schemaCreateMode": 0,
+ "batchSize": 10,
+ "column": [
+ {
+ "name": "id",
+ "type": "Int64",
+ "isPrimaryKey": true
+ },
+ {
+ "name": "title_vector",
+ "type": "FloatVector",
+ "dimension": 768
+ },
+ {
+ "name": "title",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "link",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "reading_time",
+ "type": "Int64"
+ },
+ {
+ "name": "publication",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "claps",
+ "type": "Int64"
+ },
+ {
+ "name": "responses",
+ "type": "Int64"
+ }
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
+```
+
+#### 4 参数说明
+
+* uri: Milvus Cluster endpoint。【必填】
+* token:Milvus的连接token。【必填】
+* collection: 读取数据的collection。【必填】
+* partition: 读取数据的partition。【选填】
+* batchSize: 每次读取数据的行数【选填】
+* schemaCreateMode: Integer, schema创建模式, 默认为createWhenTableNotExit [0(createWhenTableNotExit),1(exception)]【选填】
+* enableDyanmicSchema: 是否启用动态schema, 默认为true【选填】
+* column: 写入的字段信息【必填】
+ * name: 字段名【必填】
+ * type: 字段类型[Int8, Int16, Int32, Int64, Float, Double, VarChar, FloatVector, JSON, Array]【必填】
+ * isPrimaryKey: 是否为主键【选填】
+ * isPartitionKey: 是否为分区键【选填】
+ * dimension: FloatVector类型的维度【选填】
+ * maxLength: VarChar类型的最大长度【选填】
+ * elementType: Array类型的元素类型【选填】
+ * maxcapacity: Array类型的最大容量【选填】
+#### 5 类型转换
+
+| DataX 内部类型| Milvus 数据类型 |
+| -------- |-----------------|
+| Long | int |
+| Double | double |
+| String | string, varchar |
+| Boolean | bool |
+
+- 当前暂不支持写入dynamic schema的数据,及按partition写入
+
+#### 6 性能报告
+#### 7 测试报告
diff --git a/milvuswriter/src/main/assembly/package.xml b/milvuswriter/src/main/assembly/package.xml
new file mode 100644
index 000000000..62357b4ae
--- /dev/null
+++ b/milvuswriter/src/main/assembly/package.xml
@@ -0,0 +1,36 @@
+
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/writer/milvuswriter
+
+
+ target/
+
+ milvuswriter-0.0.1-SNAPSHOT.jar
+
+ plugin/writer/milvuswriter
+
+
+
+
+
+ false
+ plugin/writer/milvuswriter/libs
+ runtime
+
+
+
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java
new file mode 100644
index 000000000..89153a6a4
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+public class BufferUtils {
+
+ public static ByteBuffer toByteBuffer(Short[] shortArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);
+
+ for (Short value : shortArray) {
+ byteBuffer.putShort(value);
+ }
+
+ // Compatible compilation and running versions are not consistent
+ // Flip the buffer to prepare for reading
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Short[] toShortArray(ByteBuffer byteBuffer) {
+ Short[] shortArray = new Short[byteBuffer.capacity() / 2];
+
+ for (int i = 0; i < shortArray.length; i++) {
+ shortArray[i] = byteBuffer.getShort();
+ }
+
+ return shortArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Float[] floatArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);
+
+ for (float value : floatArray) {
+ byteBuffer.putFloat(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Float[] toFloatArray(ByteBuffer byteBuffer) {
+ Float[] floatArray = new Float[byteBuffer.capacity() / 4];
+
+ for (int i = 0; i < floatArray.length; i++) {
+ floatArray[i] = byteBuffer.getFloat();
+ }
+
+ return floatArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Double[] doubleArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);
+
+ for (double value : doubleArray) {
+ byteBuffer.putDouble(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
+ Double[] doubleArray = new Double[byteBuffer.capacity() / 8];
+
+ for (int i = 0; i < doubleArray.length; i++) {
+ doubleArray[i] = byteBuffer.getDouble();
+ }
+
+ return doubleArray;
+ }
+
+ public static ByteBuffer toByteBuffer(Integer[] intArray) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);
+
+ for (int value : intArray) {
+ byteBuffer.putInt(value);
+ }
+
+ ((Buffer) byteBuffer).flip();
+
+ return byteBuffer;
+ }
+
+ public static Integer[] toIntArray(ByteBuffer byteBuffer) {
+ Integer[] intArray = new Integer[byteBuffer.capacity() / 4];
+
+ for (int i = 0; i < intArray.length; i++) {
+ intArray[i] = byteBuffer.getInt();
+ }
+
+ return intArray;
+ }
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java
new file mode 100644
index 000000000..78fc302c5
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java
@@ -0,0 +1,24 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+public class KeyConstant {
+ public static final String URI = "uri";
+ public static final String TOKEN = "token";
+ public static final String DATABASE = "database";
+ public static final String COLLECTION = "collection";
+ public static final String PARTITION = "partition";
+ public static final String AUTO_ID = "autoId";
+ public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema";
+ public static final String BATCH_SIZE = "batchSize";
+ public static final String COLUMN = "column";
+ public static final String COLUMN_TYPE = "type";
+ public static final String COLUMN_NAME = "name";
+ public static final String VECTOR_DIMENSION = "dimension";
+ public static final String IS_PRIMARY_KEY = "isPrimaryKey";
+// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception"
+ public static final String schemaCreateMode = "schemaCreateMode";
+ public static final String IS_PARTITION_KEY = "isPartitionKey";
+ public static final String MAX_LENGTH = "maxLength";
+ public static final String ELEMENT_TYPE = "elementType";
+ public static final String MAX_CAPACITY = "maxCapacity";
+ public static final String IS_AUTO_INCREMENT = "autoId";
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java
new file mode 100644
index 000000000..dc3590860
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java
@@ -0,0 +1,48 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import com.google.gson.JsonObject;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.service.vector.request.UpsertReq;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class MilvusBufferWriter {
+
+ private final MilvusClientV2 milvusClientV2;
+ private final String collection;
+ private final String partition;
+ private final Integer batchSize;
+ private List dataCache;
+
+ public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, String partition, Integer batchSize){
+ this.milvusClientV2 = milvusClientV2;
+ this.collection = collection;
+ this.partition = partition;
+ this.batchSize = batchSize;
+ this.dataCache = new ArrayList<>();
+ }
+ public void write(JsonObject data){
+ dataCache.add(data);
+ }
+ public Boolean needCommit(){
+ return dataCache.size() >= batchSize;
+ }
+ public void commit(){
+ if(dataCache.isEmpty()){
+ log.info("dataCache is empty, skip commit");
+ return;
+ }
+ UpsertReq upsertReq = UpsertReq.builder()
+ .collectionName(collection)
+ .data(dataCache)
+ .build();
+ if(partition != null){
+ upsertReq.setPartitionName(partition);
+ }
+ milvusClientV2.upsert(upsertReq);
+ dataCache = new ArrayList<>();
+ }
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java
new file mode 100644
index 000000000..fd436f3b0
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java
@@ -0,0 +1,121 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.fastjson2.JSONArray;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+import io.milvus.v2.common.DataType;
+import static io.milvus.v2.common.DataType.*;
+import io.milvus.v2.service.collection.request.AddFieldReq;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+
+import java.lang.reflect.Type;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MilvusSinkConverter {
+ public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) {
+ JsonObject data = new JsonObject();
+ Gson gson = new Gson();
+ for(int i = 0; i < record.getColumnNumber(); i++) {
+ String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE);
+ String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME);
+ Object rawData = record.getColumn(i).getRawData();
+ Object field = convertToMilvusField(fieldType, rawData);
+ data.add(fieldName, gson.toJsonTree(field));
+ }
+ return data;
+ }
+
+ private Object convertToMilvusField(String type, Object rawData) {
+ Gson gson = new Gson();
+ switch (valueOf(type)) {
+ case Int8:
+ return Byte.parseByte(rawData.toString());
+ case Int16:
+ return Short.parseShort(rawData.toString());
+ case Int32:
+ return Integer.parseInt(rawData.toString());
+ case Int64:
+ return Long.parseLong(rawData.toString());
+ case Float:
+ return java.lang.Float.parseFloat(rawData.toString());
+ case String:
+ case VarChar:
+ return rawData.toString();
+ case Bool:
+ return Boolean.parseBoolean(rawData.toString());
+ case JSON:
+ return gson.fromJson(rawData.toString(), JsonObject.class);
+ case Array:
+ Type listType = new TypeToken>() {}.getType();
+ return gson.fromJson(rawData.toString(), listType);
+ case FloatVector:
+ java.lang.Float[] floats = Arrays.stream(processVectorString(rawData)).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new);
+ return Arrays.stream(floats).collect(Collectors.toList());
+ case BinaryVector:
+ java.lang.Integer[] binarys = Arrays.stream(processVectorString(rawData)).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new);
+ return BufferUtils.toByteBuffer(binarys);
+ case Float16Vector:
+ case BFloat16Vector:
+ // all these data is byte format in milvus
+ ByteBuffer binaryVector = (ByteBuffer) rawData;
+ return gson.toJsonTree(binaryVector.array());
+ case SparseFloatVector:
+ return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject();
+ default:
+ throw new RuntimeException("Unsupported data type: " + type);
+ }
+ }
+
+ private String[] processArrayString(Object rawData) {
+ // Step 1: Remove square brackets
+ String cleanedInput = rawData.toString().replace("[", "").replace("]", "");
+
+ // Step 2: Split the string into an array of string numbers
+ return cleanedInput.split(",\\s*");
+ }
+
+ private String[] processVectorString(Object rawData) {
+ // Step 1: Remove square brackets
+ String cleanedInput = rawData.toString().replace("[", "").replace("]", "");
+
+ // Step 2: Split the string into an array of string numbers
+ return cleanedInput.split(",\\s*");
+ }
+
+ public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) {
+ CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build();
+ for (int i = 0; i < milvusColumnMeta.size(); i++) {
+ AddFieldReq addFieldReq = AddFieldReq.builder()
+ .fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME))
+ .dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE)))
+ .build();
+ if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) {
+ addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY));
+ }
+ if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) {
+ addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION));
+ }
+ if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) {
+ addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY));
+ }
+ if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) {
+ addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH));
+ }
+ if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.ELEMENT_TYPE)) {
+ addFieldReq.setElementType(DataType.valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.ELEMENT_TYPE)));
+ addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH));
+ }
+ if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_CAPACITY)) {
+ addFieldReq.setMaxCapacity(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_CAPACITY));
+ }
+ collectionSchema.addField(addFieldReq);
+ }
+ return collectionSchema;
+ }
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java
new file mode 100644
index 000000000..01ae39e52
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java
@@ -0,0 +1,148 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONArray;
+import com.google.gson.JsonObject;
+import io.milvus.v2.client.ConnectConfig;
+import io.milvus.v2.client.MilvusClientV2;
+import io.milvus.v2.service.collection.request.CreateCollectionReq;
+import io.milvus.v2.service.collection.request.HasCollectionReq;
+import io.milvus.v2.service.partition.request.CreatePartitionReq;
+import io.milvus.v2.service.partition.request.HasPartitionReq;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class MilvusWriter extends Writer {
+ public static class Job extends Writer.Job {
+ private Configuration originalConfig = null;
+ /**
+ * 切分任务。
+ *
+ * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
+ */
+ @Override
+ public List split(int mandatoryNumber) {
+ List configList = new ArrayList();
+ for(int i = 0; i < mandatoryNumber; i++) {
+ configList.add(this.originalConfig.clone());
+ }
+ return configList;
+ }
+
+ @Override
+ public void init() {
+ this.originalConfig = super.getPluginJobConf();
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ }
+ public static class Task extends Writer.Task {
+
+ private MilvusClientV2 milvusClientV2;
+
+ private MilvusSinkConverter milvusSinkConverter;
+ private MilvusBufferWriter milvusBufferWriter;
+
+ private String collection = null;
+ private String partition = null;
+ private JSONArray milvusColumnMeta;
+
+ private boolean enableDynamicSchema;
+
+ private Integer schemaCreateMode;
+
+ @Override
+ public void startWrite(RecordReceiver lineReceiver) {
+ Record record = lineReceiver.getFromReader();
+ while(record != null){
+ JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record);
+ milvusBufferWriter.write(data);
+ if (milvusBufferWriter.needCommit()) {
+ log.info("Reached buffer limit, Committing data");
+ milvusBufferWriter.commit();
+ log.info("Data committed");
+ }
+ record = lineReceiver.getFromReader();
+ }
+ }
+
+ @Override
+ public void init() {
+ log.info("Initializing Milvus writer");
+ // get configuration
+ Configuration writerSliceConfig = this.getPluginJobConf();
+ this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION);
+ this.partition = writerSliceConfig.getString(KeyConstant.PARTITION, null);
+ this.enableDynamicSchema = writerSliceConfig.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA, true);
+ this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN));
+ this.schemaCreateMode = writerSliceConfig.getInt(KeyConstant.schemaCreateMode) == null ?
+ SchemaCreateMode.CREATE_WHEN_NOT_EXIST.getMode() : writerSliceConfig.getInt(KeyConstant.schemaCreateMode);
+ int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100);
+ log.info("Collection:{}", this.collection);
+ // connect to milvus
+ ConnectConfig connectConfig = ConnectConfig.builder()
+ .uri(writerSliceConfig.getString(KeyConstant.URI))
+ .token(writerSliceConfig.getString(KeyConstant.TOKEN))
+ .build();
+ if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) {
+ log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE));
+ connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE));
+ }
+ this.milvusClientV2 = new MilvusClientV2(connectConfig);
+ this.milvusSinkConverter = new MilvusSinkConverter();
+ this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, partition, batchSize);
+ log.info("Milvus writer initialized");
+ }
+ @Override
+ public void prepare() {
+ super.prepare();
+ Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build());
+ if (!hasCollection) {
+ log.info("Collection not exist");
+ if (schemaCreateMode.equals(SchemaCreateMode.CREATE_WHEN_NOT_EXIST.getMode())) {
+ // create collection
+ log.info("Creating collection:{}", this.collection);
+ CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta);
+ collectionSchema.setEnableDynamicField(enableDynamicSchema);
+ CreateCollectionReq createCollectionReq = CreateCollectionReq.builder()
+ .collectionName(collection)
+ .collectionSchema(collectionSchema)
+ .build();
+ milvusClientV2.createCollection(createCollectionReq);
+ } else if (schemaCreateMode.equals(SchemaCreateMode.EXCEPTION.getMode())) {
+ log.error("Collection not exist, throw exception");
+ throw new RuntimeException("Collection not exist");
+ }
+ }
+ if(partition != null) {
+ Boolean hasPartition = milvusClientV2.hasPartition(HasPartitionReq.builder().collectionName(collection).partitionName(partition).build());
+ if (!hasPartition) {
+ log.info("Partition not exist, creating");
+ CreatePartitionReq createPartitionReq = CreatePartitionReq.builder()
+ .collectionName(collection)
+ .partitionName(partition)
+ .build();
+ milvusClientV2.createPartition(createPartitionReq);
+ log.info("Partition created");
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ log.info("Closing Milvus writer, committing data and closing connection");
+ this.milvusBufferWriter.commit();
+ this.milvusClientV2.close();
+ }
+ }
+}
diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java
new file mode 100644
index 000000000..aa67945db
--- /dev/null
+++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java
@@ -0,0 +1,16 @@
+package com.alibaba.datax.plugin.writer.milvuswriter;
+
+import lombok.Getter;
+
+@Getter
+public enum SchemaCreateMode {
+ CREATE_WHEN_NOT_EXIST(0),
+ EXCEPTION(1),
+ IGNORE(2);
+
+ private int mode;
+
+ SchemaCreateMode(int mode) {
+ this.mode = mode;
+ }
+}
diff --git a/milvuswriter/src/main/resources/plugin.json b/milvuswriter/src/main/resources/plugin.json
new file mode 100644
index 000000000..8b9123093
--- /dev/null
+++ b/milvuswriter/src/main/resources/plugin.json
@@ -0,0 +1,6 @@
+{
+ "name": "milvuswriter",
+ "class": "com.alibaba.datax.plugin.writer.milvuswriter.MilvusWriter",
+ "description": "useScene: prod. mechanism: via milvusclient connect milvus write data concurrent.",
+ "developer": "nianliuu"
+}
diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json
new file mode 100644
index 000000000..b9bff3628
--- /dev/null
+++ b/milvuswriter/src/main/resources/plugin_job_template.json
@@ -0,0 +1,49 @@
+{
+ "name": "milvuswriter",
+ "parameter": {
+ "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530",
+ "token": "*****",
+ "collection": "medium_articles",
+ "batchSize": 10,
+ "schemaCreateMode": "createWhenTableNotExit",
+ "column": [
+ {
+ "name": "id",
+ "type": "Int64",
+ "isPrimaryKey": true
+ },
+ {
+ "name": "title_vector",
+ "type": "FloatVector",
+ "dimension": 768
+ },
+ {
+ "name": "title",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "link",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "reading_time",
+ "type": "Int64"
+ },
+ {
+ "name": "publication",
+ "type": "VarChar",
+ "maxLength": 1000
+ },
+ {
+ "name": "claps",
+ "type": "Int64"
+ },
+ {
+ "name": "responses",
+ "type": "Int64"
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/package.xml b/package.xml
index 624109f79..49eff42d6 100644
--- a/package.xml
+++ b/package.xml
@@ -257,8 +257,22 @@
datax
+
+ milvusreader/target/datax/
+
+ **/*.*
+
+ datax
+
+
+ milvuswriter/target/datax/
+
+ **/*.*
+
+ datax
+
mysqlwriter/target/datax/
diff --git a/pom.xml b/pom.xml
index c7f43f172..2d4d3a66c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,7 @@
starrocksreader
sybasereader
dorisreader
+ milvusreader
mysqlwriter
starrockswriter
@@ -129,6 +130,7 @@
adbmysqlwriter
sybasewriter
neo4jwriter
+ milvuswriter
plugin-rdbms-util
plugin-unstructured-storage-util