Skip to content

Commit

Permalink
103 feature samples
Browse files Browse the repository at this point in the history
  • Loading branch information
aliyunmq committed Sep 3, 2020
1 parent 4d236fa commit 2b8d03a
Show file tree
Hide file tree
Showing 22 changed files with 873 additions and 31 deletions.
10 changes: 1 addition & 9 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,5 @@ include
include/
lib
lib/
consumer
producer
trans_producer
run_consumer
run_producer
run_trans_producer
run_consumer.cpp
run_producer.cpp
run_trans_producer.cpp
run_*
replace.sh
110 changes: 110 additions & 0 deletions cpp/order_consumer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

MQClient mqClient(
// 设置HTTP接入域名(此处以公共云生产环境为例)
"${HTTP_ENDPOINT}",
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${ACCESS_KEY}",
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${SECRET_KEY}"
);

// 所属的 Topic
string topic = "${TOPIC}";
// 您在控制台创建的 Consumer ID(Group ID)
string groupId = "${GROUP_ID}";
// Topic所属实例ID,默认实例为空
string instanceId = "${INSTANCE_ID}";

MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}

do {
try {
std::vector<Message> messages;
// 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
// 只有所有消息确认消费成功才能消费下一批消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
consumer->consumeMessageOrderly(
3,//一次最多消费3条(最多可设置为16条)
3,//长轮询时间3秒(最多可设置为30秒)
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;

// 处理消息
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " ShardingKey: " << iter->getShardingKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}

// 确认消息消费成功
// Message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// 某些消息的句柄可能超时了会导致确认不成功
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}

} while(true);
}
55 changes: 55 additions & 0 deletions cpp/order_producer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


int main() {

MQClient mqClient(
// 设置HTTP接入域名(此处以公共云生产环境为例)
"${HTTP_ENDPOINT}",
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${ACCESS_KEY}",
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${SECRET_KEY}"
);

// 所属的 Topic
string topic = "${TOPIC}";
// Topic所属实例ID,默认实例为空
string instanceId = "${INSTANCE_ID}";

MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}

try {
for (int i = 0; i < 8; i++)
{
PublishMessageResponse pmResp;
TopicMessage pubMsg("Hello, mq!order msg!");
// 设置顺序消息的分区KEY
pubMsg.setShardingKey(std::to_string(i % 2));
pubMsg.putProperty("a",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}

return 0;
}
4 changes: 1 addition & 3 deletions csharp/.gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
.DS_Store
.idea/
replace.sh
run_consumer.cs
run_producer.cs
run_trans_producer.cs
run*
107 changes: 107 additions & 0 deletions csharp/order_consumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
public class OrderConsumerSample
{
// 设置HTTP接入域名(此处以公共云生产环境为例)
private const string _endpoint = "${HTTP_ENDPOINT}";
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
private const string _accessKeyId = "${ACCESS_KEY}";
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
private const string _secretAccessKey = "${SECRET_KEY}";
// 所属的 Topic
private const string _topicName = "${TOPIC}";
// Topic所属实例ID,默认实例为空
private const string _instanceId = "${INSTANCE_ID}";
// 您在控制台创建的 Consumer ID(Group ID)
private const string _groupId = "${GROUP_ID}";

private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

static void Main(string[] args)
{
// 在当前线程循环消费消息,建议是多开个几个线程并发消费消息
while (true)
{
try
{
// 长轮询顺序消费消息, 拿到的消息可能是多个分区的(对于分区顺序)一个分区的内的消息一定是顺序的
// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息
// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息
// 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
List<Message> messages = null;

try
{
messages = consumer.ConsumeMessageOrderly(
3, // 一次最多消费3条(最多可设置为16条)
3 // 长轮询时间3秒(最多可设置为30秒)
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}

if (messages == null)
{
continue;
}

List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
// 处理业务逻辑
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Property a is:" + message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// Message.nextConsumeTime前若不确认消息消费成功,则消息会重复消费
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// 某些消息的句柄可能超时了会导致确认不成功
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}
47 changes: 47 additions & 0 deletions csharp/order_producer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
public class OrderProducerSample
{
// 设置HTTP接入域名(此处以公共云生产环境为例)
private const string _endpoint = "${HTTP_ENDPOINT}";
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
private const string _accessKeyId = "${ACCESS_KEY}";
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
private const string _secretAccessKey = "${SECRET_KEY}";
// 所属的 Topic
private const string _topicName = "${TOPIC}";
// Topic所属实例ID,默认实例为空
private const string _instanceId = "${INSTANCE_ID}";

private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

static void Main(string[] args)
{
try
{
// 循环发送8条消息
for (int i = 0; i < 8; i++)
{
TopicMessage sendMsg = new TopicMessage("dfadfadfadf", "tag");
sendMsg.PutProperty("a", i.ToString());
sendMsg.ShardingKey = (i % 2).ToString();
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publis message success:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}
4 changes: 1 addition & 3 deletions go/.gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
.DS_Store
.idea/
replace.sh
run_consumer.go
run_producer.go
run_trans_producer.go
run*
src/
4 changes: 1 addition & 3 deletions java/.gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
.idea/
*.iml
replace.sh
RunConsumer.java
RunProducer.java
RunTransProducer.java
Run*
target/
4 changes: 2 additions & 2 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
<dependency>
<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk</artifactId>
<version>1.0.1</version>
<classifier>jar-with-dependencies</classifier>
<version>1.0.3</version>
<!--<classifier>jar-with-dependencies</classifier>-->
</dependency>
</dependencies>

Expand Down
Loading

0 comments on commit 2b8d03a

Please sign in to comment.