Skip to content

Commit

Permalink
[ISSUES apache#530] fix order message bug
Browse files Browse the repository at this point in the history
  • Loading branch information
靖愉 committed Apr 2, 2024
1 parent 02a168c commit 7699aa6
Showing 1 changed file with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,22 @@ public void onException(Throwable throwable) {
} else {
// Partition message ordering,
// At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business
producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback);
try {
SendResult result = producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys());
log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
// complete record
counter.completeRecord();
// commit record for custom
recordSent(preTransformRecord, sourceMessage, result);
// ack record position
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
} catch (Exception e){
log.error("Source task send record failed ,error msg {}. message {}", e.getMessage(), JSON.toJSONString(sourceMessage), e);
// skip record
counter.skipRecord();
// record send failed
recordSendFailed(false, sourceMessage, preTransformRecord, e);
}
}

} catch (RetriableException e) {
Expand Down

0 comments on commit 7699aa6

Please sign in to comment.