Skip to content

Commit

Permalink
update milvus connector to support dynamic schema, failed retry, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
nianliuu committed Oct 24, 2024
1 parent 4406fbc commit 54596ca
Show file tree
Hide file tree
Showing 25 changed files with 1,798 additions and 847 deletions.
10 changes: 7 additions & 3 deletions docs/en/connector-v2/sink/Mivlus.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
## Description

Write data to Milvus or Zilliz Cloud

This Milvus Sink connector write data to Milvus or Zilliz Cloud, it has the following features:
- support read and write data by partition
- support write dynamic schema data from Metadata Column
- json data will be converted to json string and sink as json as well
- retry automatically to bypass ratelimit and grpc limit
## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
Expand Down Expand Up @@ -34,7 +37,7 @@ Write data to Milvus or Zilliz Cloud

## Sink Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|----------------------|---------|----------|------------------------------|-----------------------------------------------------------|
| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. |
| token | String | Yes | - | User:password |
Expand All @@ -44,6 +47,7 @@ Write data to Milvus or Zilliz Cloud
| enable_upsert | boolean | No | false | Upsert data not insert. |
| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. |
| batch_size | int | No | 1000 | Write batch size. |
| partition_key | String | No | | Milvus partition key field |

## Task Example

Expand Down
8 changes: 7 additions & 1 deletion docs/en/connector-v2/source/Mivlus.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
## Description

Read data from Milvus or Zilliz Cloud
This Milvus source connector reads data from Milvus or Zilliz Cloud, it has the following features:
- support read and write data by partition
- support read dynamic schema data into Metadata Column
- json data will be converted to json string and sink as json as well
- retry automatically to bypass ratelimit and grpc limit

## Key Features

Expand Down Expand Up @@ -53,3 +57,5 @@ source {
}
```

## Changelog

Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,25 @@ public static PhysicalColumn of(
String comment,
String sourceType,
Map<String, Object> options) {
return new PhysicalColumn(
name, dataType, columnLength, nullable, defaultValue, comment, sourceType, options);
}

public static PhysicalColumn of(
String name,
SeaTunnelDataType<?> dataType,
Long columnLength,
Integer scale,
boolean nullable,
Object defaultValue,
String comment,
String sourceType,
Map<String, Object> options) {
return new PhysicalColumn(
name,
dataType,
columnLength,
null,
scale,
nullable,
defaultValue,
comment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -35,6 +36,8 @@ public final class SeaTunnelRow implements Serializable {

private volatile int size;

private Map<String, Object> options = new HashMap<>();

public SeaTunnelRow(int arity) {
this.fields = new Object[arity];
}
Expand All @@ -55,6 +58,10 @@ public void setRowKind(RowKind rowKind) {
this.rowKind = rowKind;
}

public void setOptions(Map<String, Object> options) {
this.options = options;
}

public int getArity() {
return fields.length;
}
Expand All @@ -67,6 +74,10 @@ public RowKind getRowKind() {
return this.rowKind;
}

public Map<String, Object> getOptions() {
return options;
}

public Object[] getFields() {
return fields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.milvus.sink.batch;
package org.apache.seatunnel.common.constants;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import lombok.Getter;

public interface MilvusBatchWriter {
@Getter
public enum CommonOptions {
JSON("Json"),
METADATA("Metadata"),
PARTITION("Partition"),
;

void addToBatch(SeaTunnelRow element);
private final String name;

boolean needFlush();

boolean flush();

void close();
CommonOptions(String name) {
this.name = name;
}
}
25 changes: 10 additions & 15 deletions seatunnel-connectors-v2/connector-milvus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,20 @@

<artifactId>connector-milvus</artifactId>
<name>SeaTunnel : Connectors V2 : Milvus</name>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.3</version>
<version>2.4.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand All @@ -42,19 +50,6 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.11.0</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Loading

0 comments on commit 54596ca

Please sign in to comment.