Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify RocketMQ connect domain model #180

Open
lizhiboo opened this issue Jun 23, 2022 · 14 comments
Open

Specify RocketMQ connect domain model #180

lizhiboo opened this issue Jun 23, 2022 · 14 comments
Labels

Comments

@lizhiboo
Copy link
Contributor

Each sourceTask and sinkTask in RocketMQ connect architecture have its own conduct logic, the data flow and control flow is controlled by WorkerSinkTask and WorkerSourceTask. We just call current conduct logic as RocketMQ connect domain model, but I found it's not flexible enough. For example, SourceTask only commit after data sink, regardless of success or failure. SinkTask put records to RocketMQ without result handling. IMO, we can add callback for data sink, and open these callback to users.

public abstract class SourceTask {
    public boolean dismissFailedMsgs(final List<ConnectRecord> failedRecords) {
        // dismiss for sink to RocketMQ failed records
        return false;
    }

    public boolean customerFailedMsgs(final List<ConnectRecord> failedRecords) {
        // customer conduct logic for sink to RocketMQ failed records
        return false;
    }
}

public abstract class SinkTask {
    // Callback conduct for success or failed records that sink to RocketMQ
    public abstract void put(List<ConnectRecord> var1, CallBack<SuccessRecords,FailedRecords> callback) throws ConnectException;
}
@odbozhou
Copy link
Contributor

imo, the source task provides the void commitRecord(SourceRecord record, RecordOffset recordOffset) method. If the source task implementation wants to maintain the offset itself, it implements the commit method. If it is not implemented, the WorkerSourceTask will maintain the offset. Is it possible to solve this problem?
The sink task is also similar, and the corresponding api has been provided in the api
flush(Map<RecordPartition, RecordOffset> currentOffsets)
If the site that fails to submit can be flushed to remove the corresponding partition, WorkerSinkTask can not submit the site of the failed partition.

@lizhiboo
Copy link
Contributor Author

@odbozhou In WorkerSourceTask, if producer.send failed, sourceTask will also commit instead of call sourceTask's other method like dismissFailedMsgs to conduct produce failed messages.

@odbozhou
Copy link
Contributor

odbozhou commented Jun 23, 2022

Currently, WorkerSourceTask does not have the logic of commit call, and it will only automatically commit the offset when the sending is successful. In order to support the ability of SourceTask to maintain the offset by itself, the logic of the new commit can be optimized. The most important thing for success and failure is the impact on the offset.

@lizhiboo
Copy link
Contributor Author

Currently, WorkerSourceTask does not have the logic of commit call, and it will only automatically commit the offset when the sending is successful. In order to support the ability of SourceTask to maintain the offset by itself, the logic of the new commit can be optimized. The most important thing for success and failure is the impact on the offset.

Yes, the logic of the new commit will be optimized. IMO, open the conduction of sending failure to users will be better, like the interface dismissFailedMsgs in the above.

@sunxiaojian
Copy link
Contributor

  1. WorkerSourceTask中对一条数据的处理过程是 poll , transform, converter, producer;
    <1.> poll , transform过程是用户自定义,发生异常当在自定义插件逻辑中捕捉处理
    <2.> converter是正向序列化流程,由于序列化的都是标准的connectRecord ,一般不会有问题,问题经常发生在sink端对数据反序列化不出来
    <3.>producer发送失败,基本存在超时和服务不可用两种,非逻辑错误,这时首要的是保证offset不被提交,等恢复正常后能再次正常被处理, 这样发给dismissFailedMsgs、customerFailedMsgs就没有价值;由于系统的失败重试,可能也会存在一条数据被多次发向dismissFailedMsgs、customerFailedMsgs 中,无法保证一条数据只能被成功处理唯一;
    如果不使用系统内置的offset逻辑,自定义offset维护逻辑,那就需要dismissFailedMsgs、customerFailedMsgs和 commit,及commitRecord配合使用, 并需要对数据进行另存处理,这样就脱离了原数据处理的轨道,对一些要保序的数据就不适用

    所以整体来看source侧的核心是不是就只维护好offset就可以 ?

  2. WorkerSourceTask中对一条数据的处理过程是consumer , converter,transform , put;

    已经定义了死信队列的处理逻辑,错误数据可通过死信队列来暂存

@lizhiboo
Copy link
Contributor Author

lizhiboo commented Jul 1, 2022

@sunxiaojian 先来说一下引入这两个接口的背景,我需要自己维护offset,目前对于WorkerSourceTask发送异常的情况,是没有任何机制通知到SourceConnector的,如果要自己维护offset,是通过消费内部offset的消息来实现吗?
对于exactly once的问题,是需要外部存储来记录,从而来保证刚好一次,如果需要实现刚好一次的逻辑,dismissFailedMsgs 和 customerFailedMsg同样可以访问这个外部存储实现;
对于顺序性保证的问题,现在的实现同样不能保证顺序,如果需要实现保序,当发送失败时需要阻塞住,dismissFailedMsgs 和 customerFailedMsg也不会造成干扰,主要看这两个方法在WorkerSourceTask是什么时候被调用的。

@lizhiboo
Copy link
Contributor Author

lizhiboo commented Jul 1, 2022

@odbozhou @sunxiaojian 另外还有一个问题可以讨论一下,现在的流程是poll , transform, converter, producer,但是这样性能不高,如果要提高性能,就需要并发的poll,这样就需要自己管理offset了,在WorkerSourceTask在成功或失败时,需要有个机制能通知到SourceConnector做相应的offset管理。

