Skip to content

Commit

Permalink
merge from branch 4.9.1
Browse files Browse the repository at this point in the history
  • Loading branch information
yongfeigao authored and yongfeigao committed Nov 11, 2021
1 parent 34e833f commit bf212d2
Show file tree
Hide file tree
Showing 135 changed files with 4,810 additions and 1,683 deletions.
2 changes: 1 addition & 1 deletion mq-client-common-open/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.sohu.tv</groupId>
<artifactId>mq</artifactId>
<version>4.7.2</version>
<version>4.9.1</version>
</parent>

<artifactId>mq-client-common-open</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
* @param <T> msg obj
* @param MessageExt
*/
public interface BatchConsumerCallback<T> {
public interface BatchConsumerCallback<T, C> {

/**
* 订阅回调方法
*
* @return
* @param batchMessage
* @param context @ConsumeConcurrentlyContext or @ConsumeOrderlyContext
* @throws Exception
*/
void call(List<MQMessage<T>> batchMessage) throws Exception;
void call(List<MQMessage<T>> batchMessage, C context) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.sohu.index.tv.mq.common;

import java.nio.ByteBuffer;

import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;

/**
* 批量消息
Expand All @@ -15,12 +13,27 @@
* @param <MessageExt>
*/
public class MQMessage<T> {

public static final String IDEMPOTENT_ID = "IDEMPOTENT_ID";

// 发送的原始对象
private T message;
private MessageExt messageExt;

public MQMessage(T message, MessageExt messageExt) {
// 可以重试的次数
private int retryTimes = -1;

// rocketmq 消息
private Message innerMessage;

// 发送异常,测试用
private boolean exceptionForTest;

public MQMessage() {
}

public MQMessage(T message, Message innerMessage) {
this.message = message;
this.messageExt = messageExt;
this.innerMessage = innerMessage;
}

public T getMessage() {
Expand All @@ -32,22 +45,129 @@ public void setMessage(T message) {
}

public MessageExt getMessageExt() {
return messageExt;
return (MessageExt) innerMessage;
}

public void setMessageExt(MessageExt messageExt) {
this.messageExt = messageExt;
setInnerMessage(messageExt);
}


public Message getInnerMessage() {
return innerMessage;
}

public void setInnerMessage(Message innerMessage) {
this.innerMessage = innerMessage;
}

public MQMessage<T> setKeys(String keys) {
innerMessage.setKeys(keys);
return this;
}

public String getKeys() {
return innerMessage.getKeys();
}

public String getTags() {
return innerMessage.getTags();
}

public MQMessage<T> setTags(String tags) {
innerMessage.setTags(tags);
return this;
}

public MQMessage<T> setDelayTimeLevel(int level) {
innerMessage.setDelayTimeLevel(level);
return this;
}

public int getDelayTimeLevel() {
return innerMessage.getDelayTimeLevel();
}

public byte[] getBody() {
return innerMessage.getBody();
}

public MQMessage<T> setBody(byte[] body) {
innerMessage.setBody(body);
return this;
}

public boolean isWaitStoreMsgOK() {
return innerMessage.isWaitStoreMsgOK();
}

public MQMessage<T> setWaitStoreMsgOK(boolean waitStoreMsgOK) {
innerMessage.setWaitStoreMsgOK(waitStoreMsgOK);
return this;
}

public MQMessage<T> setTopic(String topic) {
innerMessage.setTopic(topic);
return this;
}

public String getTopic() {
return innerMessage.getTopic();
}

public int getRetryTimes() {
return retryTimes;
}

public MQMessage<T> setRetryTimes(int retryTimes) {
this.retryTimes = retryTimes;
return this;
}

public MQMessage<T> resetRetryTimes(int retryTimes) {
if (this.retryTimes == -1) {
this.retryTimes = retryTimes;
}
return this;
}

/**
* 构建offsetMsgId
*
* @return
*/
public String buildOffsetMsgId() {
int msgIdLength = (messageExt.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8
: 16 + 4 + 8;
ByteBuffer byteBufferMsgId = ByteBuffer.allocate(msgIdLength);
return MessageDecoder.createMessageId(byteBufferMsgId, messageExt.getStoreHostBytes(),
messageExt.getCommitLogOffset());
return innerMessage instanceof MessageClientExt ? ((MessageClientExt)innerMessage).getOffsetMsgId() : null;
}

public static <T> MQMessage<T> build(T message) {
MQMessage<T> mqMessage = new MQMessage<>();
mqMessage.setMessage(message);
mqMessage.innerMessage = new Message();
mqMessage.setWaitStoreMsgOK(true);
return mqMessage;
}

/**
* 设置幂等id
*
* @param idempotentId
*/
public MQMessage<T> setIdempotentID(String idempotentId) {
innerMessage.putUserProperty(IDEMPOTENT_ID, idempotentId);
return this;
}

public MQMessage<T> setExceptionForTest(boolean exceptionForTest) {
this.exceptionForTest = exceptionForTest;
return this;
}

public boolean isExceptionForTest() {
return exceptionForTest;
}

@Override
public String toString() {
return "[topic=" + getTopic() + ", message=" + message + ", retryTimes=" + retryTimes + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ public class Result<T> implements Serializable {
/**
* 异常信息
*/
private Exception exception;
private Throwable exception;

// 正在重试
private boolean retrying;

// 重试过的次数
private int retriedTimes;

private MQMessage<?> mqMessage;

public Result(boolean isSuccess) {
this.isSuccess = isSuccess;
Expand All @@ -33,7 +41,7 @@ public Result(boolean isSuccess, T result) {
this.result = result;
}

public Result(boolean isSuccess, Exception exception) {
public Result(boolean isSuccess, Throwable exception) {
this.isSuccess = isSuccess;
this.exception = exception;
}
Expand All @@ -55,15 +63,42 @@ public void setResult(T result) {
}

public Exception getException() {
return exception;
return (Exception) exception;
}

public void setException(Exception exception) {
public void setException(Throwable exception) {
this.exception = exception;
}

public boolean isRetrying() {
return retrying;
}

public Result<T> setRetrying(boolean retrying) {
this.retrying = retrying;
return this;
}

public int getRetriedTimes() {
return retriedTimes;
}

public void setRetriedTimes(int retriedTimes) {
this.retriedTimes = retriedTimes;
}

@SuppressWarnings("unchecked")
public <R> MQMessage<R> getMqMessage() {
return (MQMessage<R>) mqMessage;
}

public void setMqMessage(MQMessage<?> mqMessage) {
this.mqMessage = mqMessage;
}

@Override
public String toString() {
return "Result [isSuccess=" + isSuccess + ", result=" + result + ", exception=" + exception + "]";
return "Result [isSuccess=" + isSuccess + ", result=" + result + ", exception=" + exception + ", retrying="
+ retrying + ", retriedTimes=" + retriedTimes + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,32 @@ public AbstractCommand(String groupKey, String commandKey, int poolSize, int tim
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(poolSize)));
this.alerter = alerter;
}

/**
* 构建(信号量隔离)
*
* @param groupKey
* @param commandKey
* @param timeout 超时时间
*/
public AbstractCommand(String groupKey, String commandKey, int timeout, Alerter alerter) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey + "-semaphore"))
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey + "-semaphore"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(
HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
.withFallbackIsolationSemaphoreMaxConcurrentRequests(100)
.withExecutionIsolationSemaphoreMaxConcurrentRequests(50)
.withExecutionTimeoutInMilliseconds(timeout)));
this.alerter = alerter;
}

protected T run() throws Exception {
try {
return invoke();
} catch (Exception e) {
logger.error("send err! "+getCommandGroup().name() + "-" +
getCommandKey().name() + ":" +
invokeErrorInfo(),
e);
throw new RuntimeException(e);
logger.error("group:{} command:{} param:{}", getCommandGroup().name(), getCommandKey().name(),
invokeErrorInfo(), e);
throw e;
}
}

Expand All @@ -77,7 +93,7 @@ public T getFallback() {
// 判断熔断器是否打开
if (super.isCircuitBreakerOpen()) {
if (null != alerter) {
String info = "group:" + getCommandGroup().name() + " command:" + getCommandKey().name() + " err!";
String info = "group:" + getCommandGroup().name() + " command:" + getCommandKey().name() + " circuitBreakerOpen!";
alerter.alert(info);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public abstract class AbstractConfig {

// 是否设置了instanceName
protected String instanceName;

protected SohuAsyncTraceDispatcher traceDispatcher;

public AbstractConfig(String group, String topic) {
this.topic = topic;
Expand Down Expand Up @@ -122,12 +124,14 @@ protected void init() {
logger.error("http err, topic:{},group:{}", topic, group, e);
}
if (clusterInfoDTO == null) {
if (clusterInfoDTOResult.getStatus() == 201) {
logger.warn("please register your {}:{} topic:{} in MQCloud first, times:{}",
role() == 1 ? "producer" : "consumer", group, topic, times++);
} else {
logger.warn("fetch topic:{} group:{} cluster info err:{}, times:{}", getTopic(), group,
clusterInfoDTOResult.getMessage(), times++);
if (clusterInfoDTOResult != null) {
if (clusterInfoDTOResult.getStatus() == 201) {
logger.warn("please register your {}:{} topic:{} in MQCloud first, times:{}",
role() == 1 ? "producer" : "consumer", group, topic, times++);
} else {
logger.warn("fetch topic:{} group:{} cluster info err:{}, times:{}", getTopic(), group,
clusterInfoDTOResult.getMessage(), times++);
}
}
try {
Thread.sleep(1000);
Expand Down Expand Up @@ -231,7 +235,7 @@ protected void initTrace() {
TraceRocketMQProducer traceRocketMQProducer = new TraceRocketMQProducer(
CommonUtil.buildTraceTopicProducer(traceTopic), traceTopic);
// 初始化TraceDispatcher
SohuAsyncTraceDispatcher traceDispatcher = new SohuAsyncTraceDispatcher(traceTopic);
traceDispatcher = new SohuAsyncTraceDispatcher(traceTopic);
// 设置producer属性
traceRocketMQProducer.getProducer().setSendMsgTimeout(5000);
traceRocketMQProducer.getProducer().setMaxMessageSize(traceDispatcher.getMaxMsgSize() - 10 * 1000);
Expand Down Expand Up @@ -316,4 +320,10 @@ public void setInstanceName(String instanceName) {
public String getInstanceName() {
return instanceName;
}

public void shutdown(){
if(traceDispatcher != null){
traceDispatcher.shutdown();
}
}
}
Loading

0 comments on commit bf212d2

Please sign in to comment.