diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java index 79317bdb..aa7412b8 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java @@ -276,7 +276,22 @@ public void onException(Throwable throwable) { } else { // Partition message ordering, // At the same time, ensure that the data is pulled in an orderly manner, which needs to be guaranteed by sourceTask in the business - producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys(), callback); + try { + SendResult result = producer.send(sourceMessage, new SelectMessageQueueByHash(), sourceMessage.getKeys()); + log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic()); + // complete record + counter.completeRecord(); + // commit record for custom + recordSent(preTransformRecord, sourceMessage, result); + // ack record position + submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack); + } catch (Exception e){ + log.error("Source task send record failed ,error msg {}. message {}", e.getMessage(), JSON.toJSONString(sourceMessage), e); + // skip record + counter.skipRecord(); + // record send failed + recordSendFailed(false, sourceMessage, preTransformRecord, e); + } } } catch (RetriableException e) {