Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #51] Optimize the source task commit method #52

Merged
merged 22 commits into from
Jul 14, 2022
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c15757b
SinkTask and SourceTask implement the validate method https://github.…
sunxiaojian Apr 12, 2022
dcc706d
Adjust the init and start methods of the component interface
sunxiaojian Apr 15, 2022
fa134ee
Set pause and resume to deprecated methods. It feels like they can be…
sunxiaojian Apr 15, 2022
55b9ac0
Add struct object and optimize schema and schema builder API #41
sunxiaojian May 24, 2022
c72659e
add offset storage writer #41
sunxiaojian May 24, 2022
e6deeb8
add getter and setter method #41
sunxiaojian Jun 4, 2022
aa9f315
add SchemaAndValue #41
sunxiaojian Jun 4, 2022
23ed62a
add logical type #41
sunxiaojian Jun 4, 2022
26fe155
Schemabuilder add required method
sunxiaojian Jun 6, 2022
3e6f1e6
schema add hashCode and equals method
sunxiaojian Jun 6, 2022
14765cb
fixed doc method
sunxiaojian Jun 6, 2022
a6b3c79
Field add equals and hashcode method
sunxiaojian Jun 7, 2022
427e009
optimize api #85
sunxiaojian Jun 8, 2022
1dc98a3
Merge branch 'dev'
sunxiaojian Jun 8, 2022
a38fa65
Merge branch 'optimize-api'
sunxiaojian Jun 8, 2022
c2314fb
Merge branch 'openmessaging:master' into master
sunxiaojian Jun 9, 2022
c4b1836
Optimize transform api #45
sunxiaojian Jun 9, 2022
c76d2fb
Optimize transform api and add RecordConverter
sunxiaojian Jun 18, 2022
ed258b0
Merge branch 'openmessaging:master' into master
sunxiaojian Jun 21, 2022
db8f31f
Merge branch 'openmessaging:master' into master
sunxiaojian Jun 28, 2022
c2205c3
Optimize the source task commit method #51
sunxiaojian Jul 4, 2022
5d7f13f
add batch commit #51
sunxiaojian Jul 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.data.ConnectRecord;

import java.util.List;
import java.util.Map;

/**
* The source task API definition is used to define the logic for data pulling
Expand All @@ -38,6 +40,23 @@ public void init(SourceTaskContext sourceTaskContext) {
*/
public abstract List<ConnectRecord> poll() throws InterruptedException;


/**
* batch commit
* @param records
* @param metadata
*/
public void commit(final List<ConnectRecord> records, Map<String,String> metadata) {
}
/**
* commit record
* @param record
* @param metadata
*/
public void commit(final ConnectRecord record, Map<String,String> metadata) {
commit(record);
}

/**
* <p>
* Commit an individual {@link ConnectRecord} when the callback from the producer client is received.
Expand All @@ -48,11 +67,9 @@ public void init(SourceTaskContext sourceTaskContext) {
* in their own system.
* </p>
*
* @throws InterruptedException task thread interupt exception
* @param connectRecords connect records
* @param record connect record
*/
public void commit(final List<ConnectRecord> connectRecords) throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对于某些Source保留Offset的场景,如果一次commit一批数据,Source可以根据不同的分区信息,选择每个分区的最新位点进行提交,以此降低提交的频次;如果是单个提交,类似的场景就不好处理。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对于某些Source保留Offset的场景,如果一次commit一批数据,Source可以根据不同的分区信息,选择每个分区的最新位点进行提交,以此降低提交的频次;如果是单个提交,类似的场景就不好处理。

次处主要的考虑主要有以下几点,

  1. 每条数据经过transform 、converter、send 成功后就可以commit了,如果是批次的commit,系统内还需要缓存record offset信息,数据丢了会出现重复按照历史offset拉取数据的问题
  2. 如果要降低批次提交,看用户自身需要可以在插件中自缓存list , 同样可以解决相同的问题
    所以,觉得提交单条还是更纯粹一点

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里我理解你讲的是send单条数据的场景,但是有些场景下,我们需要支持send一批数据。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里我理解你讲的是send单条数据的场景,但是有些场景下,我们需要支持send一批数据。

嗯,考虑过这个问题,但是一个source task拉取的数据可能不是写入同一个topic,可能是多个,这个要看写插件的逻辑定;所以可能暂时用不到,也可能一直用不到,如果需要时可以加上

commit();
public void commit(final ConnectRecord record) {
}

/**
Expand Down