Skip to content

Commit

Permalink
refactor: subscription migration
Browse files Browse the repository at this point in the history
  • Loading branch information
guqing committed Apr 25, 2024
1 parent 2cbc375 commit c2a31d4
Show file tree
Hide file tree
Showing 15 changed files with 452 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,11 @@ public Result reconcile(Request request) {
return;
}
if (addFinalizers(comment.getMetadata(), Set.of(FINALIZER_NAME))) {
replyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment);
client.update(comment);
eventPublisher.publishEvent(new CommentCreatedEvent(this, comment));
}

replyNotificationSubscriptionHelper.subscribeNewReplyReasonForComment(comment);

compatibleCreationTime(comment);
Comment.CommentStatus status = comment.getStatusOrDefault();
status.setHasNewReply(defaultIfNull(status.getUnreadReplyCount(), 0) > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public Result reconcile(Request request) {
return;
}
if (addFinalizers(reply.getMetadata(), Set.of(FINALIZER_NAME))) {
replyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply);
client.update(reply);
eventPublisher.publishEvent(new ReplyCreatedEvent(this, reply));
}
Expand All @@ -59,8 +60,6 @@ public Result reconcile(Request request) {
}
client.update(reply);

replyNotificationSubscriptionHelper.subscribeNewReplyReasonForReply(reply);

eventPublisher.publishEvent(new ReplyChangedEvent(this, reply));
});
return new Result(false, null);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package run.halo.app.infra;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ListOptions;

/**
* Reactive extension paginated operator to handle extensions by pagination.
*
* @author guqing
* @since 2.15.0
*/
public interface ReactiveExtensionPaginatedOperator {

/**
* <p>Deletes all data, including any new entries added during the execution of this method.</p>
* <p>This method continuously monitors and removes data that appears throughout its runtime,
* ensuring that even data created during the deletion process is also removed.</p>
*/
<E extends Extension> Mono<Void> deleteContinuously(Class<E> type,
ListOptions listOptions);

/**
* <p>Deletes only the data that existed at the start of the operation.</p>
* <p>This method takes a snapshot of the data at the beginning and deletes only that dataset;
* any data added after the method starts will not be affected or removed.</p>
*/
<E extends Extension> Flux<E> deleteInitialBatch(Class<E> type,
ListOptions listOptions);

/**
* <p>Note that: This method can not be used for <code>deletion</code> operation, because
* deletion operation will change the total records.</p>
*/
<E extends Extension> Flux<E> list(Class<E> type, ListOptions listOptions);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package run.halo.app.infra;

import static run.halo.app.extension.index.query.QueryFactory.isNull;

import java.util.concurrent.atomic.AtomicLong;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ListResult;
import run.halo.app.extension.PageRequest;
import run.halo.app.extension.PageRequestImpl;
import run.halo.app.extension.ReactiveExtensionClient;

@Component
@RequiredArgsConstructor
public class ReactiveExtensionPaginatedOperatorImpl implements ReactiveExtensionPaginatedOperator {
private static final int DEFAULT_PAGE_SIZE = 200;
private final ReactiveExtensionClient client;

@Override
public <E extends Extension> Mono<Void> deleteContinuously(Class<E> type,
ListOptions listOptions) {
var pageRequest = createPageRequest();
return cleanupContinuously(type, listOptions, pageRequest);
}

private <E extends Extension> Mono<Void> cleanupContinuously(Class<E> type,
ListOptions listOptions,
PageRequest pageRequest) {
// forever loop first page until no more to delete
return pageBy(type, listOptions, pageRequest)
.flatMap(page -> Flux.fromIterable(page.getItems())
.flatMap(client::delete)
.then(page.hasNext() ? cleanupContinuously(type, listOptions, pageRequest)
: Mono.empty())
);
}

@Override
public <E extends Extension> Flux<E> deleteInitialBatch(Class<E> type,
ListOptions listOptions) {
var pageRequest = createPageRequest();
var newFieldQuery = listOptions.getFieldSelector()
.andQuery(isNull("metadata.deletionTimestamp"));
listOptions.setFieldSelector(newFieldQuery);

final AtomicLong totalRecords = new AtomicLong(0);
final AtomicLong consumedRecords = new AtomicLong(0);

return pageBy(type, listOptions, pageRequest)
.doOnNext(page -> totalRecords.compareAndSet(0, page.getTotal()))
// forever loop first page until no more to delete
.expandDeep(page -> hasMorePages(page, consumedRecords.get(), totalRecords.get())
? pageBy(type, listOptions, pageRequest) : Mono.empty())
.flatMap(page -> Flux.fromIterable(page.getItems()))
.takeWhile(item -> consumedRecords.incrementAndGet() <= totalRecords.get())
.flatMap(client::delete);
}

private <E extends Extension> boolean hasMorePages(ListResult<E> result, long consumedRecords,
long totalRecords) {
return result.hasNext() && consumedRecords < totalRecords;
}

@Override
public <E extends Extension> Flux<E> list(Class<E> type, ListOptions listOptions) {
var pageRequest = createPageRequest();
return list(type, listOptions, pageRequest);
}

/**
* Paginated list all items to avoid memory overflow.
* <pre>
* 1. Retrieve data multiple times until all data is consumed.
* 2. Fetch next page if current page has more data and consumed records is less than total
* records.
* 3. Take while consumed records is less than total records.
* 4. totalRecords from first page to ensure new inserted data will not be counted in during
* querying to avoid infinite loop.
* </pre>
*/
private <E extends Extension> Flux<E> list(Class<E> type, ListOptions listOptions,
PageRequest pageRequest) {
final AtomicLong totalRecords = new AtomicLong(0);
final AtomicLong consumedRecords = new AtomicLong(0);
return pageBy(type, listOptions, pageRequest)
// set total records in first page
.doOnNext(page -> totalRecords.compareAndSet(0, page.getTotal()))
.expandDeep(page -> {
if (hasMorePages(page, consumedRecords.get(), totalRecords.get())) {
// fetch next page
PageRequest nextPageRequest = pageRequest.next();
return pageBy(type, listOptions, nextPageRequest);
} else {
return Mono.empty();
}
})
.flatMap(page -> Flux.fromIterable(page.getItems()))
.takeWhile(item -> consumedRecords.incrementAndGet() <= totalRecords.get());
}

private PageRequest createPageRequest() {
return PageRequestImpl.of(1, DEFAULT_PAGE_SIZE,
Sort.by("metadata.creationTimestamp", "metadata.name"));
}

private <E extends Extension> Mono<ListResult<E>> pageBy(Class<E> type, ListOptions listOptions,
PageRequest pageRequest) {
return client.listBy(type, listOptions, pageRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,13 @@ public Mono<Subscription> subscribe(Subscription.Subscriber subscriber,

@Override
public Mono<Void> unsubscribe(Subscription.Subscriber subscriber) {
return subscriptionService.list(subscriber)
.flatMap(subscriptionService::remove)
.then();
return subscriptionService.remove(subscriber).then();
}

@Override
public Mono<Void> unsubscribe(Subscription.Subscriber subscriber,
Subscription.InterestReason reason) {
return subscriptionService.list(subscriber, reason)
.flatMap(subscriptionService::remove)
.then();
return subscriptionService.remove(subscriber, reason).then();
}

Flux<String> getNotifiersBySubscriber(Subscriber subscriber, Reason reason) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class RecipientResolverImpl implements RecipientResolver {
@Override
public Flux<Subscriber> resolve(Reason reason) {
var reasonType = reason.getSpec().getReasonType();
return subscriptionService.list(reasonType)
return subscriptionService.listByPerPage(reasonType)
.filter(this::isNotDisabled)
.filter(subscription -> {
var interestReason = subscription.getSpec().getReason();
Expand Down
Loading

0 comments on commit c2a31d4

Please sign in to comment.