diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/system/websocket/SessionWrapper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/system/websocket/SessionWrapper.java index d4063b41a154..5a02c7a187f6 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/system/websocket/SessionWrapper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/system/websocket/SessionWrapper.java @@ -174,4 +174,9 @@ public Principal getUserPrincipal() { public Set getOpenSessions() { return this.session.getOpenSessions(); } + + @Override + public String toString() { + return (null != this.session)?this.session.toString():"session null wrapper"; + } } // E:O:F:SessionWrapper. diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/system/websocket/SystemEventsWebSocketEndPoint.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/system/websocket/SystemEventsWebSocketEndPoint.java index bb2978fde384..952fbe31e37b 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/system/websocket/SystemEventsWebSocketEndPoint.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/system/websocket/SystemEventsWebSocketEndPoint.java @@ -2,12 +2,12 @@ import com.dotcms.api.system.event.*; import com.dotcms.concurrent.DotConcurrentFactory; -import com.dotcms.concurrent.DotSubmitter; import com.dotcms.repackage.com.google.common.annotations.VisibleForTesting; import com.dotcms.repackage.javax.ws.rs.ForbiddenException; import com.dotmarketing.business.APILocator; import com.dotmarketing.business.UserAPI; import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.init.DotInitScheduler; import com.dotmarketing.util.Config; import com.dotmarketing.util.Logger; import com.liferay.portal.model.User; @@ -19,8 +19,10 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Date; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; /** * This Websocket end-point allows other parts of the system (such as the User @@ -52,47 +54,80 @@ public class SystemEventsWebSocketEndPoint implements Serializable { /** * Configuration for ping pong strategy */ - public static final String WEB_SOCKET_THREAD_POOL_SUBMITTER_NAME = "longpolling"; public static final String DOTCMS_WEBSOCKET_MILLIS_PINGPONG = "dotcms.websocket.millis.pingpong"; public static final String DOTCMS_WEBSOCKET_USEPINGPONG = "dotcms.websocket.usepingpong"; private static final ByteBuffer PING_RECEIVED = ByteBuffer.wrap("PING".getBytes()); - private static final ByteBuffer PONG_RECEIVED = ByteBuffer.wrap("PONG".getBytes()); - private final boolean usePingPong; - private final long millisForWaitPingPong; - private final DotSubmitter dotSubmitterPingPong; public SystemEventsWebSocketEndPoint() { this(new ConcurrentLinkedQueue(), APILocator.getUserAPI(), SystemEventProcessorFactory.getInstance(), - PayloadVerifierFactory.getInstance(), - DotConcurrentFactory.getInstance()); + PayloadVerifierFactory.getInstance()); } @VisibleForTesting public SystemEventsWebSocketEndPoint(final Queue queue, final UserAPI userAPI, final SystemEventProcessorFactory systemEventProcessorFactory, - final PayloadVerifierFactory payloadVerifierFactory, - final DotConcurrentFactory dotConcurrentFactory) { + final PayloadVerifierFactory payloadVerifierFactory) { this.queue = queue; this.userAPI = userAPI; this.systemEventProcessorFactory = systemEventProcessorFactory; this.payloadVerifierFactory = payloadVerifierFactory; - this.usePingPong = Config.getBooleanProperty(DOTCMS_WEBSOCKET_USEPINGPONG, false); - if (this.usePingPong) { + final boolean usePingPong = Config.getBooleanProperty(DOTCMS_WEBSOCKET_USEPINGPONG, true); + if (usePingPong) { - this.millisForWaitPingPong = Config.getLongProperty(DOTCMS_WEBSOCKET_MILLIS_PINGPONG, + final long millisForWaitPingPong = Config.getLongProperty(DOTCMS_WEBSOCKET_MILLIS_PINGPONG, DateUtil.MINUTE); // by default 1 min - this.dotSubmitterPingPong = dotConcurrentFactory.getSubmitter(WEB_SOCKET_THREAD_POOL_SUBMITTER_NAME); - } else { - this.millisForWaitPingPong = -1; - this.dotSubmitterPingPong = null; + DotInitScheduler.getScheduledThreadPoolExecutor(). + scheduleWithFixedDelay(this::processPingPongQueue, 0, millisForWaitPingPong, TimeUnit.MILLISECONDS); + } + } // SystemEventsWebSocketEndPoint. + + private void processPingPongQueue () { + + Logger.debug(this, + "Processing the session queue at: " + new Date()); + for (Session session : this.queue) { + + this.doPing(session); } } + @OnMessage + public void onPong(final PongMessage pongMessage, final Session session) { + // the browser will send the pong message automatically with the same data we sent on the ping. + if (PING_RECEIVED.equals(pongMessage.getApplicationData())) { + + Logger.debug(this, "Pong message received from session: " + session); + } + } // onPong. + + private void doPing (final Session session) { + + // wait for N seconds + try { + + // we wait for a N seconds and then send the ping message + if (session.isOpen()) { + Logger.debug(this, "Doing ping to: " + session + " at " + new Date()); + session.getAsyncRemote().sendPing(PING_RECEIVED); + } else { + + Logger.debug(this, "Couldn't do the ping to: " + session + ", session is closed"); + } + } catch (Exception e) { + if (Logger.isErrorEnabled(this.getClass())) { + + Logger.error(this.getClass(), e.getMessage(), e); + } + } + } // doPing. + + + @OnOpen public void open(final Session session) { @@ -141,57 +176,18 @@ public void open(final Session session) { } // open. @OnError - public void error(Session session, Throwable t) { + public void error(final Session session, final Throwable t) { + Logger.debug(this, "Error on the session: " + session + ", error: " + t); queue.remove(session); } @OnClose public void closedConnection(Session session) { + + Logger.debug(this, "Closing the session: " + session); queue.remove(session); } - @OnMessage - public void onPong(final PongMessage pongMessage, final Session session) { - // the browser will send the pong message automatically with the same data we sent on the ping. - if (PING_RECEIVED.equals(pongMessage.getApplicationData())) { - - this.doPing(session); - } - } // onPong. - - private void doPing (final Session session) { - // wait for N seconds - if (null != this.dotSubmitterPingPong) { - this.dotSubmitterPingPong.execute(() -> { - - try { - // we wait for a N seconds and then send the ping messsage - if (this.millisForWaitPingPong > 0) { - Thread.sleep(this.millisForWaitPingPong); - } - - if (session.isOpen()) { - Logger.debug(this, "Doing ping to: " + session); - session.getAsyncRemote().sendPing(PING_RECEIVED); - } else { - - Logger.debug(this, "Couldn't do the ping to: " + session + ", session is closed"); - } - } catch (InterruptedException e) { - - Logger.debug(this, e.getMessage(), e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - if (Logger.isErrorEnabled(this.getClass())) { - - Logger.error(this.getClass(), e.getMessage(), e); - } - } - }); - } - } // doPing. - - /** * Sends the specified {@link SystemEvent} object to all the clients * (front-end or back-end services) that are registered to this Websocket @@ -204,10 +200,9 @@ public void sendSystemEvent(final SystemEvent event) { final ArrayList closedSessions = new ArrayList<>(); - try { - for (Session session : queue) { + for (Session session : this.queue) { if (!session.isOpen()) { diff --git a/dotCMS/src/main/java/com/dotmarketing/init/DotInitScheduler.java b/dotCMS/src/main/java/com/dotmarketing/init/DotInitScheduler.java index b2826dd12dfc..ac1de7d5ff00 100644 --- a/dotCMS/src/main/java/com/dotmarketing/init/DotInitScheduler.java +++ b/dotCMS/src/main/java/com/dotmarketing/init/DotInitScheduler.java @@ -67,13 +67,22 @@ public class DotInitScheduler { private static ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null; - private static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { + /** + * Returns the {@link ScheduledThreadPoolExecutor} + * @return ScheduledThreadPoolExecutor + */ + public static ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { if (null == scheduledThreadPoolExecutor) { - final int corePoolSize = Config.getIntProperty(SCHEDULER_COREPOOLSIZE, 10); + synchronized (DotInitScheduler.class) { + + if (null == scheduledThreadPoolExecutor) { - scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize); + final int corePoolSize = Config.getIntProperty(SCHEDULER_COREPOOLSIZE, 10); + scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize); + } + } } return scheduledThreadPoolExecutor;