Skip to content

Commit

Permalink
Merge pull request #2653 from guusdk/OF-2951_Async-packet-delivery
Browse files Browse the repository at this point in the history
OF-2951: Async stanza delivery
  • Loading branch information
akrherz authored Jan 14, 2025
2 parents 5606c82 + ede2313 commit 8ff4746
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2004-2008 Jive Software, 2017-2019 Ignite Realtime Foundation. All rights reserved.
* Copyright (C) 2004-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,10 @@
import org.xmpp.packet.Packet;
import org.jivesoftware.openfire.auth.UnauthorizedException;

import javax.annotation.Nonnull;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

/**
* Delivers packets to locally connected streams. This is the opposite
* of the packet transporter.
Expand All @@ -28,15 +32,40 @@
public interface PacketDeliverer {

/**
* Delivers the given packet based on packet recipient and sender. The
* deliverer defers actual routing decisions to other classes.
* <h2>Warning</h2>
* Be careful to enforce concurrency DbC of concurrent by synchronizing
* any accesses to class resources.
* Delivers the given stanza based on its recipient and sender.
*
* Invocation of this method blocks until the deliverer finishes processing the stanza.
*
* The deliverer defers actual routing decisions to other classes.
*
* @param packet the packet to route
* @param stanza the stanza to route
* @throws PacketException if the packet is null or the packet could not be routed.
* @throws UnauthorizedException if the user is not authorised
*/
void deliver( Packet packet ) throws UnauthorizedException, PacketException;
void deliver(@Nonnull final Packet stanza) throws UnauthorizedException, PacketException;

