diff --git a/milvusreader/pom.xml b/milvusreader/pom.xml new file mode 100644 index 000000000..c66a6c7ba --- /dev/null +++ b/milvusreader/pom.xml @@ -0,0 +1,109 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + milvusreader + + + UTF-8 + official + 1.8 + + + + + guava + com.google.guava + 32.0.1-jre + + + + + + + io.milvus + milvus-sdk-java + 2.4.8 + + + org.jetbrains.kotlin + kotlin-test-junit5 + 2.0.0 + test + + + org.junit.jupiter + junit-jupiter + 5.10.0 + test + + + org.jetbrains.kotlin + kotlin-stdlib + 2.0.0 + + + com.alibaba.datax + datax-common + 0.0.1-SNAPSHOT + compile + + + org.projectlombok + lombok + 1.18.30 + provided + + + + + + + + src/main/resources + + **/*.* + + true + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/milvusreader/src/main/assembly/package.xml b/milvusreader/src/main/assembly/package.xml new file mode 100644 index 000000000..3b12c5cb5 --- /dev/null +++ b/milvusreader/src/main/assembly/package.xml @@ -0,0 +1,36 @@ + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/milvusreader + + + target/ + + milvuswriter-0.0.1-SNAPSHOT.jar + + plugin/reader/milvusreader + + + + + + false + plugin/reader/milvusreader/libs + runtime + + + diff --git a/milvusreader/src/main/doc/milvusreader.md b/milvusreader/src/main/doc/milvusreader.md new file mode 100644 index 000000000..bb2257422 --- /dev/null +++ b/milvusreader/src/main/doc/milvusreader.md @@ -0,0 +1,105 @@ +### Datax MilvusReader +#### 1 快速介绍 + +MilvusReader 插件利用 Milvus 的java客户端MilvusClient进行Milvus的读操作。 + +#### 2 实现原理 + +MilvusReader通过Datax框架从Milvus读取数据,通过主控的JOB程序按照指定的规则对Milvus中的数据进行分片,并行读取,然后将Milvus支持的类型通过逐一判断转换成Datax支持的类型。 + +#### 3 功能说明 +* 该示例从Milvus读一份Collection数据到另一个Milvus。 +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "milvusreader", + "parameter": { + "uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10 + } + }, + "writer": { + "name": "milvuswriter", + "parameter": { + "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10, + "column": [ + { + "name": "id", + "type": "Int64", + "isPrimaryKey": true + }, + { + "name": "title_vector", + "type": "FloatVector", + "dimension": 768 + }, + { + "name": "title", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "link", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "reading_time", + "type": "Int64" + }, + { + "name": "publication", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "claps", + "type": "Int64" + }, + { + "name": "responses", + "type": "Int64" + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +#### 4 参数说明 + +* uri: Milvus Cluster endpoint。【必填】 +* token:Milvus的连接token。【必填】 +* collection: 读取数据的collection。【必填】 +* partition: 读取数据的partition。【选填】 +* batchSize: 每次读取数据的行数【选填】 + +#### 5 类型转换 + +| DataX 内部类型| Milvus 数据类型 | +| -------- |-----------------| +| Long | int | +| Double | double | +| String | string, varchar | +| Boolean | bool | + +- 当前暂不支持读取dynamic schema的数据,及按partition读取 + +#### 6 性能报告 +#### 7 测试报告 diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java new file mode 100644 index 000000000..ed22c129e --- /dev/null +++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.datax.plugin.reader.milvusreader; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +public class BufferUtils { + + public static ByteBuffer toByteBuffer(Short[] shortArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); + + for (Short value : shortArray) { + byteBuffer.putShort(value); + } + + // Compatible compilation and running versions are not consistent + // Flip the buffer to prepare for reading + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Short[] toShortArray(ByteBuffer byteBuffer) { + Short[] shortArray = new Short[byteBuffer.capacity() / 2]; + + for (int i = 0; i < shortArray.length; i++) { + shortArray[i] = byteBuffer.getShort(); + } + + return shortArray; + } + + public static ByteBuffer toByteBuffer(Float[] floatArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4); + + for (float value : floatArray) { + byteBuffer.putFloat(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Float[] toFloatArray(ByteBuffer byteBuffer) { + Float[] floatArray = new Float[byteBuffer.capacity() / 4]; + + for (int i = 0; i < floatArray.length; i++) { + floatArray[i] = byteBuffer.getFloat(); + } + + return floatArray; + } + + public static ByteBuffer toByteBuffer(Double[] doubleArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8); + + for (double value : doubleArray) { + byteBuffer.putDouble(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Double[] toDoubleArray(ByteBuffer byteBuffer) { + Double[] doubleArray = new Double[byteBuffer.capacity() / 8]; + + for (int i = 0; i < doubleArray.length; i++) { + doubleArray[i] = byteBuffer.getDouble(); + } + + return doubleArray; + } + + public static ByteBuffer toByteBuffer(Integer[] intArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4); + + for (int value : intArray) { + byteBuffer.putInt(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Integer[] toIntArray(ByteBuffer byteBuffer) { + Integer[] intArray = new Integer[byteBuffer.capacity() / 4]; + + for (int i = 0; i < intArray.length; i++) { + intArray[i] = byteBuffer.getInt(); + } + + return intArray; + } +} diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java new file mode 100644 index 000000000..b0f608be1 --- /dev/null +++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.plugin.reader.milvusreader; + +public class KeyConstant { + public static final String URI = "uri"; + public static final String TOKEN = "token"; + public static final String DATABASE = "database"; + public static final String COLLECTION = "collection"; + public static final String PARTITION = "partition"; + public static final String BATCH_SIZE = "batchSize"; +} diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java new file mode 100644 index 000000000..51e7798ed --- /dev/null +++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java @@ -0,0 +1,130 @@ +package com.alibaba.datax.plugin.reader.milvusreader; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import io.milvus.orm.iterator.QueryIterator; +import io.milvus.response.QueryResultsWrapper; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.DescribeCollectionReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import io.milvus.v2.service.vector.request.QueryIteratorReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class MilvusReader extends Reader { + public static class Job extends Reader.Job { + private Configuration originalConfig = null; + /** + * 切分任务。
+ * + * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错! + */ + @Override + public List split(int mandatoryNumber) { + List configList = new ArrayList(); + for(int i = 0; i < mandatoryNumber; i++) { + configList.add(this.originalConfig.clone()); + } + return configList; + } + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + } + + @Override + public void destroy() { + + } + } + public static class Task extends Reader.Task { + + private MilvusClientV2 milvusClientV2; + + private MilvusSourceConverter milvusSourceConverter; + + private String collection = null; + private String partition = null; + private Integer batchSize; + + private CreateCollectionReq.CollectionSchema collectionSchema; + + @Override + public void init() { + log.info("Initializing Milvus writer"); + // get configuration + Configuration writerSliceConfig = this.getPluginJobConf(); + this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.partition = writerSliceConfig.getString(KeyConstant.PARTITION, null); + this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); + log.info("Collection:{}", this.collection); + // connect to milvus + ConnectConfig connectConfig = ConnectConfig.builder() + .uri(writerSliceConfig.getString(KeyConstant.URI)) + .token(writerSliceConfig.getString(KeyConstant.TOKEN)) + .build(); + if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) { + log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE)); + connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE)); + } + this.milvusClientV2 = new MilvusClientV2(connectConfig); + this.milvusSourceConverter = new MilvusSourceConverter(); + log.info("Milvus writer initialized"); + } + @Override + public void prepare() { + super.prepare(); + Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build()); + if (!hasCollection) { + log.error("Collection {} does not exist", collection); + throw new RuntimeException("Collection does not exist"); + } + DescribeCollectionReq describeCollectionReq = DescribeCollectionReq.builder() + .collectionName(collection) + .build(); + DescribeCollectionResp describeCollectionResp = milvusClientV2.describeCollection(describeCollectionReq); + this.collectionSchema = describeCollectionResp.getCollectionSchema(); + } + + @Override + public void destroy() { + log.info("Closing Milvus reader, closing connection"); + this.milvusClientV2.close(); + } + + @Override + public void startRead(RecordSender recordSender) { + QueryIteratorReq queryIteratorReq = QueryIteratorReq.builder() + .collectionName(collection) + .outputFields(Collections.singletonList("*")) + .batchSize(batchSize) + .build(); + if(partition != null) { + queryIteratorReq.setPartitionNames(Collections.singletonList(partition)); + } + QueryIterator queryIterator = milvusClientV2.queryIterator(queryIteratorReq); + while (true){ + List rowRecords = queryIterator.next(); + if(rowRecords.isEmpty()){ + break; + } + rowRecords.forEach(rowRecord -> { + Record record = recordSender.createRecord(); + record = milvusSourceConverter.toDataXRecord(record, rowRecord, collectionSchema); + recordSender.sendToWriter(record); + }); + } + } + } +} diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java new file mode 100644 index 000000000..2f8eb6fd3 --- /dev/null +++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java @@ -0,0 +1,64 @@ +package com.alibaba.datax.plugin.reader.milvusreader; + +import com.alibaba.datax.common.element.BoolColumn; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.DoubleColumn; +import com.alibaba.datax.common.element.LongColumn; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.google.gson.Gson; +import io.milvus.response.QueryResultsWrapper; +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.CreateCollectionReq; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class MilvusSourceConverter { + + public Record toDataXRecord(Record record, QueryResultsWrapper.RowRecord rowRecord, CreateCollectionReq.CollectionSchema collectionSchema) { + Map fields = rowRecord.getFieldValues(); + + for (int i = 0; i < collectionSchema.getFieldSchemaList().size(); i++) { + CreateCollectionReq.FieldSchema fieldSchema = collectionSchema.getFieldSchemaList().get(i); + String fieldName = fieldSchema.getName(); + Object fieldValue = fields.get(fieldName); + Column column = convertToDataXColumn(fieldSchema.getDataType(), fieldValue); + record.addColumn(column); + } + return record; + } + + private Column convertToDataXColumn(DataType dataType, Object fieldValue) { + Gson gson = new Gson(); + switch (dataType) { + case Bool: + return new BoolColumn(Boolean.getBoolean(fieldValue.toString())); + case Int8: + case Int16: + case Int32: + case Int64: + return new LongColumn(Integer.parseInt(fieldValue.toString())); + case Float: + case Double: + return new DoubleColumn(java.lang.Double.parseDouble(fieldValue.toString())); + case VarChar: + case String: + return new StringColumn(fieldValue.toString()); + case JSON: + return new StringColumn(gson.toJson(fieldValue)); + case Array: + return new StringColumn(gson.toJson(fieldValue)); + case FloatVector: + List floats = (List) fieldValue; + return new StringColumn(Arrays.toString(floats.toArray())); + case BinaryVector: + Integer[] binarys = BufferUtils.toIntArray((ByteBuffer) fieldValue); + return new StringColumn(Arrays.toString(binarys)); + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } +} diff --git a/milvusreader/src/main/resources/plugin.json b/milvusreader/src/main/resources/plugin.json new file mode 100644 index 000000000..dc90019da --- /dev/null +++ b/milvusreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "milvusreader", + "class": "com.alibaba.datax.plugin.reader.milvusreader.MilvusReader", + "description": "useScene: prod. mechanism: via milvusclient connect milvus read data concurrent.", + "developer": "nianliuu" +} diff --git a/milvusreader/src/main/resources/plugin_job_template.json b/milvusreader/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..de33d6512 --- /dev/null +++ b/milvusreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,9 @@ +{ + "name": "milvusreader", + "parameter": { + "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19532", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10 + } +} \ No newline at end of file diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml new file mode 100644 index 000000000..b889cbe41 --- /dev/null +++ b/milvuswriter/pom.xml @@ -0,0 +1,109 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + milvuswriter + + + UTF-8 + official + 1.8 + + + + + guava + com.google.guava + 32.0.1-jre + + + + + + + io.milvus + milvus-sdk-java + 2.4.8 + + + org.jetbrains.kotlin + kotlin-test-junit5 + 2.0.0 + test + + + org.junit.jupiter + junit-jupiter + 5.10.0 + test + + + org.jetbrains.kotlin + kotlin-stdlib + 2.0.0 + + + com.alibaba.datax + datax-common + 0.0.1-SNAPSHOT + compile + + + org.projectlombok + lombok + 1.18.30 + provided + + + + + + + + src/main/resources + + **/*.* + + true + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/milvuswriter/src/doc/milvuswriter.md b/milvuswriter/src/doc/milvuswriter.md new file mode 100644 index 000000000..a0d773f69 --- /dev/null +++ b/milvuswriter/src/doc/milvuswriter.md @@ -0,0 +1,116 @@ +### Datax MilvusWriter插件 +#### 1 快速介绍 + +MilvusWriter 插件利用 Milvus 的java客户端MilvusClient进行Milvus的写操作。 + +#### 2 实现原理 + +MilvusWriter通过Datax框架向Milvus写入数据,通过主控的JOB程序按照指定的规则向Milvus写入,然后将Datax的类型通过逐一判断转换成Milvus支持的类型。 + +#### 3 功能说明 +* 该示例从Milvus读一份Collection数据到另一个Milvus。 +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "milvusreader", + "parameter": { + "uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10 + } + }, + "writer": { + "name": "milvuswriter", + "parameter": { + "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530", + "token": "*****", + "collection": "medium_articles", + "schemaCreateMode": 0, + "batchSize": 10, + "column": [ + { + "name": "id", + "type": "Int64", + "isPrimaryKey": true + }, + { + "name": "title_vector", + "type": "FloatVector", + "dimension": 768 + }, + { + "name": "title", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "link", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "reading_time", + "type": "Int64" + }, + { + "name": "publication", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "claps", + "type": "Int64" + }, + { + "name": "responses", + "type": "Int64" + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +#### 4 参数说明 + +* uri: Milvus Cluster endpoint。【必填】 +* token:Milvus的连接token。【必填】 +* collection: 读取数据的collection。【必填】 +* partition: 读取数据的partition。【选填】 +* batchSize: 每次读取数据的行数【选填】 +* schemaCreateMode: Integer, schema创建模式, 默认为createWhenTableNotExit [0(createWhenTableNotExit),1(exception)]【选填】 +* enableDyanmicSchema: 是否启用动态schema, 默认为true【选填】 +* column: 写入的字段信息【必填】 + * name: 字段名【必填】 + * type: 字段类型[Int8, Int16, Int32, Int64, Float, Double, VarChar, FloatVector, JSON, Array]【必填】 + * isPrimaryKey: 是否为主键【选填】 + * isPartitionKey: 是否为分区键【选填】 + * dimension: FloatVector类型的维度【选填】 + * maxLength: VarChar类型的最大长度【选填】 + * elementType: Array类型的元素类型【选填】 + * maxcapacity: Array类型的最大容量【选填】 +#### 5 类型转换 + +| DataX 内部类型| Milvus 数据类型 | +| -------- |-----------------| +| Long | int | +| Double | double | +| String | string, varchar | +| Boolean | bool | + +- 当前暂不支持写入dynamic schema的数据,及按partition写入 + +#### 6 性能报告 +#### 7 测试报告 diff --git a/milvuswriter/src/main/assembly/package.xml b/milvuswriter/src/main/assembly/package.xml new file mode 100644 index 000000000..62357b4ae --- /dev/null +++ b/milvuswriter/src/main/assembly/package.xml @@ -0,0 +1,36 @@ + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/milvuswriter + + + target/ + + milvuswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/milvuswriter + + + + + + false + plugin/writer/milvuswriter/libs + runtime + + + diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java new file mode 100644 index 000000000..89153a6a4 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.datax.plugin.writer.milvuswriter; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +public class BufferUtils { + + public static ByteBuffer toByteBuffer(Short[] shortArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); + + for (Short value : shortArray) { + byteBuffer.putShort(value); + } + + // Compatible compilation and running versions are not consistent + // Flip the buffer to prepare for reading + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Short[] toShortArray(ByteBuffer byteBuffer) { + Short[] shortArray = new Short[byteBuffer.capacity() / 2]; + + for (int i = 0; i < shortArray.length; i++) { + shortArray[i] = byteBuffer.getShort(); + } + + return shortArray; + } + + public static ByteBuffer toByteBuffer(Float[] floatArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4); + + for (float value : floatArray) { + byteBuffer.putFloat(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Float[] toFloatArray(ByteBuffer byteBuffer) { + Float[] floatArray = new Float[byteBuffer.capacity() / 4]; + + for (int i = 0; i < floatArray.length; i++) { + floatArray[i] = byteBuffer.getFloat(); + } + + return floatArray; + } + + public static ByteBuffer toByteBuffer(Double[] doubleArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8); + + for (double value : doubleArray) { + byteBuffer.putDouble(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Double[] toDoubleArray(ByteBuffer byteBuffer) { + Double[] doubleArray = new Double[byteBuffer.capacity() / 8]; + + for (int i = 0; i < doubleArray.length; i++) { + doubleArray[i] = byteBuffer.getDouble(); + } + + return doubleArray; + } + + public static ByteBuffer toByteBuffer(Integer[] intArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4); + + for (int value : intArray) { + byteBuffer.putInt(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Integer[] toIntArray(ByteBuffer byteBuffer) { + Integer[] intArray = new Integer[byteBuffer.capacity() / 4]; + + for (int i = 0; i < intArray.length; i++) { + intArray[i] = byteBuffer.getInt(); + } + + return intArray; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java new file mode 100644 index 000000000..78fc302c5 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java @@ -0,0 +1,24 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +public class KeyConstant { + public static final String URI = "uri"; + public static final String TOKEN = "token"; + public static final String DATABASE = "database"; + public static final String COLLECTION = "collection"; + public static final String PARTITION = "partition"; + public static final String AUTO_ID = "autoId"; + public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema"; + public static final String BATCH_SIZE = "batchSize"; + public static final String COLUMN = "column"; + public static final String COLUMN_TYPE = "type"; + public static final String COLUMN_NAME = "name"; + public static final String VECTOR_DIMENSION = "dimension"; + public static final String IS_PRIMARY_KEY = "isPrimaryKey"; +// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception" + public static final String schemaCreateMode = "schemaCreateMode"; + public static final String IS_PARTITION_KEY = "isPartitionKey"; + public static final String MAX_LENGTH = "maxLength"; + public static final String ELEMENT_TYPE = "elementType"; + public static final String MAX_CAPACITY = "maxCapacity"; + public static final String IS_AUTO_INCREMENT = "autoId"; +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java new file mode 100644 index 000000000..dc3590860 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java @@ -0,0 +1,48 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.google.gson.JsonObject; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class MilvusBufferWriter { + + private final MilvusClientV2 milvusClientV2; + private final String collection; + private final String partition; + private final Integer batchSize; + private List dataCache; + + public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, String partition, Integer batchSize){ + this.milvusClientV2 = milvusClientV2; + this.collection = collection; + this.partition = partition; + this.batchSize = batchSize; + this.dataCache = new ArrayList<>(); + } + public void write(JsonObject data){ + dataCache.add(data); + } + public Boolean needCommit(){ + return dataCache.size() >= batchSize; + } + public void commit(){ + if(dataCache.isEmpty()){ + log.info("dataCache is empty, skip commit"); + return; + } + UpsertReq upsertReq = UpsertReq.builder() + .collectionName(collection) + .data(dataCache) + .build(); + if(partition != null){ + upsertReq.setPartitionName(partition); + } + milvusClientV2.upsert(upsertReq); + dataCache = new ArrayList<>(); + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java new file mode 100644 index 000000000..fd436f3b0 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java @@ -0,0 +1,121 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; +import io.milvus.v2.common.DataType; +import static io.milvus.v2.common.DataType.*; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; + +import java.lang.reflect.Type; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class MilvusSinkConverter { + public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) { + JsonObject data = new JsonObject(); + Gson gson = new Gson(); + for(int i = 0; i < record.getColumnNumber(); i++) { + String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE); + String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME); + Object rawData = record.getColumn(i).getRawData(); + Object field = convertToMilvusField(fieldType, rawData); + data.add(fieldName, gson.toJsonTree(field)); + } + return data; + } + + private Object convertToMilvusField(String type, Object rawData) { + Gson gson = new Gson(); + switch (valueOf(type)) { + case Int8: + return Byte.parseByte(rawData.toString()); + case Int16: + return Short.parseShort(rawData.toString()); + case Int32: + return Integer.parseInt(rawData.toString()); + case Int64: + return Long.parseLong(rawData.toString()); + case Float: + return java.lang.Float.parseFloat(rawData.toString()); + case String: + case VarChar: + return rawData.toString(); + case Bool: + return Boolean.parseBoolean(rawData.toString()); + case JSON: + return gson.fromJson(rawData.toString(), JsonObject.class); + case Array: + Type listType = new TypeToken>() {}.getType(); + return gson.fromJson(rawData.toString(), listType); + case FloatVector: + java.lang.Float[] floats = Arrays.stream(processVectorString(rawData)).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new); + return Arrays.stream(floats).collect(Collectors.toList()); + case BinaryVector: + java.lang.Integer[] binarys = Arrays.stream(processVectorString(rawData)).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new); + return BufferUtils.toByteBuffer(binarys); + case Float16Vector: + case BFloat16Vector: + // all these data is byte format in milvus + ByteBuffer binaryVector = (ByteBuffer) rawData; + return gson.toJsonTree(binaryVector.array()); + case SparseFloatVector: + return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject(); + default: + throw new RuntimeException("Unsupported data type: " + type); + } + } + + private String[] processArrayString(Object rawData) { + // Step 1: Remove square brackets + String cleanedInput = rawData.toString().replace("[", "").replace("]", ""); + + // Step 2: Split the string into an array of string numbers + return cleanedInput.split(",\\s*"); + } + + private String[] processVectorString(Object rawData) { + // Step 1: Remove square brackets + String cleanedInput = rawData.toString().replace("[", "").replace("]", ""); + + // Step 2: Split the string into an array of string numbers + return cleanedInput.split(",\\s*"); + } + + public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) { + CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build(); + for (int i = 0; i < milvusColumnMeta.size(); i++) { + AddFieldReq addFieldReq = AddFieldReq.builder() + .fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME)) + .dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE))) + .build(); + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) { + addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) { + addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) { + addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) { + addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.ELEMENT_TYPE)) { + addFieldReq.setElementType(DataType.valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.ELEMENT_TYPE))); + addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_CAPACITY)) { + addFieldReq.setMaxCapacity(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_CAPACITY)); + } + collectionSchema.addField(addFieldReq); + } + return collectionSchema; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java new file mode 100644 index 000000000..01ae39e52 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java @@ -0,0 +1,148 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.JsonObject; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.partition.request.CreatePartitionReq; +import io.milvus.v2.service.partition.request.HasPartitionReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class MilvusWriter extends Writer { + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + /** + * 切分任务。
+ * + * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错! + */ + @Override + public List split(int mandatoryNumber) { + List configList = new ArrayList(); + for(int i = 0; i < mandatoryNumber; i++) { + configList.add(this.originalConfig.clone()); + } + return configList; + } + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + } + + @Override + public void destroy() { + + } + } + public static class Task extends Writer.Task { + + private MilvusClientV2 milvusClientV2; + + private MilvusSinkConverter milvusSinkConverter; + private MilvusBufferWriter milvusBufferWriter; + + private String collection = null; + private String partition = null; + private JSONArray milvusColumnMeta; + + private boolean enableDynamicSchema; + + private Integer schemaCreateMode; + + @Override + public void startWrite(RecordReceiver lineReceiver) { + Record record = lineReceiver.getFromReader(); + while(record != null){ + JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record); + milvusBufferWriter.write(data); + if (milvusBufferWriter.needCommit()) { + log.info("Reached buffer limit, Committing data"); + milvusBufferWriter.commit(); + log.info("Data committed"); + } + record = lineReceiver.getFromReader(); + } + } + + @Override + public void init() { + log.info("Initializing Milvus writer"); + // get configuration + Configuration writerSliceConfig = this.getPluginJobConf(); + this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.partition = writerSliceConfig.getString(KeyConstant.PARTITION, null); + this.enableDynamicSchema = writerSliceConfig.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA, true); + this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN)); + this.schemaCreateMode = writerSliceConfig.getInt(KeyConstant.schemaCreateMode) == null ? + SchemaCreateMode.CREATE_WHEN_NOT_EXIST.getMode() : writerSliceConfig.getInt(KeyConstant.schemaCreateMode); + int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); + log.info("Collection:{}", this.collection); + // connect to milvus + ConnectConfig connectConfig = ConnectConfig.builder() + .uri(writerSliceConfig.getString(KeyConstant.URI)) + .token(writerSliceConfig.getString(KeyConstant.TOKEN)) + .build(); + if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) { + log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE)); + connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE)); + } + this.milvusClientV2 = new MilvusClientV2(connectConfig); + this.milvusSinkConverter = new MilvusSinkConverter(); + this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, partition, batchSize); + log.info("Milvus writer initialized"); + } + @Override + public void prepare() { + super.prepare(); + Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build()); + if (!hasCollection) { + log.info("Collection not exist"); + if (schemaCreateMode.equals(SchemaCreateMode.CREATE_WHEN_NOT_EXIST.getMode())) { + // create collection + log.info("Creating collection:{}", this.collection); + CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta); + collectionSchema.setEnableDynamicField(enableDynamicSchema); + CreateCollectionReq createCollectionReq = CreateCollectionReq.builder() + .collectionName(collection) + .collectionSchema(collectionSchema) + .build(); + milvusClientV2.createCollection(createCollectionReq); + } else if (schemaCreateMode.equals(SchemaCreateMode.EXCEPTION.getMode())) { + log.error("Collection not exist, throw exception"); + throw new RuntimeException("Collection not exist"); + } + } + if(partition != null) { + Boolean hasPartition = milvusClientV2.hasPartition(HasPartitionReq.builder().collectionName(collection).partitionName(partition).build()); + if (!hasPartition) { + log.info("Partition not exist, creating"); + CreatePartitionReq createPartitionReq = CreatePartitionReq.builder() + .collectionName(collection) + .partitionName(partition) + .build(); + milvusClientV2.createPartition(createPartitionReq); + log.info("Partition created"); + } + } + } + + @Override + public void destroy() { + log.info("Closing Milvus writer, committing data and closing connection"); + this.milvusBufferWriter.commit(); + this.milvusClientV2.close(); + } + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java new file mode 100644 index 000000000..aa67945db --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java @@ -0,0 +1,16 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import lombok.Getter; + +@Getter +public enum SchemaCreateMode { + CREATE_WHEN_NOT_EXIST(0), + EXCEPTION(1), + IGNORE(2); + + private int mode; + + SchemaCreateMode(int mode) { + this.mode = mode; + } +} diff --git a/milvuswriter/src/main/resources/plugin.json b/milvuswriter/src/main/resources/plugin.json new file mode 100644 index 000000000..8b9123093 --- /dev/null +++ b/milvuswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "milvuswriter", + "class": "com.alibaba.datax.plugin.writer.milvuswriter.MilvusWriter", + "description": "useScene: prod. mechanism: via milvusclient connect milvus write data concurrent.", + "developer": "nianliuu" +} diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..b9bff3628 --- /dev/null +++ b/milvuswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,49 @@ +{ + "name": "milvuswriter", + "parameter": { + "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10, + "schemaCreateMode": "createWhenTableNotExit", + "column": [ + { + "name": "id", + "type": "Int64", + "isPrimaryKey": true + }, + { + "name": "title_vector", + "type": "FloatVector", + "dimension": 768 + }, + { + "name": "title", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "link", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "reading_time", + "type": "Int64" + }, + { + "name": "publication", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "claps", + "type": "Int64" + }, + { + "name": "responses", + "type": "Int64" + } + ] + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index 624109f79..49eff42d6 100644 --- a/package.xml +++ b/package.xml @@ -257,8 +257,22 @@ datax + + milvusreader/target/datax/ + + **/*.* + + datax + + + milvuswriter/target/datax/ + + **/*.* + + datax + mysqlwriter/target/datax/ diff --git a/pom.xml b/pom.xml index c7f43f172..2d4d3a66c 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,7 @@ starrocksreader sybasereader dorisreader + milvusreader mysqlwriter starrockswriter @@ -129,6 +130,7 @@ adbmysqlwriter sybasewriter neo4jwriter + milvuswriter plugin-rdbms-util plugin-unstructured-storage-util