Skip to content

Commit

Permalink
copy from mybank:
Browse files Browse the repository at this point in the history
1. Update Subscriber: support for push context
2. increase queueSize of checkPushExecutor
3. fix the isolation function of Gzone and Rzone
  • Loading branch information
kezhu.wukz committed Oct 29, 2019
1 parent 86a08b8 commit df07195
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;

import com.alipay.sofa.registry.common.model.ElementType;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import com.fasterxml.jackson.annotation.JsonIgnore;

Expand All @@ -31,16 +32,16 @@
public class Subscriber extends BaseInfo {

/** UID */
private static final long serialVersionUID = 98433360274932292L;
private static final long serialVersionUID = 98433360274932292L;
/** */
private ScopeEnum scope;
private ScopeEnum scope;
/** */
private ElementType elementType;
private ElementType elementType;

/**
* all dataCenter push dataInfo version
* last push context
*/
private Map<String/*dataCenter*/, Long> lastPushVersions = new ConcurrentHashMap<>();
private Map<String/*dataCenter*/, PushContext> lastPushContexts = new ConcurrentHashMap<>();

/**
* Getter method for property <tt>scope</tt>.
Expand Down Expand Up @@ -71,7 +72,11 @@ public ElementType getElementType() {
*/
public boolean checkVersion(String dataCenter, Long version) {

Long oldVersion = lastPushVersions.get(dataCenter);
PushContext lastPushContext = lastPushContexts.get(dataCenter);
if (lastPushContext == null) {
return version != null;
}
Long oldVersion = lastPushContext.pushVersion;
if (oldVersion == null) {
return version != null;
} else {
Expand All @@ -88,15 +93,26 @@ public boolean checkVersion(String dataCenter, Long version) {
* @return
*/
public void checkAndUpdateVersion(String dataCenter, Long version) {
checkAndUpdateVersion(dataCenter, version, -1);
}

/**
* check version input greater or equal to current version
* @param version
* @return
*/
public void checkAndUpdateVersion(String dataCenter, Long version, int pubCount) {

while (true) {
Long oldVersion = lastPushVersions.putIfAbsent(dataCenter, version);
PushContext pushContext = new PushContext(version, pubCount);
PushContext oldPushContext = lastPushContexts.putIfAbsent(dataCenter, pushContext);
// Add firstly
if (oldVersion == null) {
if (oldPushContext == null) {
break;
} else {
if (version > oldVersion) {
if (lastPushVersions.replace(dataCenter, oldVersion, version)) {
if (oldPushContext.pushVersion == null
|| (pushContext.pushVersion != null && pushContext.pushVersion > oldPushContext.pushVersion)) {
if (lastPushContexts.replace(dataCenter, oldPushContext, pushContext)) {
break;
}
} else {
Expand All @@ -106,6 +122,23 @@ public void checkAndUpdateVersion(String dataCenter, Long version) {
}
}

/**
* If the pushed data is empty, check the last push, for avoid continuous empty datum push
*/
public boolean allowPush(String dataCenter, int pubCount) {
boolean allowPush = true;
// condition of no push:
// 1. last push count is 0 and this time is also 0
// 2. last push is a valid push (version > 1)
if (pubCount == 0) {
PushContext pushContext = lastPushContexts.get(dataCenter);
allowPush = !(pushContext != null && pushContext.pushPubCount == 0
//last push is a valid push
&& pushContext.pushVersion != null && pushContext.pushVersion > ValueConstants.DEFAULT_NO_DATUM_VERSION);
}
return allowPush;
}

/**
* Setter method for property <tt>elementType</tt>.
*
Expand All @@ -126,28 +159,10 @@ protected String getOtherInfo() {
final StringBuilder sb = new StringBuilder("scope=");
sb.append(scope).append(",");
sb.append("elementType=").append(elementType).append(",");
sb.append("lastPushVersion=").append(lastPushVersions);
sb.append("pushVersion=").append(lastPushContexts);
return sb.toString();
}

/**
* Getter method for property <tt>lastPushVersions</tt>.
*
* @return property value of lastPushVersions
*/
public Map<String, Long> getLastPushVersions() {
return lastPushVersions;
}

/**
* Setter method for property <tt>lastPushVersions </tt>.
*
* @param lastPushVersions value to be assigned to property lastPushVersions
*/
public void setLastPushVersions(Map<String, Long> lastPushVersions) {
this.lastPushVersions = lastPushVersions;
}

/**
* @see Object#toString()
*/
Expand All @@ -156,7 +171,7 @@ public String toString() {
final StringBuilder sb = new StringBuilder("Subscriber{");
sb.append("scope=").append(scope);
sb.append(", elementType=").append(elementType);
sb.append(", lastPushVersions=").append(lastPushVersions);
sb.append(", lastPushContexts=").append(lastPushContexts);
sb.append(", super=").append(super.toString());
sb.append('}');
return sb.toString();
Expand All @@ -180,4 +195,33 @@ public static Subscriber internSubscriber(Subscriber subscriber) {

return subscriber;
}

static class PushContext {
/**
* last pushed dataInfo version
*/
private Long pushVersion;

/**
* push pushed dataInfo pubCount
*/
private int pushPubCount;

public PushContext(Long pushVersion, int pushPubCount) {
this.pushVersion = pushVersion;
this.pushPubCount = pushPubCount;
}

/**
* @see Object#toString()
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("PushContext{");
sb.append("pushVersion=").append(pushVersion);
sb.append(", pushPubCount=").append(pushPubCount);
sb.append('}');
return sb.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ private void fetchDatum(String targetIp, String dataCenter, String dataInfoId) {
if (response.isSuccess()) {
Datum datum = response.getData().get(dataCenter);


if (datum != null) {
// wrap by WordCache
datum = Datum.internDatum(datum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@
*/
package com.alipay.sofa.registry.server.session.scheduler;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;

import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.metrics.TaskMetrics;
Expand All @@ -28,18 +41,6 @@
import com.alipay.sofa.registry.timer.AsyncHashedWheelTimer.TaskFailedCallback;
import com.alipay.sofa.registry.util.NamedThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
*
Expand Down Expand Up @@ -161,7 +162,7 @@ public ExecutorManager(SessionServerConfig sessionServerConfig) {
.computeIfAbsent(USER_DATA_ELEMENT_PUSH_TASK_CHECK_EXECUTOR, k -> new SessionThreadPoolExecutor(
USER_DATA_ELEMENT_PUSH_TASK_CHECK_EXECUTOR, 100, 600, 60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(100000),
new LinkedBlockingQueue(150000),
new NamedThreadFactory("UserDataElementPushCheck-executor", true)));

connectClientExecutor = reportExecutors.computeIfAbsent(CONNECT_CLIENT_EXECUTOR,k->new SessionThreadPoolExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private void fireReceivedDataMultiPushTask(Datum datum, List<String> subscriberR
// zone scope subscribe only return zone list
return true;

} else if (ScopeEnum.dataCenter == scopeEnum) {
} else if (ScopeEnum.dataCenter == scopeEnum || ScopeEnum.global == scopeEnum) {
// disable zone config
return sessionServerConfig.isInvalidForeverZone(zone) && !sessionServerConfig
.isInvalidIgnored(dataId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
*/
package com.alipay.sofa.registry.server.session.scheduler.task;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;

import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.sessionserver.DataPushRequest;
import com.alipay.sofa.registry.common.model.store.BaseInfo.ClientVersion;
Expand All @@ -33,15 +42,6 @@
import com.alipay.sofa.registry.task.listener.TaskEvent.TaskType;
import com.alipay.sofa.registry.task.listener.TaskListenerManager;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;

/**
*
* @author shangyu.wh
Expand Down Expand Up @@ -154,7 +154,7 @@ private void fireReceivedDataMultiPushTask(Datum datum, List<String> subscriberR
// zone scope subscribe only return zone list
return true;

} else if (ScopeEnum.dataCenter == scopeEnum) {
} else if (ScopeEnum.dataCenter == scopeEnum || ScopeEnum.global == scopeEnum) {
// disable zone config
if (sessionServerConfig.isInvalidForeverZone(zone)
&& !sessionServerConfig.isInvalidIgnored(dataId)) {
Expand Down

0 comments on commit df07195

Please sign in to comment.