Skip to content

Commit

Permalink
Merge pull request #1607 from gdrouet/master
Browse files Browse the repository at this point in the history
  • Loading branch information
gdrouet committed May 26, 2014
2 parents dcf53dc + f273ff8 commit a12c996
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2014 Jeanfrancois Arcand
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.atmosphere;

import org.atmosphere.cpr.AtmosphereResourceEventImpl;
import org.atmosphere.cpr.AtmosphereResourceImpl;

/**
* <p>
* Specifies to the observable that {@link org.atmosphere.cpr.AtmosphereResourceEventListener#onHeartbeat(org.atmosphere.cpr.AtmosphereResourceEvent)}
* should be invoked when it fires event to observers.
* </p>
*
* @version 1.0
* @author Guillaume DROUET
* @since 2.2
*/
public class HeartbeatAtmosphereResourceEvent extends AtmosphereResourceEventImpl {

/**
* <p>
* Builds a new event.
* </p>
*
* @param resource the resource
*/
public HeartbeatAtmosphereResourceEvent(final AtmosphereResourceImpl resource) {
super(resource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.atmosphere.config.service.Delete;
import org.atmosphere.config.service.Disconnect;
import org.atmosphere.config.service.Get;
import org.atmosphere.config.service.Heartbeat;
import org.atmosphere.config.service.Message;
import org.atmosphere.config.service.PathParam;
import org.atmosphere.config.service.Post;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class ManagedAtmosphereHandler extends AbstractReflectorAtmosphereHandler
private final static List<Decoder<?, ?>> EMPTY = Collections.<Decoder<?, ?>>emptyList();
private Object proxiedInstance;
private List<MethodInfo> onRuntimeMethod;
private Method onHeartbeatMethod;
private Method onDisconnectMethod;
private Method onTimeoutMethod;
private Method onGetMethod;
Expand All @@ -91,6 +93,7 @@ public ManagedAtmosphereHandler() {
public AnnotatedProxy configure(AtmosphereConfig config, Object c) {
this.proxiedInstance = c;
this.onRuntimeMethod = populateMessage(c, Message.class);
this.onHeartbeatMethod = populate(c, Heartbeat.class);
this.onDisconnectMethod = populate(c, Disconnect.class);
this.onTimeoutMethod = populate(c, Resume.class);
this.onGetMethod = populate(c, Get.class);
Expand Down Expand Up @@ -416,6 +419,19 @@ protected void processReady(AtmosphereResource r) {
}
}

/**
* <p>
* Notifies the heartbeat for the given resource to the annotated method if exists.
* </p>
*
* @param event the event
*/
public void onHeartbeat(final AtmosphereResourceEvent event) {
if (onHeartbeatMethod != null && !Utils.pollableTransport(event.getResource().transport())) {
invoke(onHeartbeatMethod, event);
}
}

@Override
public String toString() {
return "ManagedAtmosphereHandler proxy for " + proxiedInstance.getClass().getName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2014 Jeanfrancois Arcand
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.atmosphere.config.service;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotate a method that will get invoked when the client sends an heartbeat. {@link org.atmosphere.interceptor.HeartbeatInterceptor}
* must be installed.
*
* @author Jeanfrancois Arcand
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Heartbeat {
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface AtmosphereResourceEventListener {
*/
void onResume(AtmosphereResourceEvent event);

/**
* Invoked when the remote connections send a heartbeat.
*
* @param event a {@link AtmosphereResourceEvent}
*/
void onHeartbeat(AtmosphereResourceEvent event);

/**
* Invoked when the remote connection gets closed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public void onResume(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

@Override
public void onHeartbeat(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

@Override
public void onDisconnect(AtmosphereResourceEvent event) {
logger.trace("{}", event);
Expand All @@ -62,6 +67,14 @@ public void onClose(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

/**
* On Heartbeat's Listener
*/
abstract static public class OnHeartbeat extends AtmosphereResourceEventListenerAdapter {
@Override
abstract public void onHeartbeat(AtmosphereResourceEvent event);
}

/**
* On Suspend's Listener
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.atmosphere.cpr;

import org.atmosphere.HeartbeatAtmosphereResourceEvent;
import org.atmosphere.interceptor.AllowInterceptor;
import org.atmosphere.util.Utils;
import org.atmosphere.websocket.WebSocket;
Expand Down Expand Up @@ -592,7 +593,9 @@ public AtmosphereResource notifyListeners(AtmosphereResourceEvent event) {

Action oldAction = action;
try {
if (event.isClosedByApplication()) {
if (HeartbeatAtmosphereResourceEvent.class.isAssignableFrom(event.getClass())) {
onHeartbeat(event);
} else if (event.isClosedByApplication()) {
onClose(event);
} else if (event.isCancelled() || event.isClosedByClient()) {
if (!disconnected.getAndSet(true)) {
Expand Down Expand Up @@ -647,6 +650,19 @@ void onThrowable(AtmosphereResourceEvent e) {
}
}

/**
* <p>
* Notifies to all listeners that a heartbeat has been sent.
* </p>
*
* @param e the event
*/
void onHeartbeat(AtmosphereResourceEvent e) {
for (AtmosphereResourceEventListener r : listeners) {
r.onHeartbeat(e);
}
}

void onSuspend(AtmosphereResourceEvent e) {
for (AtmosphereResourceEventListener r : listeners) {
if (disableSuspendEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.atmosphere.interceptor;

import org.atmosphere.HeartbeatAtmosphereResourceEvent;
import org.atmosphere.config.managed.ManagedAtmosphereHandler;
import org.atmosphere.cpr.Action;
import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
import org.atmosphere.cpr.AsyncIOWriter;
Expand Down Expand Up @@ -216,12 +218,29 @@ public void postPayload(final AtmosphereResponse response, byte[] data, int offs
});
r.getRequest().setAttribute(INTERCEPTOR_ADDED, Boolean.TRUE);
} else {
// This is where we should dispatch an event to notify that an heartbeat has been intercepted
// See: https://github.com/Atmosphere/atmosphere/issues/1549
byte[] body = IOUtils.readEntirelyAsByte(r);

if (Arrays.equals(paddingBytes, body)) {
// Dispatch an event to notify that a heartbeat has been intercepted
// TODO: see https://github.com/Atmosphere/atmosphere/issues/1561
final AtmosphereResourceEvent event = new HeartbeatAtmosphereResourceEvent(AtmosphereResourceImpl.class.cast(r));

// Currently we fire heartbeat notification only for managed handler
if (r.getAtmosphereHandler().getClass().isAssignableFrom(ManagedAtmosphereHandler.class)) {
r.addEventListener(new AtmosphereResourceEventListenerAdapter.OnHeartbeat() {
@Override
public void onHeartbeat(AtmosphereResourceEvent event) {
ManagedAtmosphereHandler.class.cast(r.getAtmosphereHandler()).onHeartbeat(event);
}
});
}

// Fire event
r.notifyListeners(event);

return Action.CANCELLED;
}

request.body(body);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public void onResume(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

@Override
public void onHeartbeat(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

@Override
public void onDisconnect(AtmosphereResourceEvent event) {
logger.trace("{}", event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.atmosphere.config.service.Put;
import org.atmosphere.config.service.Ready;
import org.atmosphere.cpr.Action;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AsyncIOWriter;
import org.atmosphere.cpr.AsyncIOWriterAdapter;
import org.atmosphere.cpr.AsynchronousProcessor;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereInterceptorAdapter;
Expand All @@ -31,6 +34,8 @@
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.AtmosphereResponse;
import org.atmosphere.cpr.FrameworkConfig;
import org.atmosphere.interceptor.HeartbeatInterceptor;
import org.atmosphere.interceptor.InvokationOrder;
import org.atmosphere.util.ExcludeSessionBroadcaster;
import org.atmosphere.util.SimpleBroadcaster;
Expand All @@ -51,6 +56,7 @@

import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnSuspend;
import static org.atmosphere.cpr.HeaderConfig.LONG_POLLING_TRANSPORT;
import static org.atmosphere.cpr.HeaderConfig.WEBSOCKET_TRANSPORT;
import static org.atmosphere.cpr.HeaderConfig.X_ATMOSPHERE_TRANSPORT;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -442,13 +448,56 @@ public void message(InputStream reader) {

@Test
public void testInputStreamMessage() throws IOException, ServletException {

AtmosphereRequest request = new AtmosphereRequest.Builder().pathInfo("/inputStreamInjection").method("GET").build();
framework.doCometSupport(request, AtmosphereResponse.newInstance());
assertNotNull(r.get());
r.get().resume();
assertNotNull(message.get());
assertEquals(message.get(), "message");
}

@ManagedService(path = "/heartbeat")
public final static class Heartbeat {
static final String paddingData = new String(new HeartbeatInterceptor().getPaddingBytes());

@Get
public void get(AtmosphereResource resource) {
r.set(resource);
}

@org.atmosphere.config.service.Heartbeat
public void heartbeat(AtmosphereResourceEvent resource) {
message.set(paddingData);
}
}

@Test
public void testHeartbeat() throws IOException, ServletException {
// Open connection
AtmosphereRequest request = new AtmosphereRequest.Builder()
.pathInfo("/heartbeat")
.method("GET")
.build();

request.header(X_ATMOSPHERE_TRANSPORT, WEBSOCKET_TRANSPORT);
framework.doCometSupport(request, AtmosphereResponse.newInstance());

// Check suspend
final AtmosphereResource res = r.get();
assertNotNull(res);

// Send heartbeat
request = new AtmosphereRequest.Builder()
.pathInfo("/heartbeat")
.method("GET")
.body(Heartbeat.paddingData)
.build();
request.header(X_ATMOSPHERE_TRANSPORT, WEBSOCKET_TRANSPORT);
request.setAttribute(HeartbeatInterceptor.INTERCEPTOR_ADDED, "");
res.initialize(res.getAtmosphereConfig(), res.getBroadcaster(), request, AtmosphereResponse.newInstance(), framework.getAsyncSupport(), res.getAtmosphereHandler());
request.setAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE, res);
framework.doCometSupport(request, AtmosphereResponse.newInstance());
assertNotNull(message.get());
assertEquals(message.get(), Heartbeat.paddingData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public void onResume(AtmosphereResourceEvent event) {
resumed.set(true);
}

@Override
public void onHeartbeat(AtmosphereResourceEvent event) {
}


@Override
public void onDisconnect(AtmosphereResourceEvent event) {
disconnected.set(true);
Expand Down

0 comments on commit a12c996

Please sign in to comment.