Skip to content

Commit

Permalink
#11348 Fixes to create a Schedule Fix Task to do the ping pong by def…
Browse files Browse the repository at this point in the history
…ault for each session (#11598)
  • Loading branch information
jdotcms authored and jgambarios committed May 17, 2017
1 parent b70d813 commit 74db8db
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,9 @@ public Principal getUserPrincipal() {
public Set<Session> getOpenSessions() {
return this.session.getOpenSessions();
}

@Override
public String toString() {
return (null != this.session)?this.session.toString():"session null wrapper";
}
} // E:O:F:SessionWrapper.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import com.dotcms.api.system.event.*;
import com.dotcms.concurrent.DotConcurrentFactory;
import com.dotcms.concurrent.DotSubmitter;
import com.dotcms.repackage.com.google.common.annotations.VisibleForTesting;
import com.dotcms.repackage.javax.ws.rs.ForbiddenException;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.business.UserAPI;
import com.dotmarketing.exception.DotDataException;
import com.dotmarketing.init.DotInitScheduler;
import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;
import com.liferay.portal.model.User;
Expand All @@ -19,8 +19,10 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

/**
* This Websocket end-point allows other parts of the system (such as the User
Expand Down Expand Up @@ -52,47 +54,80 @@ public class SystemEventsWebSocketEndPoint implements Serializable {
/**
* Configuration for ping pong strategy
*/
public static final String WEB_SOCKET_THREAD_POOL_SUBMITTER_NAME = "longpolling";
public static final String DOTCMS_WEBSOCKET_MILLIS_PINGPONG = "dotcms.websocket.millis.pingpong";
public static final String DOTCMS_WEBSOCKET_USEPINGPONG = "dotcms.websocket.usepingpong";
private static final ByteBuffer PING_RECEIVED = ByteBuffer.wrap("PING".getBytes());
private static final ByteBuffer PONG_RECEIVED = ByteBuffer.wrap("PONG".getBytes());
private final boolean usePingPong;
private final long millisForWaitPingPong;
private final DotSubmitter dotSubmitterPingPong;

public SystemEventsWebSocketEndPoint() {

this(new ConcurrentLinkedQueue<Session>(),
APILocator.getUserAPI(),
SystemEventProcessorFactory.getInstance(),
PayloadVerifierFactory.getInstance(),
DotConcurrentFactory.getInstance());
PayloadVerifierFactory.getInstance());
}

