一、MSMQ简介
MSMQ(微软消息队列)是Windows操作系统中消息应用程序的基础,是用于创建分布式、松散连接的消息通讯应用程序的开发工具。消息队列
和电子邮件有着很多相似处,他们都包含多个属性,用于保存消息,消息类型中都指出发送者和接收者的地址;然而他们的用处却有着很大的
区别:消息队列的发送者和接收者是应用程序,而电子邮件的发送者和接收者通常是人。如同电子邮件一样,消息队列的发送和接收也不需要
发送者和接收者同时在场,可以存储在消息队列或是邮件服务器中。
二、消息队列的安装
默认情况下安装操作系统是不安装消息队列的,你可以在控制面板中找到添加/删除程序,然后选择添加/删除Windows组件一项,然后选择应
用程序服务器,双击它进入详细资料中选择消息队列一项进行安装,如图:
三、消息队列类型
消息对列分为3类:
公共队列
MachineName\QueueName
能被别的机器所访问,如果你的多个项目中用到消息队列,那么你可以把队列定义为公共队列
专用队列
MachineName\Private$\QueueName
只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。
日志队列
MachineName\QueueName\Journal$
四、消息队列的创建
MessageQueue Mq=new MessageQueue(“.\\private$\\Mymq”);
通过Path属性引用消息队列的代码也十分简单:
MessageQueue Mq=new MessageQueue();
Mq.Path=”.\\private$\\Mymq”;
使用 Create 方法可以在计算机上创建队列:
System.Messaging.MessageQueue.Create(@".\private$\Mymq");
这里注意由于在C#中要记住用反斜杠将“\”转义。
由于消息对列所放置的地方经常改变,所以建议消息队列路径不要写死,建议放在配置文件中。
五、消息的发送
消息的发送可以分为简单消息和复杂消息,简单消息类型就是常用的数据类型,例如整型、字符串等数据;复杂消息的数据类型通常对应于系统中的复杂数据类型,例如结构,对象等等。
Mq.Send("Hello!");
在这里建议你可以事先定义一个对象类,然后发送这个对象类的实例对象,这样以后无论在增加什么发送信息,只需在对象类中增加相应的属性即可。
六、消息的接收和阅读
(1)同步接收消息
接收消息的代码很简单:
Mq.Receive();
Mq.Receive(TimeSpan timeout); //设定超时时间
Mq.ReceiveById(ID);
Mq.Peek();
通过Receive方法接收消息同时永久性地从队列中删除消息;
通过Peek方法从队列中取出消息而不从队列中移除该消息。
如果知道消息的标识符(ID),还可以通过ReceiveById方法和PeekById方法完成相应的操作。
(2)异步接受消息
利用委托机制:MessQueue.ReceiveCompleted +=new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
(3)消息阅读
在应用程序能够阅读的消息和消息队列中的消息格式不同,应用程序发送出去的消息经过序列化以后才发送给了消息队列
而在接受端必须反序列化,利用下面的代码可以实现:
public void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
{
System.Messaging.Message m = MessQueue.EndReceive(e.AsyncResult);
m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
Console.WriteLine("Message: " + (string)m.Body);
MessQueue.BeginReceive() ;
}
反序列化还有另一种写法:m.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );
七、由于消息队列的代码有些是固定不便的,所以把这些代码封装成一个类方便以后使用:
以下为引用的内容:
1using System;
2using System.Messaging;
3using System.Threading;
5
6namespace LoveStatusService
7{
8 /**////
9 /// Summary description for Msmq.
10 ///
11 public class Msmq
12 {
13 public Msmq()
14 {
15 //
16 // TODO: Add constructor logic here
17 //
18 }
19
20
21 private MessageQueue _messageQueue=null;
22 //最大并发线程数
23 private static int MAX_WORKER_THREADS=Convert.ToInt32( System.Configuration.ConfigurationSettings.AppSettings["MAX_WORKER_THREADS"].ToString());
24 //Msmq路径
25 private static string MsmqPath=System.Configuration.ConfigurationSettings.AppSettings["LoveStatusMQPath"];
26 //等待句柄
27 private WaitHandle[] waitHandleArray = new WaitHandle[MAX_WORKER_THREADS];
28 //任务类型
29 //1. Send Email 2. Send Message 3. Send Email and Message
30 private string TaskType=System.Configuration.ConfigurationSettings.AppSettings["TaskType"];
31 public MessageQueue MessQueue
32 {
33 get
34 {
35
36 if (_messageQueue==null)
37 {
38 if(MessageQueue.Exists(MsmqPath))
39 {
40 _messageQueue = new MessageQueue(MsmqPath);
41 }
42 else
43 {
44 _messageQueue = MessageQueue.Create(MsmqPath);
45 }
46 }
47
48
49 return _messageQueue;
50 }
51 }
52
53
54 Private Method#region Private Method
55
56 private void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
57 {
58 MessageQueue mqq = (MessageQueue)sender;
59 System.Messaging.Message m = mqq.EndReceive(e.AsyncResult);
60 //m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
61 m.Formatter =new System.Messaging.XmlMessageFormatter(new Type[] {typeof(UserObject)}) ;
62 //log.Info("Receive UserID: " + (string)m.Body) ;
63 UserObject obj=(UserObject)m.Body ;
64 long curUserId=obj.curUserID ;
65 long oppUserId=obj.oppUserID;
66 string curUserName=obj.curUserName;
67 string oppUserName=obj.oppUserName;
68 string curEmail=obj.curEmail ;
69 string oppEmail=obj.oppEmail;
70 string subject =obj.subject ;
71 string body=obj.body ;
72 //AppLog.log.Info("curUserId:"+curUserId) ;
73 //AppLog.log.Info("oppUserId:"+oppUserId) ;
74 AppLog.log.Info("==type="+TaskType) ;
75 switch(TaskType)
76 {
77 //Email
78 case "1":
79 EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
80 AppLog.log.Info("==Send to=="+oppEmail) ;
81 break;
82 //Message
83 case "2":
84 MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
85 AppLog.log.Info("==Send Msg to=="+oppUserId) ;
86 break;
87 //Email and Message
88 case "3":
89 EmailForMQ.SendEmailForLoveStatus(curEmail,oppEmail,curUserName,oppUserName,subject) ;
90 AppLog.log.Info("==Send to=="+oppEmail) ;
91 MessageForMQ.SendMessage(curUserId,oppUserId,subject,body) ;
92 AppLog.log.Info("==Send Msg to=="+oppUserId) ;
93 break;
94 default:
95 break;
96
97 }
98 mqq.BeginReceive() ;
99
100 }
101
102 #endregion
103
104 Public Method#region Public Method
105
106 //一个将对象发送到队列的方法,这里发送的是对象
107 public void SendUserIDToMQ(object arr)
108 {
109 MessQueue.Send(arr) ;
110 Console.WriteLine("Ok") ;
111 Console.Read() ;
112 }
113
114 //同步接受队列内容的方法
115 public void ReceiveFromMQ()
116 {
117 Message ms=new Message() ;
118
119 //ms=MessQueue.Peek();
120 try
121 {
122 ms=MessQueue.Receive(new TimeSpan(0,0,5));
123 if(ms!=null)
124 {
125 ms.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );
126 AppLog.log.Info((string)ms.Body) ;
127 }
128 }
129 catch(Exception ex)
130 {
131
132 }
133
134
135 }
136
137 //开始监听工作线程
138 public void startListen()
139 {
140 AppLog.log.Info("--Thread--"+MAX_WORKER_THREADS) ;
141 MessQueue.ReceiveCompleted +=new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
142
143 //异步方式,并发
144
145 for(int i=0; i
146 {
147 // Begin asynchronous operations.
148 waitHandleArray[i] = MessQueue.BeginReceive().AsyncWaitHandle;
149 }
150
151 AppLog.log.Info("------Start Listen--------") ;
152
153 return;
154
155 }
156
157
158 //停止监听工作线程
159 public void stopListen()
160 {
161
162 for(int i=0;i163 {
164
165 try
166 {
167 waitHandleArray[i].Close();
168 }
169 catch
170 {
171 AppLog.log.Info("---waitHandleArray[i].Close() Error!-----") ;
172 }
173
174 }
175
176 try
177 {
178 // Specify to wait for all operations to return.
179 WaitHandle.WaitAll(waitHandleArray,1000,false);
180 }
181 catch
182 {
183 AppLog.log.Info("---WaitHandle.WaitAll Error!-----") ;
184 }
185 AppLog.log.Info("------Stop Listen--------") ;
186
187 }
188
189 #endregion
190
191
192
193
194 }
195}
196
UserObject的代码
以下为引用的内容:
1using System;
2
3namespace Goody9807
4{
5 /**////
6 /// 用与在MQ上传输数据的对象
7 ///
8 public class UserObject
9 {
10 public UserObject()
11 {
12 //
13 // TODO: Add constructor logic here
14 //
15 }
16
17 private long _curUserID;
18 public long curUserID
19 {
20 get
21 {
22 return _curUserID;
23 }
24 set
25 {
26 _curUserID=value;
27 }
28 }
29
30 private string _curUserName="";
31 public string curUserName
32 {
33 get
34 {
35 return _curUserName;
36 }
37 set
38 {
39 _curUserName=value;
40 }
41 }
42
43 private string _curEmail="";
44 public string curEmail
45 {
46 get
47 {
48 return _curEmail;
49 }
50 set
51 {
52 _curEmail=value;
53 }
54 }
55
56
57 private long _oppUserID;
58 public long oppUserID
59 {
60 get
61 {
62 return _oppUserID;
63 }
64 set
65 {
66 _oppUserID=value;
67 }
68 }
69
70 private string _oppUserName="";
71 public string oppUserName
72 {
73 get
74 {
75 return _oppUserName;
76 }
77 set
78 {
79 _oppUserName=value;
80 }
81 }
82
83 private string _oppEmail="";
84 public string oppEmail
85 {
86 get
87 {
88 return _oppEmail;
89 }
90 set
91 {
92 _oppEmail=value;
93 }
94 }
95
96 private string _subject ="";
97 public string subject
98 {
99 get
100 {
101 return _subject;
102 }
103 set
104 {
105 _subject=value;
106 }
107 }
108
109 private string _body="";
110 public string body
111 {
112 get
113 {
114 return _body;
115 }
116 set
117 {
118 _body=value;
119 }
120 }
121 }
122}
123
另一个同事写的封装类
以下为引用的内容:
1using System;
2
3using System.Threading;
4
5using System.Messaging;
6
7
8
9namespace Wapdm.SmsApp
10
11{
12
13 /**////
14
15 ///
16
17 /// A Logger implementation that writes messages to a message queue.
18
19 /// The default event formatter used is an instance of XMLEventFormatter
20
21 ///
22
23 ///
24
25 public sealed class MsgQueue
26
27 {
28
29
30
31 private const string BLANK_STRING = "";
32
33 private const string PERIOD = @".\private$"; //".";
34
35 private const string ELLIPSIS = "";
36
37
38
39 private string serverAddress;
40
41 private string queueName;
42
43 private string queuePath;
44
45
46
47 private bool IsContextEnabled;
48
49
50
51 private MessageQueue queue;
52
53
54
55 private object queueMonitor = new object();
56
57
58
59 private MsgQueue() {}
60
61
62
63 public static MsgQueue mq = null;
64
65 public static WaitHandle[] waitHandleArray = new WaitHandle[Util.MAX_WORKER_THREADS];
66
67
68
69 public MsgQueue(string _serverAddress, string _queueName, string _summaryPattern)
70
71 {
72
73 if ((_serverAddress == null) || (_queueName == null) || (_summaryPattern == null))
74
75 {
76
77 throw new ArgumentNullException();
78
79 }
80
81 ServerAddress = _serverAddress;
82
83 QueueName = _queueName;
84
85 IsContextEnabled = true;
86
87 }
88
89
90
91 public MsgQueue(string _serverAddress, string _queueName)
92
93 {
94
95 if ((_serverAddress == null) || (_queueName == null))
96
97 {
98
99 throw new ArgumentNullException();
100
101 }
102
103 ServerAddress = _serverAddress;
104
105 QueueName = _queueName;
106
107 IsContextEnabled = true;
108
109 }
110
111
112
113 public MsgQueue(string _queueName)
114
115 {
116
117 if (_queueName == null)
118
119 {
120
121 throw new ArgumentNullException();
122
123 }
124
125 serverAddress = PERIOD;
126
127 QueueName = _queueName;
128
129 IsContextEnabled = true;
130
131 if ( IsContextEnabled == false )
132
133 throw new ArgumentNullException();
134
135 }
136
137
138
139 public string ServerAddress
140
141 {
142
143 get
144
145 {
146
147 return serverAddress;
148
149 }
150
151 set
152
153 {
154
155 if (value == null)
156
157 {
158
159 value = PERIOD;
160
161 }
162
163 value = value.Trim();
164
165 if (value.Equals(BLANK_STRING))
166
167 {
168
169 throw new ArgumentException("Invalid value (must contain non-whitespace characters)");
170
171 }
172
173 lock (queueMonitor)
174
175 {
176
177 serverAddress = value;
178
179 queuePath = serverAddress + '\\' + queueName;
180
181 InitializeQueue();
182
183 }
184
185 }
186
187 }
188
189
190
191 public string QueueName
192
193 {
194
195 get
196
197 {
198
199 return queueName;
200
201 }
202
203 set
204
205 {
206
207 if (value == null)
208
209 {
210
211 throw new ArgumentNullException();
212
213 }
214
215 value = value.Trim();
216
217 if (value.Equals(BLANK_STRING))
218
219 {
220
221 throw new ArgumentException("Invalid value (must contain non-whitespace characters)");
222
223 }
224
225 lock (queueMonitor)
226
227 {
228
229 queueName = value;
230
231 queuePath = serverAddress + '\\' + queueName;
232
233 InitializeQueue();
234
235 }
236
237 }
238
239 }
240
241
242
243 private void InitializeQueue()
244
245 {
246
247 lock (queueMonitor)
248
249 {
250
251 if (queue != null)
252
253 {
254
255 try { queue.Close(); }
256
257 catch {}
258
259 queue = null;
260
261 }
262
263
264
265 try
266
267 {
268
269 if(!MessageQueue.Exists(queuePath))
270
271 MessageQueue.Create(queuePath);
272
273 }
274
275 catch {}
276
277 try
278
279 {
280
281 queue = new MessageQueue(queuePath);
282
283 queue.SetPermissions("EveryOne",MessageQueueAccessRights.FullControl);
284
285 queue.Formatter = new XmlMessageFormatter(new Type[] {typeof(MoMsg)});
286
287 }
288
289 catch (Exception e)
290
291 {
292
293 try { queue.Close(); }
294
295 catch {}
296
297 queue = null;
298
299 throw new ApplicationException("Couldn't open queue at '" + queuePath + "': " + e.GetType().FullName + ": " + e.Message);
300
301 }
302
303
304
305 }
306
307 }
308
309
310
311 private void AcquireResources()
312
313 {
314
315 InitializeQueue();
316
317 }
318
319
320
321 public void ReleaseResources()
322
323 {
324
325 lock (queueMonitor)
326
327 {
328
329 if (queue != null)
330
331 {
332
333 try
334
335 {
336
337 queue.Close();
338
339 }
340
341 catch {}
342
343 queue = null;
344
345 }
346
347 }
348
349 }
350
351
352
353 //阻塞方式
354
355 public MoMsg Read( )
356
357 {
358
359 MoMsg _event = null;
360
361 lock (queueMonitor)
362
363 {
364
365 if (queue == null)
366
367 {
368
369 InitializeQueue();
370
371 }
372
373 try
374
375 {
376
377 Message message = queue.Receive( new TimeSpan(0,0,1) );//等待10秒
378
379 _event = (MoMsg) (message.Body);
380
381 return _event;
382
383 }
384
385 catch (Exception )
386
387 {
388
389 try { queue.Close(); }
390
391 catch {}
392
393 queue = null;
394
395 }
396
397 }
398
399 return null;
400
401 }
402
403
404
405 public void Write(MoMsg _event)
406
407 {
408
409 if (_event == null)
410
411 {
412
413 return;
414
415 }
416
417 lock (queueMonitor)
418
419 {
420
421 try
422
423 {
424
425 if (queue == null)
426
427 {
428
429 InitializeQueue();
430
431 }
432
433
434
435 Message message = new Message();
436
437 message.Priority = _event.Priority;
438
439 message.Recoverable = true;
440
441 message.Body = _event; //eventFormatter.Format(_event);
442
443
444
445 queue.Send(message);
446
447 }
448
449 catch (Exception e)
450
451 {
452
453 try { queue.Close(); }
454
455 catch {}
456
457 queue = null;
458
459 Util.Log.log("Couldn't write Message (" + e.GetType().FullName + ": " + e.Message + ")");
460
461 }
462
463 }
464
465 }
466
467
468
469 public static bool statusTest()
470
471 {
472
473 bool reValue = false;
474
475 try
476
477 {
478
479 MessageEnumerator re = mq.queue.GetMessageEnumerator();
480
481 bool rev = re.MoveNext();
482
483 reValue = true;
484
485 }
486
487 catch
488
489 {
490
491 reValue = false;
492
493 }
494
495
496
497 return reValue;
498
499 }
500
501
502
503 public static void startListen()
504
505 {
506
507 mq = new MsgQueue(Util.MqName);
508
509
510
511 mq.queue.ReceiveCompleted +=new ReceiveCompletedEventHandler(queue_ReceiveCompleted);
512
513
514
515 //异步方式,并发
516
517 for(int i=0; i518
519 {
520
521 // Begin asynchronous operations.
522
523 waitHandleArray[i] =
524
525 mq.queue.BeginReceive().AsyncWaitHandle;
526
527 }
528
529
530
531 return;
532
533 }
534
535
536
537 public static void stopListen()
538
539 {
540
541
542
543 for(int i=0;i544
545 {
546
547 try
548
549 {
550
551 waitHandleArray[i].Close();
552
553 }
554
555 catch
556
557 {
558
559 //忽略错误
560
561 }
562
563 }
564
565
566
567 try
568
569 {
570
571 // Specify to wait for all operations to return.
572
573 WaitHandle.WaitAll(waitHandleArray,1000,false);
574
575 }
576
577 catch
578
579 {
580
581 //忽略错误
582
583 }
584
585 }
586
587
588
589 private static void queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
590
591 {
592
593 // Connect to the queue.
594
595 MessageQueue mqq = (MessageQueue)sender;
596
597
598
599 // End the asynchronous Receive operation.
600
601 Message m = mqq.EndReceive(e.AsyncResult);
602
603
604
605 Util.ProcessMo((MoMsg)(m.Body));
606
607
608
609 if(Util.isRunning)
610
611 {
612
613 // Restart the asynchronous Receive operation.
614
615 mqq.BeginReceive();
616
617 }
618
619
620
621 return;
622
623 }
624
625 }
626
627}
本文作者:佚名 来源:http://www.chinaz.com/
CIO之家 www.ciozj.com 微信公众号:imciow