//附CMPPClient.cs:
/*
作者:TNT 时间:2003年12月
文件说明:本文件实现SP端的协议开发。
*/
using System;
using System.Security.Cryptography;
using System.Net.Sockets;
using System.Net;
using System.Text;
using System.Threading;
using System.Collections;
namespace CMPP.YOURCOMPANY
{
public delegate void ReportEventHandler(object sender, ReportEventArgs e); //声明一个事件的指代(指针)
public delegate void SMSEventHandler(object sender, SMSEventArgs e); //声明一个事件的指代(指针)
public delegate void TerminateEventHandler(object sender,TerminateEventArgs e); //声明收到终止信号
public delegate void TerminateRespEventHandler(object sender,TerminateRespEventArgs e); //回应事件发生
public delegate void TestEventHandler(object sender,TestEventArgs e);
public delegate void TestRespEventHandler(object sender,TestRespEventArgs e);
public delegate void ConnectRespEventHandler(object sender,ConnectRespEventArgs e);
public delegate void CancelRespEventHandler(object sender,CancelRespEventArgs e);
public delegate void SubmitRespEventHandler(object sender,SubmitRespEventArgs e);
public delegate void QueryRespEventHandler(object sender,QueryRespEventArgs e);
public delegate void LogonSuccEventHandler(object sender,EventArgs e); //当成功登录系统
public delegate void SocketClosedEventHandler(object sender,EventArgs e); //当套接字被检测到关闭
public delegate void FailedItemDeletedEventHandler(object sender,WaitingQueueItemEventArgs e); //当一条等待队列的消息超过60秒没有回应
public delegate void CMPPClientSvcStopEventHandler(object sender, ClientQueueStateArgs e); //当CMPP服务停止时候触发事件
///
1<summary>
2/// 作为CMPP协议的客户端,具有的登陆、发送、接受功能
3/// 会开3 个线程处理: 1、处理需要发送 MO(下行)的消息
4/// 2、处理从移动服务器发送过来CMPP的消息
5/// 3、处理连接断等信息,检查需要重发的消息,检查收到的报告、短信,并调用 OnReport 事件 OnSMS事件
6/// </summary>
public class CMPPClient
{
public static long CMPP_ACTIVE_TEST_C_TICKs= 30 ; // *3 ; //长连接的active_test测试时间
public static long CMPP_ACTIVE_TEST_T_TICKs= 60 ; // 消息失败时间 60秒
public static int CMPP_ACTIVE_TEST_N_COUNT=3; //3次
//public static int CMPP_MSG_MAX=100; //一次取得的最大消息数量
public static int CMPP_Port=7890;
public event ReportEventHandler onReportHandler; //指向事件处理代码的指针
public event SMSEventHandler onSMSHandler; //短信到来处理
public event TestEventHandler onTestHandler;
public event TestRespEventHandler onTestRespHandler;
public event ConnectRespEventHandler onConnectRespHandler;
public event CancelRespEventHandler onCancelRespHandler;
public event TerminateEventHandler onTerminateHandler;
public event TerminateRespEventHandler onTerminateRespHandler;
public event SubmitRespEventHandler onSubmitRespHandler;
public event QueryRespEventHandler onQueryRespHandler;
public event LogonSuccEventHandler onLogonSuccEventHandler;
public event SocketClosedEventHandler onSocketClosedHandler;
public event FailedItemDeletedEventHandler onWaitingItemDeltedHandler; //当等待队列消息超时
public event CMPPClientSvcStopEventHandler onClientSvcStopedHandler; //当服务停止时候的事件
//private 函数区域//////////////////////////////////////////////////////////////////////
private Socket tcp=null;
private IPHostEntry ip=null;
private IPEndPoint cmpp_ep=null;
private int RecvTimeOut =1000; //2000ms的接受超时
private int SendTimeout =2000; //2000ms的发送超时
private string CMPP_Server=""; //移动的服务器IP或者DNS名
private string systemID=""; //企业编号
private string userName=""; //sp的号码 /企业编号
private string PassWord=""; //口令
private bool isStop=false; //本服务是否终止运行
private bool isLogin=false; //是否已经登录
private Thread Send_Thread; //发送线程,专门处理对移动的数据包
private Thread Recv_Thread; //专门处理接收包
private Thread Deamo_Thread; //监控线程
private string ErrorInfo=""; //存放最后一次发生的错误信息 或者参考信息
private DateTime _current_time=DateTime.Now; //上一次 ping的时间
private uint lastSequence; //流水号,每一次重新启动都需要重新设定 lastSequence
private SortedList _outSeqQueue=new SortedList(); //消息队列存储 QueueItem,存储发送队列中的状态
private SortedList _waitingSeqQueue=new SortedList(); //消息队列存储 QueueItem
private int sub_resp=0; //最后返回的包 Sequence
private DateTime _lastOkTime; //最后正确发送消息时间
private bool _bNre=false; //空引用错误,套接字错误
//private ManualResetEvent _connectionDone=new ManualResetEvent(false); //是否连接到套接字服务器,也就是CMPP服务器
//private ManualResetEvent _lastsendDone=new ManualResetEvent(false); //上一次发送是否完毕
//private ManualResetEvent _lastrecvDone=new ManualResetEvent(false); //上一次接收是否完毕
private void ping() //发送一次ping包 ,不经过_outSeqQueue 直接存储在 out queue中
{
uint seq=this.getNextSequence();
MSG.CMPP_MSG_TEST test=new MSG.CMPP_MSG_TEST(seq);
QueueItem q=new QueueItem(seq,(uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST,0,0);
q.setmsgObj(test);
this.addToOutQueue(q);
}
private string getValIdTime(DateTime d) //返回短信存活时间
{
DateTime n=d.AddHours(2); //2小时
return(n.Year.ToString().Substring(2) + n.Month.ToString().PadLeft(2,'0')+n.Day.ToString().PadLeft(2,'0')+n.Hour.ToString().PadLeft(2,'0')+n.Minute.ToString().PadLeft(2,'0')+n.Second.ToString().PadLeft(2,'0')+"032+");
}
private bool isPingTime( ) //是否到了ping一次的时间
{
System.TimeSpan l=(DateTime.Now - this._current_time );
if ( l.TotalSeconds >= (CMPPClient.CMPP_ACTIVE_TEST_C_TICKs))
{
lock(this)
{
this._current_time =DateTime.Now;
return(true);
}
}
else
{
return(false);
}
}
private void checkReSend() //是否需要再一次ping //查询 _waitingSeqQueue 是否存在 上一次 没有相应的消息
{ //调查waiting queue 中的所有消息,如果入列时间超过60
for(int i=0;i
1<this._waitingseqqueue.count;i++) ;="" datetime="" if(q!="null)" if(t.totalseconds="" q="(QueueItem)this._waitingSeqQueue.GetByIndex(i);" queueitem="" t="this_time-q.inQueueTime" this_time="DateTime.Now" thread.sleep(20);="" timespan="" {="" 去当前时间="">CMPPClient.CMPP_ACTIVE_TEST_T_TICKs ) //达到超时时间
2{//需要重新发送消息
3if(q.FailedCount>=CMPPClient.CMPP_ACTIVE_TEST_N_COUNT)
4{
5//报告消息发送失败
6if(this.onWaitingItemDeltedHandler!=null)
7{
8WaitingQueueItemEventArgs e=new WaitingQueueItemEventArgs(q);
9this.onWaitingItemDeltedHandler(this,e);
10}
11this.delFromWaitingQueue(q); //从等待队列中删除
12//q.MsgState =(int)MSG_STATE.SENDED_WAITTING;
13}
14else
15{//可以尝试继续发送
16q.inQueueTime = this_time;
17q.FailedCount ++ ;
18q.MsgState =(int)MSG_STATE.SENDED_WAITTING ;
19this.sendQueueItem(q);
20}
21}
22}
23}
24
25}
26
27private void startThreads()
28{
29Deamo_Thread=new Thread(new ThreadStart(this.DeamonThread));
30Deamo_Thread.Start();
31}
32
33private QueueItem newQueueItem(int msgtype,int msgstate,object msg) //生成一个消息队列成员对象实例
34{
35uint seq=this.getNextSequence(); //
36QueueItem q=new QueueItem(seq,(uint)msgtype,0,msgstate);
37q.setmsgObj(msg); //设定消息为 object
38return(q);
39}
40
41private QueueItem getOutQueueItem(uint seq) //获取MT 队列中的消息项目
42{
43lock(this)
44{
45return((QueueItem)this._outSeqQueue[seq]) ;
46}
47}
48
49private QueueItem getWaitingQueueItem(uint seq) //获取等待队列中的消息
50{
51return((QueueItem) this._waitingSeqQueue[seq]);
52}
53
54private void addToOutQueue(QueueItem q)
55{
56lock(this)
57{
58this._outSeqQueue.Add(q.Sequence,q);
59}
60}
61
62private void addToWaitingQueue(QueueItem q)
63{
64lock(this)
65{
66if(!this._waitingSeqQueue.ContainsKey(q.Sequence))
67{
68this._waitingSeqQueue.Add(q.Sequence,q);
69}
70}
71}
72
73private QueueItem getTopOutQueue() //需要在取之前进行判断
74{
75for(int i=0;i<this._outseqqueue.count;i++) ;="" <="16)" arraylist="" arraylist()="" arrlength="" arrlength++;="" break;="" else="" gettop16queue()="" if(arrlength="" if(q!="null)" if(q.msgstate="(int)MSG_STATE.NEW)" int="" lock(this)="" private="" q="getTopOutQueue();" q.msgstate="(int)MSG_STATE.SENDING;" queueitem="" rearr="new" rearr.add(q);="" return(null);="" return(q);="" while(q!="null" {="" ||="" }="" 发送状态="" 新消息,立即返回="" 返回16条最顶的消息="">0)
76{
77return(reArr);
78}
79else
80{
81return(null);
82}
83}
84
85private void delFromOutQueue(QueueItem q)
86{
87lock(this)
88{
89this._outSeqQueue.Remove(q.Sequence);
90}
91}
92
93private void delFromOutQueue(uint seq)
94{
95lock(this)
96{
97this._outSeqQueue.Remove(seq);
98}
99}
100
101private void delFromWaitingQueue(QueueItem q)
102{
103lock(this)
104{
105this._waitingSeqQueue.Remove(q.Sequence);
106}
107}
108
109private void delFromWaitingQueue(uint seq)
110{
111this._waitingSeqQueue.Remove(seq);
112}
113
114private void SendLogin(string SystemID,string spNum,string Password)
115{//发送登录验证包
116systemID=SystemID;
117userName=spNum;
118PassWord=Password;
119uint seq=this.getNextSequence(); //取得一个流水号
120MSG.CMPP_MSG_CONNECT cn=new MSG.CMPP_MSG_CONNECT(seq);
121cn.Password =Password.Trim();
122cn.SourceAdd =SystemID.Trim();
123tcp.Send(cn.ToBytes());
124}
125
126
127private byte[] prepairPKs(QueueItem outitem)//将QueueItem发送出去
128{
129uint seq=outitem.Sequence ;
130uint msgtype=outitem.MsgType;
131switch(msgtype)
132{
133case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST :
134MSG.CMPP_MSG_TEST test=(MSG.CMPP_MSG_TEST) outitem.getMsgObj(); //发送队列中取出
135lock(this)
136{
137outitem.MsgState =(int)MSG_STATE.SENDING;
138this.delFromOutQueue(seq);
139this.addToWaitingQueue(outitem); //等待服务器的active_TEST_resp
140}
141outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;
142return(test.toBytes());
143
144
145case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST_RESP:
146MSG.CMPP_MSG_TEST_RESP test_reply=(MSG.CMPP_MSG_TEST_RESP)outitem.getMsgObj(); //发送队列中取出//取出需要发送的具体消息
147lock(this)
148{
149outitem.MsgState =(int)MSG_STATE.SENDING ;
150this.delFromOutQueue(seq);
151}
152outitem.MsgState = (int)MSG_STATE.SENDING_FINISHED ; //完成
153return(test_reply.toBytes());
154
155
156
157case (uint)MSG.CMPP_COMMAND_ID.CMPP_CANCEL :
158MSG.CMPP_MSG_CANCEL cancel=(MSG.CMPP_MSG_CANCEL)outitem.getMsgObj(); //还原成消息类
159lock(this)
160{
161outitem.MsgState =(int)MSG_STATE.SENDING ;
162this.delFromOutQueue(seq);
163this.addToWaitingQueue(outitem); //等待回应
164}
165outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;
166return(cancel.toBytes());
167
168case (uint)MSG.CMPP_COMMAND_ID.CMPP_DELIVER_RESP:
169MSG.CMPP_MSG_DELIVER_RESP deliver_resp=(MSG.CMPP_MSG_DELIVER_RESP)outitem.getMsgObj(); //发送队列中取出;
170lock(this)
171{
172outitem.MsgState =(int)MSG_STATE.SENDING ;
173this.delFromOutQueue(seq);
174}
175outitem.MsgState=(int)MSG_STATE.SENDING_FINISHED ; //完成
176return (deliver_resp.toBytes());
177
178
179case (uint)MSG.CMPP_COMMAND_ID.CMPP_QUERY :
180MSG.CMPP_MSG_QUERY query = (MSG.CMPP_MSG_QUERY )outitem.getMsgObj(); //发送队列中取出;
181lock(this)
182{
183outitem.MsgState =(int)MSG_STATE.SENDING ;
184this.delFromOutQueue(seq);
185this.addToWaitingQueue(outitem);
186}
187outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ; //等待回应
188return(query.toBytes());
189
190case (uint)MSG.CMPP_COMMAND_ID.CMPP_SUBMIT :
191MSG.CMPP_MSG_SUBMIT submit =(MSG.CMPP_MSG_SUBMIT)outitem.getMsgObj(); //发送队列中取出;
192lock(this)
193{
194outitem.MsgState =(int)MSG_STATE.SENDING ;
195this.delFromOutQueue(seq);
196this.addToWaitingQueue (outitem);
197}
198outitem.MsgState =(int)MSG_STATE.SENDING_FINISHED ;
199return(submit.toBytes());
200
201case (uint)MSG.CMPP_COMMAND_ID.CMPP_TERMINATE :
202MSG.CMPP_MSG_TERMINATE terminate=(MSG.CMPP_MSG_TERMINATE)outitem.getMsgObj(); //发送队列中取出;
203lock(this)
204{
205outitem.MsgState =(int)MSG_STATE.SENDING ;
206this.delFromOutQueue(seq);
207this.addToWaitingQueue(outitem);
208}
209outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;
210return(terminate.toBytes());
211
212case (uint)MSG.CMPP_COMMAND_ID.CMPP_TERMINATE_RESP :
213MSG.CMPP_MSG_TERMINATE_RESP terminate_resp=(MSG.CMPP_MSG_TERMINATE_RESP)outitem.getMsgObj(); //发送队列中取出;
214lock(this)
215{
216outitem.MsgState =(int)MSG_STATE.SENDING ;
217this.delFromOutQueue(seq);
218}
219outitem.MsgState =(int)MSG_STATE.SENDING_FINISHED ;
220return(terminate_resp.toBytes());
221
222default:
223test=(MSG.CMPP_MSG_TEST) outitem.getMsgObj(); //发送队列中取出
224lock(this)
225{
226outitem.MsgState =(int)MSG_STATE.SENDING;
227this.delFromOutQueue(seq);
228this.addToWaitingQueue(outitem); //等待服务器的active_TEST_resp
229}
230outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;
231return(test.toBytes());
232}
233}
234
235private void sendQueueItem(QueueItem outitem)//将QueueItem发送出去
236{
237uint seq=outitem.Sequence ;
238uint msgtype=outitem.MsgType;
239try
240{
241switch(msgtype)
242{
243case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST :
244MSG.CMPP_MSG_TEST test=(MSG.CMPP_MSG_TEST) outitem.getMsgObj(); //发送队列中取出
245lock(this)
246{
247outitem.MsgState =(int)MSG_STATE.SENDING;
248this.delFromOutQueue(seq);
249this.addToWaitingQueue(outitem); //等待服务器的active_TEST_resp
250}
251tcp.Send(test.toBytes());
252outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;
253break;
254
255case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST_RESP:
256MSG.CMPP_MSG_TEST_RESP test_reply=(MSG.CMPP_MSG_TEST_RESP)outitem.getMsgObj(); //发送队列中取出//取出需要发送的具体消息
257lock(this)
258{
259outitem.MsgState =(int)MSG_STATE.SENDING ;
260this.delFromOutQueue(seq);
261}
262tcp.Send(test_reply.toBytes());
263outitem.MsgState = (int)MSG_STATE.SENDING_FINISHED ; //完成
264break;
265
266case (uint)MSG.CMPP_COMMAND_ID.CMPP_CANCEL :
267MSG.CMPP_MSG_CANCEL cancel=(MSG.CMPP_MSG_CANCEL)outitem.getMsgObj(); //还原成消息类
268lock(this)
269{
270outitem.MsgState =(int)MSG_STATE.SENDING ;
271this.delFromOutQueue(seq);
272this.addToWaitingQueue(outitem); //等待回应
273}
274tcp.Send(cancel.toBytes());
275outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;
276break;
277
278case (uint)MSG.CMPP_COMMAND_ID.CMPP_DELIVER_RESP:
279MSG.CMPP_MSG_DELIVER_RESP deliver_resp=(MSG.CMPP_MSG_DELIVER_RESP)outitem.getMsgObj(); //发送队列中取出;
280lock(this)
281{
282outitem.MsgState =(int)MSG_STATE.SENDING ;
283this.delFromOutQueue(seq);
284}
285tcp.Send(deliver_resp.toBytes());
286outitem.MsgState=(int)MSG_STATE.SENDING_FINISHED ; //完成
287break;
288
289case (uint)MSG.CMPP_COMMAND_ID.CMPP_QUERY :
290MSG.CMPP_MSG_QUERY query = (MSG.CMPP_MSG_QUERY )outitem.getMsgObj(); //发送队列中取出;
291lock(this)
292{
293outitem.MsgState =(int)MSG_STATE.SENDING ;
294this.delFromOutQueue(seq);
295this.addToWaitingQueue(outitem);
296}
297tcp.Send(query.toBytes());
298outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ; //等待回应
299break;
300
301case (uint)MSG.CMPP_COMMAND_ID.CMPP_SUBMIT :
302MSG.CMPP_MSG_SUBMIT submit =(MSG.CMPP_MSG_SUBMIT)outitem.getMsgObj(); //发送队列中取出;
303lock(this)
304{
305outitem.MsgState =(int)MSG_STATE.SENDING ;
306this.delFromOutQueue(seq);
307this.addToWaitingQueue (outitem);
308}
309tcp.Send(submit.toBytes());
310outitem.MsgState =(int)MSG_STATE.SENDING_FINISHED ;
311break;
312
313case (uint)MSG.CMPP_COMMAND_ID.CMPP_TERMINATE :
314MSG.CMPP_MSG_TERMINATE terminate=(MSG.CMPP_MSG_TERMINATE)outitem.getMsgObj(); //发送队列中取出;
315lock(this)
316{
317outitem.MsgState =(int)MSG_STATE.SENDING ;
318this.delFromOutQueue(seq);
319this.addToWaitingQueue(outitem);
320}
321if(this.tcpIsCanUse())
322{
323tcp.Send(terminate.toBytes());
324outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;
325}
326this.isStop =true; //通知其他线程可以退出了
327break;
328
329case (uint)MSG.CMPP_COMMAND_ID.CMPP_TERMINATE_RESP :
330MSG.CMPP_MSG_TERMINATE_RESP terminate_resp=(MSG.CMPP_MSG_TERMINATE_RESP)outitem.getMsgObj(); //发送队列中取出;
331lock(this)
332{
333outitem.MsgState =(int)MSG_STATE.SENDING ;
334this.delFromOutQueue(seq);
335}
336tcp.Send(terminate_resp.toBytes());
337outitem.MsgState =(int)MSG_STATE.SENDING_FINISHED ;
338break;
339}
340LogLastOkTime(DateTime.Now ); //记录当前最后一次消息soket正确时间
341}
342catch(SocketException se)
343{
344//发生套接字错误
345this.ErrorInfo =this.ErrorInfo +"\r\n"+se.ToString ();
346}
347catch(NullReferenceException nre)
348{
349this._bNre =true; //出现空引用错误
350this.ErrorInfo =this.ErrorInfo +"\r\n"+nre.ToString ();
351}
352}
353
354private bool tcpIsCanUse() //测试当前tcp是否可用
355{
356bool reval=true;
357DateTime t=DateTime.Now ;
358TimeSpan ts=t- this._lastOkTime;
359if(ts.TotalSeconds > CMPPClient.CMPP_ACTIVE_TEST_T_TICKs ) //60秒
360{
361reval=false; //不可用
362}
363if(this._bNre )
364{
365reval=false;
366}
367return(reval);
368}
369
370private void _reStartRecvNSend()
371{
372Send_Thread=new Thread(new ThreadStart(this.SendSPMsgThread));
373Send_Thread.Start();
374Recv_Thread=new Thread(new ThreadStart(this.RecvISMGMsgThread));
375Recv_Thread.Start();
376}
377
378private void LogLastOkTime(DateTime lastoktime)
379{
380lock(this)
381{
382this._lastOkTime=lastoktime; //设定最后成功消息交互时间
383}
384}
385
386private void defaultReportHandler() //却省的报告事件处理函数
387{
388
389}
390
391private void defaultSMSHandler()
392{
393
394}
395
396private void defaultTeminateHandler()
397{
398
399}
400
401private void defaultTestEventHandler()
402{
403
404}
405private void defaultTestRespEventHandler()
406{
407
408}
409private void defaultTerminateEventHandler()
410{
411}
412private void defaultTerminateRespEventHandler()
413{
414}
415private void defaultCancelRespEventHandler()
416{
417}
418private void defaultQueryRespEventHandler()
419{
420}
421
422private void defaultConnectRespEventHandler()
423{
424QueueItem q=new QueueItem(this.getNextSequence(),(uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST,0,(int)MSG_STATE.NEW);
425MSG.CMPP_MSG_TEST test=new MSG.CMPP_MSG_TEST(q.Sequence ); //立即发送包过去
426q.setmsgObj(test);
427this.addToOutQueue(q);
428}
429private void defaultSubmitRespEventHandler()
430{
431}
432
433private void defaultClientStopEventHandler()
434{}
435
436private void rePortError(string info)
437{
438
439}
440
441private bool _init(string CMPPServer,int CMPPPort)
442{
443bool reVal=false;
444CMPP_Server=CMPPServer;
445CMPP_Port=CMPPPort;
446try
447{
448tcp=new Socket(AddressFamily.InterNetwork ,SocketType.Stream ,ProtocolType.Tcp );
449ip=Dns.GetHostByName(CMPP_Server);
450cmpp_ep=new IPEndPoint(ip.AddressList[0],CMPP_Port);
451tcp.Connect(cmpp_ep); //连接
452reVal=true;
453}
454catch(SocketException se)
455{
456ErrorInfo="Socker Error:" + se.ToString();
457}
458return(reVal);
459}
460private uint getNextSequence()
461{
462lock(typeof(CMPPClient))
463{
464try
465{
466lastSequence++;
467}
468catch(OverflowException ofe)
469{
470this.ErrorInfo =this.ErrorInfo +"\r\n"+ofe.ToString();
471lastSequence=uint.MinValue;
472}
473return(lastSequence);
474}
475}
476
477private void RecvISMGMsgThread() //处理ISMG消息的线程
478{
479while(!this.isStop )
480{
481try
482{
483byte[] rbuf=new byte[10240]; //结果缓冲区
484byte[] recv_temp=new Byte[1024]; //recv临时缓冲区
485int index=0;
486int msglength=tcp.Receive(rbuf); //阻塞接收//分析收到的数据
487
488MSG.CMPP_MSG_Header header; //=new MSG.CMPP_MSG_Header(rbuf,index); //取得一个消息
489while(index<msglength) !="null)" ");="" ";="" (the_pk,msg.cmpp_msg_header.headerlength);="" (uint)msg.cmpp_command_id.cmpp_active_test="" (uint)msg.cmpp_command_id.cmpp_active_test_resp="" (uint)msg.cmpp_command_id.cmpp_cancel_resp="" (uint)msg.cmpp_command_id.cmpp_connect_resp="" (uint)msg.cmpp_command_id.cmpp_deliver:="" (uint)msg.cmpp_command_id.cmpp_query_resp="" (uint)msg.cmpp_command_id.cmpp_submit_resp="" (uint)msg.cmpp_command_id.cmpp_terminate="" (uint)msg.cmpp_command_id.cmpp_terminate_resp="" );="" +"\r\n"+"发送:cmpp_active_test_resp="" +"\r\n"+"收到:cmpp_active_test";="" +"\r\n"+"收到:cmpp_terminate";="" +"\r\n"+"收到:cmpp_terminate_resp";="" +"\r\n"+("发送:cmpp__deliver_resp="" +"\r\n"+("收到:cmpp_active_test_resp="" +"\r\n"+("收到:cmpp_cancel_resp="" +"\r\n"+("收到:cmpp_connect_resp="" +"\r\n"+("收到:cmpp_deliver="" +"\r\n"+("收到:cmpp_query_resp="" +"\r\n"+("收到:cmpp_submit_resp="" ,更新="" 0.1秒="" :="" ;="" ;i++)="" _debugbs(byte[]="" _debugbs(the_pk);="" _restartrecvnsend();="" arg="new" biconvert.dumpbytes(initvalue,"c:\\\cmpp_submit_resp.txt");="" biconvert.dumpbytes(the_pk,"c:\\\cmpp_deliver.txt");="" biconvert.dumpbytes(the_pk,"c:\\\cmpp_submit_resp.txt");="" break;="" byte[]="" byte[header.msglength]="" cancel_reply="new" cancelrespeventargs="" cancelrespeventargs(cancel_reply);="" case="" catch(socketexception="" cn_reply="new" connectrespeventargs="" connectrespeventargs(cn_reply);="" deamonthread()="" debug="" defaultcancelrespeventhandler();="" defaultconnectrespeventhandler();="" defaultqueryrespeventhandler();="" defaultsmshandler();="" defaultsubmitrespeventhandler();="" defaultterminateeventhandler();="" defaultterminaterespeventhandler();="" defaulttesteventhandler();="" defaulttestrespeventhandler();="" delfromwaitingqueue(submit_resp.sequence);="" deliver="new" deliver_resp="new" deliver_resp.msgid="deliver.MsgID" deliver_resp.result="0;" e="new" else="" for(int="" header="new" i="0;i<header.MSGLength" if(cn_reply.isok)="" if(deliver.isreport)="" if(t_count="" if(tcpiscanuse())="" if(this.ispingtime())="" if(this.oncancelresphandler!="null)" if(this.onconnectresphandler="" if(this.onqueryresphandler!="null)" if(this.onreporthandler!="null)" if(this.onsmshandler!="null)" if(this.onsubmitresphandler!="null)" if(this.onterminatehandler!="null)" if(this.onterminateresphandler!="null)" if(this.ontesthandler!="null)" if(this.ontestresphandler!="null)" int="" ismg--〉sp="" loglastoktime(datetime.now="" msg.cmpp_msg_cancel_resp="" msg.cmpp_msg_cancel_resp(the_pk);="" msg.cmpp_msg_connect_resp="" msg.cmpp_msg_connect_resp(the_pk);="" msg.cmpp_msg_deliver="" msg.cmpp_msg_deliver(the_pk);="" msg.cmpp_msg_deliver_resp="" msg.cmpp_msg_deliver_resp(seq);="" msg.cmpp_msg_header(rbuf,index);="" msg.cmpp_msg_query_resp="" msg.cmpp_msg_query_resp(the_pk);="" msg.cmpp_msg_submit_resp="" msg.cmpp_msg_submit_resp(the_pk);="" msg.cmpp_msg_terminate="" msg.cmpp_msg_terminate(the_pk);="" msg.cmpp_msg_terminate_resp="" msg.cmpp_msg_terminate_resp(seq);="" msg.cmpp_msg_terminate_resp(the_pk);="" msg.cmpp_msg_test="" msg.cmpp_msg_test(the_pk);="" msg.cmpp_msg_test_resp="" msg.cmpp_msg_test_resp(seq);="" msg.cmpp_msg_test_resp(the_pk);="" msgid="" oncancelresphandler(this,e);="" onconnectresphandler(this,e);="" onreporthandler(this,arg);="" onsmshandler(this,smsarg);="" onsubmitresphandler(this,e);="" onterminatehandler(this,e);="" onterminateresphandler(this,e);="" ontesthandler(this,e);="" ontestresphandler(this,e);="" private="" query_resp="new" queryrespeventargs="" queryrespeventargs(query_resp);="" reporteventargs="" reporteventargs(reportmsgid.tostring(),="" reporteventargs(the_pk,msg.cmpp_msg_header.headerlength+8+21+10+1+1+1+21+1+1);="" reporteventargs传递的字节数组是="" reportmsgid="deliver.ReportMsgID" return;="" se)="" seq="ter_resp.Sequence" seq;="" smsarg="new" smseventargs="" statereport="deliver.StateReport;" string="" sub_resp++;="" submit_resp="new" submit_resp.="" submitrespeventargs="" submitrespeventargs(submit_resp);="" switch(header.command_id)="" t="deliver_resp.toBytes();" t_count="0;" t_count++;="" tcp.send(t);="" tcp.send(terminate_resp.tobytes());="" tcp.send(test_reply.tobytes());="" ter_resp="new" terminate="new" terminate_resp="new" terminateeventargs="" terminateeventargs(terminate);="" terminaterespeventargs="" terminaterespeventargs(ter_resp);="" test="new" test_reply="new" test_reply2="new" testeventargs="" testeventargs(test);="" testrespeventargs="" testrespeventargs(test_reply2);="" the_pk="new" the_pk)="" the_pk[i]="rbuf[index++];" this._stopme();="" this.defaultreporthandler();="" this.delfromoutqueue(seq);="" this.delfromwaitingqueue(query_resp.sequence="" this.delfromwaitingqueue(seq);="" this.errorinfo="this.ErrorInfo" this.islogin="false;" this.isstop)="" this.ping();="" this.stopme()="" thread.sleep(50);="" uint="" uint64="" void="" while(!="" {="" }="" 一条="" 传递的整个deliver包="" 保留映像="" 准备自我停止?="" 删除等待队列中的消息="" 删除等待队列的消息="" 删除输出表重点项目="" 删除队列中的等待连接信息包="" 删除需要等待的消息="" 发出终止设定="" 发过来的流水号,需要立即发送一个deliver_resp="" 发送一个ping包="" 取得一个消息="" 取得关于此消息的状态="" 取得发送过来的流水号="" 取得回复消息的下一个流水序列号="" 取得流水信号="" 取得消息id="" 取得消息的seq="" 启动接收和发送="" 失败,正确则处理是否状态包,不是状态包则存到mo缓存,表示收到信息,时状态包则判断缓存消息进行消息送达处理="" 存储byte字节="" 寻找="" 将等待的队列中的元素删除="" 循环时间计数="" 或者="" 报告信息包的数据,在此不考虑多个报告的情况="" 报告消息已经正确发送到="" 收到服务器送达的慧英消息="" 收到消息,处理后存入数据库="" 曾经发送过去的消息="" 服务器的回应消息,应当丢弃不管="" 服务器给客户的测试信号="" 构造报告事件参数="" 构造消息="" 检查下消息的正确性,清除等待队列="" 检查消息正确定,立即返回="" 正确="" 此线程是监视线程="" 清空等待回应队列="" 生成此消息的大小="" 的消息="" 监视本系统连接是否正常="" 触发事件,应当很快结束处理,不要靠考虑存储之类的耗费资源事宜="" 记录当前最后一次消息soket正确时间="" 设定连接成功标志="" 该变量仅供测试使用="" 超时="" 退出线程="" 逐个消息分析="" 马上送出回应包,不需要进入队列="">50) // 500*100=50000=50秒
490{
491t_count=0;
492checkReSend() ; //检查需要重新发送的消息
493//触发一个事件,让系统自动检查消息队列,存储消息队列中的消息状态
494}
495}
496else
497{
498EventArgs e=new EventArgs();
499if(this.onSocketClosedHandler!=null)
500{
501onSocketClosedHandler(this,e);
502}
503else
504{
505}
506this.isStop =true; //通知其他线程退出
507}
508Thread.Sleep(1000);
509}
510}
511
512private void SendSPMsgThread()
513{
514while (!this.isStop )
515{
516Thread.Sleep(10);
517if(this.isLogin)
518{
519ArrayList lists=this.getTop16Queue(); //取出16条最顶的消息
520if(lists!=null && lists.Count >0)
521{
522int count=lists.Count;
523ArrayList pks=new ArrayList( count); //定义容量
524for (int i=0;i<lists.count; )="" +"\r\n"+se.tostring();="" ++;="" ;="" _forcedsubthread(thread="" _stopme()="" bool="" byte[400];="" byte[]="" catch(exception="" catch(socketexception="" cmppport)="" cmppport,int="" cmppserver,int="" datetime="" i++)="" if(l="" if(outitem!="null)" init(string="" int="" l="tcp.Receive(rbuf)" l;="" lock(this)="" login(string="" outitem="(QueueItem)lists[i];" outitem.failedcount="" password)="" password);="" private="" public="" queueitem="" rbuf="new" recvtimeout)="" recvtimeout,int="" return(false);="" return(this._init(cmppserver,cmppport));="" se)="" sendlogin(systemid,="" sendqueueitem(outitem);="" sendtimeout)="" systemid,string="" t)="" t.abort();="" t.join();="" t1="DateTime.Now;" this.errorinfo="this.ErrorInfo" this.isstop="true;" this.loglastoktime(datetime.now);="" this.recvtimeout="recvtimeout;" this.sendtimeout="recvtimeout;" thread.sleep(100);="" try="" username,="" username,string="" void="" while(!this.islogin)="" {="" {}="" }="" 公用函数="" 函数区域="" 发送出错="" 发送失败="" 发送每一个消息="" 取出每一个消息对象="" 属性区域="" 强制停止线程="" 最后一次正确的发送="">16)
525{
526if(BIConvert.Bytes2UInt(rbuf,4)==(uint)MSG.CMPP_COMMAND_ID.CMPP_CONNECT_RESP)
527{
528MSG.CMPP_MSG_CONNECT_RESP resp=new MSG.CMPP_MSG_CONNECT_RESP(rbuf);
529if(resp.isOk)
530{
531EventArgs e=new EventArgs();
532if(onLogonSuccEventHandler!=null)
533{
534onLogonSuccEventHandler(this,e);
535}
536else
537{
538this.defaultConnectRespEventHandler();
539}
540this.isLogin =true;
541}
542else
543{
544}
545break;
546}
547}
548this._lastOkTime =DateTime.Now ; //更新当前最后成功收发套接字的时间
549}
550catch(SocketException)
551{
552}
553n</lists.count;></msglength)></this._outseqqueue.count;i++)></this._waitingseqqueue.count;i++)>