Skip to content

Commit

Permalink
ISSUE-5334 Disconnection: the distributed way (#1473)
Browse files Browse the repository at this point in the history
* ISSUE-5334 Disconnection: the distributed way
  • Loading branch information
vttranlina authored Jan 21, 2025
1 parent 8f4aff6 commit 1f8ef65
Show file tree
Hide file tree
Showing 17 changed files with 1,144 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
import com.linagora.tmail.OpenPaasContactsConsumerModule;
import com.linagora.tmail.OpenPaasModule;
import com.linagora.tmail.OpenPaasModuleChooserConfiguration;
import com.linagora.tmail.RabbitMQDisconnectorModule;
import com.linagora.tmail.ScheduledReconnectionHandler;
import com.linagora.tmail.UsersRepositoryModuleChooser;
import com.linagora.tmail.blob.guice.BlobStoreCacheModulesChooser;
Expand Down Expand Up @@ -338,6 +339,7 @@ protected void configure() {
new DistributedEmailAddressContactEventDeadLettersModule(),
new DistributedTaskSerializationModule(),
new JMAPEventBusModule(),
new RabbitMQDisconnectorModule(),
new RabbitMQEmailAddressContactModule(),
new RabbitMQEventBusModule(),
new RabbitMQModule(),
Expand Down
61 changes: 61 additions & 0 deletions tmail-backend/data/data-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.linagora.tmail</groupId>
<artifactId>tmail-backend</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>data-rabbitmq</artifactId>
<name>Twake Mail :: Data :: RabbitMQ</name>

<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-rabbitmq</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-rabbitmq</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-data-api</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>metrics-tests</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.javacrumbs.json-unit</groupId>
<artifactId>json-unit-assertj</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/********************************************************************
* As a subpart of Twake Mail, this file is edited by Linagora. *
* *
* https://twake-mail.com/ *
* https://linagora.com *
* *
* This file is subject to The Affero Gnu Public License *
* version 3. *
* *
* https://www.gnu.org/licenses/agpl-3.0.en.html *
* *
* This program is distributed in the hope that it will be *
* useful, but WITHOUT ANY WARRANTY; without even the implied *
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR *
* PURPOSE. See the GNU Affero General Public License for *
* more details. *
********************************************************************/

package com.linagora.tmail;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.apache.james.DisconnectorNotifier.AllUsersRequest;
import org.apache.james.DisconnectorNotifier.MultipleUserRequest;
import org.apache.james.DisconnectorNotifier.Request;
import org.apache.james.core.Username;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

public class DisconnectorRequestSerializer {

public static class DisconnectorRequestSerializeException extends RuntimeException {

public DisconnectorRequestSerializeException(String message, Throwable cause) {
super(message, cause);
}
}

public static final String ALL_USERS_REQUEST = "[]";
public static final byte[] ALL_USERS_REQUEST_BYTES = ALL_USERS_REQUEST.getBytes(StandardCharsets.UTF_8);
public static final TypeReference<List<String>> LIST_OF_STRING = new TypeReference<>() {
};

private final ObjectMapper objectMapper;

@Inject
@Singleton
public DisconnectorRequestSerializer() {
this.objectMapper = new ObjectMapper();
}

public byte[] serialize(Request request) throws JsonProcessingException {
return switch (request) {
case MultipleUserRequest multipleUserRequest -> objectMapper.writeValueAsBytes(
multipleUserRequest.usernameList().stream()
.map(Username::asString)
.toList());
case AllUsersRequest allUsersRequest -> ALL_USERS_REQUEST_BYTES;
};
}

public Request deserialize(byte[] serialized) {
if (serialized.length == 2 && serialized[0] == '[' && serialized[1] == ']') {
return AllUsersRequest.ALL_USERS_REQUEST;
}
try {
Set<Username> usernameSet = objectMapper.readValue(serialized, LIST_OF_STRING)
.stream().map(Username::of)
.collect(Collectors.toSet());
return new MultipleUserRequest(usernameSet);
} catch (Exception e) {
throw new DisconnectorRequestSerializeException("Error while deserializing: " + new String(serialized, StandardCharsets.UTF_8), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/********************************************************************
* As a subpart of Twake Mail, this file is edited by Linagora. *
* *
* https://twake-mail.com/ *
* https://linagora.com *
* *
* This file is subject to The Affero Gnu Public License *
* version 3. *
* *
* https://www.gnu.org/licenses/agpl-3.0.en.html *
* *
* This program is distributed in the hope that it will be *
* useful, but WITHOUT ANY WARRANTY; without even the implied *
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR *
* PURPOSE. See the GNU Affero General Public License for *
* more details. *
********************************************************************/

package com.linagora.tmail;

import java.io.Closeable;
import java.net.InetAddress;
import java.util.Optional;
import java.util.UUID;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.apache.james.DisconnectorNotifier.InVMDisconnectorNotifier;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.lifecycle.api.Startable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.Receiver;

public class RabbitMQDisconnectorConsumer implements Startable, Closeable {

public static final String TMAIL_DISCONNECTOR_QUEUE_NAME = "tmail-disconnector-" +
Throwing.supplier(() -> InetAddress.getLocalHost().getHostName()).get() +
"-" + UUID.randomUUID();

private static final boolean REQUEUE_ON_NACK = true;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDisconnectorConsumer.class);

private final ReceiverProvider receiverProvider;
private final InVMDisconnectorNotifier inVMDisconnectorNotifier;
private final DisconnectorRequestSerializer deserializer;
private final String disconnectorQueueName;

private Disposable consumeMessages;

@VisibleForTesting
public RabbitMQDisconnectorConsumer(ReceiverProvider receiverProvider,
InVMDisconnectorNotifier inVMDisconnectorNotifier,
DisconnectorRequestSerializer deserializer,
String disconnectorQueueName) {
this.receiverProvider = receiverProvider;
this.inVMDisconnectorNotifier = inVMDisconnectorNotifier;
this.deserializer = deserializer;
this.disconnectorQueueName = disconnectorQueueName;
}

@Inject
@Singleton
public RabbitMQDisconnectorConsumer(ReceiverProvider receiverProvider,
InVMDisconnectorNotifier inVMDisconnectorNotifier,
DisconnectorRequestSerializer deserializer) {
this(receiverProvider, inVMDisconnectorNotifier, deserializer, TMAIL_DISCONNECTOR_QUEUE_NAME);
}

public void start() {
consumeMessages = doConsumeMessages();
}

public void restart() {
Disposable previousConsumer = consumeMessages;
consumeMessages = doConsumeMessages();
Optional.ofNullable(previousConsumer).ifPresent(Disposable::dispose);
}

private Disposable doConsumeMessages() {
return Flux.using(receiverProvider::createReceiver,
receiver -> receiver.consumeManualAck(disconnectorQueueName),
Receiver::close)
.flatMap(this::consumeMessage)
.subscribe();
}

private Mono<Void> consumeMessage(AcknowledgableDelivery ackDelivery) {
return Mono.fromCallable(() -> deserializer.deserialize(ackDelivery.getBody()))
.flatMap(disconnectorRequest -> Mono.fromRunnable(() -> inVMDisconnectorNotifier.disconnect(disconnectorRequest)).then())
.doOnSuccess(result -> ackDelivery.ack())
.onErrorResume(error -> {
LOGGER.error("Error when consume message", error);
ackDelivery.nack(!REQUEUE_ON_NACK);
return Mono.empty();
});
}

@Override
public void close() {
Optional.ofNullable(consumeMessages).ifPresent(Disposable::dispose);
}

public String disconnectorQueueName() {
return disconnectorQueueName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/********************************************************************
* As a subpart of Twake Mail, this file is edited by Linagora. *
* *
* https://twake-mail.com/ *
* https://linagora.com *
* *
* This file is subject to The Affero Gnu Public License *
* version 3. *
* *
* https://www.gnu.org/licenses/agpl-3.0.en.html *
* *
* This program is distributed in the hope that it will be *
* useful, but WITHOUT ANY WARRANTY; without even the implied *
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR *
* PURPOSE. See the GNU Affero General Public License for *
* more details. *
********************************************************************/

package com.linagora.tmail;

import java.time.Duration;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.apache.commons.lang3.StringUtils;
import org.apache.james.DisconnectorNotifier;
import org.apache.james.lifecycle.api.Startable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;

public class RabbitMQDisconnectorNotifier implements DisconnectorNotifier, Startable {
public static final String TMAIL_DISCONNECTOR_EXCHANGE_NAME = "tmail-disconnector";
public static final String ROUTING_KEY = StringUtils.EMPTY;

private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDisconnectorNotifier.class);
private static final Retry RETRY_SPEC = Retry.backoff(2, Duration.ofMillis(100));

private final Sender sender;
private final DisconnectorRequestSerializer serializer;

@Inject
@Singleton
public RabbitMQDisconnectorNotifier(Sender sender,
DisconnectorRequestSerializer serializer) {
this.sender = sender;
this.serializer = serializer;
}

@Override
public void disconnect(Request request) {
try {
sender.send(Mono.just(new OutboundMessage(TMAIL_DISCONNECTOR_EXCHANGE_NAME,
ROUTING_KEY,
serializer.serialize(request))))
.retryWhen(RETRY_SPEC)
.block();
} catch (Exception exception) {
LOGGER.error("Error while sending disconnection request", exception);
}
}
}
Loading

0 comments on commit 1f8ef65

Please sign in to comment.