Skip to content

Commit

Permalink
add milvus reader plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
nianliuu committed Nov 28, 2024
1 parent 906bf3d commit d36fe0b
Show file tree
Hide file tree
Showing 16 changed files with 834 additions and 22 deletions.
109 changes: 109 additions & 0 deletions milvusreader/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>

<artifactId>milvusreader</artifactId>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
<version>32.0.1-jre</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit5</artifactId>
<version>2.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<resources>
<!--将resource目录也输出到target-->
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
36 changes: 36 additions & 0 deletions milvusreader/src/main/assembly/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>

<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/milvusreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>milvuswriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/milvusreader</outputDirectory>
</fileSet>
</fileSets>

<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/milvusreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
105 changes: 105 additions & 0 deletions milvusreader/src/main/doc/milvusreader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
### Datax MilvusReader
#### 1 快速介绍

MilvusReader 插件利用 Milvus 的java客户端MilvusClient进行Milvus的读操作。

#### 2 实现原理

MilvusReader通过Datax框架从Milvus读取数据,通过主控的JOB程序按照指定的规则对Milvus中的数据进行分片,并行读取,然后将Milvus支持的类型通过逐一判断转换成Datax支持的类型。

#### 3 功能说明
* 该示例从Milvus读一份Collection数据到另一个Milvus。
```json
{
"job": {
"content": [
{
"reader": {
"name": "milvusreader",
"parameter": {
"uri": "https://****.aws-us-west-2.vectordb.zillizcloud.com:19532",
"token": "*****",
"collection": "medium_articles",
"batchSize": 10
}
},
"writer": {
"name": "milvuswriter",
"parameter": {
"uri": "https://*****.aws-us-west-2.vectordb.zillizcloud.com:19530",
"token": "*****",
"collection": "medium_articles",
"batchSize": 10,
"schemaCreateMode": "createWhenTableNotExit",
"column": [
{
"name": "id",
"type": "Int64",
"isPrimaryKey": true
},
{
"name": "title_vector",
"type": "FloatVector",
"dimension": 768
},
{
"name": "title",
"type": "VarChar",
"maxLength": 1000
},
{
"name": "link",
"type": "VarChar",
"maxLength": 1000
},
{
"name": "reading_time",
"type": "Int64"
},
{
"name": "publication",
"type": "VarChar",
"maxLength": 1000
},
{
"name": "claps",
"type": "Int64"
},
{
"name": "responses",
"type": "Int64"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```
#### 4 参数说明

* uri: Milvus Cluster endpoint。【必填】
* token:Milvus的连接token。【必填】
* collection: 读取数据的collection。【必填】
* batchSize: 每次读取数据的行数【选填】

#### 5 类型转换

| DataX 内部类型| Milvus 数据类型 |
| -------- |-----------------|
| Long | int |
| Double | double |
| String | string, varchar |
| Boolean | bool |

- 当前暂不支持读取dynamic schema的数据,及按partition读取

#### 6 性能报告
#### 7 测试报告
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.datax.plugin.reader.milvusreader;

import java.nio.Buffer;
import java.nio.ByteBuffer;

public class BufferUtils {

public static ByteBuffer toByteBuffer(Short[] shortArray) {
ByteBuffer byteBuffer = ByteBuffer.allocate(shortArray.length * 2);

for (Short value : shortArray) {
byteBuffer.putShort(value);
}

// Compatible compilation and running versions are not consistent
// Flip the buffer to prepare for reading
((Buffer) byteBuffer).flip();

return byteBuffer;
}

public static Short[] toShortArray(ByteBuffer byteBuffer) {
Short[] shortArray = new Short[byteBuffer.capacity() / 2];

for (int i = 0; i < shortArray.length; i++) {
shortArray[i] = byteBuffer.getShort();
}

return shortArray;
}

public static ByteBuffer toByteBuffer(Float[] floatArray) {
ByteBuffer byteBuffer = ByteBuffer.allocate(floatArray.length * 4);

for (float value : floatArray) {
byteBuffer.putFloat(value);
}

((Buffer) byteBuffer).flip();

return byteBuffer;
}

public static Float[] toFloatArray(ByteBuffer byteBuffer) {
Float[] floatArray = new Float[byteBuffer.capacity() / 4];

for (int i = 0; i < floatArray.length; i++) {
floatArray[i] = byteBuffer.getFloat();
}

return floatArray;
}

public static ByteBuffer toByteBuffer(Double[] doubleArray) {
ByteBuffer byteBuffer = ByteBuffer.allocate(doubleArray.length * 8);

for (double value : doubleArray) {
byteBuffer.putDouble(value);
}

((Buffer) byteBuffer).flip();

return byteBuffer;
}

public static Double[] toDoubleArray(ByteBuffer byteBuffer) {
Double[] doubleArray = new Double[byteBuffer.capacity() / 8];

for (int i = 0; i < doubleArray.length; i++) {
doubleArray[i] = byteBuffer.getDouble();
}

return doubleArray;
}

public static ByteBuffer toByteBuffer(Integer[] intArray) {
ByteBuffer byteBuffer = ByteBuffer.allocate(intArray.length * 4);

for (int value : intArray) {
byteBuffer.putInt(value);
}

((Buffer) byteBuffer).flip();

return byteBuffer;
}

public static Integer[] toIntArray(ByteBuffer byteBuffer) {
Integer[] intArray = new Integer[byteBuffer.capacity() / 4];

for (int i = 0; i < intArray.length; i++) {
intArray[i] = byteBuffer.getInt();
}

return intArray;
}
}
Loading

0 comments on commit d36fe0b

Please sign in to comment.