Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add milvusReader and milvusWriter to support read and write data for milvus #2249

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

nianliuu
Copy link

Overview

Add MilvusReader and MilvusWriter to support read and write data for milvus

Code Change

Currently, DataX provider no support for vector data, in this PR, we introduced milvusread and writer plugin, which support moving data between milvus
we support the following vector type for now:

  • float vector
  • binary vector

Test

tested with data-x example to migrate data from one milvus to another

Limitation

  • not support moving dynamic schema data
  • not support read and write by partition

@nianliuu nianliuu force-pushed the master branch 3 times, most recently from d36fe0b to ef56ae7 Compare November 28, 2024 05:15
@Override
public void destroy() {
log.info("Closing Milvus writer, committing data and closing connection");
this.milvusBufferWriter.commit();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.milvusBufferWriter.commit()这个感觉放到startWrite while后,判断下不为空做一次好些。便于理解和维护

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

参考
image

.collectionName(collection)
.data(dataCache)
.build();
milvusClientV2.upsert(upsertReq);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是否要考虑下失败重试的情况,重试次数和间隔时间设置

.uri(writerSliceConfig.getString(KeyConstant.URI))
.token(writerSliceConfig.getString(KeyConstant.TOKEN))
.build();
if(writerSliceConfig.getString(KeyConstant.DATABASE) == null) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里判断条件我理解是不是!=null


@Getter
public enum SchemaCreateMode {
CREATE_WHEN_NOT_EXIST(0),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议还是用字符串来表示,而不是int,这样任务配置可读性会高很多
● createWhenTableNotExit
● Ignore
● recreate

}
UpsertReq upsertReq = UpsertReq.builder()
.collectionName(collection)
.data(dataCache)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里看着没有写入对应配置的partition,直接写到了默认partition。 如果配置了partition应该要写到对应partition下

package com.alibaba.datax.plugin.writer.milvuswriter;

public class KeyConstant {
public static final String URI = "uri";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uri改成endpoint吧,datax里基本都用endpoint做参数配置

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants