在开发过程中,经常会遇到需要处理消息队列中消息的情况。本文将介绍如何在.NET环境下,开发一个用于迁移MSMQ消息队列中消息的工具。这个工具可以帮助在消息发送过程中,由于使用了错误的IP地址导致消息卡在出队列中时,将消息从错误的出队列移动到正确的出队列。
在开始之前,需要对以下几个概念有所了解:
当客户端向WCF服务发送消息时,消息会被序列化。序列化的格式取决于所使用的绑定,本文中使用的是NetMsmqBinding。在WCF协议栈中,可以看到这个绑定的规范是MS-NMFMB,它基于MS-NMF。如果查看一个原始的MSMQ消息,会发现以下结构:
在.NET消息框架头中,包含了版本记录、模式记录和通过记录等信息。其中,通过记录是最重要的部分,它包含了消息发送的目标地址。
要迁移消息,需要执行以下步骤:
以下是使用System.Messaging API加载消息的示例代码:
private void readMessage()
{
try
{
message = sourceQueue.Peek(TimeSpan.FromSeconds(TIMEOUT));
if (message == null)
throw new Exception("cannot peek message, queue might be empty");
}
catch (Exception ex)
{
throw new Exception("failed to read message from queue", ex);
}
}
在这一步中,首先使用Peek方法来查看消息,而不从队列中移除它。这样可以确保在成功发送重定向消息之前,消息仍然保留在队列中。
接下来,需要更改消息中的地址。以下是更改消息的示例代码:
private void alterMessage()
{
byte[] data = new byte[message.BodyStream.Length];
byte[] source = getHost(PREFIX + sourceName);
byte[] dest = getHost(PREFIX + destName);
message.BodyStream.Read(data, 0, data.Length);
int offset = find(data, source);
if (offset == NOT_FOUND)
throw new Exception("cannot find source name in message");
//
get the total string size from the prefix byte which holds the size
byte sourceSize = Convert.ToByte(data[offset - 1]);
byte destSize = Convert.ToByte(sourceSize - (source.Length - dest.Length));
//
prefix the source & dest with a byte that holds the string size
source = prefixSize(source, sourceSize);
dest = prefixSize(dest, destSize);
data = replace(data, source, dest);
//
update framing message address
data = replace(data, source, dest);
//
update wcf channels message address
//
put the altered data back into the message
message.BodyStream = new MemoryStream(data);
}
在这段代码中,首先读取消息体,然后找到源地址,并将其替换为目标地址。同时,还需要更新.NET消息框架头中的地址。
发送消息和移除消息的代码如下:
private void sendMessage()
{
MessageQueue destQueue = new MessageQueue("FormatName:Direct=TCP:" + destName, QueueAccessMode.Send);
destQueue.Send(message, MessageQueueTransactionType.Single);
destQueue.Close();
}
private void removeMessage()
{
sourceQueue.ReceiveByLookupId(message.LookupId);
}