From 906bf3ded923e7d87df34c952a87aa86d43321fb Mon Sep 17 00:00:00 2001 From: Nian Liu Date: Wed, 27 Nov 2024 17:52:09 +0800 Subject: [PATCH 1/2] add milvus writer plugin --- milvuswriter/pom.xml | 109 +++++++++++++++ milvuswriter/src/main/assembly/package.xml | 36 +++++ .../writer/milvuswriter/BufferUtils.java | 130 ++++++++++++++++++ .../writer/milvuswriter/KeyConstant.java | 20 +++ .../milvuswriter/MilvusBufferWriter.java | 43 ++++++ .../milvuswriter/MilvusSinkConverter.java | 86 ++++++++++++ .../writer/milvuswriter/MilvusWriter.java | 130 ++++++++++++++++++ milvuswriter/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 15 ++ pom.xml | 1 + 10 files changed, 576 insertions(+) create mode 100644 milvuswriter/pom.xml create mode 100644 milvuswriter/src/main/assembly/package.xml create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java create mode 100644 milvuswriter/src/main/resources/plugin.json create mode 100644 milvuswriter/src/main/resources/plugin_job_template.json diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml new file mode 100644 index 0000000000..b889cbe41a --- /dev/null +++ b/milvuswriter/pom.xml @@ -0,0 +1,109 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + milvuswriter + + + UTF-8 + official + 1.8 + + + + + guava + com.google.guava + 32.0.1-jre + + + + + + + io.milvus + milvus-sdk-java + 2.4.8 + + + org.jetbrains.kotlin + kotlin-test-junit5 + 2.0.0 + test + + + org.junit.jupiter + junit-jupiter + 5.10.0 + test + + + org.jetbrains.kotlin + kotlin-stdlib + 2.0.0 + + + com.alibaba.datax + datax-common + 0.0.1-SNAPSHOT + compile + + + org.projectlombok + lombok + 1.18.30 + provided + + + + + + + + src/main/resources + + **/*.* + + true + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/milvuswriter/src/main/assembly/package.xml b/milvuswriter/src/main/assembly/package.xml new file mode 100644 index 0000000000..62357b4ae5 --- /dev/null +++ b/milvuswriter/src/main/assembly/package.xml @@ -0,0 +1,36 @@ + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/milvuswriter + + + target/ + + milvuswriter-0.0.1-SNAPSHOT.jar + + plugin/writer/milvuswriter + + + + + + false + plugin/writer/milvuswriter/libs + runtime + + + diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java new file mode 100644 index 0000000000..89153a6a42 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/BufferUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.datax.plugin.writer.milvuswriter; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +public class BufferUtils { + + public static ByteBuffer toByteBuffer(Short[] shortArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); + + for (Short value : shortArray) { + byteBuffer.putShort(value); + } + + // Compatible compilation and running versions are not consistent + // Flip the buffer to prepare for reading + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Short[] toShortArray(ByteBuffer byteBuffer) { + Short[] shortArray = new Short[byteBuffer.capacity() / 2]; + + for (int i = 0; i < shortArray.length; i++) { + shortArray[i] = byteBuffer.getShort(); + } + + return shortArray; + } + + public static ByteBuffer toByteBuffer(Float[] floatArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4); + + for (float value : floatArray) { + byteBuffer.putFloat(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Float[] toFloatArray(ByteBuffer byteBuffer) { + Float[] floatArray = new Float[byteBuffer.capacity() / 4]; + + for (int i = 0; i < floatArray.length; i++) { + floatArray[i] = byteBuffer.getFloat(); + } + + return floatArray; + } + + public static ByteBuffer toByteBuffer(Double[] doubleArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8); + + for (double value : doubleArray) { + byteBuffer.putDouble(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Double[] toDoubleArray(ByteBuffer byteBuffer) { + Double[] doubleArray = new Double[byteBuffer.capacity() / 8]; + + for (int i = 0; i < doubleArray.length; i++) { + doubleArray[i] = byteBuffer.getDouble(); + } + + return doubleArray; + } + + public static ByteBuffer toByteBuffer(Integer[] intArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4); + + for (int value : intArray) { + byteBuffer.putInt(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Integer[] toIntArray(ByteBuffer byteBuffer) { + Integer[] intArray = new Integer[byteBuffer.capacity() / 4]; + + for (int i = 0; i < intArray.length; i++) { + intArray[i] = byteBuffer.getInt(); + } + + return intArray; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java new file mode 100644 index 0000000000..cc6364040d --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +public class KeyConstant { + public static final String URI = "uri"; + public static final String TOKEN = "token"; + public static final String DATABASE = "database"; + public static final String COLLECTION = "collection"; + public static final String AUTO_ID = "autoId"; + public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema"; + public static final String BATCH_SIZE = "batchSize"; + public static final String COLUMN = "column"; + public static final String COLUMN_TYPE = "type"; + public static final String COLUMN_NAME = "name"; + public static final String VECTOR_DIMENSION = "dimension"; + public static final String IS_PRIMARY_KEY = "isPrimaryKey"; +// "schemaCreateMode":"createWhenTableNotExit"/"Ignore"/"exception" + public static final String schemaCreateMode = "schemaCreateMode"; + public static final String IS_PARTITION_KEY = "isPartitionKey"; + public static final String MAX_LENGTH = "maxLength"; +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java new file mode 100644 index 0000000000..da686af229 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java @@ -0,0 +1,43 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.google.gson.JsonObject; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class MilvusBufferWriter { + + private final MilvusClientV2 milvusClientV2; + private final String collection; + private final Integer batchSize; + private List dataCache; + + public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){ + this.milvusClientV2 = milvusClientV2; + this.collection = collection; + this.batchSize = batchSize; + this.dataCache = new ArrayList<>(); + } + public void write(JsonObject data){ + dataCache.add(data); + } + public Boolean needCommit(){ + return dataCache.size() >= batchSize; + } + public void commit(){ + if(dataCache.isEmpty()){ + log.info("dataCache is empty, skip commit"); + return; + } + UpsertReq upsertReq = UpsertReq.builder() + .collectionName(collection) + .data(dataCache) + .build(); + milvusClientV2.upsert(upsertReq); + dataCache = new ArrayList<>(); + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java new file mode 100644 index 0000000000..390f95e5f4 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java @@ -0,0 +1,86 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.milvus.v2.common.DataType; +import static io.milvus.v2.common.DataType.*; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class MilvusSinkConverter { + public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) { + JsonObject data = new JsonObject(); + Gson gson = new Gson(); + for(int i = 0; i < record.getColumnNumber(); i++) { + String fieldType = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE); + String fieldName = milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME); + Object rawData = record.getColumn(i).getRawData(); + Object field = convertToMilvusField(fieldType, rawData); + data.add(fieldName, gson.toJsonTree(field)); + } + return data; + } + + private Object convertToMilvusField(String type, Object rawData) { + Gson gson = new Gson(); + switch (valueOf(type)) { + case Int32: + return Integer.parseInt(rawData.toString()); + case Int64: + return Long.parseLong(rawData.toString()); + case Float: + return java.lang.Float.parseFloat(rawData.toString()); + case String: + case VarChar: + return rawData.toString(); + case Bool: + return Boolean.parseBoolean(rawData.toString()); + case FloatVector: + java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new); + return Arrays.stream(floats).collect(Collectors.toList()); + case BinaryVector: + java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new); + return BufferUtils.toByteBuffer(binarys); + case Float16Vector: + case BFloat16Vector: + // all these data is byte format in milvus + ByteBuffer binaryVector = (ByteBuffer) rawData; + return gson.toJsonTree(binaryVector.array()); + case SparseFloatVector: + return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject(); + default: + throw new RuntimeException("Unsupported data type"); + } + } + + public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) { + CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build(); + for (int i = 0; i < milvusColumnMeta.size(); i++) { + AddFieldReq addFieldReq = AddFieldReq.builder() + .fieldName(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_NAME)) + .dataType(valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.COLUMN_TYPE))) + .build(); + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PRIMARY_KEY)) { + addFieldReq.setIsPrimaryKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PRIMARY_KEY)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.VECTOR_DIMENSION)) { + addFieldReq.setDimension(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.VECTOR_DIMENSION)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.IS_PARTITION_KEY)) { + addFieldReq.setIsPartitionKey(milvusColumnMeta.getJSONObject(i).getBoolean(KeyConstant.IS_PARTITION_KEY)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) { + addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH)); + } + collectionSchema.addField(addFieldReq); + } + return collectionSchema; + } +} diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java new file mode 100644 index 0000000000..c9b5a1bccb --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java @@ -0,0 +1,130 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONArray; +import com.google.gson.JsonObject; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.AddFieldReq; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.vector.request.UpsertReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class MilvusWriter extends Writer { + public static class Job extends Writer.Job { + private Configuration originalConfig = null; + /** + * 切分任务。
+ * + * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错! + */ + @Override + public List split(int mandatoryNumber) { + List configList = new ArrayList(); + for(int i = 0; i < mandatoryNumber; i++) { + configList.add(this.originalConfig.clone()); + } + return configList; + } + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + } + + @Override + public void destroy() { + + } + } + public static class Task extends Writer.Task { + + private MilvusClientV2 milvusClientV2; + + private MilvusSinkConverter milvusSinkConverter; + private MilvusBufferWriter milvusBufferWriter; + + private String collection = null; + private JSONArray milvusColumnMeta; + + private String schemaCreateMode = "createWhenTableNotExit"; + + @Override + public void startWrite(RecordReceiver lineReceiver) { + Record record = lineReceiver.getFromReader(); + JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record); + milvusBufferWriter.write(data); + if(milvusBufferWriter.needCommit()){ + log.info("Reached buffer limit, Committing data"); + milvusBufferWriter.commit(); + log.info("Data committed"); + } + } + + @Override + public void init() { + log.info("Initializing Milvus writer"); + // get configuration + Configuration writerSliceConfig = this.getPluginJobConf(); + this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN)); + this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ? + "createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode); + int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); + log.info("Collection:{}", this.collection); + // connect to milvus + ConnectConfig connectConfig = ConnectConfig.builder() + .uri(writerSliceConfig.getString(KeyConstant.URI)) + .token(writerSliceConfig.getString(KeyConstant.TOKEN)) + .build(); + if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) { + log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE)); + connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE)); + } + this.milvusClientV2 = new MilvusClientV2(connectConfig); + this.milvusSinkConverter = new MilvusSinkConverter(); + this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize); + log.info("Milvus writer initialized"); + } + @Override + public void prepare() { + super.prepare(); + Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build()); + if (!hasCollection) { + log.info("Collection not exist"); + if (schemaCreateMode.equals("createWhenTableNotExit")) { + // create collection + log.info("Creating collection:{}", this.collection); + CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta); + + CreateCollectionReq createCollectionReq = CreateCollectionReq.builder() + .collectionName(collection) + .collectionSchema(collectionSchema) + .build(); + milvusClientV2.createCollection(createCollectionReq); + } else if (schemaCreateMode.equals("exception")) { + log.error("Collection not exist, throw exception"); + throw new RuntimeException("Collection not exist"); + } + } + } + + @Override + public void destroy() { + log.info("Closing Milvus writer, committing data and closing connection"); + this.milvusBufferWriter.commit(); + this.milvusClientV2.close(); + } + } +} diff --git a/milvuswriter/src/main/resources/plugin.json b/milvuswriter/src/main/resources/plugin.json new file mode 100644 index 0000000000..8b91230934 --- /dev/null +++ b/milvuswriter/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "milvuswriter", + "class": "com.alibaba.datax.plugin.writer.milvuswriter.MilvusWriter", + "description": "useScene: prod. mechanism: via milvusclient connect milvus write data concurrent.", + "developer": "nianliuu" +} diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000000..d4ba4bf1fc --- /dev/null +++ b/milvuswriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,15 @@ +{ + "name": "mongodbwriter", + "parameter": { + "address": [], + "userName": "", + "userPassword": "", + "dbName": "", + "collectionName": "", + "column": [], + "upsertInfo": { + "isUpsert": "", + "upsertKey": "" + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index c7f43f1725..1b364a7546 100644 --- a/pom.xml +++ b/pom.xml @@ -129,6 +129,7 @@ adbmysqlwriter sybasewriter neo4jwriter + milvuswriter plugin-rdbms-util plugin-unstructured-storage-util From f479264f2da45f53f2da8e948b5059626e929923 Mon Sep 17 00:00:00 2001 From: Nian Liu Date: Wed, 27 Nov 2024 18:51:41 +0800 Subject: [PATCH 2/2] add milvus reader plugin --- milvusreader/pom.xml | 109 +++++++++++++++ milvusreader/src/main/assembly/package.xml | 36 +++++ milvusreader/src/main/doc/milvusreader.md | 105 ++++++++++++++ .../reader/milvusreader/BufferUtils.java | 130 ++++++++++++++++++ .../reader/milvusreader/KeyConstant.java | 10 ++ .../reader/milvusreader/MilvusReader.java | 130 ++++++++++++++++++ .../milvusreader/MilvusSourceConverter.java | 64 +++++++++ milvusreader/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 9 ++ milvuswriter/src/doc/milvuswriter.md | 116 ++++++++++++++++ .../writer/milvuswriter/KeyConstant.java | 4 + .../milvuswriter/MilvusBufferWriter.java | 7 +- .../milvuswriter/MilvusSinkConverter.java | 41 +++++- .../writer/milvuswriter/MilvusWriter.java | 52 ++++--- .../writer/milvuswriter/SchemaCreateMode.java | 16 +++ .../main/resources/plugin_job_template.json | 56 ++++++-- package.xml | 14 ++ pom.xml | 1 + 18 files changed, 874 insertions(+), 32 deletions(-) create mode 100644 milvusreader/pom.xml create mode 100644 milvusreader/src/main/assembly/package.xml create mode 100644 milvusreader/src/main/doc/milvusreader.md create mode 100644 milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java create mode 100644 milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java create mode 100644 milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java create mode 100644 milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java create mode 100644 milvusreader/src/main/resources/plugin.json create mode 100644 milvusreader/src/main/resources/plugin_job_template.json create mode 100644 milvuswriter/src/doc/milvuswriter.md create mode 100644 milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java diff --git a/milvusreader/pom.xml b/milvusreader/pom.xml new file mode 100644 index 0000000000..c66a6c7bab --- /dev/null +++ b/milvusreader/pom.xml @@ -0,0 +1,109 @@ + + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + milvusreader + + + UTF-8 + official + 1.8 + + + + + guava + com.google.guava + 32.0.1-jre + + + + + + + io.milvus + milvus-sdk-java + 2.4.8 + + + org.jetbrains.kotlin + kotlin-test-junit5 + 2.0.0 + test + + + org.junit.jupiter + junit-jupiter + 5.10.0 + test + + + org.jetbrains.kotlin + kotlin-stdlib + 2.0.0 + + + com.alibaba.datax + datax-common + 0.0.1-SNAPSHOT + compile + + + org.projectlombok + lombok + 1.18.30 + provided + + + + + + + + src/main/resources + + **/*.* + + true + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/milvusreader/src/main/assembly/package.xml b/milvusreader/src/main/assembly/package.xml new file mode 100644 index 0000000000..3b12c5cb5d --- /dev/null +++ b/milvusreader/src/main/assembly/package.xml @@ -0,0 +1,36 @@ + + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/milvusreader + + + target/ + + milvuswriter-0.0.1-SNAPSHOT.jar + + plugin/reader/milvusreader + + + + + + false + plugin/reader/milvusreader/libs + runtime + + + diff --git a/milvusreader/src/main/doc/milvusreader.md b/milvusreader/src/main/doc/milvusreader.md new file mode 100644 index 0000000000..bb22574229 --- /dev/null +++ b/milvusreader/src/main/doc/milvusreader.md @@ -0,0 +1,105 @@ +### Datax MilvusReader +#### 1 快速介绍 + +MilvusReader 插件利用 Milvus 的java客户端MilvusClient进行Milvus的读操作。 + +#### 2 实现原理 + +MilvusReader通过Datax框架从Milvus读取数据,通过主控的JOB程序按照指定的规则对Milvus中的数据进行分片,并行读取,然后将Milvus支持的类型通过逐一判断转换成Datax支持的类型。 + +#### 3 功能说明 +* 该示例从Milvus读一份Collection数据到另一个Milvus。 +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "milvusreader", + "parameter": { + "uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10 + } + }, + "writer": { + "name": "milvuswriter", + "parameter": { + "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10, + "column": [ + { + "name": "id", + "type": "Int64", + "isPrimaryKey": true + }, + { + "name": "title_vector", + "type": "FloatVector", + "dimension": 768 + }, + { + "name": "title", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "link", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "reading_time", + "type": "Int64" + }, + { + "name": "publication", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "claps", + "type": "Int64" + }, + { + "name": "responses", + "type": "Int64" + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +#### 4 参数说明 + +* uri: Milvus Cluster endpoint。【必填】 +* token:Milvus的连接token。【必填】 +* collection: 读取数据的collection。【必填】 +* partition: 读取数据的partition。【选填】 +* batchSize: 每次读取数据的行数【选填】 + +#### 5 类型转换 + +| DataX 内部类型| Milvus 数据类型 | +| -------- |-----------------| +| Long | int | +| Double | double | +| String | string, varchar | +| Boolean | bool | + +- 当前暂不支持读取dynamic schema的数据,及按partition读取 + +#### 6 性能报告 +#### 7 测试报告 diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java new file mode 100644 index 0000000000..ed22c129e6 --- /dev/null +++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/BufferUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.datax.plugin.reader.milvusreader; + +import java.nio.Buffer; +import java.nio.ByteBuffer; + +public class BufferUtils { + + public static ByteBuffer toByteBuffer(Short[] shortArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2); + + for (Short value : shortArray) { + byteBuffer.putShort(value); + } + + // Compatible compilation and running versions are not consistent + // Flip the buffer to prepare for reading + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Short[] toShortArray(ByteBuffer byteBuffer) { + Short[] shortArray = new Short[byteBuffer.capacity() / 2]; + + for (int i = 0; i < shortArray.length; i++) { + shortArray[i] = byteBuffer.getShort(); + } + + return shortArray; + } + + public static ByteBuffer toByteBuffer(Float[] floatArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4); + + for (float value : floatArray) { + byteBuffer.putFloat(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Float[] toFloatArray(ByteBuffer byteBuffer) { + Float[] floatArray = new Float[byteBuffer.capacity() / 4]; + + for (int i = 0; i < floatArray.length; i++) { + floatArray[i] = byteBuffer.getFloat(); + } + + return floatArray; + } + + public static ByteBuffer toByteBuffer(Double[] doubleArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8); + + for (double value : doubleArray) { + byteBuffer.putDouble(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Double[] toDoubleArray(ByteBuffer byteBuffer) { + Double[] doubleArray = new Double[byteBuffer.capacity() / 8]; + + for (int i = 0; i < doubleArray.length; i++) { + doubleArray[i] = byteBuffer.getDouble(); + } + + return doubleArray; + } + + public static ByteBuffer toByteBuffer(Integer[] intArray) { + ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4); + + for (int value : intArray) { + byteBuffer.putInt(value); + } + + ((Buffer) byteBuffer).flip(); + + return byteBuffer; + } + + public static Integer[] toIntArray(ByteBuffer byteBuffer) { + Integer[] intArray = new Integer[byteBuffer.capacity() / 4]; + + for (int i = 0; i < intArray.length; i++) { + intArray[i] = byteBuffer.getInt(); + } + + return intArray; + } +} diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java new file mode 100644 index 0000000000..b0f608be19 --- /dev/null +++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/KeyConstant.java @@ -0,0 +1,10 @@ +package com.alibaba.datax.plugin.reader.milvusreader; + +public class KeyConstant { + public static final String URI = "uri"; + public static final String TOKEN = "token"; + public static final String DATABASE = "database"; + public static final String COLLECTION = "collection"; + public static final String PARTITION = "partition"; + public static final String BATCH_SIZE = "batchSize"; +} diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java new file mode 100644 index 0000000000..51e7798ed7 --- /dev/null +++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusReader.java @@ -0,0 +1,130 @@ +package com.alibaba.datax.plugin.reader.milvusreader; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import io.milvus.orm.iterator.QueryIterator; +import io.milvus.response.QueryResultsWrapper; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.DescribeCollectionReq; +import io.milvus.v2.service.collection.request.HasCollectionReq; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import io.milvus.v2.service.vector.request.QueryIteratorReq; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@Slf4j +public class MilvusReader extends Reader { + public static class Job extends Reader.Job { + private Configuration originalConfig = null; + /** + * 切分任务。
+ * + * @param mandatoryNumber 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错! + */ + @Override + public List split(int mandatoryNumber) { + List configList = new ArrayList(); + for(int i = 0; i < mandatoryNumber; i++) { + configList.add(this.originalConfig.clone()); + } + return configList; + } + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + } + + @Override + public void destroy() { + + } + } + public static class Task extends Reader.Task { + + private MilvusClientV2 milvusClientV2; + + private MilvusSourceConverter milvusSourceConverter; + + private String collection = null; + private String partition = null; + private Integer batchSize; + + private CreateCollectionReq.CollectionSchema collectionSchema; + + @Override + public void init() { + log.info("Initializing Milvus writer"); + // get configuration + Configuration writerSliceConfig = this.getPluginJobConf(); + this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.partition = writerSliceConfig.getString(KeyConstant.PARTITION, null); + this.batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); + log.info("Collection:{}", this.collection); + // connect to milvus + ConnectConfig connectConfig = ConnectConfig.builder() + .uri(writerSliceConfig.getString(KeyConstant.URI)) + .token(writerSliceConfig.getString(KeyConstant.TOKEN)) + .build(); + if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) { + log.warn("Database is set, using database{}", writerSliceConfig.getString(KeyConstant.DATABASE)); + connectConfig.setDbName(writerSliceConfig.getString(KeyConstant.DATABASE)); + } + this.milvusClientV2 = new MilvusClientV2(connectConfig); + this.milvusSourceConverter = new MilvusSourceConverter(); + log.info("Milvus writer initialized"); + } + @Override + public void prepare() { + super.prepare(); + Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build()); + if (!hasCollection) { + log.error("Collection {} does not exist", collection); + throw new RuntimeException("Collection does not exist"); + } + DescribeCollectionReq describeCollectionReq = DescribeCollectionReq.builder() + .collectionName(collection) + .build(); + DescribeCollectionResp describeCollectionResp = milvusClientV2.describeCollection(describeCollectionReq); + this.collectionSchema = describeCollectionResp.getCollectionSchema(); + } + + @Override + public void destroy() { + log.info("Closing Milvus reader, closing connection"); + this.milvusClientV2.close(); + } + + @Override + public void startRead(RecordSender recordSender) { + QueryIteratorReq queryIteratorReq = QueryIteratorReq.builder() + .collectionName(collection) + .outputFields(Collections.singletonList("*")) + .batchSize(batchSize) + .build(); + if(partition != null) { + queryIteratorReq.setPartitionNames(Collections.singletonList(partition)); + } + QueryIterator queryIterator = milvusClientV2.queryIterator(queryIteratorReq); + while (true){ + List rowRecords = queryIterator.next(); + if(rowRecords.isEmpty()){ + break; + } + rowRecords.forEach(rowRecord -> { + Record record = recordSender.createRecord(); + record = milvusSourceConverter.toDataXRecord(record, rowRecord, collectionSchema); + recordSender.sendToWriter(record); + }); + } + } + } +} diff --git a/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java new file mode 100644 index 0000000000..2f8eb6fd34 --- /dev/null +++ b/milvusreader/src/main/java/com/alibaba/datax/plugin/reader/milvusreader/MilvusSourceConverter.java @@ -0,0 +1,64 @@ +package com.alibaba.datax.plugin.reader.milvusreader; + +import com.alibaba.datax.common.element.BoolColumn; +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.DoubleColumn; +import com.alibaba.datax.common.element.LongColumn; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.element.StringColumn; +import com.google.gson.Gson; +import io.milvus.response.QueryResultsWrapper; +import io.milvus.v2.common.DataType; +import io.milvus.v2.service.collection.request.CreateCollectionReq; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class MilvusSourceConverter { + + public Record toDataXRecord(Record record, QueryResultsWrapper.RowRecord rowRecord, CreateCollectionReq.CollectionSchema collectionSchema) { + Map fields = rowRecord.getFieldValues(); + + for (int i = 0; i < collectionSchema.getFieldSchemaList().size(); i++) { + CreateCollectionReq.FieldSchema fieldSchema = collectionSchema.getFieldSchemaList().get(i); + String fieldName = fieldSchema.getName(); + Object fieldValue = fields.get(fieldName); + Column column = convertToDataXColumn(fieldSchema.getDataType(), fieldValue); + record.addColumn(column); + } + return record; + } + + private Column convertToDataXColumn(DataType dataType, Object fieldValue) { + Gson gson = new Gson(); + switch (dataType) { + case Bool: + return new BoolColumn(Boolean.getBoolean(fieldValue.toString())); + case Int8: + case Int16: + case Int32: + case Int64: + return new LongColumn(Integer.parseInt(fieldValue.toString())); + case Float: + case Double: + return new DoubleColumn(java.lang.Double.parseDouble(fieldValue.toString())); + case VarChar: + case String: + return new StringColumn(fieldValue.toString()); + case JSON: + return new StringColumn(gson.toJson(fieldValue)); + case Array: + return new StringColumn(gson.toJson(fieldValue)); + case FloatVector: + List floats = (List) fieldValue; + return new StringColumn(Arrays.toString(floats.toArray())); + case BinaryVector: + Integer[] binarys = BufferUtils.toIntArray((ByteBuffer) fieldValue); + return new StringColumn(Arrays.toString(binarys)); + default: + throw new IllegalArgumentException("Unsupported data type: " + dataType); + } + } +} diff --git a/milvusreader/src/main/resources/plugin.json b/milvusreader/src/main/resources/plugin.json new file mode 100644 index 0000000000..dc90019da4 --- /dev/null +++ b/milvusreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "milvusreader", + "class": "com.alibaba.datax.plugin.reader.milvusreader.MilvusReader", + "description": "useScene: prod. mechanism: via milvusclient connect milvus read data concurrent.", + "developer": "nianliuu" +} diff --git a/milvusreader/src/main/resources/plugin_job_template.json b/milvusreader/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000000..de33d6512c --- /dev/null +++ b/milvusreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,9 @@ +{ + "name": "milvusreader", + "parameter": { + "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19532", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10 + } +} \ No newline at end of file diff --git a/milvuswriter/src/doc/milvuswriter.md b/milvuswriter/src/doc/milvuswriter.md new file mode 100644 index 0000000000..a0d773f692 --- /dev/null +++ b/milvuswriter/src/doc/milvuswriter.md @@ -0,0 +1,116 @@ +### Datax MilvusWriter插件 +#### 1 快速介绍 + +MilvusWriter 插件利用 Milvus 的java客户端MilvusClient进行Milvus的写操作。 + +#### 2 实现原理 + +MilvusWriter通过Datax框架向Milvus写入数据,通过主控的JOB程序按照指定的规则向Milvus写入,然后将Datax的类型通过逐一判断转换成Milvus支持的类型。 + +#### 3 功能说明 +* 该示例从Milvus读一份Collection数据到另一个Milvus。 +```json +{ + "job": { + "content": [ + { + "reader": { + "name": "milvusreader", + "parameter": { + "uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10 + } + }, + "writer": { + "name": "milvuswriter", + "parameter": { + "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530", + "token": "*****", + "collection": "medium_articles", + "schemaCreateMode": 0, + "batchSize": 10, + "column": [ + { + "name": "id", + "type": "Int64", + "isPrimaryKey": true + }, + { + "name": "title_vector", + "type": "FloatVector", + "dimension": 768 + }, + { + "name": "title", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "link", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "reading_time", + "type": "Int64" + }, + { + "name": "publication", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "claps", + "type": "Int64" + }, + { + "name": "responses", + "type": "Int64" + } + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} +``` + +#### 4 参数说明 + +* uri: Milvus Cluster endpoint。【必填】 +* token:Milvus的连接token。【必填】 +* collection: 读取数据的collection。【必填】 +* partition: 读取数据的partition。【选填】 +* batchSize: 每次读取数据的行数【选填】 +* schemaCreateMode: Integer, schema创建模式, 默认为createWhenTableNotExit [0(createWhenTableNotExit),1(exception)]【选填】 +* enableDyanmicSchema: 是否启用动态schema, 默认为true【选填】 +* column: 写入的字段信息【必填】 + * name: 字段名【必填】 + * type: 字段类型[Int8, Int16, Int32, Int64, Float, Double, VarChar, FloatVector, JSON, Array]【必填】 + * isPrimaryKey: 是否为主键【选填】 + * isPartitionKey: 是否为分区键【选填】 + * dimension: FloatVector类型的维度【选填】 + * maxLength: VarChar类型的最大长度【选填】 + * elementType: Array类型的元素类型【选填】 + * maxcapacity: Array类型的最大容量【选填】 +#### 5 类型转换 + +| DataX 内部类型| Milvus 数据类型 | +| -------- |-----------------| +| Long | int | +| Double | double | +| String | string, varchar | +| Boolean | bool | + +- 当前暂不支持写入dynamic schema的数据,及按partition写入 + +#### 6 性能报告 +#### 7 测试报告 diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java index cc6364040d..78fc302c50 100644 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/KeyConstant.java @@ -5,6 +5,7 @@ public class KeyConstant { public static final String TOKEN = "token"; public static final String DATABASE = "database"; public static final String COLLECTION = "collection"; + public static final String PARTITION = "partition"; public static final String AUTO_ID = "autoId"; public static final String ENABLE_DYNAMIC_SCHEMA = "enableDynamicSchema"; public static final String BATCH_SIZE = "batchSize"; @@ -17,4 +18,7 @@ public class KeyConstant { public static final String schemaCreateMode = "schemaCreateMode"; public static final String IS_PARTITION_KEY = "isPartitionKey"; public static final String MAX_LENGTH = "maxLength"; + public static final String ELEMENT_TYPE = "elementType"; + public static final String MAX_CAPACITY = "maxCapacity"; + public static final String IS_AUTO_INCREMENT = "autoId"; } diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java index da686af229..dc35908603 100644 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusBufferWriter.java @@ -13,12 +13,14 @@ public class MilvusBufferWriter { private final MilvusClientV2 milvusClientV2; private final String collection; + private final String partition; private final Integer batchSize; private List dataCache; - public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, Integer batchSize){ + public MilvusBufferWriter(MilvusClientV2 milvusClientV2, String collection, String partition, Integer batchSize){ this.milvusClientV2 = milvusClientV2; this.collection = collection; + this.partition = partition; this.batchSize = batchSize; this.dataCache = new ArrayList<>(); } @@ -37,6 +39,9 @@ public void commit(){ .collectionName(collection) .data(dataCache) .build(); + if(partition != null){ + upsertReq.setPartitionName(partition); + } milvusClientV2.upsert(upsertReq); dataCache = new ArrayList<>(); } diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java index 390f95e5f4..fd436f3b0c 100644 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusSinkConverter.java @@ -5,13 +5,16 @@ import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; import io.milvus.v2.common.DataType; import static io.milvus.v2.common.DataType.*; import io.milvus.v2.service.collection.request.AddFieldReq; import io.milvus.v2.service.collection.request.CreateCollectionReq; +import java.lang.reflect.Type; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; import java.util.stream.Collectors; public class MilvusSinkConverter { @@ -31,6 +34,10 @@ public JsonObject convertByType(JSONArray milvusColumnMeta, Record record) { private Object convertToMilvusField(String type, Object rawData) { Gson gson = new Gson(); switch (valueOf(type)) { + case Int8: + return Byte.parseByte(rawData.toString()); + case Int16: + return Short.parseShort(rawData.toString()); case Int32: return Integer.parseInt(rawData.toString()); case Int64: @@ -42,11 +49,16 @@ private Object convertToMilvusField(String type, Object rawData) { return rawData.toString(); case Bool: return Boolean.parseBoolean(rawData.toString()); + case JSON: + return gson.fromJson(rawData.toString(), JsonObject.class); + case Array: + Type listType = new TypeToken>() {}.getType(); + return gson.fromJson(rawData.toString(), listType); case FloatVector: - java.lang.Float[] floats = Arrays.stream(rawData.toString().split(",")).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new); + java.lang.Float[] floats = Arrays.stream(processVectorString(rawData)).map(java.lang.Float::parseFloat).toArray(java.lang.Float[]::new); return Arrays.stream(floats).collect(Collectors.toList()); case BinaryVector: - java.lang.Integer[] binarys = Arrays.stream(rawData.toString().split(",")).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new); + java.lang.Integer[] binarys = Arrays.stream(processVectorString(rawData)).map(java.lang.Integer::parseInt).toArray(java.lang.Integer[]::new); return BufferUtils.toByteBuffer(binarys); case Float16Vector: case BFloat16Vector: @@ -56,10 +68,26 @@ private Object convertToMilvusField(String type, Object rawData) { case SparseFloatVector: return JsonParser.parseString(gson.toJson(rawData)).getAsJsonObject(); default: - throw new RuntimeException("Unsupported data type"); + throw new RuntimeException("Unsupported data type: " + type); } } + private String[] processArrayString(Object rawData) { + // Step 1: Remove square brackets + String cleanedInput = rawData.toString().replace("[", "").replace("]", ""); + + // Step 2: Split the string into an array of string numbers + return cleanedInput.split(",\\s*"); + } + + private String[] processVectorString(Object rawData) { + // Step 1: Remove square brackets + String cleanedInput = rawData.toString().replace("[", "").replace("]", ""); + + // Step 2: Split the string into an array of string numbers + return cleanedInput.split(",\\s*"); + } + public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray milvusColumnMeta) { CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder().build(); for (int i = 0; i < milvusColumnMeta.size(); i++) { @@ -79,6 +107,13 @@ public CreateCollectionReq.CollectionSchema prepareCollectionSchema(JSONArray mi if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_LENGTH)) { addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH)); } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.ELEMENT_TYPE)) { + addFieldReq.setElementType(DataType.valueOf(milvusColumnMeta.getJSONObject(i).getString(KeyConstant.ELEMENT_TYPE))); + addFieldReq.setMaxLength(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_LENGTH)); + } + if(milvusColumnMeta.getJSONObject(i).containsKey(KeyConstant.MAX_CAPACITY)) { + addFieldReq.setMaxCapacity(milvusColumnMeta.getJSONObject(i).getInteger(KeyConstant.MAX_CAPACITY)); + } collectionSchema.addField(addFieldReq); } return collectionSchema; diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java index c9b5a1bccb..01ae39e52e 100644 --- a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/MilvusWriter.java @@ -9,15 +9,13 @@ import com.google.gson.JsonObject; import io.milvus.v2.client.ConnectConfig; import io.milvus.v2.client.MilvusClientV2; -import io.milvus.v2.common.DataType; -import io.milvus.v2.service.collection.request.AddFieldReq; import io.milvus.v2.service.collection.request.CreateCollectionReq; import io.milvus.v2.service.collection.request.HasCollectionReq; -import io.milvus.v2.service.vector.request.UpsertReq; +import io.milvus.v2.service.partition.request.CreatePartitionReq; +import io.milvus.v2.service.partition.request.HasPartitionReq; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; -import java.util.Collections; import java.util.List; @Slf4j @@ -56,19 +54,25 @@ public static class Task extends Writer.Task { private MilvusBufferWriter milvusBufferWriter; private String collection = null; + private String partition = null; private JSONArray milvusColumnMeta; - private String schemaCreateMode = "createWhenTableNotExit"; + private boolean enableDynamicSchema; + + private Integer schemaCreateMode; @Override public void startWrite(RecordReceiver lineReceiver) { Record record = lineReceiver.getFromReader(); - JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record); - milvusBufferWriter.write(data); - if(milvusBufferWriter.needCommit()){ - log.info("Reached buffer limit, Committing data"); - milvusBufferWriter.commit(); - log.info("Data committed"); + while(record != null){ + JsonObject data = milvusSinkConverter.convertByType(milvusColumnMeta, record); + milvusBufferWriter.write(data); + if (milvusBufferWriter.needCommit()) { + log.info("Reached buffer limit, Committing data"); + milvusBufferWriter.commit(); + log.info("Data committed"); + } + record = lineReceiver.getFromReader(); } } @@ -78,9 +82,11 @@ public void init() { // get configuration Configuration writerSliceConfig = this.getPluginJobConf(); this.collection = writerSliceConfig.getString(KeyConstant.COLLECTION); + this.partition = writerSliceConfig.getString(KeyConstant.PARTITION, null); + this.enableDynamicSchema = writerSliceConfig.getBool(KeyConstant.ENABLE_DYNAMIC_SCHEMA, true); this.milvusColumnMeta = JSON.parseArray(writerSliceConfig.getString(KeyConstant.COLUMN)); - this.schemaCreateMode = writerSliceConfig.getString(KeyConstant.schemaCreateMode) == null ? - "createWhenTableNotExit" : writerSliceConfig.getString(KeyConstant.schemaCreateMode); + this.schemaCreateMode = writerSliceConfig.getInt(KeyConstant.schemaCreateMode) == null ? + SchemaCreateMode.CREATE_WHEN_NOT_EXIST.getMode() : writerSliceConfig.getInt(KeyConstant.schemaCreateMode); int batchSize = writerSliceConfig.getInt(KeyConstant.BATCH_SIZE, 100); log.info("Collection:{}", this.collection); // connect to milvus @@ -94,7 +100,7 @@ public void init() { } this.milvusClientV2 = new MilvusClientV2(connectConfig); this.milvusSinkConverter = new MilvusSinkConverter(); - this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, batchSize); + this.milvusBufferWriter = new MilvusBufferWriter(milvusClientV2, collection, partition, batchSize); log.info("Milvus writer initialized"); } @Override @@ -103,21 +109,33 @@ public void prepare() { Boolean hasCollection = milvusClientV2.hasCollection(HasCollectionReq.builder().collectionName(collection).build()); if (!hasCollection) { log.info("Collection not exist"); - if (schemaCreateMode.equals("createWhenTableNotExit")) { + if (schemaCreateMode.equals(SchemaCreateMode.CREATE_WHEN_NOT_EXIST.getMode())) { // create collection log.info("Creating collection:{}", this.collection); CreateCollectionReq.CollectionSchema collectionSchema = milvusSinkConverter.prepareCollectionSchema(milvusColumnMeta); - + collectionSchema.setEnableDynamicField(enableDynamicSchema); CreateCollectionReq createCollectionReq = CreateCollectionReq.builder() .collectionName(collection) .collectionSchema(collectionSchema) .build(); milvusClientV2.createCollection(createCollectionReq); - } else if (schemaCreateMode.equals("exception")) { + } else if (schemaCreateMode.equals(SchemaCreateMode.EXCEPTION.getMode())) { log.error("Collection not exist, throw exception"); throw new RuntimeException("Collection not exist"); } } + if(partition != null) { + Boolean hasPartition = milvusClientV2.hasPartition(HasPartitionReq.builder().collectionName(collection).partitionName(partition).build()); + if (!hasPartition) { + log.info("Partition not exist, creating"); + CreatePartitionReq createPartitionReq = CreatePartitionReq.builder() + .collectionName(collection) + .partitionName(partition) + .build(); + milvusClientV2.createPartition(createPartitionReq); + log.info("Partition created"); + } + } } @Override diff --git a/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java new file mode 100644 index 0000000000..aa67945db4 --- /dev/null +++ b/milvuswriter/src/main/java/com/alibaba/datax/plugin/writer/milvuswriter/SchemaCreateMode.java @@ -0,0 +1,16 @@ +package com.alibaba.datax.plugin.writer.milvuswriter; + +import lombok.Getter; + +@Getter +public enum SchemaCreateMode { + CREATE_WHEN_NOT_EXIST(0), + EXCEPTION(1), + IGNORE(2); + + private int mode; + + SchemaCreateMode(int mode) { + this.mode = mode; + } +} diff --git a/milvuswriter/src/main/resources/plugin_job_template.json b/milvuswriter/src/main/resources/plugin_job_template.json index d4ba4bf1fc..b9bff3628e 100644 --- a/milvuswriter/src/main/resources/plugin_job_template.json +++ b/milvuswriter/src/main/resources/plugin_job_template.json @@ -1,15 +1,49 @@ { - "name": "mongodbwriter", + "name": "milvuswriter", "parameter": { - "address": [], - "userName": "", - "userPassword": "", - "dbName": "", - "collectionName": "", - "column": [], - "upsertInfo": { - "isUpsert": "", - "upsertKey": "" - } + "uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530", + "token": "*****", + "collection": "medium_articles", + "batchSize": 10, + "schemaCreateMode": "createWhenTableNotExit", + "column": [ + { + "name": "id", + "type": "Int64", + "isPrimaryKey": true + }, + { + "name": "title_vector", + "type": "FloatVector", + "dimension": 768 + }, + { + "name": "title", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "link", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "reading_time", + "type": "Int64" + }, + { + "name": "publication", + "type": "VarChar", + "maxLength": 1000 + }, + { + "name": "claps", + "type": "Int64" + }, + { + "name": "responses", + "type": "Int64" + } + ] } } \ No newline at end of file diff --git a/package.xml b/package.xml index 624109f799..49eff42d60 100644 --- a/package.xml +++ b/package.xml @@ -257,8 +257,22 @@ datax + + milvusreader/target/datax/ + + **/*.* + + datax + + + milvuswriter/target/datax/ + + **/*.* + + datax + mysqlwriter/target/datax/ diff --git a/pom.xml b/pom.xml index 1b364a7546..2d4d3a66c1 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,7 @@ starrocksreader sybasereader dorisreader + milvusreader mysqlwriter starrockswriter