Microsoft Message Queuing
Microsoft Message Queuing或MSMQ微軟公司實現的一種消息隊列,始於Windows NT 4與Windows 95。Windows Server 2016與Windows 10仍然包含這種組件。1999年起,Microsoft Embedded平台以及Windows CE 3.0也開始支持這一組件。[1]
簡介
MSMQ作為一種消息協議,允許多伺服器/多進程通信,即使不總是保持互聯。而sockets與其他網絡協議要求直連總是成立。
MSMQ從1997年開始可用[2]。
MSMQ是可靠分發消息。分發失敗的消息保存在隊列中直到目標可達時重發該消息。還支持安全與優先級的消息機制。可以創建死信隊列用於調試。
MSMQ支持可持續性與不可持續性消息,使得性能與消息是否寫到磁盤的一致性上可以權衡。不可持續性消息只能用於向非事務性隊列發送快速消息。
MSMQ支持事務處理。允許多個動作發給多個隊列中包裝為單個事務。微軟分佈式事務協調器 (MSDTC) 支持對MSMQ或其他資源的事務訪問。
MSMQ使用下述端口:
- TCP: 1801
- RPC: 135, 2101*, 2103*, 2105*
- UDP: 3527, 1801
- 這些端口可能增加11,如果RPC端口的初始選擇被使用。端口135用於查詢2xxx端口。[3]
版本歷史
- Version 1.0 (1997年5月). 支持Windows 95, Windows NT 4.0 SP3, Windows 98 與 Windows Me.
- Version 2.0, 包含在Windows 2000中.
- 新特性包括:[4] Support for registering public message queues in Active Directory, 128-bit encryption and digital certificate support, full COM support for message properties (achieving functional parity with the Win32 API function calls, full DNS path name support, improved performance in multi-threaded applications.
- Version 3.0, 包含在Windows XP (專業版, 非家庭版) 與 Windows Server 2003.
- 新特性包括:[5] Internet Messaging (referencing queues via HTTP, SOAP-formatted messages, MSMQ support for Internet Information Services), queue aliases, multicasting of messages, and additional support for programmatic maintenance and administration of queues and MSMQ itself.
- Version 4.0, 包含在Windows Vista與Windows Server 2008.
- Version 5.0, 包含在Windows 7 與 Windows Server 2008 R2.
- Version 6.0, 包含在Windows 8 與 Windows Server 2012.
- Version 6.3, 包含在 Windows 8.1 與 Windows Server 2012 R2.
使用
C#例子:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;
namespace Test
{
public class QueueManger
{
/// <summary>
/// 创建MSMQ队列
/// </summary>
/// <param name="queuePath">队列路径</param>
/// <param name="transactional">是否事务队列</param>
public static void Createqueue(string queuePath, bool transactional = false)
{
try
{
//判断队列是否存在
if (!MessageQueue.Exists(queuePath))
{
MessageQueue.Create(queuePath);
Console.WriteLine(queuePath + "已成功创建!");
}
else
{
Console.WriteLine(queuePath + "已经存在!");
}
}
catch (MessageQueueException e)
{
Console.WriteLine(e.Message);
}
}
/// <summary>
/// 删除队列
/// </summary>
/// <param name="queuePath"></param>
public static void Deletequeue(string queuePath)
{
try
{
//判断队列是否存在
if (MessageQueue.Exists(queuePath))
{
MessageQueue.Delete(@".\private$\myQueue");
Console.WriteLine(queuePath + "已删除!");
}
else
{
Console.WriteLine(queuePath + "不存在!");
}
}
catch (MessageQueueException e)
{
Console.WriteLine(e.Message);
}
}
/// <summary>
/// 发送消息
/// </summary>
/// <typeparam name="T">用户数据类型</typeparam>
/// <param name="target">用户数据</param>
/// <param name="queuePath">队列名称</param>
/// <param name="tran"></param>
/// <returns></returns>
public static bool SendMessage<T>(T target, string queuePath, MessageQueueTransaction tran = null)
{
try
{
//连接到本地的队列
MessageQueue myQueue = new MessageQueue(queuePath);
System.Messaging.Message myMessage = new System.Messaging.Message();
myMessage.Body = target;
myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
//发送消息到队列中
if (tran == null)
{
myQueue.Send(myMessage);
}
else
{
myQueue.Send(myMessage, tran);
}
Console.WriteLine("消息已成功发送到"+queuePath + "队列!");
return true;
}
catch (ArgumentException e)
{
Console.WriteLine(e.Message);
return false;
}
}
/// <summary>
/// 接收消息
/// </summary>
/// <typeparam name="T">用户的数据类型</typeparam>
/// <param name="queuePath">消息路径</param>
/// <returns>用户填充在消息当中的数据</returns>
public static T ReceiveMessage<T>(string queuePath,MessageQueueTransaction tran=null)
{
//连接到本地队列
MessageQueue myQueue = new MessageQueue(queuePath);
myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
try
{
//从队列中接收消息
System.Messaging.Message myMessage = tran == null ? myQueue.Receive() : myQueue.Receive(tran);
return (T)myMessage.Body; //获取消息的内容
}
catch (MessageQueueException e)
{
Console.WriteLine(e.Message);
}
catch (InvalidCastException e)
{
Console.WriteLine(e.Message);
}
return default(T);
}
/// <summary>
/// 采用Peek方法接收消息
/// </summary>
/// <typeparam name="T">用户数据类型</typeparam>
/// <param name="queuePath">队列路径</param>
/// <returns>用户数据</returns>
public static T ReceiveMessageByPeek<T>(string queuePath)
{
//连接到本地队列
MessageQueue myQueue = new MessageQueue(queuePath);
myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
try
{
//从队列中接收消息
System.Messaging.Message myMessage = myQueue.Peek();
return (T)myMessage.Body; //获取消息的内容
}
catch (MessageQueueException e)
{
Console.WriteLine(e.Message);
}
catch (InvalidCastException e)
{
Console.WriteLine(e.Message);
}
return default(T);
}
/// <summary>
/// 获取队列中的所有消息
/// </summary>
/// <typeparam name="T">用户数据类型</typeparam>
/// <param name="queuePath">队列路径</param>
/// <returns>用户数据集合</returns>
public static List<T> GetAllMessage<T>(string queuePath)
{
MessageQueue myQueue = new MessageQueue(queuePath);
myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
try
{
Message[] msgArr= myQueue.GetAllMessages();
List<T> list=new List<T>();
msgArr.ToList().ForEach((o) =>
{
list.Add((T)o.Body);
});
return list;
}
catch(Exception e)
{
Console.WriteLine(e.Message);
}
return null;
}
}
}
namespace Test
{
public class Student
{
/// <summary>
/// 年龄
/// </summary>
public int Age { get; set; }
/// <summary>
/// 姓名
/// </summary>
public string Name { get; set; }
}
}
namespace Test
{
class Program
{
static void Main(string[] args)
{
string queuepath = @".\private$\myQueue";
//QueueManger.Createqueue(queuepath);
//Student stu = new Student() { Name="shaoshun",Age=18};
//QueueManger.SendMessage<Student>(stu, queuepath);
//Student stu= QueueManger.ReceiveMessageByPeek<Student>(queuepath);
//Student stu = QueueManger.ReceiveMessage<Student>(queuepath);
//Console.WriteLine(stu.Name);
QueueManger.Deletequeue(queuepath);
QueueManger.Createqueue(queuepath);
MessageQueueTransaction tran = new MessageQueueTransaction();
tran.Begin();
try
{
Student stu;
for (int i = 0; i < 4; i++)
{
stu=new Student(){Name="shaoshun"+i,Age=i};
QueueManger.SendMessage<Student>(stu, queuepath, tran);
if (i == 3)
{
throw new Exception();
}
}
tran.Commit();
}
catch
{
tran.Abort();
}
Console.ReadKey();
}
}
}
C語言調用Windows API例子:
#include "windows.h"
#include "mq.h"
#include "tchar.h"
#define BUFLEN = 256 ;
HRESULT CreateMSMQQueue(
LPWSTR wszPathName,
PSECURITY_DESCRIPTOR pSecurityDescriptor,
LPWSTR wszOutFormatName,
DWORD *pdwOutFormatNameLength
)
{
// Define the maximum number of queue properties.
const int NUMBEROFPROPERTIES = 2;
// Define a queue property structure and the structures needed to initialize it.
MQQUEUEPROPS QueueProps;
MQPROPVARIANT aQueuePropVar[NUMBEROFPROPERTIES];
QUEUEPROPID aQueuePropId[NUMBEROFPROPERTIES];
HRESULT aQueueStatus[NUMBEROFPROPERTIES];
HRESULT hr = MQ_OK;
// Validate the input parameters.
if (wszPathName == NULL || wszOutFormatName == NULL || pdwOutFormatNameLength == NULL)
{
return MQ_ERROR_INVALID_PARAMETER;
}
// Set queue properties.
DWORD cPropId = 0;
aQueuePropId[cPropId] = PROPID_Q_PATHNAME;
aQueuePropVar[cPropId].vt = VT_LPWSTR;
aQueuePropVar[cPropId].pwszVal = wszPathName;
cPropId++;
WCHAR wszLabel[MQ_MAX_Q_LABEL_LEN] = L"Test Queue";
aQueuePropId[cPropId] = PROPID_Q_LABEL;
aQueuePropVar[cPropId].vt = VT_LPWSTR;
aQueuePropVar[cPropId].pwszVal = wszLabel;
cPropId++;
// Initialize the MQQUEUEPROPS structure.
QueueProps.cProp = cPropId; // Number of properties
QueueProps.aPropID = aQueuePropId; // IDs of the queue properties
QueueProps.aPropVar = aQueuePropVar; // Values of the queue properties
QueueProps.aStatus = aQueueStatus; // Pointer to the return status
// Call MQCreateQueue to create the queue.
WCHAR wszFormatNameBuffer[BUFLEN];
DWORD dwFormatNameBufferLength = BUFLEN;
hr = MQCreateQueue(pSecurityDescriptor, // Security descriptor
&QueueProps, // Address of queue property structure
wszFormatNameBuffer, // Pointer to format name buffer
&dwFormatNameBufferLength); // Pointer to receive the queue's format name length in Unicode characters not bytes.
// Return the format name if the queue is created successfully.
if (hr == MQ_OK || hr == MQ_INFORMATION_PROPERTY)
{
if (*pdwOutFormatNameLength >= dwFormatNameBufferLength)
{
wcsncpy_s(wszOutFormatName, *pdwOutFormatNameLength - 1, wszFormatNameBuffer, _TRUNCATE);
// ************************************
// You must copy wszFormatNameBuffer into the
// wszOutFormatName buffer.
// ************************************
wszOutFormatName[*pdwOutFormatNameLength - 1] = L'\0';
*pdwOutFormatNameLength = dwFormatNameBufferLength;
}
else
{
wprintf(L"The queue was created, but its format name cannot be returned.\n");
}
}
return hr;
}
參見
參考文獻
- ^ Microsoft Windows CE 3.0 Message Queuing Service. Microsoft Developer Network. [2009-11-25]. (原始內容存檔於2018-10-11).
- ^ InformationWeek News Connects The Business Technology Community. Informationweek.com (2014-02-04). Retrieved on 2014-02-22. (頁面存檔備份,存於互聯網檔案館)
- ^ TCP ports, UDP ports, and RPC ports that are used by Message Queuing (頁面存檔備份,存於互聯網檔案館). Support.microsoft.com (2011-09-28). Retrieved on 2014-02-22.
- ^ New Features for Windows 2000. Message Queuing (MSMQ). Microsoft Developer Network. [2006-08-05].[永久失效連結]
- ^ New Features for Windows XP and the Windows 2003 Family. Message Queuing (MSMQ). Microsoft Developer Network. [2006-08-05]. (原始內容存檔於2007-12-24).
- ^ What's New in Message Queuing 4.0. Message Queuing (MSMQ). Microsoft Developer Network. [2006-08-05]. (原始內容存檔於2007-02-12).
- ^ Sub-queues in MSMQ 4.0. [2018-10-11]. (原始內容存檔於2008-01-30).
- ^ What's New in Message Queuing 5.0. Message Queuing (MSMQ). Microsoft TechNet. [2006-08-05]. (原始內容存檔於2017-08-26).