十年河东,十年河西,莫欺少年穷
学无止境,精益求精
近几天一直都在看阿里云的IOT云服务及消息队列MNS,一头雾水好几天了,直到今天,总算有点收获了,记录下来,方便以后查阅。
首先借用阿里云的一张图来说明:设备是如何通过云服务平台和企业服务器‘通话的’
针对此图,作如下说明:
1、物联网平台作为中间组件,主要是通过消息队MNS列来实现设备和企业服务器对话的,具体可描述为:
1.1、设备发送指令至物联网平台的MNS队列,MNS队列将设备指令收录,需要说明的是:设备发送指令是通过嵌入式开发人员开发的,例如C语言
1.2、企业通过C#、JAVA、PHP等高级语言开发人员开发监听程序,当监听到MNS队列中的设备指令时,获取指令,做相关业务处理,并发送新的设备指令至MNS队列。【例如发送快递柜关门的指令】
1.3、企业发送的指令被MNS收录,设备同样通过监听程序获取企业服务器发送的关门指令,收到关门指令的设备执行相关指令,完成自动关门操作。
以上便是设备与企业服务器之间的对话过程
下面列出C#的监听MNS代码【需要MNS C# JDK 的支持】注意:消息是经过EncodeBase64编码,接受消息要解码,发送消息要编码
异步监听:
using System;
using System.Threading;
using System.Threading.Tasks;
using Aliyun.MNS;
using Aliyun.MNS.Model;
using IotCommon;
using IotDtos.MongodbDtos;
using IotService.Device;
using IotService.MongoDb;
namespace IotListener
{
class Program
{
private static MongoLogService _logService;
public static string _receiptHandle;
public static DeviceResponseService service = new DeviceResponseService();
public static Queue nativeQueue;
static void Main(string[] args)
{
LogstoreDatabaseSettings st = new LogstoreDatabaseSettings() { LogsCollectionName = "LogsForDg_" + DateTime.Now.ToString("yyyyMMdd") };
_logService = new MongoLogService(st);
while (true)
{
try
{
IMNS client = new Aliyun.MNS.MNSClient(IotParm._accessKeyId, IotParm._secretAccessKey, IotParm._endpoint, IotParm._stsToken);
nativeQueue = client.GetNativeQueue(IotParm._queueName);
for (int i = 0; i < IotParm._receiveTimes; i++)
{
ReceiveMessageRequest request = new ReceiveMessageRequest(1);
nativeQueue.BeginReceiveMessage(request, ListenerCallback, null);
Thread.Sleep(1);
}
}
catch (Exception ex)
{
Console.WriteLine("Receive message failed, exception info: " + ex.Message);
}
}
}
/// <summary>
/// 回调函数
/// </summary>
/// <param name="ar"></param>
public static void ListenerCallback(IAsyncResult ar)
{
try
{
Message message = nativeQueue.EndReceiveMessage(ar).Message;
string Json = Base64Helper.DecodeBase64(message.Body);
Console.WriteLine("Message: {0}", Json);
Console.WriteLine("----------------------------------------------------\n");
var methodValue = JsonKeyHelper.GetJsonValue(Json, "method");
DeviceResponse(methodValue, Json);
if (!string.IsNullOrEmpty(methodValue))
{
_logService.Create(new LogsForDgModel { CreateTime = DateTime.Now, data = Json, methodNo = methodValue });
}
_receiptHandle = message.ReceiptHandle;
nativeQueue.DeleteMessage(_receiptHandle);
}
catch (Exception ex)
{
Console.WriteLine("Receive message failed, exception info: " + ex.Message);
}
}
/// <summary>
/// 响应设备上传接口
/// </summary>
/// <param name="method"></param>
/// <param name="message"></param>
public static void DeviceResponse(string method, string message)
{
switch (method)
{
case "doorClosedReport": service.doorClosedReportResponse(message); break;
case "doorOpenReport": service.doorOpenReportResponse(message); break;
case "deviceStartReportToCloud": service.deviceStartReportToCloudResponse(message); break;
case "qryDeviceConfig": service.qryDeviceConfigResponse(message); break;
case "devicePingToCloud": service.devicePingToCloudResponse(message); break;
case "deviceFatalReport": service.deviceFatalReportResponse(message); break;
case "deviceVersionReport": service.deviceVersionReportResponse(message); break;
case "deviceFirmwareData": service.deviceFirmwareDataResponse(message); break;
case "deviceLocationReport": service.deviceLocationReportResponse(message); break;
}
}
}
}
View Code
同步监听:
using Aliyun.MNS;
using Aliyun.MNS.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MnsListener
{
class Program
{
#region Private Properties
private const string _accessKeyId = "";
private const string _secretAccessKey = "";
private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";
private const string _stsToken = null;
private const string _queueName = "Sub";
private const string _queueNamePrefix = "my";
private const int _receiveTimes = 1;
private const int _receiveInterval = 2;
private const int batchSize = 6;
private static string _receiptHandle;
#endregion
static void Main(string[] args)
{
while (true)
{
try
{
IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);
var nativeQueue = client.GetNativeQueue(_queueName);
for (int i = 0; i < _receiveTimes; i++)
{
var receiveMessageResponse = nativeQueue.ReceiveMessage(3);
Console.WriteLine("Receive message successfully, status code: {0}", receiveMessageResponse.HttpStatusCode);
Console.WriteLine("----------------------------------------------------");
Message message = receiveMessageResponse.Message;
string s = DecodeBase64(message.Body);
Console.WriteLine("MessageId: {0}", message.Id);
Console.WriteLine("ReceiptHandle: {0}", message.ReceiptHandle);
Console.WriteLine("MessageBody: {0}", message.Body);
Console.WriteLine("MessageBodyMD5: {0}", message.BodyMD5);
Console.WriteLine("EnqueueTime: {0}", message.EnqueueTime);
Console.WriteLine("NextVisibleTime: {0}", message.NextVisibleTime);
Console.WriteLine("FirstDequeueTime: {0}", message.FirstDequeueTime);
Console.WriteLine("DequeueCount: {0}", message.DequeueCount);
Console.WriteLine("Priority: {0}", message.Priority);
Console.WriteLine("----------------------------------------------------\n");
_receiptHandle = message.ReceiptHandle;
nativeQueue.DeleteMessage(_receiptHandle);
Thread.Sleep(_receiveInterval);
}
}
catch (Exception ex)
{
Console.WriteLine("Receive message failed, exception info: " + ex.Message);
}
}
}
///编码
public static string EncodeBase64(string code, string code_type= "utf-8")
{
string encode = "";
byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);
try
{
encode = Convert.ToBase64String(bytes);
}
catch
{
encode = code;
}
return encode;
}
///解码
public static string DecodeBase64(string code, string code_type = "utf-8")
{
string decode = "";
byte[] bytes = Convert.FromBase64String(code);
try
{
decode = Encoding.GetEncoding(code_type).GetString(bytes);
}
catch
{
decode = code;
}
return decode;
}
}
}
View Code
发送消息:
using Aliyun.MNS;
using Aliyun.MNS.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MnsSendMsg
{
class Program
{
#region Private Properties
private const string _accessKeyId = "";
private const string _secretAccessKey = "";
private const string _endpoint = "http://.mns.cn-shanghai.aliyuncs.com/";
private const string _stsToken = null;
private const string _queueName = "Sub";
private const string _queueNamePrefix = "my";
private const int _receiveTimes = 1;
private const int _receiveInterval = 2;
private const int batchSize = 6;
private static string _receiptHandle;
#endregion
static void Main(string[] args)
{
try
{
IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint, _stsToken);
// 1. 获取Queue的实例
var nativeQueue = client.GetNativeQueue(_queueName);
var sendMessageRequest = new SendMessageRequest(EncodeBase64("阿里云<MessageBody>计算"));
sendMessageRequest.DelaySeconds = 2;
var sendMessageResponse = nativeQueue.SendMessage(sendMessageRequest);
Console.WriteLine("Send message successfully,{0}",
sendMessageResponse.ToString());
Thread.Sleep(2000);
}
catch (Exception ex)
{
Console.WriteLine("Send message failed, exception info: " + ex.Message);
}
}
///编码
public static string EncodeBase64(string code, string code_type = "utf-8")
{
string encode = "";
byte[] bytes = Encoding.GetEncoding(code_type).GetBytes(code);
try
{
encode = Convert.ToBase64String(bytes);
}
catch
{
encode = code;
}
return encode;
}
///解码
public static string DecodeBase64(string code, string code_type = "utf-8")
{
string decode = "";
byte[] bytes = Convert.FromBase64String(code);
try
{
decode = Encoding.GetEncoding(code_type).GetString(bytes);
}
catch
{
decode = code;
}
return decode;
}
}
}
View Code
关于MNS C# JDK下载,可以去阿里云:https://help.aliyun.com/document_detail/32447.html?spm=a2c4g.11186623.6.633.61395f64IfHTRo
关于MNS队列,主题,主题订阅相关知识:https://help.aliyun.com/document_detail/34445.html?spm=a2c4g.11186623.6.542.699f38c6RO3nDS
关于阿里云AMQP队列接入,可以查询:https://help.aliyun.com/document_detail/149716.html?spm=a2c4g.11186623.6.621.2cda31b4kS1zXR
关于阿里云物联网平台,请查阅:https://help.aliyun.com/document_detail/125800.html?spm=a2c4g.11186623.6.542.7b0241c8o5r6PT
最后:阿里云物联网平台
@天才卧龙的博客