@sunxiaojian
Copy link
Contributor

sunxiaojian commented Jul 1, 2022

@sunxiaojian 先来说一下引入这两个接口的背景,我需要自己维护offset,目前对于WorkerSourceTask发送异常的情况,是没有任何机制通知到SourceConnector的,如果要自己维护offset,是通过消费内部offset的消息来实现吗? 对于exactly once的问题,是需要外部存储来记录,从而来保证刚好一次,如果需要实现刚好一次的逻辑,dismissFailedMsgs 和 customerFailedMsg同样可以访问这个外部存储实现; 对于顺序性保证的问题,现在的实现同样不能保证顺序,如果需要实现保序,当发送失败时需要阻塞住,dismissFailedMsgs 和 customerFailedMsg也不会造成干扰,主要看这两个方法在WorkerSourceTask是什么时候被调用的。

问题一:对于自己维护offset, 不需要消费offset消息,你可以自己通过new PositionStorageWriter()实现,自定义的offset放进去; 如果仅仅是读取offset信息,在source task context中包含 offsetReader对象可以获取到对应的当前offset信息,若不满足可以提出意见,我们共同做进一步改造
问题二:exactly once问题尽量让系统来保证,尽量通过系统offset来保证,如果不满足可以对其进行改进
问题三:对于顺序性保证的问题,如果系统不能保证需要对系统内进行优化,首先要系统保证,如果系统保证了是不是就不需要了加方法了?

@sunxiaojian
Copy link
Contributor

sunxiaojian commented Jul 1, 2022

@odbozhou @sunxiaojian 另外还有一个问题可以讨论一下,现在的流程是poll , transform, converter, producer,但是这样性能不高,如果要提高性能,就需要并发的poll,这样就需要自己管理offset了,在WorkerSourceTask在成功或失败时,需要有个机制能通知到SourceConnector做相应的offset管理。

这个流程只是一个task的流程,一个connector下可以包含多个task, 至于怎么组装task,在声明connector taskConfigs时做不同task config组装就可以了, 可以按照自己的需求去进行不同的task config, 来做任务的拆分

@lizhiboo
Copy link
Contributor Author

lizhiboo commented Jul 1, 2022

问题二:exactly once问题尽量让系统来保证,尽量通过系统offset来保证,如果不满足可以对其进行改进
问题三:对于顺序性保证的问题,如果系统不能保证需要对系统内进行优化,首先要系统保证,如果系统保证了是不是就不需要了加方法了?

问题一我们以现在的RmqSourceRepicator为例,现在是实现不了自己维护位点的,WorkerSourceTask里面的producer发送成功了或者失败了都没有通知机制;
问题二和问题三都是系统实现的问题,但是增加这两个接口不影响系统实现,所以这两个问题不会成为connector实现精准一次投递和保序的绊脚石。

@lizhiboo
Copy link
Contributor Author

lizhiboo commented Jul 1, 2022

@odbozhou @sunxiaojian 另外还有一个问题可以讨论一下,现在的流程是poll , transform, converter, producer,但是这样性能不高,如果要提高性能,就需要并发的poll,这样就需要自己管理offset了,在WorkerSourceTask在成功或失败时,需要有个机制能通知到SourceConnector做相应的offset管理。

这个流程只是一个task的流程,一个connector下可以包含多个task, 至于怎么组装task,在声明connector taskConfigs时做不同task config组装就可以了, 可以按照自己的需求去进行不同的task config, 来做任务的拆分

这是一个方案,但是从性能角度触发,一个task负责一个队列的情况,一定是poll、transform、converter、producer分别属于不同的线程才能达到最好的效果。

@sunxiaojian
Copy link
Contributor

sunxiaojian commented Jul 1, 2022

问题二:exactly once问题尽量让系统来保证,尽量通过系统offset来保证,如果不满足可以对其进行改进
问题三:对于顺序性保证的问题,如果系统不能保证需要对系统内进行优化,首先要系统保证,如果系统保证了是不是就不需要了加方法了?

问题一我们以现在的RmqSourceRepicator为例,现在是实现不了自己维护位点的,WorkerSourceTask里面的producer发送成功了或者失败了都没有通知机制; 问题二和问题三都是系统实现的问题,但是增加这两个接口不影响系统实现,所以这两个问题不会成为connector实现精准一次投递和保序的绊脚石。

问题一: 实现不了自己维护位点的,是否是因为commit()和commitRecord()未进行调用? 这个最近会做修正;
问题二: 可以对 commitRecord()进行丰富, 比如改成 commitRecord(ConnectRecord,RecordMetadata metadata) 来处理,metadata 为发送成功的信息描述, 如果metadata为空则任务是未发送成功的数据,作为错误数据处理;这样可以不新增方法,保证了SourceTask的简洁;如果offset自己维护,则不能再使用系统托管的offset,否则两处可能存在不一致

@lizhiboo @odbozhou

@lizhiboo
Copy link
Contributor Author

lizhiboo commented Jul 4, 2022

问题二修改commitRecord的参数可能不太好,没有保持兼容性,可以考虑在ConnectRecord.extensions增加标识是否成功

@sunxiaojian
Copy link
Contributor

问题二修改commitRecord的参数可能不太好,没有保持兼容性,可以考虑在ConnectRecord.extensions增加标识是否成功

嗯嗯,考虑到commitRecord目前还没有用户使用,所以本次就直接升级了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants