-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Specify RocketMQ connect domain model #180
Comments
imo, the source task provides the void commitRecord(SourceRecord record, RecordOffset recordOffset) method. If the source task implementation wants to maintain the offset itself, it implements the commit method. If it is not implemented, the WorkerSourceTask will maintain the offset. Is it possible to solve this problem? |
@odbozhou In WorkerSourceTask, if producer.send failed, sourceTask will also commit instead of call sourceTask's other method like dismissFailedMsgs to conduct produce failed messages. |
Currently, WorkerSourceTask does not have the logic of commit call, and it will only automatically commit the offset when the sending is successful. In order to support the ability of SourceTask to maintain the offset by itself, the logic of the new commit can be optimized. The most important thing for success and failure is the impact on the offset. |
Yes, the logic of the new commit will be optimized. IMO, open the conduction of sending failure to users will be better, like the interface dismissFailedMsgs in the above. |
|
@sunxiaojian 先来说一下引入这两个接口的背景,我需要自己维护offset,目前对于WorkerSourceTask发送异常的情况,是没有任何机制通知到SourceConnector的,如果要自己维护offset,是通过消费内部offset的消息来实现吗? |
@odbozhou @sunxiaojian 另外还有一个问题可以讨论一下,现在的流程是poll , transform, converter, producer,但是这样性能不高,如果要提高性能,就需要并发的poll,这样就需要自己管理offset了,在WorkerSourceTask在成功或失败时,需要有个机制能通知到SourceConnector做相应的offset管理。 |
问题一:对于自己维护offset, 不需要消费offset消息,你可以自己通过new PositionStorageWriter()实现,自定义的offset放进去; 如果仅仅是读取offset信息,在source task context中包含 offsetReader对象可以获取到对应的当前offset信息,若不满足可以提出意见,我们共同做进一步改造 |
这个流程只是一个task的流程,一个connector下可以包含多个task, 至于怎么组装task,在声明connector taskConfigs时做不同task config组装就可以了, 可以按照自己的需求去进行不同的task config, 来做任务的拆分 |
问题一我们以现在的RmqSourceRepicator为例,现在是实现不了自己维护位点的,WorkerSourceTask里面的producer发送成功了或者失败了都没有通知机制; |
这是一个方案,但是从性能角度触发,一个task负责一个队列的情况,一定是poll、transform、converter、producer分别属于不同的线程才能达到最好的效果。 |
问题一: 实现不了自己维护位点的,是否是因为commit()和commitRecord()未进行调用? 这个最近会做修正; |
问题二修改commitRecord的参数可能不太好,没有保持兼容性,可以考虑在ConnectRecord.extensions增加标识是否成功 |
嗯嗯,考虑到commitRecord目前还没有用户使用,所以本次就直接升级了 |
Each sourceTask and sinkTask in RocketMQ connect architecture have its own conduct logic, the data flow and control flow is controlled by WorkerSinkTask and WorkerSourceTask. We just call current conduct logic as RocketMQ connect domain model, but I found it's not flexible enough. For example, SourceTask only commit after data sink, regardless of success or failure. SinkTask put records to RocketMQ without result handling. IMO, we can add callback for data sink, and open these callback to users.
The text was updated successfully, but these errors were encountered: