Microsoft Message Queuing

Microsoft Message QueuingMSMQ微軟公司實現的一種消息隊列,始於Windows NT 4Windows 95Windows Server 2016Windows 10仍然包含這種組件。1999年起,Microsoft Embedded平台以及Windows CE 3.0也開始支持這一組件。[1]

簡介

MSMQ作為一種消息協議,允許多伺服器/多進程通信,即使不總是保持互聯。而sockets與其他網絡協議要求直連總是成立。

MSMQ從1997年開始可用[2]

MSMQ是可靠分發消息。分發失敗的消息保存在隊列中直到目標可達時重發該消息。還支持安全與優先級的消息機制。可以創建死信隊列英語Dead letter queue用於調試。

MSMQ支持可持續性與不可持續性消息,使得性能與消息是否寫到磁盤的一致性上可以權衡。不可持續性消息只能用於向非事務性隊列發送快速消息。

MSMQ支持事務處理。允許多個動作發給多個隊列中包裝為單個事務。微軟分佈式事務協調器 (MSDTC) 支持對MSMQ或其他資源的事務訪問。

MSMQ使用下述端口:

  • TCP: 1801
  • RPC: 135, 2101*, 2103*, 2105*
  • UDP: 3527, 1801
  • 這些端口可能增加11,如果RPC端口的初始選擇被使用。端口135用於查詢2xxx端口。[3]

版本歷史

  • Version 1.0 (1997年5月). 支持Windows 95, Windows NT 4.0 SP3, Windows 98Windows Me.
  • Version 2.0, 包含在Windows 2000中.
    • 新特性包括:[4] Support for registering public message queues in Active Directory, 128-bit encryption and digital certificate support, full COM support for message properties (achieving functional parity with the Win32 API function calls, full DNS path name support, improved performance in multi-threaded applications.
  • Version 3.0, 包含在Windows XP (專業版, 非家庭版) 與 Windows Server 2003.
    • 新特性包括:[5] Internet Messaging (referencing queues via HTTP, SOAP-formatted messages, MSMQ support for Internet Information Services), queue aliases, multicasting of messages, and additional support for programmatic maintenance and administration of queues and MSMQ itself.
  • Version 4.0, 包含在Windows VistaWindows Server 2008.
    • 新特性包括:[6] Subqueues,[7] improved support for "poison messages" (messages which continually fail to be processed correctly by the receiver), and support for transactional receives of messages from a remote queue.
  • Version 5.0, 包含在Windows 7Windows Server 2008 R2.
    • 新特性包括:[8] support for Secure Hash Algorithm 2.0 (SHA2) and all advanced hash algorithms that are supported in Windows 2008 R2; by default, weaker hash algorithms are disabled.
  • Version 6.0, 包含在Windows 8Windows Server 2012.
  • Version 6.3, 包含在 Windows 8.1Windows Server 2012 R2.

使用

C#例子:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Messaging;
using System.Text;
using System.Threading.Tasks;

namespace Test
{
    public class QueueManger
    {
        /// <summary>
        /// 创建MSMQ队列
        /// </summary>
        /// <param name="queuePath">队列路径</param>
        /// <param name="transactional">是否事务队列</param>
        public static void Createqueue(string queuePath, bool transactional = false)
        {
            try
            {
                //判断队列是否存在
                if (!MessageQueue.Exists(queuePath))
                {
                    MessageQueue.Create(queuePath);
                    Console.WriteLine(queuePath + "已成功创建!");
                }
                else
                {
                    Console.WriteLine(queuePath + "已经存在!");
                }
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
        }
        /// <summary>
        /// 删除队列
        /// </summary>
        /// <param name="queuePath"></param>
        public static void Deletequeue(string queuePath)
        {
            try
            {
                //判断队列是否存在
                if (MessageQueue.Exists(queuePath))
                {
                    MessageQueue.Delete(@".\private$\myQueue");
                    Console.WriteLine(queuePath + "已删除!");
                }
                else
                {
                    Console.WriteLine(queuePath + "不存在!");
                }
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
        }
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <typeparam name="T">用户数据类型</typeparam>
        /// <param name="target">用户数据</param>
        /// <param name="queuePath">队列名称</param>
        /// <param name="tran"></param>
        /// <returns></returns>
        public static bool SendMessage<T>(T target, string queuePath, MessageQueueTransaction tran = null)
        {
            try
            {
                //连接到本地的队列
                MessageQueue myQueue = new MessageQueue(queuePath);
                System.Messaging.Message myMessage = new System.Messaging.Message();
                myMessage.Body = target;
                myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                //发送消息到队列中
                if (tran == null)
                {
                    myQueue.Send(myMessage);
                }
                else
                {
                    myQueue.Send(myMessage, tran);
                }
                Console.WriteLine("消息已成功发送到"+queuePath + "队列!");
                return true;
            }
            catch (ArgumentException e)
            {
                Console.WriteLine(e.Message);
                return false;
            }
        }
        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T">用户的数据类型</typeparam>
        /// <param name="queuePath">消息路径</param>
        /// <returns>用户填充在消息当中的数据</returns>
        public static T ReceiveMessage<T>(string queuePath,MessageQueueTransaction tran=null)
        {
            //连接到本地队列
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                //从队列中接收消息
                System.Messaging.Message myMessage = tran == null ? myQueue.Receive() : myQueue.Receive(tran);
                return (T)myMessage.Body; //获取消息的内容
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
            catch (InvalidCastException e)
            {
                Console.WriteLine(e.Message);
            }
            return default(T);
        }
        /// <summary>
        /// 采用Peek方法接收消息
        /// </summary>
        /// <typeparam name="T">用户数据类型</typeparam>
        /// <param name="queuePath">队列路径</param>
        /// <returns>用户数据</returns>
        public static T ReceiveMessageByPeek<T>(string queuePath)
        {
            //连接到本地队列
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                //从队列中接收消息
                System.Messaging.Message myMessage = myQueue.Peek();
                return (T)myMessage.Body; //获取消息的内容
            }
            catch (MessageQueueException e)
            {
                Console.WriteLine(e.Message);
            }
            catch (InvalidCastException e)
            {
                Console.WriteLine(e.Message);
            }
            return default(T);
        }
        /// <summary>
        /// 获取队列中的所有消息
        /// </summary>
        /// <typeparam name="T">用户数据类型</typeparam>
        /// <param name="queuePath">队列路径</param>
        /// <returns>用户数据集合</returns>
        public static List<T> GetAllMessage<T>(string queuePath)
        {
            MessageQueue myQueue = new MessageQueue(queuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                Message[] msgArr=  myQueue.GetAllMessages();
                List<T> list=new List<T>();
                msgArr.ToList().ForEach((o) => 
                {
                    list.Add((T)o.Body);
                });
                return list;
            }
            catch(Exception e)
            {
                Console.WriteLine(e.Message);
            }
            return null;
        }
    }
}

namespace Test
{
    public class Student
    {
        /// <summary>
        /// 年龄
        /// </summary>
        public int Age { get; set; }
        /// <summary>
        /// 姓名
        /// </summary>
        public string Name { get; set; }
    }
}

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            string queuepath = @".\private$\myQueue";
            //QueueManger.Createqueue(queuepath);
            //Student stu = new Student() { Name="shaoshun",Age=18};
            //QueueManger.SendMessage<Student>(stu, queuepath);
            //Student stu=  QueueManger.ReceiveMessageByPeek<Student>(queuepath);
            //Student stu = QueueManger.ReceiveMessage<Student>(queuepath);
            //Console.WriteLine(stu.Name);

            QueueManger.Deletequeue(queuepath);
            QueueManger.Createqueue(queuepath);
            MessageQueueTransaction tran = new MessageQueueTransaction();
            tran.Begin();
            try
            {
                Student stu;
                for (int i = 0; i < 4; i++)
                {
                    stu=new Student(){Name="shaoshun"+i,Age=i};
                    QueueManger.SendMessage<Student>(stu, queuepath, tran);
                    if (i == 3)
                    {
                        throw new Exception();
                    }
                }
                tran.Commit();
            }
            catch
            {
                tran.Abort();
            }
            Console.ReadKey();
        }
    }
}

C語言調用Windows API例子:

#include "windows.h"  
#include "mq.h"  
#include "tchar.h"  
#define BUFLEN = 256 ;  
  
HRESULT CreateMSMQQueue(  
                        LPWSTR wszPathName,   
                        PSECURITY_DESCRIPTOR pSecurityDescriptor,  
                        LPWSTR wszOutFormatName,  
                        DWORD *pdwOutFormatNameLength  
                        )  
{  
  
  // Define the maximum number of queue properties.  
  const int NUMBEROFPROPERTIES = 2;  
  
  // Define a queue property structure and the structures needed to initialize it.  
  MQQUEUEPROPS   QueueProps;  
  MQPROPVARIANT  aQueuePropVar[NUMBEROFPROPERTIES];  
  QUEUEPROPID    aQueuePropId[NUMBEROFPROPERTIES];  
  HRESULT        aQueueStatus[NUMBEROFPROPERTIES];  
  HRESULT        hr = MQ_OK;  
  
  // Validate the input parameters.  
  if (wszPathName == NULL || wszOutFormatName == NULL || pdwOutFormatNameLength == NULL)  
  {  
    return MQ_ERROR_INVALID_PARAMETER;  
  }  
  
  // Set queue properties.  
  DWORD cPropId = 0;  
  aQueuePropId[cPropId] = PROPID_Q_PATHNAME;  
  aQueuePropVar[cPropId].vt = VT_LPWSTR;  
  aQueuePropVar[cPropId].pwszVal = wszPathName;  
  cPropId++;  
  
  WCHAR wszLabel[MQ_MAX_Q_LABEL_LEN] = L"Test Queue";  
  aQueuePropId[cPropId] = PROPID_Q_LABEL;  
  aQueuePropVar[cPropId].vt = VT_LPWSTR;  
  aQueuePropVar[cPropId].pwszVal = wszLabel;  
  cPropId++;  
  
  // Initialize the MQQUEUEPROPS structure.  
  QueueProps.cProp = cPropId;               // Number of properties  
  QueueProps.aPropID = aQueuePropId;        // IDs of the queue properties  
  QueueProps.aPropVar = aQueuePropVar;      // Values of the queue properties  
  QueueProps.aStatus = aQueueStatus;        // Pointer to the return status  
  
  // Call MQCreateQueue to create the queue.  
  WCHAR wszFormatNameBuffer[BUFLEN];  
  DWORD dwFormatNameBufferLength = BUFLEN;  
  hr = MQCreateQueue(pSecurityDescriptor,         // Security descriptor  
                     &QueueProps,                 // Address of queue property structure  
                     wszFormatNameBuffer,         // Pointer to format name buffer  
                     &dwFormatNameBufferLength);  // Pointer to receive the queue's format name length in Unicode characters not bytes.  
  
  // Return the format name if the queue is created successfully.  
  if (hr == MQ_OK || hr == MQ_INFORMATION_PROPERTY)  
  {  
    if (*pdwOutFormatNameLength >= dwFormatNameBufferLength)  
    {  
      wcsncpy_s(wszOutFormatName, *pdwOutFormatNameLength - 1, wszFormatNameBuffer, _TRUNCATE);  
      // ************************************  
      // You must copy wszFormatNameBuffer into the   
      // wszOutFormatName buffer.  
      // ************************************  
      wszOutFormatName[*pdwOutFormatNameLength - 1] = L'\0';  
      *pdwOutFormatNameLength = dwFormatNameBufferLength;  
    }  
    else  
    {  
      wprintf(L"The queue was created, but its format name cannot be returned.\n");  
    }  
  }  
  return hr;  
}

參見

參考文獻

  1. ^ Microsoft Windows CE 3.0 Message Queuing Service. Microsoft Developer Network. [2009-11-25]. (原始內容存檔於2018-10-11). 
  2. ^ InformationWeek News Connects The Business Technology Community. Informationweek.com (2014-02-04). Retrieved on 2014-02-22. (頁面存檔備份,存於互聯網檔案館
  3. ^ TCP ports, UDP ports, and RPC ports that are used by Message Queuing頁面存檔備份,存於互聯網檔案館). Support.microsoft.com (2011-09-28). Retrieved on 2014-02-22.
  4. ^ New Features for Windows 2000. Message Queuing (MSMQ). Microsoft Developer Network. [2006-08-05]. [永久失效連結]
  5. ^ New Features for Windows XP and the Windows 2003 Family. Message Queuing (MSMQ). Microsoft Developer Network. [2006-08-05]. (原始內容存檔於2007-12-24). 
  6. ^ What's New in Message Queuing 4.0. Message Queuing (MSMQ). Microsoft Developer Network. [2006-08-05]. (原始內容存檔於2007-02-12). 
  7. ^ Sub-queues in MSMQ 4.0. [2018-10-11]. (原始內容存檔於2008-01-30). 
  8. ^ What's New in Message Queuing 5.0. Message Queuing (MSMQ). Microsoft TechNet. [2006-08-05]. (原始內容存檔於2017-08-26). 

外部連結