Skip to content

Commit

Permalink
Remove Retry From Abstract Event Consumer (#16405)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohityadav766 authored May 24, 2024
1 parent 3a9c247 commit f8ed079
Showing 1 changed file with 11 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
import static org.openmetadata.service.events.subscription.AlertUtil.getFilteredEvents;
import static org.openmetadata.service.events.subscription.AlertUtil.getStartingOffset;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -91,62 +88,17 @@ public void handleFailedEvent(EventPublisherException ex) {
failingSubscriptionId,
changeEvent);

// Update Failed Event with details
FailedEvent failedEvent =
new FailedEvent()
.withFailingSubscriptionId(failingSubscriptionId)
.withChangeEvent(changeEvent)
.withRetriesLeft(eventSubscription.getRetries())
.withTimestamp(System.currentTimeMillis());

if (eventSubscription.getRetries() == 0) {
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertFailedEvent(
eventSubscription.getId().toString(),
String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()),
JsonUtils.pojoToJson(failedEvent));
} else {
// Check in Qtz Map
Set<FailedEvent> failedEventsList =
JsonUtils.convertValue(
jobDetail.getJobDataMap().get(FAILED_EVENT_EXTENSION), new TypeReference<>() {});
if (failedEventsList == null) {
failedEventsList = new HashSet<>();
} else {
// Remove exising change event
boolean removeChangeEvent =
failedEventsList.removeIf(
failedEvent1 -> {
if (failedEvent1
.getChangeEvent()
.getId()
.equals(failedEvent.getChangeEvent().getId())
&& failedEvent1.getFailingSubscriptionId().equals(failingSubscriptionId)) {
failedEvent.withRetriesLeft(failedEvent1.getRetriesLeft());
return true;
}
return false;
});

if (removeChangeEvent) {
if (failedEvent.getRetriesLeft() == 0) {
// If the Retries are exhausted, then remove the Event from the List to DLQ
failedEvent.withRetriesLeft(0);
} else {
failedEvent.withRetriesLeft(failedEvent.getRetriesLeft() - 1);
}
}
}
failedEventsList.add(failedEvent);
jobDetail.getJobDataMap().put(FAILED_EVENT_EXTENSION, failedEventsList);
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertFailedEvent(
eventSubscription.getId().toString(),
String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()),
JsonUtils.pojoToJson(failedEvent));
}
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertFailedEvent(
eventSubscription.getId().toString(),
String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()),
JsonUtils.pojoToJson(
new FailedEvent()
.withFailingSubscriptionId(failingSubscriptionId)
.withChangeEvent(changeEvent)
.withRetriesLeft(eventSubscription.getRetries())
.withTimestamp(System.currentTimeMillis())));
}

private long loadInitialOffset(JobExecutionContext context) {
Expand Down Expand Up @@ -278,20 +230,6 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
int batchSize = batch.size();
Map<ChangeEvent, Set<UUID>> eventsWithReceivers = createEventsWithReceivers(batch);
try {
// Retry Failed Events
Set<FailedEvent> failedEventsList =
JsonUtils.convertValue(
jobDetail.getJobDataMap().get(FAILED_EVENT_EXTENSION), new TypeReference<>() {});
if (failedEventsList != null) {
Map<ChangeEvent, Set<UUID>> failedChangeEvents =
failedEventsList.stream()
.filter(failedEvent -> failedEvent.getRetriesLeft() > 0)
.collect(
Collectors.toMap(
FailedEvent::getChangeEvent,
failedEvent -> Set.of(failedEvent.getFailingSubscriptionId())));
eventsWithReceivers.putAll(failedChangeEvents);
}
// Publish Events
if (!eventsWithReceivers.isEmpty()) {
alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + eventsWithReceivers.size());
Expand Down

0 comments on commit f8ed079

Please sign in to comment.