From 7994cd6c7d0dee88984248f8089a5a064274ea6c Mon Sep 17 00:00:00 2001
From: keyboardbobo <cjb20032003@126.com>
Date: Fri, 3 Mar 2023 10:07:13 +0800
Subject: [PATCH] If all partition messages are sent, cancel the delayed tasks
 to avoid full gc or oom (#1749)

Fixes #1739

### Motivation

The request.timeout.ms setting is too large to cause broker full gc or oom.

### Modifications

If all partition messages are sent, cancel the delayed tasks to avoid full gc or oom
---
 .../pulsar/handlers/kop/storage/ReplicaManager.java             | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java
index 17105d4e57..380fdb7850 100644
--- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java
+++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/ReplicaManager.java
@@ -143,6 +143,8 @@ public CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>>
                 return;
             }
             if (restTopicPartitionNum == 0) {
+                // If all tasks are sent, cancel the timer tasks to avoid full gc or oom
+                producePurgatory.checkAndComplete(new DelayedOperationKey.TopicPartitionOperationKey(topicPartition));
                 complete.run();
             }
         };