Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1549 #1607

Merged
merged 2 commits into from
May 26, 2014
Merged

#1549 #1607

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2014 Jeanfrancois Arcand
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.atmosphere;

import org.atmosphere.cpr.AtmosphereResourceEventImpl;
import org.atmosphere.cpr.AtmosphereResourceImpl;

/**
* <p>
* Specifies to the observable that {@link org.atmosphere.cpr.AtmosphereResourceEventListener#onHeartbeat(org.atmosphere.cpr.AtmosphereResourceEvent)}
* should be invoked when it fires event to observers.
* </p>
*
* @version 1.0
* @author Guillaume DROUET
* @since 2.2
*/
public class HeartbeatAtmosphereResourceEvent extends AtmosphereResourceEventImpl {

/**
* <p>
* Builds a new event.
* </p>
*
* @param resource the resource
*/
public HeartbeatAtmosphereResourceEvent(final AtmosphereResourceImpl resource) {
super(resource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.atmosphere.config.service.Delete;
import org.atmosphere.config.service.Disconnect;
import org.atmosphere.config.service.Get;
import org.atmosphere.config.service.Heartbeat;
import org.atmosphere.config.service.Message;
import org.atmosphere.config.service.PathParam;
import org.atmosphere.config.service.Post;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class ManagedAtmosphereHandler extends AbstractReflectorAtmosphereHandler
private final static List<Decoder<?, ?>> EMPTY = Collections.<Decoder<?, ?>>emptyList();
private Object proxiedInstance;
private List<MethodInfo> onRuntimeMethod;
private Method onHeartbeatMethod;
private Method onDisconnectMethod;
private Method onTimeoutMethod;
private Method onGetMethod;
Expand All @@ -91,6 +93,7 @@ public ManagedAtmosphereHandler() {
public AnnotatedProxy configure(AtmosphereConfig config, Object c) {
this.proxiedInstance = c;
this.onRuntimeMethod = populateMessage(c, Message.class);
this.onHeartbeatMethod = populate(c, Heartbeat.class);
this.onDisconnectMethod = populate(c, Disconnect.class);
this.onTimeoutMethod = populate(c, Resume.class);
this.onGetMethod = populate(c, Get.class);
Expand Down Expand Up @@ -416,6 +419,19 @@ protected void processReady(AtmosphereResource r) {
}
}

/**
* <p>
* Notifies the heartbeat for the given resource to the annotated method if exists.
* </p>
*
* @param event the event
*/
public void onHeartbeat(final AtmosphereResourceEvent event) {
if (onHeartbeatMethod != null && !Utils.pollableTransport(event.getResource().transport())) {
invoke(onHeartbeatMethod, event);
}
}

@Override
public String toString() {
return "ManagedAtmosphereHandler proxy for " + proxiedInstance.getClass().getName();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2014 Jeanfrancois Arcand
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.atmosphere.config.service;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotate a method that will get invoked when the client sends an heartbeat. {@link org.atmosphere.interceptor.HeartbeatInterceptor}
* must be installed.
*
* @author Jeanfrancois Arcand
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Heartbeat {
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public interface AtmosphereResourceEventListener {
*/
void onResume(AtmosphereResourceEvent event);

/**
* Invoked when the remote connections send a heartbeat.
*
* @param event a {@link AtmosphereResourceEvent}
*/
void onHeartbeat(AtmosphereResourceEvent event);

/**
* Invoked when the remote connection gets closed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public void onResume(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

@Override
public void onHeartbeat(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

@Override
public void onDisconnect(AtmosphereResourceEvent event) {
logger.trace("{}", event);
Expand All @@ -62,6 +67,14 @@ public void onClose(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

/**
* On Heartbeat's Listener
*/
abstract static public class OnHeartbeat extends AtmosphereResourceEventListenerAdapter {
@Override
abstract public void onHeartbeat(AtmosphereResourceEvent event);
}

/**
* On Suspend's Listener
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.atmosphere.cpr;

import org.atmosphere.HeartbeatAtmosphereResourceEvent;
import org.atmosphere.interceptor.AllowInterceptor;
import org.atmosphere.util.Utils;
import org.atmosphere.websocket.WebSocket;
Expand Down Expand Up @@ -592,7 +593,9 @@ public AtmosphereResource notifyListeners(AtmosphereResourceEvent event) {

Action oldAction = action;
try {
if (event.isClosedByApplication()) {
if (HeartbeatAtmosphereResourceEvent.class.isAssignableFrom(event.getClass())) {
onHeartbeat(event);
} else if (event.isClosedByApplication()) {
onClose(event);
} else if (event.isCancelled() || event.isClosedByClient()) {
if (!disconnected.getAndSet(true)) {
Expand Down Expand Up @@ -647,6 +650,19 @@ void onThrowable(AtmosphereResourceEvent e) {
}
}

/**
* <p>
* Notifies to all listeners that a heartbeat has been sent.
* </p>
*
* @param e the event
*/
void onHeartbeat(AtmosphereResourceEvent e) {
for (AtmosphereResourceEventListener r : listeners) {
r.onHeartbeat(e);
}
}

void onSuspend(AtmosphereResourceEvent e) {
for (AtmosphereResourceEventListener r : listeners) {
if (disableSuspendEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.atmosphere.interceptor;

import org.atmosphere.HeartbeatAtmosphereResourceEvent;
import org.atmosphere.config.managed.ManagedAtmosphereHandler;
import org.atmosphere.cpr.Action;
import org.atmosphere.cpr.AsyncIOInterceptorAdapter;
import org.atmosphere.cpr.AsyncIOWriter;
Expand Down Expand Up @@ -216,12 +218,29 @@ public void postPayload(final AtmosphereResponse response, byte[] data, int offs
});
r.getRequest().setAttribute(INTERCEPTOR_ADDED, Boolean.TRUE);
} else {
// This is where we should dispatch an event to notify that an heartbeat has been intercepted
// See: https://github.com/Atmosphere/atmosphere/issues/1549
byte[] body = IOUtils.readEntirelyAsByte(r);

if (Arrays.equals(paddingBytes, body)) {
// Dispatch an event to notify that a heartbeat has been intercepted
// TODO: see https://github.com/Atmosphere/atmosphere/issues/1561
final AtmosphereResourceEvent event = new HeartbeatAtmosphereResourceEvent(AtmosphereResourceImpl.class.cast(r));

// Currently we fire heartbeat notification only for managed handler
if (r.getAtmosphereHandler().getClass().isAssignableFrom(ManagedAtmosphereHandler.class)) {
r.addEventListener(new AtmosphereResourceEventListenerAdapter.OnHeartbeat() {
@Override
public void onHeartbeat(AtmosphereResourceEvent event) {
ManagedAtmosphereHandler.class.cast(r.getAtmosphereHandler()).onHeartbeat(event);
}
});
}

// Fire event
r.notifyListeners(event);

return Action.CANCELLED;
}

request.body(body);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public void onResume(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

@Override
public void onHeartbeat(AtmosphereResourceEvent event) {
logger.trace("{}", event);
}

@Override
public void onDisconnect(AtmosphereResourceEvent event) {
logger.trace("{}", event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.atmosphere.config.service.Put;
import org.atmosphere.config.service.Ready;
import org.atmosphere.cpr.Action;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AsyncIOWriter;
import org.atmosphere.cpr.AsyncIOWriterAdapter;
import org.atmosphere.cpr.AsynchronousProcessor;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereInterceptorAdapter;
Expand All @@ -31,6 +34,8 @@
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.AtmosphereResponse;
import org.atmosphere.cpr.FrameworkConfig;
import org.atmosphere.interceptor.HeartbeatInterceptor;
import org.atmosphere.interceptor.InvokationOrder;
import org.atmosphere.util.ExcludeSessionBroadcaster;
import org.atmosphere.util.SimpleBroadcaster;
Expand All @@ -51,6 +56,7 @@

import static org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter.OnSuspend;
import static org.atmosphere.cpr.HeaderConfig.LONG_POLLING_TRANSPORT;
import static org.atmosphere.cpr.HeaderConfig.WEBSOCKET_TRANSPORT;
import static org.atmosphere.cpr.HeaderConfig.X_ATMOSPHERE_TRANSPORT;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -442,13 +448,56 @@ public void message(InputStream reader) {

@Test
public void testInputStreamMessage() throws IOException, ServletException {

AtmosphereRequest request = new AtmosphereRequest.Builder().pathInfo("/inputStreamInjection").method("GET").build();
framework.doCometSupport(request, AtmosphereResponse.newInstance());
assertNotNull(r.get());
r.get().resume();
assertNotNull(message.get());
assertEquals(message.get(), "message");
}

@ManagedService(path = "/heartbeat")
public final static class Heartbeat {
static final String paddingData = new String(new HeartbeatInterceptor().getPaddingBytes());

@Get
public void get(AtmosphereResource resource) {
r.set(resource);
}

@org.atmosphere.config.service.Heartbeat
public void heartbeat(AtmosphereResourceEvent resource) {
message.set(paddingData);
}
}

@Test
public void testHeartbeat() throws IOException, ServletException {
// Open connection
AtmosphereRequest request = new AtmosphereRequest.Builder()
.pathInfo("/heartbeat")
.method("GET")
.build();

request.header(X_ATMOSPHERE_TRANSPORT, WEBSOCKET_TRANSPORT);
framework.doCometSupport(request, AtmosphereResponse.newInstance());

// Check suspend
final AtmosphereResource res = r.get();
assertNotNull(res);

// Send heartbeat
request = new AtmosphereRequest.Builder()
.pathInfo("/heartbeat")
.method("GET")
.body(Heartbeat.paddingData)
.build();
request.header(X_ATMOSPHERE_TRANSPORT, WEBSOCKET_TRANSPORT);
request.setAttribute(HeartbeatInterceptor.INTERCEPTOR_ADDED, "");
res.initialize(res.getAtmosphereConfig(), res.getBroadcaster(), request, AtmosphereResponse.newInstance(), framework.getAsyncSupport(), res.getAtmosphereHandler());
request.setAttribute(FrameworkConfig.INJECTED_ATMOSPHERE_RESOURCE, res);
framework.doCometSupport(request, AtmosphereResponse.newInstance());
assertNotNull(message.get());
assertEquals(message.get(), Heartbeat.paddingData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public void onResume(AtmosphereResourceEvent event) {
resumed.set(true);
}

@Override
public void onHeartbeat(AtmosphereResourceEvent event) {
}


@Override
public void onDisconnect(AtmosphereResourceEvent event) {
disconnected.set(true);
Expand Down