From 334cbfdbf8877e410aa0b36f92335dc6b344632e Mon Sep 17 00:00:00 2001 From: "DROUET, Guillaume" Date: Sun, 25 May 2014 19:46:41 +0200 Subject: [PATCH 1/2] #1549 --- .../HeartbeatAtmosphereResourceEvent.java | 44 ++++++++++++++++ .../managed/ManagedAtmosphereHandler.java | 13 +++++ .../atmosphere/config/service/Heartbeat.java | 32 ++++++++++++ .../cpr/AtmosphereResourceEventListener.java | 7 +++ ...tmosphereResourceEventListenerAdapter.java | 13 +++++ .../cpr/AtmosphereResourceImpl.java | 18 ++++++- .../interceptor/HeartbeatInterceptor.java | 48 ++++++++++++++++- .../WebSocketEventListenerAdapter.java | 5 ++ .../ManagedAtmosphereHandlerTest.java | 51 ++++++++++++++++++- .../cpr/AtmosphereResourceListenerTest.java | 5 ++ 10 files changed, 232 insertions(+), 4 deletions(-) create mode 100644 modules/cpr/src/main/java/org/atmosphere/HeartbeatAtmosphereResourceEvent.java create mode 100644 modules/cpr/src/main/java/org/atmosphere/config/service/Heartbeat.java diff --git a/modules/cpr/src/main/java/org/atmosphere/HeartbeatAtmosphereResourceEvent.java b/modules/cpr/src/main/java/org/atmosphere/HeartbeatAtmosphereResourceEvent.java new file mode 100644 index 00000000000..53f8c9e1303 --- /dev/null +++ b/modules/cpr/src/main/java/org/atmosphere/HeartbeatAtmosphereResourceEvent.java @@ -0,0 +1,44 @@ +/* + * Copyright 2014 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.atmosphere; + +import org.atmosphere.cpr.AtmosphereResourceEventImpl; +import org.atmosphere.cpr.AtmosphereResourceImpl; + +/** + *

