Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 11124 fixes/enhances Hazelcast implementation (#11414) #11493

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dotCMS/build-aws-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ sed -i "s,{password},$DB_PASSWORD,g" dotserver/tomcat/webapps/ROOT/META-INF/cont
sed -i "s,{valquery},$DB_VALIDATION_QUERY,g" dotserver/tomcat/webapps/ROOT/META-INF/context.xml

sed -i "s,dotCMSContentIndex,$ESCLUSTER,g" dotserver/tomcat/webapps/ROOT/WEB-INF/classes/dotcms-config-cluster.properties
sed -i "s,CLUSTER_AUTOWIRE=true,CLUSTER_AUTOWIRE=false,g" dotserver/tomcat/webapps/ROOT/WEB-INF/classes/dotcms-config-cluster.properties
sed -i "s,AUTOWIRE_CLUSTER_TRANSPORT=true,AUTOWIRE_CLUSTER_TRANSPORT=false,g" dotserver/tomcat/webapps/ROOT/WEB-INF/classes/dotcms-config-cluster.properties
sed -i "s,AUTOWIRE_CLUSTER_ES=true,AUTOWIRE_CLUSTER_ES=false,g" dotserver/tomcat/webapps/ROOT/WEB-INF/classes/dotcms-config-cluster.properties

sed -i "s,PUBLISHER_QUEUE_MAX_TRIES=3,PUBLISHER_QUEUE_MAX_TRIES=1,g" dotserver/tomcat/webapps/ROOT/WEB-INF/classes/dotmarketing-config.properties

Expand All @@ -66,7 +67,10 @@ sed -i "s,^es.path.work *=.*$,es.path.work=$PWD/dotserver/tomcat/webapps/ROOT/do
sed -i "s,^es.path.repo *=.*$,es.path.repo=$PWD/dotserver/tomcat/webapps/ROOT/dotsecure/esdata/essnapshot/snaphosts,g" core/dotCMS/src/integration-test/resources/it-dotcms-config-cluster.properties
sed -i "s,^es.path.logs *=.*$,es.path.logs=$PWD/dotserver/tomcat/webapps/ROOT/dotsecure/logs,g" core/dotCMS/src/integration-test/resources/it-dotcms-config-cluster.properties
echo "
CLUSTER_AUTOWIRE=false
AUTOWIRE_CLUSTER_TRANSPORT=false
" >> core/dotCMS/src/integration-test/resources/it-dotcms-config-cluster.properties
echo "
AUTOWIRE_CLUSTER_ES=false
" >> core/dotCMS/src/integration-test/resources/it-dotcms-config-cluster.properties

sed -i "s,^ASSET_REAL_PATH *=.*$,ASSET_REAL_PATH=$PWD/dotserver/tomcat/webapps/ROOT/assets,g" core/dotCMS/src/integration-test/resources/it-dotmarketing-config.properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static void prepare() throws Exception{

@Test
public void serializeTest() throws Exception {
CacheLocator.getCacheAdministrator().flushAlLocalOnly();
CacheLocator.getCacheAdministrator().flushAlLocalOnly(true);

VelocityEngine engine=VelocityUtil.getEngine();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.dotcms.cluster.bean.Server;
import com.dotcms.cluster.business.HazelcastUtil;
import com.dotcms.cluster.business.HazelcastUtil.HazelcastInstanceType;
import com.dotcms.repackage.org.apache.struts.Globals;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.business.CacheLocator;
Expand All @@ -11,35 +12,44 @@
import com.dotmarketing.exception.DotRuntimeException;
import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;
import com.hazelcast.config.TopicConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.liferay.portal.struts.MultiMessageResources;

import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Created by jasontesser on 3/28/17.
*/
public class HazelCastCacheTransport implements CacheTransport{
public abstract class AbstractHazelcastCacheTransport implements CacheTransport {

private Map<String, Map<String, Boolean>> cacheStatus;

private final AtomicLong receivedMessages = new AtomicLong(0);
private final AtomicLong receivedBytes = new AtomicLong(0);
private final AtomicLong sentMessages = new AtomicLong(0);
private final AtomicLong sentBytes = new AtomicLong(0);

private final String topicName = "dotCMSClusterCacheInvalidation";
private String topicId;

private final AtomicBoolean isInitialized = new AtomicBoolean(false);

protected abstract HazelcastInstanceType getHazelcastInstanceType();

@Override
public void init(Server localServer) throws CacheTransportException {
Logger.info(this,"Starting Hazelcast Cache Transport");
Logger.debug(this,"Calling HazelUtil to ensure Hazelcast member is up");
HazelcastInstance hazel = new HazelcastUtil().getHazel();
TopicConfig topicConfig = new TopicConfig();
topicConfig.setGlobalOrderingEnabled( false );
topicConfig.setStatisticsEnabled( false );
topicConfig.setName( topicName );
hazel.getConfig().addTopicConfig(topicConfig);

HazelcastInstance hazel = getHazelcastInstance(true);

MessageListener<Object> messageListener = new MessageListener<Object>() {
@Override
public void onMessage( Message<Object> message ) {
Expand All @@ -54,19 +64,23 @@ public void onMessage( Message<Object> message ) {
receive(msg);
}
};
topicId = new HazelcastUtil().getHazel().getTopic(topicName).addMessageListener(messageListener);
topicId = hazel.getTopic(topicName).addMessageListener(messageListener);

isInitialized.set(true);
}

public void receive(String msg){

receivedMessages.addAndGet(1);
receivedBytes.addAndGet(msg.length());

public void receive(String msg){
if ( msg.equals(ChainableCacheAdministratorImpl.TEST_MESSAGE) ) {

Logger.info(this, "Received Message Ping " + new Date());
try {
new HazelcastUtil().getHazel().getTopic(topicName).publish("ACK");
getHazelcastInstance().getTopic(topicName).publish("ACK");
} catch ( Exception e ) {
Logger.error(HazelCastCacheTransport.class, e.getMessage(), e);
Logger.error(AbstractHazelcastCacheTransport.class, e.getMessage(), e);
}

//Handle when other server is responding to ping.
Expand Down Expand Up @@ -128,10 +142,14 @@ public void receive(String msg){

@Override
public void send(String message) throws CacheTransportException {
try {
new HazelcastUtil().getHazel().getTopic(topicName).publish(message);

sentMessages.addAndGet(1);
sentBytes.addAndGet(message.length());

try {
getHazelcastInstance().getTopic(topicName).publish(message);
} catch ( Exception e ) {
Logger.error(HazelCastCacheTransport.class, "Unable to send message: " + e.getMessage(), e);
Logger.error(AbstractHazelcastCacheTransport.class, "Unable to send message: " + e.getMessage(), e);
throw new CacheTransportException("Unable to send message", e);
}
}
Expand All @@ -142,7 +160,7 @@ public void testCluster() throws CacheTransportException {
send(ChainableCacheAdministratorImpl.TEST_MESSAGE);
Logger.info(this, "Sending Ping to Cluster " + new Date());
} catch ( Exception e ) {
Logger.error(HazelCastCacheTransport.class, e.getMessage(), e);
Logger.error(AbstractHazelcastCacheTransport.class, e.getMessage(), e);
throw new CacheTransportException("Error testing cluster", e);
}
}
Expand Down Expand Up @@ -192,7 +210,73 @@ public Map<String, Boolean> validateCacheInCluster(String dateInMillis, int numb

@Override
public void shutdown() throws CacheTransportException {
new HazelcastUtil().getHazel().getTopic(topicName).removeMessageListener(topicId);
if (isInitialized.get()) {
getHazelcastInstance().getTopic(topicName).removeMessageListener(topicId);

isInitialized.set(false);
}
}

protected HazelcastInstance getHazelcastInstance() {
return getHazelcastInstance(false);
}

protected HazelcastInstance getHazelcastInstance(boolean reInitialize) {
return HazelcastUtil.getInstance().getHazel(getHazelcastInstanceType(), reInitialize);
}


@Override
public CacheTransportInfo getInfo() {
HazelcastInstance hazel = getHazelcastInstance();

return new CacheTransportInfo(){
@Override
public String getClusterName() {
return hazel.getName();
}

@Override
public String getAddress() {
return ((InetSocketAddress) hazel.getLocalEndpoint().getSocketAddress()).getHostString();
}

@Override
public int getPort() {
return ((InetSocketAddress) hazel.getLocalEndpoint().getSocketAddress()).getPort();
}


@Override
public boolean isOpen() {
return hazel.getLifecycleService().isRunning();
}

@Override
public int getNumberOfNodes() {
return hazel.getCluster().getMembers().size();
}


@Override
public long getReceivedBytes() {
return receivedBytes.get();
}

@Override
public long getReceivedMessages() {
return receivedMessages.get();
}

@Override
public long getSentBytes() {
return sentBytes.get();
}

@Override
public long getSentMessages() {
return sentMessages.get();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.dotcms.cache.transport;

import com.dotcms.cluster.business.HazelcastUtil.HazelcastInstanceType;

public class HazelcastCacheTransportClient extends AbstractHazelcastCacheTransport {

@Override
protected HazelcastInstanceType getHazelcastInstanceType() {
return HazelcastInstanceType.CLIENT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.dotcms.cache.transport;

import com.dotcms.cluster.business.HazelcastUtil.HazelcastInstanceType;

public class HazelcastCacheTransportEmbedded extends AbstractHazelcastCacheTransport {

@Override
protected HazelcastInstanceType getHazelcastInstanceType() {
return HazelcastInstanceType.EMBEDDED;
}
}
19 changes: 19 additions & 0 deletions dotCMS/src/main/java/com/dotcms/cluster/ClusterUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.dotcms.cluster;

import com.dotmarketing.util.Config;

public class ClusterUtils {

public static boolean isTransportAutoWire(){
return Config.getBooleanProperty("AUTOWIRE_CLUSTER_TRANSPORT", true);
}

public static boolean isESAutoWire(){
return Config.getBooleanProperty("AUTOWIRE_CLUSTER_ES", true);
}

public static boolean isESAutoWireReplicas(){
return isESAutoWire()
&& Config.getBooleanProperty("AUTOWIRE_MANAGE_ES_REPLICAS", true);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.dotcms.cluster.bean;

public enum ServerPort {
CACHE_PORT("cache_port", "CACHE_BINDPORT", "7800"),
CACHE_PORT("cache_port", "CACHE_BINDPORT", "5701"),
ES_TRANSPORT_TCP_PORT("es_transport_tcp_port", "es.transport.tcp.port", "9300"),
ES_HTTP_PORT("es_http_port", "es.http.port", "9200");

Expand Down
Loading