@VisibleForTesting
public SystemEventsWebSocketEndPoint(final Queue<Session> queue,
final UserAPI userAPI,
final SystemEventProcessorFactory systemEventProcessorFactory,
final PayloadVerifierFactory payloadVerifierFactory,
final DotConcurrentFactory dotConcurrentFactory) {
final PayloadVerifierFactory payloadVerifierFactory) {

this.queue = queue;
this.userAPI = userAPI;
this.systemEventProcessorFactory = systemEventProcessorFactory;
this.payloadVerifierFactory = payloadVerifierFactory;
this.usePingPong = Config.getBooleanProperty(DOTCMS_WEBSOCKET_USEPINGPONG, false);
if (this.usePingPong) {
final boolean usePingPong = Config.getBooleanProperty(DOTCMS_WEBSOCKET_USEPINGPONG, true);
if (usePingPong) {

this.millisForWaitPingPong = Config.getLongProperty(DOTCMS_WEBSOCKET_MILLIS_PINGPONG,
final long millisForWaitPingPong = Config.getLongProperty(DOTCMS_WEBSOCKET_MILLIS_PINGPONG,
DateUtil.MINUTE); // by default 1 min
this.dotSubmitterPingPong = dotConcurrentFactory.getSubmitter(WEB_SOCKET_THREAD_POOL_SUBMITTER_NAME);
} else {
this.millisForWaitPingPong = -1;
this.dotSubmitterPingPong = null;
DotInitScheduler.getScheduledThreadPoolExecutor().
scheduleWithFixedDelay(this::processPingPongQueue, 0, millisForWaitPingPong, TimeUnit.MILLISECONDS);
}
} // SystemEventsWebSocketEndPoint.

private void processPingPongQueue () {

Logger.debug(this,
"Processing the session queue at: " + new Date());
for (Session session : this.queue) {

this.doPing(session);
}
}

@OnMessage
public void onPong(final PongMessage pongMessage, final Session session) {
// the browser will send the pong message automatically with the same data we sent on the ping.
if (PING_RECEIVED.equals(pongMessage.getApplicationData())) {

Logger.debug(this, "Pong message received from session: " + session);
}
} // onPong.

private void doPing (final Session session) {

// wait for N seconds
try {

// we wait for a N seconds and then send the ping message
if (session.isOpen()) {
Logger.debug(this, "Doing ping to: " + session + " at " + new Date());
session.getAsyncRemote().sendPing(PING_RECEIVED);
} else {

Logger.debug(this, "Couldn't do the ping to: " + session + ", session is closed");
}
} catch (Exception e) {
if (Logger.isErrorEnabled(this.getClass())) {

Logger.error(this.getClass(), e.getMessage(), e);
}
}
} // doPing.



@OnOpen
public void open(final Session session) {

Expand Down Expand Up @@ -141,57 +176,18 @@ public void open(final Session session) {
} // open.

@OnError
public void error(Session session, Throwable t) {
public void error(final Session session, final Throwable t) {
Logger.debug(this, "Error on the session: " + session + ", error: " + t);
queue.remove(session);
}

@OnClose
public void closedConnection(Session session) {

Logger.debug(this, "Closing the session: " + session);
queue.remove(session);
}

@OnMessage
public void onPong(final PongMessage pongMessage, final Session session) {
// the browser will send the pong message automatically with the same data we sent on the ping.
if (PING_RECEIVED.equals(pongMessage.getApplicationData())) {

this.doPing(session);
}
} // onPong.

private void doPing (final Session session) {
// wait for N seconds
if (null != this.dotSubmitterPingPong) {
this.dotSubmitterPingPong.execute(() -> {

try {
// we wait for a N seconds and then send the ping messsage
if (this.millisForWaitPingPong > 0) {
Thread.sleep(this.millisForWaitPingPong);
}

if (session.isOpen()) {
Logger.debug(this, "Doing ping to: " + session);
session.getAsyncRemote().sendPing(PING_RECEIVED);
} else {

Logger.debug(this, "Couldn't do the ping to: " + session + ", session is closed");
}
} catch (InterruptedException e) {

Logger.debug(this, e.getMessage(), e);
Thread.currentThread().interrupt();
} catch (Exception e) {
if (Logger.isErrorEnabled(this.getClass())) {

Logger.error(this.getClass(), e.getMessage(), e);
}
}
});
}
} // doPing.


/**
* Sends the specified {@link SystemEvent} object to all the clients
* (front-end or back-end services) that are registered to this Websocket
Expand All @@ -204,10 +200,9 @@ public void sendSystemEvent(final SystemEvent event) {

final ArrayList<Session> closedSessions = new ArrayList<>();


try {

for (Session session : queue) {
for (Session session : this.queue) {

if (!session.isOpen()) {

Expand Down
15 changes: 12 additions & 3 deletions dotCMS/src/main/java/com/dotmarketing/init/DotInitScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,22 @@ public class DotInitScheduler {

private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null;

private static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
/**
* Returns the {@link ScheduledThreadPoolExecutor}
* @return ScheduledThreadPoolExecutor
*/
public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {

if (null == scheduledThreadPoolExecutor) {

final int corePoolSize = Config.getIntProperty(SCHEDULER_COREPOOLSIZE, 10);
synchronized (DotInitScheduler.class) {

if (null == scheduledThreadPoolExecutor) {

scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize);
final int corePoolSize = Config.getIntProperty(SCHEDULER_COREPOOLSIZE, 10);
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize);
}
}
}

return scheduledThreadPoolExecutor;
Expand Down

0 comments on commit 74db8db

Please sign in to comment.