Skip to content

Commit

Permalink
NOTIF-55 Use Multi when relevant, replace with Uni<List> otherwise
Browse files Browse the repository at this point in the history
  • Loading branch information
gwenneg committed May 10, 2021
1 parent c81f4d7 commit 65e698b
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.redhat.cloud.notifications.models.Application;
import com.redhat.cloud.notifications.models.Bundle;
import com.redhat.cloud.notifications.models.EventType;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.hibernate.reactive.mutiny.Mutiny;

Expand Down Expand Up @@ -66,12 +65,11 @@ public Uni<EventType> createEventType(EventType eventType) {
.onItem().transform(EventType::filterOutApplication);
}

public Multi<Application> getApplications(String bundleName) {
public Uni<List<Application>> getApplications(String bundleName) {
String query = "FROM Application WHERE bundle.name = :bundleName";
return session.createQuery(query, Application.class)
.setParameter("bundleName", bundleName)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}

public Uni<Application> getApplication(UUID id) {
Expand All @@ -95,7 +93,7 @@ public Uni<EventType> getEventType(String bundleName, String applicationName, St
.getSingleResultOrNull();
}

public Multi<EventType> getEventTypes(UUID appId) {
public Uni<List<EventType>> getEventTypes(UUID appId) {
String query = "FROM EventType WHERE application.id = :appId";
return session.find(Application.class, appId)
.onItem().ifNull().failWith(new NotFoundException())
Expand All @@ -104,8 +102,11 @@ public Multi<EventType> getEventTypes(UUID appId) {
.setParameter("appId", appId)
.getResultList()
)
.onItem().transformToMulti(Multi.createFrom()::iterable)
.onItem().transform(EventType::filterOutApplication);
.onItem().invoke(eventTypes -> {
for (EventType eventType : eventTypes) {
eventType.filterOutApplication();
}
});
}

public Uni<Boolean> deleteEventTypeById(UUID id) {
Expand Down Expand Up @@ -152,14 +153,13 @@ public Uni<List<EventType>> getEventTypes(Query limiter, Set<UUID> appIds, UUID
}

// TODO [BG Phase 2] Delete this method
public Multi<EventType> getEventTypesByEndpointId(@NotNull String accountId, @NotNull UUID endpointId) {
public Uni<List<EventType>> getEventTypesByEndpointId(@NotNull String accountId, @NotNull UUID endpointId) {
String query = "SELECT e FROM EventType e LEFT JOIN FETCH e.application JOIN e.targets t " +
"WHERE t.id.accountId = :accountId AND t.endpoint.id = :endpointId";
return session.createQuery(query, EventType.class)
.setParameter("accountId", accountId)
.setParameter("endpointId", endpointId)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}

// TODO [BG Phase 2] Remove '_BG' suffix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import com.redhat.cloud.notifications.models.EmailAggregation;
import com.redhat.cloud.notifications.models.EmailAggregationKey;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.hibernate.reactive.mutiny.Mutiny;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.time.LocalDateTime;
import java.util.List;

@ApplicationScoped
public class EmailAggregationResources {
Expand All @@ -24,26 +24,24 @@ public Uni<Boolean> addEmailAggregation(EmailAggregation aggregation) {
.onFailure().recoverWithItem(Boolean.FALSE);
}

public Multi<EmailAggregationKey> getApplicationsWithPendingAggregation(LocalDateTime start, LocalDateTime end) {
public Uni<List<EmailAggregationKey>> getApplicationsWithPendingAggregation(LocalDateTime start, LocalDateTime end) {
String query = "SELECT DISTINCT NEW com.redhat.cloud.notifications.models.EmailAggregationKey(ea.accountId, ea.bundleName, ea.applicationName) " +
"FROM EmailAggregation ea WHERE ea.created > :start AND ea.created <= :end";
return session.createQuery(query, EmailAggregationKey.class)
.setParameter("start", start)
.setParameter("end", end)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}

public Multi<EmailAggregation> getEmailAggregation(EmailAggregationKey key, LocalDateTime start, LocalDateTime end) {
public Uni<List<EmailAggregation>> getEmailAggregation(EmailAggregationKey key, LocalDateTime start, LocalDateTime end) {
String query = "FROM EmailAggregation WHERE accountId = :accountId AND bundleName = :bundleName AND applicationName = :applicationName AND created > :start AND created <= :end ORDER BY created";
return session.createQuery(query, EmailAggregation.class)
.setParameter("accountId", key.getAccountId())
.setParameter("bundleName", key.getBundle())
.setParameter("applicationName", key.getApplication())
.setParameter("start", start)
.setParameter("end", end)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}

public Uni<Integer> purgeOldAggregation(EmailAggregationKey key, LocalDateTime lastUsedTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import com.redhat.cloud.notifications.models.EmailSubscription;
import com.redhat.cloud.notifications.models.EmailSubscriptionType;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.hibernate.reactive.mutiny.Mutiny;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.util.List;

@ApplicationScoped
public class EndpointEmailSubscriptionResources {
Expand Down Expand Up @@ -59,14 +59,13 @@ public Uni<EmailSubscription> getEmailSubscription(String accountNumber, String
.getSingleResultOrNull();
}

public Multi<EmailSubscription> getEmailSubscriptionsForUser(String accountNumber, String username) {
public Uni<List<EmailSubscription>> getEmailSubscriptionsForUser(String accountNumber, String username) {
String query = "SELECT es FROM EmailSubscription es LEFT JOIN FETCH es.application a LEFT JOIN FETCH a.bundle b " +
"WHERE es.id.accountId = :accountId AND es.id.userId = :userId";
return session.createQuery(query, EmailSubscription.class)
.setParameter("accountId", accountNumber)
.setParameter("userId", username)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}

public Uni<Long> getEmailSubscribersCount(String accountNumber, String bundleName, String applicationName, EmailSubscriptionType subscriptionType) {
Expand All @@ -80,15 +79,14 @@ public Uni<Long> getEmailSubscribersCount(String accountNumber, String bundleNam
.getSingleResult();
}

public Multi<EmailSubscription> getEmailSubscribers(String accountNumber, String bundleName, String applicationName, EmailSubscriptionType subscriptionType) {
public Uni<List<EmailSubscription>> getEmailSubscribers(String accountNumber, String bundleName, String applicationName, EmailSubscriptionType subscriptionType) {
String query = "FROM EmailSubscription WHERE id.accountId = :accountId AND application.bundle.name = :bundleName " +
"AND application.name = :applicationName AND id.subscriptionType = :subscriptionType";
return session.createQuery(query, EmailSubscription.class)
.setParameter("accountId", accountNumber)
.setParameter("bundleName", bundleName)
.setParameter("applicationName", applicationName)
.setParameter("subscriptionType", subscriptionType)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import com.redhat.cloud.notifications.models.EndpointWebhook;
import com.redhat.cloud.notifications.models.EventType;
import com.redhat.cloud.notifications.models.WebhookAttributes;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.hibernate.reactive.mutiny.Mutiny;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.persistence.PersistenceException;
import javax.ws.rs.BadRequestException;
import java.util.List;
import java.util.UUID;

@ApplicationScoped
Expand All @@ -32,7 +32,7 @@ public Uni<Endpoint> createEndpoint(Endpoint endpoint) {
.replaceWith(endpoint);
}

public Multi<Endpoint> getEndpointsPerType(String tenant, EndpointType type, Boolean activeOnly, Query limiter) {
public Uni<List<Endpoint>> getEndpointsPerType(String tenant, EndpointType type, Boolean activeOnly, Query limiter) {
// TODO Modify the parameter to take a vararg of Functions that modify the query
// TODO Modify to take account selective joins (JOIN (..) UNION (..)) based on the type, same for getEndpoints
String query = "SELECT e FROM Endpoint e LEFT JOIN FETCH e.webhook WHERE e.accountId = :accountId AND e.type = :endpointType";
Expand All @@ -57,8 +57,7 @@ public Multi<Endpoint> getEndpointsPerType(String tenant, EndpointType type, Boo
.setFirstResult(limiter.getLimit().getOffset());
}

return mutinyQuery.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
return mutinyQuery.getResultList();
}

public Uni<Long> getEndpointsCountPerType(String tenant, EndpointType type, Boolean activeOnly) {
Expand All @@ -79,7 +78,7 @@ public Uni<Long> getEndpointsCountPerType(String tenant, EndpointType type, Bool
}

// TODO [BG Phase 2] Delete this method
public Multi<Endpoint> getTargetEndpoints(String tenant, String bundleName, String applicationName, String eventTypeName) {
public Uni<List<Endpoint>> getTargetEndpoints(String tenant, String bundleName, String applicationName, String eventTypeName) {
// TODO Add UNION JOIN for different endpoint types here
String query = "SELECT e FROM Endpoint e LEFT JOIN FETCH e.webhook JOIN e.targets t " +
"WHERE e.enabled = TRUE AND t.eventType.name = :eventTypeName AND t.id.accountId = :accountId " +
Expand All @@ -90,12 +89,11 @@ public Multi<Endpoint> getTargetEndpoints(String tenant, String bundleName, Stri
.setParameter("eventTypeName", eventTypeName)
.setParameter("accountId", tenant)
.setParameter("bundleName", bundleName)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}

// TODO [BG Phase 2] Remove '_BG' suffix
public Multi<Endpoint> getTargetEndpoints_BG(String tenant, String bundleName, String applicationName, String eventTypeName) {
public Uni<List<Endpoint>> getTargetEndpoints_BG(String tenant, String bundleName, String applicationName, String eventTypeName) {
// TODO Add UNION JOIN for different endpoint types here
String query = "SELECT e FROM Endpoint e LEFT JOIN FETCH e.webhook JOIN e.behaviorGroupActions bga JOIN bga.behaviorGroup.behaviors b " +
"WHERE e.enabled = TRUE AND b.eventType.name = :eventTypeName AND bga.behaviorGroup.accountId = :accountId " +
Expand All @@ -106,11 +104,10 @@ public Multi<Endpoint> getTargetEndpoints_BG(String tenant, String bundleName, S
.setParameter("eventTypeName", eventTypeName)
.setParameter("accountId", tenant)
.setParameter("bundleName", bundleName)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}

public Multi<Endpoint> getEndpoints(String tenant, Query limiter) {
public Uni<List<Endpoint>> getEndpoints(String tenant, Query limiter) {
// TODO Add the ability to modify the getEndpoints to return also with JOIN to application_eventtypes_endpoints link table
// or should I just create a new method for it?
String query = "SELECT e FROM Endpoint e LEFT JOIN FETCH e.webhook WHERE e.accountId = :accountId";
Expand All @@ -128,8 +125,7 @@ public Multi<Endpoint> getEndpoints(String tenant, Query limiter) {
.setFirstResult(limiter.getLimit().getOffset());
}

return mutinyQuery.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
return mutinyQuery.getResultList();
}

public Uni<Long> getEndpointsCount(String tenant) {
Expand Down Expand Up @@ -205,7 +201,7 @@ public Uni<Boolean> unlinkEndpoint(String tenant, UUID endpointId, UUID eventTyp
}

// TODO [BG Phase 2] Delete this method
public Multi<Endpoint> getLinkedEndpoints(String tenant, UUID eventTypeId, Query limiter) {
public Uni<List<Endpoint>> getLinkedEndpoints(String tenant, UUID eventTypeId, Query limiter) {
String query = "SELECT e FROM Endpoint e LEFT JOIN FETCH e.webhook JOIN e.targets t WHERE t.id.accountId = :accountId AND t.eventType.id = :eventTypeId";

if (limiter != null) {
Expand All @@ -221,18 +217,16 @@ public Multi<Endpoint> getLinkedEndpoints(String tenant, UUID eventTypeId, Query
.setFirstResult(limiter.getLimit().getOffset());
}

return mutinyQuery.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
return mutinyQuery.getResultList();
}

// TODO [BG Phase 2] Delete this method
public Multi<Endpoint> getDefaultEndpoints(String tenant) {
public Uni<List<Endpoint>> getDefaultEndpoints(String tenant) {
String query = "SELECT e FROM Endpoint e LEFT JOIN FETCH e.webhook JOIN e.defaults d WHERE d.id.accountId = :accountId";

return session.createQuery(query, Endpoint.class)
.setParameter("accountId", tenant)
.getResultList()
.onItem().transformToMulti(Multi.createFrom()::iterable);
.getResultList();
}

// TODO [BG Phase 2] Delete this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ void init() {
public Multi<Endpoint> getDefaultEndpoints(Endpoint defaultEndpoint) {
processedItems.increment();
return resources.getDefaultEndpoints(defaultEndpoint.getAccountId())
.onItem().transformToMulti(Multi.createFrom()::iterable)
.select().where(Endpoint::isEnabled)
.onItem().invoke(() -> enrichedEndpoints.increment());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.hibernate.reactive.mutiny.Mutiny;
import org.reactivestreams.Publisher;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -108,7 +107,8 @@ public EndpointTypeProcessor endpointTypeToProcessor(EndpointType endpointType)
// TODO [BG Phase 2] Delete this method
public Multi<Endpoint> getEndpoints(String tenant, String bundleName, String applicationName, String eventTypeName) {
return resources.getTargetEndpoints(tenant, bundleName, applicationName, eventTypeName)
.onItem().transformToMultiAndConcatenate((Function<Endpoint, Publisher<Endpoint>>) endpoint -> {
.onItem().transformToMulti(Multi.createFrom()::iterable)
.onItem().transformToMultiAndConcatenate((Function<Endpoint, Multi<Endpoint>>) endpoint -> {
// If the tenant has a default endpoint for the eventType, then add the target endpoints here
if (endpoint.getType() == EndpointType.DEFAULT) {
return defaultProcessor.getDefaultEndpoints(endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.redhat.cloud.notifications.ingress.Action;
import com.redhat.cloud.notifications.models.EmailAggregation;
import com.redhat.cloud.notifications.models.EmailAggregationKey;
import com.redhat.cloud.notifications.models.EmailSubscription;
import com.redhat.cloud.notifications.models.EmailSubscriptionType;
import com.redhat.cloud.notifications.models.Notification;
import com.redhat.cloud.notifications.models.NotificationHistory;
Expand Down Expand Up @@ -153,8 +154,11 @@ private Uni<NotificationHistory> sendEmail(Notification item, EmailSubscriptionT
final HttpRequest<Buffer> bopRequest = this.buildBOPHttpRequest();

return this.subscriptionResources.getEmailSubscribers(item.getTenant(), item.getAction().getBundle(), item.getAction().getApplication(), emailSubscriptionType)
.onItem().transform(emailSubscription -> emailSubscription.getUserId())
.collect().with(Collectors.toSet())
.onItem().transform(subscriptions -> {
return subscriptions.stream()
.map(EmailSubscription::getUserId)
.collect(Collectors.toSet());
})
.onItem().transform(userSet -> {
if (userSet.size() > 0) {
return this.buildEmail(userSet);
Expand Down Expand Up @@ -256,6 +260,7 @@ private Multi<Tuple2<NotificationHistory, EmailAggregationKey>> processAggregate

if (subscriberCount > 0 && aggregator != null) {
return emailAggregationResources.getEmailAggregation(aggregationKey, startTime, endTime)
.onItem().transformToMulti(Multi.createFrom()::iterable)
.collect().in(() -> aggregator, AbstractEmailPayloadAggregator::aggregate).toMulti();
}

Expand Down Expand Up @@ -318,6 +323,7 @@ public Uni<List<Tuple2<NotificationHistory, EmailAggregationKey>>> processAggreg
log.info(String.format("Running %s email aggregation for period (%s, %s)", emailSubscriptionType.toString(), startTime.toString(), endTime.toString()));

return emailAggregationResources.getApplicationsWithPendingAggregation(startTime, endTime)
.onItem().transformToMulti(Multi.createFrom()::iterable)
.onItem().transformToMulti(aggregationKey -> processAggregateEmailsByAggregationKey(aggregationKey, startTime, endTime, emailSubscriptionType, delete))
.concatenate().collect().asList()
.onItem().invoke(result -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class EndpointService {
public Uni<EndpointPage> getEndpoints(@Context SecurityContext sec, @BeanParam Query query, @QueryParam("type") String targetType, @QueryParam("active") Boolean activeOnly) {
RhIdPrincipal principal = (RhIdPrincipal) sec.getUserPrincipal();

Multi<Endpoint> endpoints;
Uni<List<Endpoint>> endpoints;
Uni<Long> count;

if (targetType != null) {
Expand All @@ -99,7 +99,7 @@ public Uni<EndpointPage> getEndpoints(@Context SecurityContext sec, @BeanParam Q
count = resources.getEndpointsCount(principal.getAccount());
}

return endpoints.collect().asList()
return endpoints
.onItem().transformToUni(endpointsList -> count
.onItem().transform(endpointsCount -> new EndpointPage(endpointsList, new HashMap<>(), new Meta(endpointsCount))));
}
Expand All @@ -116,6 +116,7 @@ public Uni<Endpoint> createEndpoint(@Context SecurityContext sec, @NotNull @Vali
} else if (endpoint.getType() == EndpointType.DEFAULT) {
// Only a single default endpoint is allowed
return resources.getEndpointsPerType(principal.getAccount(), EndpointType.DEFAULT, null, null)
.onItem().transformToMulti(Multi.createFrom()::iterable)
.toUni()
.onItem()
.ifNull()
Expand Down
Loading

0 comments on commit 65e698b

Please sign in to comment.