- 🚀 开箱即用
- 🍄 延时队列
- 🔆 ACK机制
- 📦 异步通信
- 🎨 消息故障修复
- 🌕 死信队列处理
- 🌪️ 消息、消费组、消费者监控管理
- 💫 灵活接口幂等控制
- 🪐 支持redis单机、主从、集群
- ..........(待续)
- redis v6.0.0+
- springboot v2.6.5
- jdk 1.8+
- ......
....(待续)
##此版本还未有监控页面
<dependency>
<groupId>io.github.disaster1-tesk</groupId>
<artifactId>fast-mq-core</artifactId>
<version>1.3.0</version>
</dependency>
注入FastMQTemplate即可使用
public class FastMQTemplateTest extends BaseTest {
@Autowired
private FastMQTemplate fastMQTemplate;
@Test
public void sendMsgTest() {
HashMap<String, Object> msg = Maps.newHashMap();
msg.put("name", "disaster");
msg.put("age", 20);
fastMQTemplate.sendMsgAsync("disaster_topic", msg);
fastMQTemplate.sendMsgAsync("disaster_topic", msg);
fastMQTemplate.sendMsgAsync(FastMQConstant.DEFAULT_TOPIC, msg);
while (true){
}
}
}
/**
* 不使用注解,则使用框架默认的topic和consumername
*
*/
@Service
@Slf4j
public class FastMQConsumerTest implements FastMQListener {
@Override
public void onMessage(Object o) {
log.info("result = {}", o);
}
}
/**
* 使用注解可指定topic和consumername,同时还支持接口幂等处理
*
*/
@Service
@FastMQMessageListener(idempotent = true,groupName = "disaster",consumeName = "disaster1",topic = "disaster_topic", readSize = 0)
@Slf4j
public class FastMQConsumerAnnotationTest implements FastMQListener{
@Override
public void onMessage(Object t) {
log.info("result = {}", t);
}
}
注入FastMQTemplate即可使用
public class FastMQDelayTemplateTest extends BaseTest {
@Autowired
private FastMQDelayTemplate fastMQDelayTemplate;
@Test
public void sendMsgTest() throws InterruptedException {
Thread.sleep(2000l);
fastMQDelayTemplate.msgEnQueue("hello", 20, null, TimeUnit.SECONDS);
while (true) {
}
}
}
/**
* 不使用注解则使用框架默认队列名和线程池
*/
@Service
@Slf4j
public class FastMQDelayConsumerTest implements FastMQDelayListener {
@Override
public void onMessage(Object t) throws Throwable {
log.info("result = {}", t);
}
}
/**
* 使用注解可自定义队列名称与线程池
*/
@FastMQDelayMessageListener(queueName = "test",executorName = "test_executor")
@Service
@Slf4j
public class FastMQDelayConsumerAnnotationTest implements FastMQDelayListener {
@Override
public void onMessage(Object t) throws Throwable {
log.info("result = {}", t);
}
}
fast-mq支持通过YAML配置Redission单机、主从、集群
## 单机版本
redisson:
server:
host: 127.0.0.1
port: 6379
database: 0
deployment: stand_alone
## 主从版本
redisson:
server:
host: 127.0.0.1
port: 6379
database: 0
nodes: 127.0.0.1:xxx,127.0.0.1:xxx,127.0.0.1:xxx
master: mymaster
deployment: master_slave
## 集群
server:
host: 127.0.0.1
port: 6379
database: 0
nodes: 127.0.0.1:xxx,127.0.0.1:xxx,127.0.0.1:xxx
deployment: cluster
如果不想使用fast-mq提供的Redission-YAML配置,则只需要在springboot项目中实例化一个RedissonClient对象并被spring管理即可
@Configuration
public class RedissionConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://" + "127.0.0.1:6379");
singleServerConfig.setDatabase(1);
singleServerConfig.setPassword("123456");
return Redisson.create(config);
}
}
fastmq:
config:
#是否开启fastmq
enable: false
# 每次拉取数据的量
fetchMessageSize: 5
#每次拉取PendingList的大小
pullPendingListSize: 1000
#死信门槛(秒)
deadLetterThreshold: 32
#是否从头开始订阅消息
isStartFromHead: true
#超过了该长度stream前面部分会被持久化(非严格模式——MAXLEN~)
trimThreshold: 10000
#是否是异步
isAsync: false
executor:
#拉取默认主题信息的周期
pullDefaultTopicMessagesPeriod: 10
#检查PendingList周期
pullTopicMessagesPeriod: 1
time-unit: seconds
#第一次延迟执行的时间
initial-delay: 1
#线程池的核心线程数,同步时调此参数能有效提高效率,如果采用的是异步消费的方式,使用默认配置即可
executor-core-size: 20
claim:
#认领门槛(单位毫秒)
claimThreshold: 20
time-unit: milliseconds
idle:
#检查consumer不活跃的门槛(单位秒)
pendingListIdleThreshold: 10
time-unit: seconds