+ * Specifies to the observable that {@link org.atmosphere.cpr.AtmosphereResourceEventListener#onHeartbeat(org.atmosphere.cpr.AtmosphereResourceEvent)} + * should be invoked when it fires event to observers. + *

+ * + * @version 1.0 + * @author Guillaume DROUET + * @since 2.2 + */ +public class HeartbeatAtmosphereResourceEvent extends AtmosphereResourceEventImpl { + + /** + *

+ * Builds a new event. + *

+ * + * @param resource the resource + */ + public HeartbeatAtmosphereResourceEvent(final AtmosphereResourceImpl resource) { + super(resource); + } +} diff --git a/modules/cpr/src/main/java/org/atmosphere/config/managed/ManagedAtmosphereHandler.java b/modules/cpr/src/main/java/org/atmosphere/config/managed/ManagedAtmosphereHandler.java index 3733c6ee5a4..e3233c083fb 100644 --- a/modules/cpr/src/main/java/org/atmosphere/config/managed/ManagedAtmosphereHandler.java +++ b/modules/cpr/src/main/java/org/atmosphere/config/managed/ManagedAtmosphereHandler.java @@ -18,6 +18,7 @@ import org.atmosphere.config.service.Delete; import org.atmosphere.config.service.Disconnect; import org.atmosphere.config.service.Get; +import org.atmosphere.config.service.Heartbeat; import org.atmosphere.config.service.Message; import org.atmosphere.config.service.PathParam; import org.atmosphere.config.service.Post; @@ -54,6 +55,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnClose; +import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnHeartbeat; import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnResume; import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnSuspend; import static org.atmosphere.util.IOUtils.isBodyEmpty; @@ -70,6 +72,7 @@ public class ManagedAtmosphereHandler extends AbstractReflectorAtmosphereHandler private final static List> EMPTY = Collections.>emptyList(); private Object proxiedInstance; private List onRuntimeMethod; + private Method onHeartbeatMethod; private Method onDisconnectMethod; private Method onTimeoutMethod; private Method onGetMethod; @@ -91,6 +94,7 @@ public ManagedAtmosphereHandler() { public AnnotatedProxy configure(AtmosphereConfig config, Object c) { this.proxiedInstance = c; this.onRuntimeMethod = populateMessage(c, Message.class); + this.onHeartbeatMethod = populate(c, Heartbeat.class); this.onDisconnectMethod = populate(c, Disconnect.class); this.onTimeoutMethod = populate(c, Resume.class); this.onGetMethod = populate(c, Get.class); @@ -137,6 +141,15 @@ public void onResume(AtmosphereResourceEvent event) { }); } + if (onHeartbeatMethod != null && !polling) { + resource.addEventListener(new OnHeartbeat() { + @Override + public void onHeartbeat(AtmosphereResourceEvent event) { + invoke(onHeartbeatMethod, event); + } + }); + } + resource.addEventListener(new OnClose() { @Override public void onClose(AtmosphereResourceEvent event) { diff --git a/modules/cpr/src/main/java/org/atmosphere/config/service/Heartbeat.java b/modules/cpr/src/main/java/org/atmosphere/config/service/Heartbeat.java new file mode 100644 index 00000000000..a77cc8bdf92 --- /dev/null +++ b/modules/cpr/src/main/java/org/atmosphere/config/service/Heartbeat.java @@ -0,0 +1,32 @@ +/* + * Copyright 2014 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.config.service; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotate a method that will get invoked when the client sends an heartbeat. {@link org.atmosphere.interceptor.HeartbeatInterceptor} + * must be installed. + * + * @author Jeanfrancois Arcand + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Heartbeat { +} diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceEventListener.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceEventListener.java index 6e5e6577095..d02cc0b07d5 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceEventListener.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceEventListener.java @@ -48,6 +48,13 @@ public interface AtmosphereResourceEventListener { */ void onResume(AtmosphereResourceEvent event); + /** + * Invoked when the remote connections send a heartbeat. + * + * @param event a {@link AtmosphereResourceEvent} + */ + void onHeartbeat(AtmosphereResourceEvent event); + /** * Invoked when the remote connection gets closed. * diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceEventListenerAdapter.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceEventListenerAdapter.java index 25e196ffaeb..4daf5100691 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceEventListenerAdapter.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceEventListenerAdapter.java @@ -42,6 +42,11 @@ public void onResume(AtmosphereResourceEvent event) { logger.trace("{}", event); } + @Override + public void onHeartbeat(AtmosphereResourceEvent event) { + logger.trace("{}", event); + } + @Override public void onDisconnect(AtmosphereResourceEvent event) { logger.trace("{}", event); @@ -62,6 +67,14 @@ public void onClose(AtmosphereResourceEvent event) { logger.trace("{}", event); } + /** + * On Heartbeat's Listener + */ + abstract static public class OnHeartbeat extends AtmosphereResourceEventListenerAdapter { + @Override + abstract public void onHeartbeat(AtmosphereResourceEvent event); + } + /** * On Suspend's Listener */ diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java index 34a852c0685..36f5348198b 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceImpl.java @@ -15,6 +15,7 @@ */ package org.atmosphere.cpr; +import org.atmosphere.HeartbeatAtmosphereResourceEvent; import org.atmosphere.interceptor.AllowInterceptor; import org.atmosphere.util.Utils; import org.atmosphere.websocket.WebSocket; @@ -592,7 +593,9 @@ public AtmosphereResource notifyListeners(AtmosphereResourceEvent event) { Action oldAction = action; try { - if (event.isClosedByApplication()) { + if (HeartbeatAtmosphereResourceEvent.class.isAssignableFrom(event.getClass())) { + onHeartbeat(event); + } else if (event.isClosedByApplication()) { onClose(event); } else if (event.isCancelled() || event.isClosedByClient()) { if (!disconnected.getAndSet(true)) { @@ -647,6 +650,19 @@ void onThrowable(AtmosphereResourceEvent e) { } } + /** + *

+ * Notifies to all listeners that a heartbeat has been sent. + *

+ * + * @param e the event + */ + void onHeartbeat(AtmosphereResourceEvent e) { + for (AtmosphereResourceEventListener r : listeners) { + r.onHeartbeat(e); + } + } + void onSuspend(AtmosphereResourceEvent e) { for (AtmosphereResourceEventListener r : listeners) { if (disableSuspendEvent) { diff --git a/modules/cpr/src/main/java/org/atmosphere/interceptor/HeartbeatInterceptor.java b/modules/cpr/src/main/java/org/atmosphere/interceptor/HeartbeatInterceptor.java index 5a0c397ffcb..1c44480cfa4 100644 --- a/modules/cpr/src/main/java/org/atmosphere/interceptor/HeartbeatInterceptor.java +++ b/modules/cpr/src/main/java/org/atmosphere/interceptor/HeartbeatInterceptor.java @@ -15,6 +15,7 @@ */ package org.atmosphere.interceptor; +import org.atmosphere.HeartbeatAtmosphereResourceEvent; import org.atmosphere.cpr.Action; import org.atmosphere.cpr.AsyncIOInterceptorAdapter; import org.atmosphere.cpr.AsyncIOWriter; @@ -24,6 +25,7 @@ import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResourceEvent; +import org.atmosphere.cpr.AtmosphereResourceEventListener; import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter; import org.atmosphere.cpr.AtmosphereResourceImpl; import org.atmosphere.cpr.AtmosphereResponse; @@ -36,7 +38,10 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Map; +import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -79,6 +84,8 @@ public class HeartbeatInterceptor extends AtmosphereInterceptorAdapter { private int heartbeatFrequencyInSeconds = 60; + private Map onHeartbeatListeners = new ConcurrentHashMap(); + /** * Heartbeat from client disabled by default. */ @@ -148,6 +155,30 @@ public Clock() { } } + /** + * {@inheritDoc} + */ + @Override + public void postInspect(final AtmosphereResource r) { + // We track any OnHeartbeat when added by the handler cause the framework don't keep it during resource lifecycle + if (!onHeartbeatListeners.containsKey(r.uuid())) { + // TODO: see https://github.com/Atmosphere/atmosphere/issues/1561 + final Queue listeners = AtmosphereResourceImpl.class.cast(r).listeners(); + + for (final AtmosphereResourceEventListener l : listeners) { + if (AtmosphereResourceEventListenerAdapter.OnHeartbeat.class.isAssignableFrom(l.getClass())) { + r.addEventListener(new AtmosphereResourceEventListenerAdapter.OnDisconnect() { + public void onDisconnect(final AtmosphereResourceEvent event) { + onHeartbeatListeners.remove(event.getResource().uuid()); + } + }); + onHeartbeatListeners.put(r.uuid(), l); + break; + } + } + } + } + @Override public Action inspect(final AtmosphereResource r) { final int interval = extractHeartbeatInterval(r); @@ -216,10 +247,23 @@ public void postPayload(final AtmosphereResponse response, byte[] data, int offs }); r.getRequest().setAttribute(INTERCEPTOR_ADDED, Boolean.TRUE); } else { - // This is where we should dispatch an event to notify that an heartbeat has been intercepted - // See: https://github.com/Atmosphere/atmosphere/issues/1549 byte[] body = IOUtils.readEntirelyAsByte(r); if (Arrays.equals(paddingBytes, body)) { + // Dispatch an event to notify that a heartbeat has been intercepted + // TODO: see https://github.com/Atmosphere/atmosphere/issues/1561 + final AtmosphereResourceEvent event = new HeartbeatAtmosphereResourceEvent(AtmosphereResourceImpl.class.cast(r)); + + + // Check if a listener is defined + final AtmosphereResourceEventListener kept = onHeartbeatListeners.get(r.uuid()); + + if (kept != null) { + kept.onHeartbeat(event); + } else { + // In case of listener is still bound to the resource + r.notifyListeners(event); + } + return Action.CANCELLED; } request.body(body); diff --git a/modules/cpr/src/main/java/org/atmosphere/websocket/WebSocketEventListenerAdapter.java b/modules/cpr/src/main/java/org/atmosphere/websocket/WebSocketEventListenerAdapter.java index 01590cb22bc..e57898995f2 100644 --- a/modules/cpr/src/main/java/org/atmosphere/websocket/WebSocketEventListenerAdapter.java +++ b/modules/cpr/src/main/java/org/atmosphere/websocket/WebSocketEventListenerAdapter.java @@ -73,6 +73,11 @@ public void onResume(AtmosphereResourceEvent event) { logger.trace("{}", event); } + @Override + public void onHeartbeat(AtmosphereResourceEvent event) { + logger.trace("{}", event); + } + @Override public void onDisconnect(AtmosphereResourceEvent event) { logger.trace("{}", event); diff --git a/modules/cpr/src/test/java/org/atmosphere/annotation/ManagedAtmosphereHandlerTest.java b/modules/cpr/src/test/java/org/atmosphere/annotation/ManagedAtmosphereHandlerTest.java index 0b2c6c9fff6..8f7edb0c909 100644 --- a/modules/cpr/src/test/java/org/atmosphere/annotation/ManagedAtmosphereHandlerTest.java +++ b/modules/cpr/src/test/java/org/atmosphere/annotation/ManagedAtmosphereHandlerTest.java @@ -23,6 +23,9 @@ import org.atmosphere.config.service.Put; import org.atmosphere.config.service.Ready; import org.atmosphere.cpr.Action; +import org.atmosphere.cpr.ApplicationConfig; +import org.atmosphere.cpr.AsyncIOWriter; +import org.atmosphere.cpr.AsyncIOWriterAdapter; import org.atmosphere.cpr.AsynchronousProcessor; import org.atmosphere.cpr.AtmosphereFramework; import org.atmosphere.cpr.AtmosphereInterceptorAdapter; @@ -31,6 +34,8 @@ import org.atmosphere.cpr.AtmosphereResourceEvent; import org.atmosphere.cpr.AtmosphereResourceImpl; import org.atmosphere.cpr.AtmosphereResponse; +import org.atmosphere.cpr.FrameworkConfig; +import org.atmosphere.interceptor.HeartbeatInterceptor; import org.atmosphere.interceptor.InvokationOrder; import org.atmosphere.util.ExcludeSessionBroadcaster; import org.atmosphere.util.SimpleBroadcaster; @@ -51,6 +56,7 @@ import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnSuspend; import static org.atmosphere.cpr.HeaderConfig.LONG_POLLING_TRANSPORT; +import static org.atmosphere.cpr.HeaderConfig.WEBSOCKET_TRANSPORT; import static org.atmosphere.cpr.HeaderConfig.X_ATMOSPHERE_TRANSPORT; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; @@ -442,13 +448,56 @@ public void message(InputStream reader) { @Test public void testInputStreamMessage() throws IOException, ServletException { - AtmosphereRequest request = new AtmosphereRequest.Builder().pathInfo("/inputStreamInjection").method("GET").build(); framework.doCometSupport(request, AtmosphereResponse.newInstance()); assertNotNull(r.get()); r.get().resume(); assertNotNull(message.get()); assertEquals(message.get(), "message"); + } + + @ManagedService(path = "/heartbeat") + public final static class Heartbeat { + static final String paddingData = new String(new HeartbeatInterceptor().getPaddingBytes()); + + @Get + public void get(AtmosphereResource resource) { + r.set(resource); + } + @org.atmosphere.config.service.Heartbeat + public void heartbeat(AtmosphereResourceEvent resource) { + message.set(paddingData); + } + } + + @Test + public void testHeartbeat() throws IOException, ServletException { + // Open connection + AtmosphereRequest request = new AtmosphereRequest.Builder() + .pathInfo("/heartbeat") + .method("GET") + .build(); + + request.header(X_ATMOSPHERE_TRANSPORT, WEBSOCKET_TRANSPORT); + framework.doCometSupport(request, AtmosphereResponse.newInstance()); + + // Check suspend + final AtmosphereResource res = r.get(); + assertNotNull(res); + + // Send heartbeat + request = new AtmosphereRequest.Builder() + .pathInfo("/heartbeat") + .method("GET") + .body(Heartbeat.paddingData) + .build(); + request.header(X_ATMOSPHERE_TRANSPORT, WEBSOCKET_TRANSPORT); + request.setAttribute(HeartbeatInterceptor.INTERCEPTOR_ADDED, ""); + res.initialize(res.getAtmosphereConfig(), res.getBroadcaster(), request, AtmosphereResponse.newInstance(), framework.getAsyncSupport(), res.getAtmosphereHandler()); + request.setAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE, res); + framework.doCometSupport(request, AtmosphereResponse.newInstance()); + assertNotNull(message.get()); + assertEquals(message.get(), Heartbeat.paddingData); } } diff --git a/modules/cpr/src/test/java/org/atmosphere/cpr/AtmosphereResourceListenerTest.java b/modules/cpr/src/test/java/org/atmosphere/cpr/AtmosphereResourceListenerTest.java index 1821d80b4f6..1b6e8d477da 100644 --- a/modules/cpr/src/test/java/org/atmosphere/cpr/AtmosphereResourceListenerTest.java +++ b/modules/cpr/src/test/java/org/atmosphere/cpr/AtmosphereResourceListenerTest.java @@ -98,6 +98,11 @@ public void onResume(AtmosphereResourceEvent event) { resumed.set(true); } + @Override + public void onHeartbeat(AtmosphereResourceEvent event) { + } + + @Override public void onDisconnect(AtmosphereResourceEvent event) { disconnected.set(true); From f273ff86e63a1a62f88df02bf185be8f46ca42ce Mon Sep 17 00:00:00 2001 From: Benjamin Prato Date: Mon, 26 May 2014 16:48:20 +0200 Subject: [PATCH 2/2] Review listener tracking. Heartbeat notification managed only for ManagedAtmosphereHandler. --- .../managed/ManagedAtmosphereHandler.java | 23 ++++---- .../interceptor/HeartbeatInterceptor.java | 53 +++++-------------- 2 files changed, 27 insertions(+), 49 deletions(-) diff --git a/modules/cpr/src/main/java/org/atmosphere/config/managed/ManagedAtmosphereHandler.java b/modules/cpr/src/main/java/org/atmosphere/config/managed/ManagedAtmosphereHandler.java index e3233c083fb..8a2cb679226 100644 --- a/modules/cpr/src/main/java/org/atmosphere/config/managed/ManagedAtmosphereHandler.java +++ b/modules/cpr/src/main/java/org/atmosphere/config/managed/ManagedAtmosphereHandler.java @@ -55,7 +55,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnClose; -import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnHeartbeat; import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnResume; import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnSuspend; import static org.atmosphere.util.IOUtils.isBodyEmpty; @@ -141,15 +140,6 @@ public void onResume(AtmosphereResourceEvent event) { }); } - if (onHeartbeatMethod != null && !polling) { - resource.addEventListener(new OnHeartbeat() { - @Override - public void onHeartbeat(AtmosphereResourceEvent event) { - invoke(onHeartbeatMethod, event); - } - }); - } - resource.addEventListener(new OnClose() { @Override public void onClose(AtmosphereResourceEvent event) { @@ -429,6 +419,19 @@ protected void processReady(AtmosphereResource r) { } } + /** + *

+ * Notifies the heartbeat for the given resource to the annotated method if exists. + *

+ * + * @param event the event + */ + public void onHeartbeat(final AtmosphereResourceEvent event) { + if (onHeartbeatMethod != null && !Utils.pollableTransport(event.getResource().transport())) { + invoke(onHeartbeatMethod, event); + } + } + @Override public String toString() { return "ManagedAtmosphereHandler proxy for " + proxiedInstance.getClass().getName(); diff --git a/modules/cpr/src/main/java/org/atmosphere/interceptor/HeartbeatInterceptor.java b/modules/cpr/src/main/java/org/atmosphere/interceptor/HeartbeatInterceptor.java index 1c44480cfa4..3ca27d1149b 100644 --- a/modules/cpr/src/main/java/org/atmosphere/interceptor/HeartbeatInterceptor.java +++ b/modules/cpr/src/main/java/org/atmosphere/interceptor/HeartbeatInterceptor.java @@ -16,6 +16,7 @@ package org.atmosphere.interceptor; import org.atmosphere.HeartbeatAtmosphereResourceEvent; +import org.atmosphere.config.managed.ManagedAtmosphereHandler; import org.atmosphere.cpr.Action; import org.atmosphere.cpr.AsyncIOInterceptorAdapter; import org.atmosphere.cpr.AsyncIOWriter; @@ -25,7 +26,6 @@ import org.atmosphere.cpr.AtmosphereRequest; import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResourceEvent; -import org.atmosphere.cpr.AtmosphereResourceEventListener; import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter; import org.atmosphere.cpr.AtmosphereResourceImpl; import org.atmosphere.cpr.AtmosphereResponse; @@ -38,10 +38,7 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Map; -import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -84,8 +81,6 @@ public class HeartbeatInterceptor extends AtmosphereInterceptorAdapter { private int heartbeatFrequencyInSeconds = 60; - private Map onHeartbeatListeners = new ConcurrentHashMap(); - /** * Heartbeat from client disabled by default. */ @@ -155,30 +150,6 @@ public Clock() { } } - /** - * {@inheritDoc} - */ - @Override - public void postInspect(final AtmosphereResource r) { - // We track any OnHeartbeat when added by the handler cause the framework don't keep it during resource lifecycle - if (!onHeartbeatListeners.containsKey(r.uuid())) { - // TODO: see https://github.com/Atmosphere/atmosphere/issues/1561 - final Queue listeners = AtmosphereResourceImpl.class.cast(r).listeners(); - - for (final AtmosphereResourceEventListener l : listeners) { - if (AtmosphereResourceEventListenerAdapter.OnHeartbeat.class.isAssignableFrom(l.getClass())) { - r.addEventListener(new AtmosphereResourceEventListenerAdapter.OnDisconnect() { - public void onDisconnect(final AtmosphereResourceEvent event) { - onHeartbeatListeners.remove(event.getResource().uuid()); - } - }); - onHeartbeatListeners.put(r.uuid(), l); - break; - } - } - } - } - @Override public Action inspect(final AtmosphereResource r) { final int interval = extractHeartbeatInterval(r); @@ -248,24 +219,28 @@ public void postPayload(final AtmosphereResponse response, byte[] data, int offs r.getRequest().setAttribute(INTERCEPTOR_ADDED, Boolean.TRUE); } else { byte[] body = IOUtils.readEntirelyAsByte(r); + if (Arrays.equals(paddingBytes, body)) { // Dispatch an event to notify that a heartbeat has been intercepted // TODO: see https://github.com/Atmosphere/atmosphere/issues/1561 final AtmosphereResourceEvent event = new HeartbeatAtmosphereResourceEvent(AtmosphereResourceImpl.class.cast(r)); - - // Check if a listener is defined - final AtmosphereResourceEventListener kept = onHeartbeatListeners.get(r.uuid()); - - if (kept != null) { - kept.onHeartbeat(event); - } else { - // In case of listener is still bound to the resource - r.notifyListeners(event); + // Currently we fire heartbeat notification only for managed handler + if (r.getAtmosphereHandler().getClass().isAssignableFrom(ManagedAtmosphereHandler.class)) { + r.addEventListener(new AtmosphereResourceEventListenerAdapter.OnHeartbeat() { + @Override + public void onHeartbeat(AtmosphereResourceEvent event) { + ManagedAtmosphereHandler.class.cast(r.getAtmosphereHandler()).onHeartbeat(event); + } + }); } + // Fire event + r.notifyListeners(event); + return Action.CANCELLED; } + request.body(body); } }