From 76279f338e497e55e02198d1b7f3bfcb412dadda Mon Sep 17 00:00:00 2001 From: jfarcand Date: Thu, 26 Jan 2012 17:21:34 -0500 Subject: [PATCH] First tentative to for #170 [long-polling] Message losts during concurrent suspend/broadcast --- .../java/org/atmosphere/cpr/DefaultBroadcaster.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java index 48543f9eeb0..2fa74110bf9 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java @@ -71,6 +71,7 @@ import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_DESTROY; import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_RESUME; import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.NEVER; +import static org.atmosphere.cpr.HeaderConfig.X_CACHE_DATE; /** * {@link Broadcaster} implementation. @@ -595,6 +596,7 @@ protected void push(Entry entry) { } if (entry.writeLocally) { + pushCachedMessage(r); queueWriteIO(r, finalMsg, entry); } } @@ -607,6 +609,7 @@ protected void push(Entry entry) { } if (entry.writeLocally) { + pushCachedMessage((AtmosphereResource) entry.multipleAtmoResources); queueWriteIO((AtmosphereResource) entry.multipleAtmoResources, finalMsg, entry); } } else if (entry.multipleAtmoResources instanceof Set) { @@ -620,6 +623,7 @@ protected void push(Entry entry) { } if (entry.writeLocally) { + pushCachedMessage(r); queueWriteIO(r, finalMsg, entry); } } @@ -630,6 +634,13 @@ protected void push(Entry entry) { } } + protected void pushCachedMessage(AtmosphereResource r) { + AtmosphereResourceImpl ari = AtmosphereResourceImpl.class.cast(r); + // Check lost message send from the last 2 seconds. + ari.getRequest(false).setAttribute(X_CACHE_DATE, System.currentTimeMillis() - 2000); + retrieveTrackedBroadcast(r, new AtmosphereResourceEventImpl(ari, false, false)); + } + protected void queueWriteIO(AtmosphereResource r, Object finalMsg, Entry entry) throws InterruptedException { asyncWriteQueue.put(new AsyncWriteToken(r, finalMsg, entry.future, entry.originalMessage)); }