/**
/**
* Delivers the given stanza based on its recipient and sender.
*
* Invocation of this method blocks until the deliverer finishes processing the stanza.
*
* The deliverer defers actual routing decisions to other classes.
*
* @param stanza the stanza to route
* @return A future from which any exception thrown while processing the stanza can be obtained.
*/
default CompletableFuture<Void> deliverAsync(@Nonnull final Packet stanza) {
try {
return CompletableFuture.runAsync(() -> {
try {
deliver(stanza);
} catch (UnauthorizedException e) {
throw new CompletionException(e);
}
});
} catch (Throwable t) {
return CompletableFuture.failedFuture(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2005-2008 Jive Software, 2017-2023 Ignite Realtime Foundation. All rights reserved.
* Copyright (C) 2005-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,8 @@
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;

import javax.annotation.Nonnull;

/**
* Fallback method used by {@link org.jivesoftware.openfire.nio.NettyConnection} when
* connected to a connection manager. The fallback method will be used when a connection
Expand All @@ -49,9 +51,9 @@ public class MultiplexerPacketDeliverer implements PacketDeliverer {

private static final Logger Log = LoggerFactory.getLogger(MultiplexerPacketDeliverer.class);

private OfflineMessageStrategy messageStrategy;
private final OfflineMessageStrategy messageStrategy;
private String connectionManagerDomain;
private ConnectionMultiplexerManager multiplexerManager;
private final ConnectionMultiplexerManager multiplexerManager;

public MultiplexerPacketDeliverer() {
this.messageStrategy = XMPPServer.getInstance().getOfflineMessageStrategy();
Expand All @@ -63,23 +65,23 @@ public void setConnectionManagerDomain(String connectionManagerDomain) {
}

@Override
public void deliver(Packet packet) throws UnauthorizedException, PacketException {
public void deliver(@Nonnull final Packet stanza) throws UnauthorizedException, PacketException {
// Check if we can send the packet using another session
if (connectionManagerDomain == null) {
// Packet deliverer has not yet been configured so handle unprocessed packet
handleUnprocessedPacket(packet);
handleUnprocessedPacket(stanza);
}
else {
// Try getting another session to the same connection manager
ConnectionMultiplexerSession session =
multiplexerManager.getMultiplexerSession(connectionManagerDomain);
if (session == null || session.isClosed()) {
// No other session was found so handle unprocessed packet
handleUnprocessedPacket(packet);
handleUnprocessedPacket(stanza);
}
else {
// Send the packet using this other session to the same connection manager
session.process(packet);
session.process(stanza);
}
}
}
Expand All @@ -93,8 +95,7 @@ private void handleUnprocessedPacket(Packet packet) {
messageStrategy.storeOffline((Message) packet);
}
else if (packet instanceof Presence) {
// presence packets are dropped silently
//dropPacket(packet);
Log.trace("Silently dropping presence stanza: {}", packet);
}
else if (packet instanceof IQ) {
IQ iq = (IQ) packet;
Expand All @@ -106,16 +107,15 @@ else if (packet instanceof IQ) {
Element send = child.element("send");
if (send != null) {
// Unwrap packet
Element wrappedElement = (Element) send.elements().get(0);
Element wrappedElement = send.elements().get(0);
if ("message".equals(wrappedElement.getName())) {
handleUnprocessedPacket(new Message(wrappedElement));
}
}
}
else {
// IQ packets are logged but dropped
Log.warn(LocaleUtils.getLocalizedString("admin.error.routing") + "\n" +
packet.toString());
Log.warn(LocaleUtils.getLocalizedString("admin.error.routing") + "\n" + packet);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2005-2015 Jive Software, 2017-2023 Ignite Realtime Foundation. All rights reserved.
* Copyright (C) 2005-2015 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import org.xmpp.packet.Packet;
import org.xmpp.packet.Presence;

import javax.annotation.Nonnull;

/**
* Fallback method used by {@link org.jivesoftware.openfire.nio.NettyConnection} when a
* connection fails to send a {@link Packet} (likely because it was closed). Message packets
Expand All @@ -39,25 +41,24 @@ public class OfflinePacketDeliverer implements PacketDeliverer {

private static final Logger Log = LoggerFactory.getLogger(OfflinePacketDeliverer.class);

private OfflineMessageStrategy messageStrategy;
private final OfflineMessageStrategy messageStrategy;

public OfflinePacketDeliverer() {
this.messageStrategy = XMPPServer.getInstance().getOfflineMessageStrategy();
}

@Override
public void deliver(Packet packet) throws UnauthorizedException, PacketException {
if (packet instanceof Message) {
messageStrategy.storeOffline((Message) packet);
public void deliver(final @Nonnull Packet stanza) throws UnauthorizedException, PacketException
{
if (stanza instanceof Message) {
messageStrategy.storeOffline((Message) stanza);
}
else if (packet instanceof Presence) {
// presence packets are dropped silently
else if (stanza instanceof Presence) {
Log.trace("Silently dropping a presence stanza.");
}
else if (packet instanceof IQ) {
else if (stanza instanceof IQ) {
// IQ packets are logged before being dropped
Log.warn(LocaleUtils.getLocalizedString("admin.error.routing") + "\n" + packet.toString());
Log.warn(LocaleUtils.getLocalizedString("admin.error.routing") + "\n" + stanza);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2004-2008 Jive Software, 2017-2018 Ignite Realtime Foundation. All rights reserved.
* Copyright (C) 2004-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
import org.jivesoftware.openfire.net.SocketPacketWriteHandler;
import org.xmpp.packet.Packet;

import javax.annotation.Nonnull;

/**
* In-memory implementation of the packet deliverer service
*
Expand All @@ -41,12 +43,12 @@ public PacketDelivererImpl() {
}

@Override
public void deliver(Packet packet) throws UnauthorizedException, PacketException {
public void deliver(@Nonnull final Packet packet) throws UnauthorizedException, PacketException {
if (packet == null) {
throw new PacketException("Packet was null");
}
if (deliverHandler == null) {
throw new PacketException("Could not send packet - no route" + packet.toString());
throw new PacketException("Could not send packet - no route: " + packet.toString());
}
// Let the SocketPacketWriteHandler process the packet. SocketPacketWriteHandler may send
// it over the socket or store it when user is offline or drop it.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2004-2008 Jive Software, 2017-2023 Ignite Realtime Foundation. All rights reserved.
* Copyright (C) 2004-2008 Jive Software, 2017-2025 Ignite Realtime Foundation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -435,7 +435,7 @@ public void probePresence(JID prober, JID probee) {
probePresence.setFrom(prober);
probePresence.setTo(probee.toBareJID());
// Send the probe presence
deliverer.deliver(probePresence);
deliverer.deliverAsync(probePresence).whenComplete((v, t) -> { if (t != null) { Log.warn("Unable to probe presence from {} to {}", prober, probee, t); }});
}
else {
// The probee may be related to a component that has not yet been connected so
Expand Down

0 comments on commit 8ff4746

Please sign in to comment.