Skip to content

Commit

Permalink
More fix for #170 [long-polling] Message losts during concurrent susp…
Browse files Browse the repository at this point in the history
…end/broadcast operation
  • Loading branch information
jfarcand committed Feb 3, 2012
1 parent c8d03a5 commit fdac1ab
Showing 1 changed file with 73 additions and 74 deletions.
147 changes: 73 additions & 74 deletions modules/cpr/src/main/java/org/atmosphere/cpr/DefaultBroadcaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -523,55 +523,54 @@ protected void push(Entry entry) {
if (destroyed.get()) {
return;
}

recentActivity.set(true);

String prevMessage = entry.message.toString();
if (!delayedBroadcast.isEmpty()) {
Iterator<Entry> i = delayedBroadcast.iterator();
StringBuilder b = new StringBuilder();
while (i.hasNext()) {
Entry e = i.next();
e.future.cancel(true);
try {
// Append so we do a single flush
if (e.message instanceof String
&& entry.message instanceof String) {
b.append(e.message);
} else {
push(e);
// We need to synchronize t make sure there is no suspend operation retrieving cached messages concurrently.
// https://github.com/Atmosphere/atmosphere/issues/170
synchronized (concurrentSuspendBroadcast) {
recentActivity.set(true);

String prevMessage = entry.message.toString();
if (!delayedBroadcast.isEmpty()) {
Iterator<Entry> i = delayedBroadcast.iterator();
StringBuilder b = new StringBuilder();
while (i.hasNext()) {
Entry e = i.next();
e.future.cancel(true);
try {
// Append so we do a single flush
if (e.message instanceof String
&& entry.message instanceof String) {
b.append(e.message);
} else {
push(e);
}
} finally {
i.remove();
}
} finally {
i.remove();
}
}

if (b.length() > 0) {
entry.message = b.append(entry.message).toString();
if (b.length() > 0) {
entry.message = b.append(entry.message).toString();
}
}
}

Object finalMsg = translate(entry.message);
Object finalMsg = translate(entry.message);

if (finalMsg == null) {
logger.trace("Broascast message was null {}", finalMsg);
return;
}
if (finalMsg == null) {
logger.trace("Broascast message was null {}", finalMsg);
return;
}

Object prevM = entry.originalMessage;
entry.originalMessage = (entry.originalMessage != entry.message ? translate(entry.originalMessage) : finalMsg);
Object prevM = entry.originalMessage;
entry.originalMessage = (entry.originalMessage != entry.message ? translate(entry.originalMessage) : finalMsg);

if (entry.originalMessage == null) {
logger.trace("Broascast message was null {}", prevM);
return;
}
if (entry.originalMessage == null) {
logger.trace("Broascast message was null {}", prevM);
return;
}

entry.message = finalMsg;
entry.message = finalMsg;

if (resources.isEmpty()) {
// We need to synchronize
// https://github.com/Atmosphere/atmosphere/issues/170
synchronized (concurrentSuspendBroadcast) {
if (resources.isEmpty()) {
logger.debug("Broadcaster {} doesn't have any associated resource", getID());

AtmosphereResource<?, ?> r = null;
Expand All @@ -585,51 +584,51 @@ protected void push(Entry entry) {
}
return;
}
}

try {
if (entry.multipleAtmoResources == null) {
for (AtmosphereResource<?, ?> r : resources) {
finalMsg = perRequestFilter(r, entry);
try {
if (entry.multipleAtmoResources == null) {
for (AtmosphereResource<?, ?> r : resources) {
finalMsg = perRequestFilter(r, entry);

if (finalMsg == null) {
logger.debug("Skipping broadcast delivery resource {} ", r);
continue;
}
if (finalMsg == null) {
logger.debug("Skipping broadcast delivery resource {} ", r);
continue;
}

if (entry.writeLocally) {
queueWriteIO(r, finalMsg, entry);
if (entry.writeLocally) {
queueWriteIO(r, finalMsg, entry);
}
}
}
} else if (entry.multipleAtmoResources instanceof AtmosphereResource<?, ?>) {
finalMsg = perRequestFilter((AtmosphereResource<?, ?>) entry.multipleAtmoResources, entry);

if (finalMsg == null) {
logger.debug("Skipping broadcast delivery resource {} ", entry.multipleAtmoResources);
return;
}

if (entry.writeLocally) {
queueWriteIO((AtmosphereResource<?, ?>) entry.multipleAtmoResources, finalMsg, entry);
}
} else if (entry.multipleAtmoResources instanceof Set) {
Set<AtmosphereResource<?, ?>> sub = (Set<AtmosphereResource<?, ?>>) entry.multipleAtmoResources;
for (AtmosphereResource<?, ?> r : sub) {
finalMsg = perRequestFilter(r, entry);
} else if (entry.multipleAtmoResources instanceof AtmosphereResource<?, ?>) {
finalMsg = perRequestFilter((AtmosphereResource<?, ?>) entry.multipleAtmoResources, entry);

if (finalMsg == null) {
logger.debug("Skipping broadcast delivery resource {} ", r);
continue;
logger.debug("Skipping broadcast delivery resource {} ", entry.multipleAtmoResources);
return;
}

if (entry.writeLocally) {
queueWriteIO(r, finalMsg, entry);
queueWriteIO((AtmosphereResource<?, ?>) entry.multipleAtmoResources, finalMsg, entry);
}
} else if (entry.multipleAtmoResources instanceof Set) {
Set<AtmosphereResource<?, ?>> sub = (Set<AtmosphereResource<?, ?>>) entry.multipleAtmoResources;
for (AtmosphereResource<?, ?> r : sub) {
finalMsg = perRequestFilter(r, entry);

if (finalMsg == null) {
logger.debug("Skipping broadcast delivery resource {} ", r);
continue;
}

if (entry.writeLocally) {
queueWriteIO(r, finalMsg, entry);
}
}
}
entry.message = prevMessage;
} catch (InterruptedException ex) {
logger.debug(ex.getMessage(), ex);
}
entry.message = prevMessage;
} catch (InterruptedException ex) {
logger.debug(ex.getMessage(), ex);
}
}

Expand All @@ -641,7 +640,7 @@ protected Object perRequestFilter(AtmosphereResource<?, ?> r, Entry msg) {
Object finalMsg = msg.message;

if (AtmosphereResourceImpl.class.isAssignableFrom(r.getClass())) {
synchronized(r) {
synchronized (r) {
if (isAtmosphereResourceValid(r)) {
if (r.getRequest() instanceof HttpServletRequest && bc.hasPerRequestFilters()) {
Object message = msg.originalMessage;
Expand Down Expand Up @@ -1044,7 +1043,7 @@ public <T> Future<T> broadcast(T msg, Set<AtmosphereResource<?, ?>> subset) {
return r;
}

private boolean isAtmosphereResourceValid(AtmosphereResource<?,?> r) {
private boolean isAtmosphereResourceValid(AtmosphereResource<?, ?> r) {
return !AtmosphereResourceImpl.class.cast(r).isResumed()
&& !AtmosphereResourceImpl.class.cast(r).isCancelled()
&& AtmosphereResourceImpl.class.cast(r).isInScope();
Expand Down

0 comments on commit fdac1ab

Please sign in to comment.