Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Don't perform topic lookup immediately when topics are loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Aug 15, 2021
1 parent f1641f2 commit 0423673
Showing 1 changed file with 0 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public static class OffsetAndTopicListener implements NamespaceBundleOwnershipLi
final NamespaceName kafkaMetaNs;
final NamespaceName kafkaTopicNs;
final GroupCoordinator groupCoordinator;
final LookupClient lookupClient;
final String brokerUrl;

public OffsetAndTopicListener(BrokerService service,
Expand All @@ -114,7 +113,6 @@ public OffsetAndTopicListener(BrokerService service,
this.groupCoordinator = groupCoordinator;
this.kafkaTopicNs = NamespaceName
.get(kafkaConfig.getKafkaTenant(), kafkaConfig.getKafkaNamespace());
this.lookupClient = KafkaProtocolHandler.getLookupClient(service.pulsar());
this.brokerUrl = service.pulsar().getBrokerServiceUrl();
}

Expand Down Expand Up @@ -144,11 +142,6 @@ public void onLoad(NamespaceBundle bundle) {
}
KafkaTopicManager.removeTopicManagerCache(name.toString());
KopBrokerLookupManager.removeTopicManagerCache(name.toString());
// update lookup cache when onload
final CompletableFuture<InetSocketAddress> retFuture =
lookupClient.getBrokerAddress(TopicName.get(topic));
KafkaTopicManager.LOOKUP_CACHE.put(topic, retFuture);
KopBrokerLookupManager.updateTopicManagerCache(topic, retFuture);
}
} else {
log.error("Failed to get owned topic list for "
Expand Down

0 comments on commit 0423673

Please sign in to comment.