Skip to content

Commit

Permalink
java
Browse files Browse the repository at this point in the history
  • Loading branch information
aliyunmq committed Jan 2, 2019
1 parent 3c18c74 commit f443a6b
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 1 deletion.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2018 aliyun.mq
Copyright (c) 2019 aliyun.mq

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
6 changes: 6 additions & 0 deletions java/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.idea/
*.iml
replace.sh
RunConsumer.java
RunProducer.java
target/
19 changes: 19 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk-sample</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>

</project>
93 changes: 93 additions & 0 deletions java/src/main/java/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;

import java.util.ArrayList;
import java.util.List;

public class Consumer {

public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP接入域名(此处以公共云生产环境为例)
"${HTTP_ENDPOINT}",
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${ACCESS_KEY}",
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${SECRET_KEY}"
);

// 所属的 Topic
final String topic = "${TOPIC}";
// 您在控制台创建的 Consumer ID(Group ID)
final String groupId = "${GROUP_ID}";
// Topic所属实例ID,默认实例为空
final String instanceId = "${INSTANCE_ID}";

final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}

// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
do {
List<Message> messages = null;

try {
// 长轮询消费消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
messages = consumer.consumeMessage(
3,// 一次最多消费3条(最多可设置为16条)
3// 长轮询时间3秒(最多可设置为30秒)
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// 没有消息
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}

// 处理业务逻辑
for (Message message : messages) {
System.out.println("Receive message: " + message);
}

// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}

try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// 某些消息的句柄可能超时了会导致确认不成功
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}
57 changes: 57 additions & 0 deletions java/src/main/java/Producer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP接入域名(此处以公共云生产环境为例)
"${HTTP_ENDPOINT}",
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${ACCESS_KEY}",
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${SECRET_KEY}"
);

// 所属的 Topic
final String topic = "${TOPIC}";
// Topic所属实例ID,默认实例为空
final String instanceId = "${INSTANCE_ID}";

// 获取Topic的生产者
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}

try {
// 循环发送100条消息
for (int i = 0; i < 100; i++) {
TopicMessage pubMsg = new TopicMessage(
// 消息内容
"hello mq!".getBytes(),
// 消息标签
"A"
);
// 同步发送消息,只要不抛异常就是成功
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

// 同步发送消息,只要不抛异常就是成功
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}

mqClient.close();
}

}

0 comments on commit f443a6b

Please sign in to comment.