Skip to content

Commit

Permalink
First tentative to for #170 [long-polling] Message losts during concu…
Browse files Browse the repository at this point in the history
…rrent suspend/broadcast
  • Loading branch information
jfarcand committed Jan 26, 2012
1 parent a0c13f0 commit 76279f3
Showing 1 changed file with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_DESTROY;
import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.IDLE_RESUME;
import static org.atmosphere.cpr.BroadcasterLifeCyclePolicy.ATMOSPHERE_RESOURCE_POLICY.NEVER;
import static org.atmosphere.cpr.HeaderConfig.X_CACHE_DATE;

/**
* {@link Broadcaster} implementation.
Expand Down Expand Up @@ -595,6 +596,7 @@ protected void push(Entry entry) {
}

if (entry.writeLocally) {
pushCachedMessage(r);
queueWriteIO(r, finalMsg, entry);
}
}
Expand All @@ -607,6 +609,7 @@ protected void push(Entry entry) {
}

if (entry.writeLocally) {
pushCachedMessage((AtmosphereResource<?, ?>) entry.multipleAtmoResources);
queueWriteIO((AtmosphereResource<?, ?>) entry.multipleAtmoResources, finalMsg, entry);
}
} else if (entry.multipleAtmoResources instanceof Set) {
Expand All @@ -620,6 +623,7 @@ protected void push(Entry entry) {
}

if (entry.writeLocally) {
pushCachedMessage(r);
queueWriteIO(r, finalMsg, entry);
}
}
Expand All @@ -630,6 +634,13 @@ protected void push(Entry entry) {
}
}

protected void pushCachedMessage(AtmosphereResource<?,?> r) {
AtmosphereResourceImpl ari = AtmosphereResourceImpl.class.cast(r);
// Check lost message send from the last 2 seconds.
ari.getRequest(false).setAttribute(X_CACHE_DATE, System.currentTimeMillis() - 2000);
retrieveTrackedBroadcast(r, new AtmosphereResourceEventImpl(ari, false, false));
}

protected void queueWriteIO(AtmosphereResource<?, ?> r, Object finalMsg, Entry entry) throws InterruptedException {
asyncWriteQueue.put(new AsyncWriteToken(r, finalMsg, entry.future, entry.originalMessage));
}
Expand Down

0 comments on commit 76279f3

Please sign in to comment.