CMPP SP端 C# 实例

//附CMPPClient.cs:
/*
作者:TNT 时间:2003年12月
文件说明:本文件实现SP端的协议开发。
*/
using System;
using System.Security.Cryptography;
using System.Net.Sockets;
using System.Net;
using System.Text;
using System.Threading;
using System.Collections;

namespace CMPP.YOURCOMPANY
{
public delegate void ReportEventHandler(object sender, ReportEventArgs e); //声明一个事件的指代(指针)
public delegate void SMSEventHandler(object sender, SMSEventArgs e); //声明一个事件的指代(指针)
public delegate void TerminateEventHandler(object sender,TerminateEventArgs e); //声明收到终止信号
public delegate void TerminateRespEventHandler(object sender,TerminateRespEventArgs e); //回应事件发生
public delegate void TestEventHandler(object sender,TestEventArgs e);
public delegate void TestRespEventHandler(object sender,TestRespEventArgs e);
public delegate void ConnectRespEventHandler(object sender,ConnectRespEventArgs e);
public delegate void CancelRespEventHandler(object sender,CancelRespEventArgs e);
public delegate void SubmitRespEventHandler(object sender,SubmitRespEventArgs e);
public delegate void QueryRespEventHandler(object sender,QueryRespEventArgs e);
public delegate void LogonSuccEventHandler(object sender,EventArgs e); //当成功登录系统
public delegate void SocketClosedEventHandler(object sender,EventArgs e); //当套接字被检测到关闭
public delegate void FailedItemDeletedEventHandler(object sender,WaitingQueueItemEventArgs e); //当一条等待队列的消息超过60秒没有回应

public delegate void CMPPClientSvcStopEventHandler(object sender, ClientQueueStateArgs e); //当CMPP服务停止时候触发事件

///

1<summary>   
2/// 作为CMPP协议的客户端,具有的登陆、发送、接受功能   
3/// 会开3 个线程处理: 1、处理需要发送 MO(下行)的消息   
4/// 2、处理从移动服务器发送过来CMPP的消息   
5/// 3、处理连接断等信息,检查需要重发的消息,检查收到的报告、短信,并调用 OnReport 事件 OnSMS事件   
6/// </summary>

public class CMPPClient
{
public static long CMPP_ACTIVE_TEST_C_TICKs= 30 ; // *3 ; //长连接的active_test测试时间
public static long CMPP_ACTIVE_TEST_T_TICKs= 60 ; // 消息失败时间 60秒
public static int CMPP_ACTIVE_TEST_N_COUNT=3; //3次
//public static int CMPP_MSG_MAX=100; //一次取得的最大消息数量
public static int CMPP_Port=7890;

public event ReportEventHandler onReportHandler; //指向事件处理代码的指针
public event SMSEventHandler onSMSHandler; //短信到来处理
public event TestEventHandler onTestHandler;
public event TestRespEventHandler onTestRespHandler;
public event ConnectRespEventHandler onConnectRespHandler;
public event CancelRespEventHandler onCancelRespHandler;
public event TerminateEventHandler onTerminateHandler;
public event TerminateRespEventHandler onTerminateRespHandler;
public event SubmitRespEventHandler onSubmitRespHandler;
public event QueryRespEventHandler onQueryRespHandler;
public event LogonSuccEventHandler onLogonSuccEventHandler;
public event SocketClosedEventHandler onSocketClosedHandler;
public event FailedItemDeletedEventHandler onWaitingItemDeltedHandler; //当等待队列消息超时

public event CMPPClientSvcStopEventHandler onClientSvcStopedHandler; //当服务停止时候的事件

//private 函数区域//////////////////////////////////////////////////////////////////////
private Socket tcp=null;
private IPHostEntry ip=null;
private IPEndPoint cmpp_ep=null;
private int RecvTimeOut =1000; //2000ms的接受超时
private int SendTimeout =2000; //2000ms的发送超时
private string CMPP_Server=""; //移动的服务器IP或者DNS名
private string systemID=""; //企业编号
private string userName=""; //sp的号码 /企业编号
private string PassWord=""; //口令
private bool isStop=false; //本服务是否终止运行
private bool isLogin=false; //是否已经登录
private Thread Send_Thread; //发送线程,专门处理对移动的数据包
private Thread Recv_Thread; //专门处理接收包
private Thread Deamo_Thread; //监控线程
private string ErrorInfo=""; //存放最后一次发生的错误信息 或者参考信息
private DateTime _current_time=DateTime.Now; //上一次 ping的时间
private uint lastSequence; //流水号,每一次重新启动都需要重新设定 lastSequence
private SortedList _outSeqQueue=new SortedList(); //消息队列存储 QueueItem,存储发送队列中的状态
private SortedList _waitingSeqQueue=new SortedList(); //消息队列存储 QueueItem
private int sub_resp=0; //最后返回的包 Sequence
private DateTime _lastOkTime; //最后正确发送消息时间
private bool _bNre=false; //空引用错误,套接字错误

//private ManualResetEvent _connectionDone=new ManualResetEvent(false); //是否连接到套接字服务器,也就是CMPP服务器
//private ManualResetEvent _lastsendDone=new ManualResetEvent(false); //上一次发送是否完毕
//private ManualResetEvent _lastrecvDone=new ManualResetEvent(false); //上一次接收是否完毕

private void ping() //发送一次ping包 ,不经过_outSeqQueue 直接存储在 out queue中
{
uint seq=this.getNextSequence();
MSG.CMPP_MSG_TEST test=new MSG.CMPP_MSG_TEST(seq);
QueueItem q=new QueueItem(seq,(uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST,0,0);
q.setmsgObj(test);
this.addToOutQueue(q);
}

private string getValIdTime(DateTime d) //返回短信存活时间
{
DateTime n=d.AddHours(2); //2小时
return(n.Year.ToString().Substring(2) + n.Month.ToString().PadLeft(2,'0')+n.Day.ToString().PadLeft(2,'0')+n.Hour.ToString().PadLeft(2,'0')+n.Minute.ToString().PadLeft(2,'0')+n.Second.ToString().PadLeft(2,'0')+"032+");
}

private bool isPingTime( ) //是否到了ping一次的时间
{
System.TimeSpan l=(DateTime.Now - this._current_time );

if ( l.TotalSeconds >= (CMPPClient.CMPP_ACTIVE_TEST_C_TICKs))
{
lock(this)
{
this._current_time =DateTime.Now;
return(true);
}
}
else
{
return(false);
}
}

private void checkReSend() //是否需要再一次ping //查询 _waitingSeqQueue 是否存在 上一次 没有相应的消息
{ //调查waiting queue 中的所有消息,如果入列时间超过60
for(int i=0;i

  1<this._waitingseqqueue.count;i++) ;="" datetime="" if(q!="null)" if(t.totalseconds="" q="(QueueItem)this._waitingSeqQueue.GetByIndex(i);" queueitem="" t="this_time-q.inQueueTime" this_time="DateTime.Now" thread.sleep(20);="" timespan="" {="" 去当前时间="">CMPPClient.CMPP_ACTIVE_TEST_T_TICKs ) //达到超时时间   
  2{//需要重新发送消息   
  3if(q.FailedCount&gt;=CMPPClient.CMPP_ACTIVE_TEST_N_COUNT)   
  4{   
  5//报告消息发送失败   
  6if(this.onWaitingItemDeltedHandler!=null)   
  7{   
  8WaitingQueueItemEventArgs e=new WaitingQueueItemEventArgs(q);   
  9this.onWaitingItemDeltedHandler(this,e);   
 10}   
 11this.delFromWaitingQueue(q); //从等待队列中删除   
 12//q.MsgState =(int)MSG_STATE.SENDED_WAITTING;   
 13}   
 14else   
 15{//可以尝试继续发送   
 16q.inQueueTime = this_time;   
 17q.FailedCount ++ ;   
 18q.MsgState =(int)MSG_STATE.SENDED_WAITTING ;   
 19this.sendQueueItem(q);   
 20}   
 21}   
 22}   
 23}   
 24  
 25}   
 26  
 27private void startThreads()   
 28{   
 29Deamo_Thread=new Thread(new ThreadStart(this.DeamonThread));   
 30Deamo_Thread.Start();   
 31}   
 32  
 33private QueueItem newQueueItem(int msgtype,int msgstate,object msg) //生成一个消息队列成员对象实例   
 34{   
 35uint seq=this.getNextSequence(); //   
 36QueueItem q=new QueueItem(seq,(uint)msgtype,0,msgstate);   
 37q.setmsgObj(msg); //设定消息为 object   
 38return(q);   
 39}   
 40  
 41private QueueItem getOutQueueItem(uint seq) //获取MT 队列中的消息项目   
 42{   
 43lock(this)   
 44{   
 45return((QueueItem)this._outSeqQueue[seq]) ;   
 46}   
 47}   
 48  
 49private QueueItem getWaitingQueueItem(uint seq) //获取等待队列中的消息   
 50{   
 51return((QueueItem) this._waitingSeqQueue[seq]);   
 52}   
 53  
 54private void addToOutQueue(QueueItem q)   
 55{   
 56lock(this)   
 57{   
 58this._outSeqQueue.Add(q.Sequence,q);   
 59}   
 60}   
 61  
 62private void addToWaitingQueue(QueueItem q)   
 63{   
 64lock(this)   
 65{   
 66if(!this._waitingSeqQueue.ContainsKey(q.Sequence))   
 67{   
 68this._waitingSeqQueue.Add(q.Sequence,q);   
 69}   
 70}   
 71}   
 72  
 73private QueueItem getTopOutQueue() //需要在取之前进行判断   
 74{   
 75for(int i=0;i<this._outseqqueue.count;i++) ;="" <="16)" arraylist="" arraylist()="" arrlength="" arrlength++;="" break;="" else="" gettop16queue()="" if(arrlength="" if(q!="null)" if(q.msgstate="(int)MSG_STATE.NEW)" int="" lock(this)="" private="" q="getTopOutQueue();" q.msgstate="(int)MSG_STATE.SENDING;" queueitem="" rearr="new" rearr.add(q);="" return(null);="" return(q);="" while(q!="null" {="" ||="" }="" 发送状态="" 新消息,立即返回="" 返回16条最顶的消息="">0)   
 76{   
 77return(reArr);   
 78}   
 79else   
 80{   
 81return(null);   
 82}   
 83}   
 84  
 85private void delFromOutQueue(QueueItem q)   
 86{   
 87lock(this)   
 88{   
 89this._outSeqQueue.Remove(q.Sequence);   
 90}   
 91}   
 92  
 93private void delFromOutQueue(uint seq)   
 94{   
 95lock(this)   
 96{   
 97this._outSeqQueue.Remove(seq);   
 98}   
 99}   
100  
101private void delFromWaitingQueue(QueueItem q)   
102{   
103lock(this)   
104{   
105this._waitingSeqQueue.Remove(q.Sequence);   
106}   
107}   
108  
109private void delFromWaitingQueue(uint seq)   
110{   
111this._waitingSeqQueue.Remove(seq);   
112}   
113  
114private void SendLogin(string SystemID,string spNum,string Password)   
115{//发送登录验证包   
116systemID=SystemID;   
117userName=spNum;   
118PassWord=Password;   
119uint seq=this.getNextSequence(); //取得一个流水号   
120MSG.CMPP_MSG_CONNECT cn=new MSG.CMPP_MSG_CONNECT(seq);   
121cn.Password =Password.Trim();   
122cn.SourceAdd =SystemID.Trim();   
123tcp.Send(cn.ToBytes());   
124}   
125
126
127private byte[] prepairPKs(QueueItem outitem)//将QueueItem发送出去   
128{   
129uint seq=outitem.Sequence ;   
130uint msgtype=outitem.MsgType;   
131switch(msgtype)   
132{   
133case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST :   
134MSG.CMPP_MSG_TEST test=(MSG.CMPP_MSG_TEST) outitem.getMsgObj(); //发送队列中取出   
135lock(this)   
136{   
137outitem.MsgState =(int)MSG_STATE.SENDING;   
138this.delFromOutQueue(seq);   
139this.addToWaitingQueue(outitem); //等待服务器的active_TEST_resp   
140}   
141outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;   
142return(test.toBytes());   
143  
144  
145case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST_RESP:   
146MSG.CMPP_MSG_TEST_RESP test_reply=(MSG.CMPP_MSG_TEST_RESP)outitem.getMsgObj(); //发送队列中取出//取出需要发送的具体消息   
147lock(this)   
148{   
149outitem.MsgState =(int)MSG_STATE.SENDING ;   
150this.delFromOutQueue(seq);   
151}   
152outitem.MsgState = (int)MSG_STATE.SENDING_FINISHED ; //完成   
153return(test_reply.toBytes());   
154  
155  
156  
157case (uint)MSG.CMPP_COMMAND_ID.CMPP_CANCEL :   
158MSG.CMPP_MSG_CANCEL cancel=(MSG.CMPP_MSG_CANCEL)outitem.getMsgObj(); //还原成消息类   
159lock(this)   
160{   
161outitem.MsgState =(int)MSG_STATE.SENDING ;   
162this.delFromOutQueue(seq);   
163this.addToWaitingQueue(outitem); //等待回应   
164}   
165outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;   
166return(cancel.toBytes());   
167  
168case (uint)MSG.CMPP_COMMAND_ID.CMPP_DELIVER_RESP:   
169MSG.CMPP_MSG_DELIVER_RESP deliver_resp=(MSG.CMPP_MSG_DELIVER_RESP)outitem.getMsgObj(); //发送队列中取出;   
170lock(this)   
171{   
172outitem.MsgState =(int)MSG_STATE.SENDING ;   
173this.delFromOutQueue(seq);   
174}   
175outitem.MsgState=(int)MSG_STATE.SENDING_FINISHED ; //完成   
176return (deliver_resp.toBytes());   
177  
178  
179case (uint)MSG.CMPP_COMMAND_ID.CMPP_QUERY :   
180MSG.CMPP_MSG_QUERY query = (MSG.CMPP_MSG_QUERY )outitem.getMsgObj(); //发送队列中取出;   
181lock(this)   
182{   
183outitem.MsgState =(int)MSG_STATE.SENDING ;   
184this.delFromOutQueue(seq);   
185this.addToWaitingQueue(outitem);   
186}   
187outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ; //等待回应   
188return(query.toBytes());   
189  
190case (uint)MSG.CMPP_COMMAND_ID.CMPP_SUBMIT :   
191MSG.CMPP_MSG_SUBMIT submit =(MSG.CMPP_MSG_SUBMIT)outitem.getMsgObj(); //发送队列中取出;   
192lock(this)   
193{   
194outitem.MsgState =(int)MSG_STATE.SENDING ;   
195this.delFromOutQueue(seq);   
196this.addToWaitingQueue (outitem);   
197}   
198outitem.MsgState =(int)MSG_STATE.SENDING_FINISHED ;   
199return(submit.toBytes());   
200  
201case (uint)MSG.CMPP_COMMAND_ID.CMPP_TERMINATE :   
202MSG.CMPP_MSG_TERMINATE terminate=(MSG.CMPP_MSG_TERMINATE)outitem.getMsgObj(); //发送队列中取出;   
203lock(this)   
204{   
205outitem.MsgState =(int)MSG_STATE.SENDING ;   
206this.delFromOutQueue(seq);   
207this.addToWaitingQueue(outitem);   
208}   
209outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;   
210return(terminate.toBytes());   
211  
212case (uint)MSG.CMPP_COMMAND_ID.CMPP_TERMINATE_RESP :   
213MSG.CMPP_MSG_TERMINATE_RESP terminate_resp=(MSG.CMPP_MSG_TERMINATE_RESP)outitem.getMsgObj(); //发送队列中取出;   
214lock(this)   
215{   
216outitem.MsgState =(int)MSG_STATE.SENDING ;   
217this.delFromOutQueue(seq);   
218}   
219outitem.MsgState =(int)MSG_STATE.SENDING_FINISHED ;   
220return(terminate_resp.toBytes()); 
221
222default:   
223test=(MSG.CMPP_MSG_TEST) outitem.getMsgObj(); //发送队列中取出   
224lock(this)   
225{   
226outitem.MsgState =(int)MSG_STATE.SENDING;   
227this.delFromOutQueue(seq);   
228this.addToWaitingQueue(outitem); //等待服务器的active_TEST_resp   
229}   
230outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;   
231return(test.toBytes());   
232}   
233} 
234
235private void sendQueueItem(QueueItem outitem)//将QueueItem发送出去   
236{   
237uint seq=outitem.Sequence ;   
238uint msgtype=outitem.MsgType;   
239try   
240{   
241switch(msgtype)   
242{   
243case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST :   
244MSG.CMPP_MSG_TEST test=(MSG.CMPP_MSG_TEST) outitem.getMsgObj(); //发送队列中取出   
245lock(this)   
246{   
247outitem.MsgState =(int)MSG_STATE.SENDING;   
248this.delFromOutQueue(seq);   
249this.addToWaitingQueue(outitem); //等待服务器的active_TEST_resp   
250}   
251tcp.Send(test.toBytes());   
252outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;   
253break;   
254  
255case (uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST_RESP:   
256MSG.CMPP_MSG_TEST_RESP test_reply=(MSG.CMPP_MSG_TEST_RESP)outitem.getMsgObj(); //发送队列中取出//取出需要发送的具体消息   
257lock(this)   
258{   
259outitem.MsgState =(int)MSG_STATE.SENDING ;   
260this.delFromOutQueue(seq);   
261}   
262tcp.Send(test_reply.toBytes());   
263outitem.MsgState = (int)MSG_STATE.SENDING_FINISHED ; //完成   
264break;   
265  
266case (uint)MSG.CMPP_COMMAND_ID.CMPP_CANCEL :   
267MSG.CMPP_MSG_CANCEL cancel=(MSG.CMPP_MSG_CANCEL)outitem.getMsgObj(); //还原成消息类   
268lock(this)   
269{   
270outitem.MsgState =(int)MSG_STATE.SENDING ;   
271this.delFromOutQueue(seq);   
272this.addToWaitingQueue(outitem); //等待回应   
273}   
274tcp.Send(cancel.toBytes());   
275outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;   
276break;   
277  
278case (uint)MSG.CMPP_COMMAND_ID.CMPP_DELIVER_RESP:   
279MSG.CMPP_MSG_DELIVER_RESP deliver_resp=(MSG.CMPP_MSG_DELIVER_RESP)outitem.getMsgObj(); //发送队列中取出;   
280lock(this)   
281{   
282outitem.MsgState =(int)MSG_STATE.SENDING ;   
283this.delFromOutQueue(seq);   
284}   
285tcp.Send(deliver_resp.toBytes());   
286outitem.MsgState=(int)MSG_STATE.SENDING_FINISHED ; //完成   
287break;   
288  
289case (uint)MSG.CMPP_COMMAND_ID.CMPP_QUERY :   
290MSG.CMPP_MSG_QUERY query = (MSG.CMPP_MSG_QUERY )outitem.getMsgObj(); //发送队列中取出;   
291lock(this)   
292{   
293outitem.MsgState =(int)MSG_STATE.SENDING ;   
294this.delFromOutQueue(seq);   
295this.addToWaitingQueue(outitem);   
296}   
297tcp.Send(query.toBytes());   
298outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ; //等待回应   
299break;   
300  
301case (uint)MSG.CMPP_COMMAND_ID.CMPP_SUBMIT :   
302MSG.CMPP_MSG_SUBMIT submit =(MSG.CMPP_MSG_SUBMIT)outitem.getMsgObj(); //发送队列中取出;   
303lock(this)   
304{   
305outitem.MsgState =(int)MSG_STATE.SENDING ;   
306this.delFromOutQueue(seq);   
307this.addToWaitingQueue (outitem);   
308}   
309tcp.Send(submit.toBytes());   
310outitem.MsgState =(int)MSG_STATE.SENDING_FINISHED ;   
311break;   
312  
313case (uint)MSG.CMPP_COMMAND_ID.CMPP_TERMINATE :   
314MSG.CMPP_MSG_TERMINATE terminate=(MSG.CMPP_MSG_TERMINATE)outitem.getMsgObj(); //发送队列中取出;   
315lock(this)   
316{   
317outitem.MsgState =(int)MSG_STATE.SENDING ;   
318this.delFromOutQueue(seq);   
319this.addToWaitingQueue(outitem);   
320}   
321if(this.tcpIsCanUse())   
322{   
323tcp.Send(terminate.toBytes());   
324outitem.MsgState =(int)MSG_STATE.SENDED_WAITTING ;   
325}   
326this.isStop =true; //通知其他线程可以退出了   
327break;   
328  
329case (uint)MSG.CMPP_COMMAND_ID.CMPP_TERMINATE_RESP :   
330MSG.CMPP_MSG_TERMINATE_RESP terminate_resp=(MSG.CMPP_MSG_TERMINATE_RESP)outitem.getMsgObj(); //发送队列中取出;   
331lock(this)   
332{   
333outitem.MsgState =(int)MSG_STATE.SENDING ;   
334this.delFromOutQueue(seq);   
335}   
336tcp.Send(terminate_resp.toBytes());   
337outitem.MsgState =(int)MSG_STATE.SENDING_FINISHED ;   
338break;   
339}   
340LogLastOkTime(DateTime.Now ); //记录当前最后一次消息soket正确时间   
341}   
342catch(SocketException se)   
343{   
344//发生套接字错误   
345this.ErrorInfo =this.ErrorInfo +"\r\n"+se.ToString ();   
346}   
347catch(NullReferenceException nre)   
348{   
349this._bNre =true; //出现空引用错误   
350this.ErrorInfo =this.ErrorInfo +"\r\n"+nre.ToString ();   
351}   
352}   
353  
354private bool tcpIsCanUse() //测试当前tcp是否可用   
355{   
356bool reval=true;   
357DateTime t=DateTime.Now ;   
358TimeSpan ts=t- this._lastOkTime;   
359if(ts.TotalSeconds &gt; CMPPClient.CMPP_ACTIVE_TEST_T_TICKs ) //60秒   
360{   
361reval=false; //不可用   
362}   
363if(this._bNre )   
364{   
365reval=false;   
366}   
367return(reval);   
368}   
369  
370private void _reStartRecvNSend()   
371{   
372Send_Thread=new Thread(new ThreadStart(this.SendSPMsgThread));   
373Send_Thread.Start();   
374Recv_Thread=new Thread(new ThreadStart(this.RecvISMGMsgThread));   
375Recv_Thread.Start();   
376}   
377  
378private void LogLastOkTime(DateTime lastoktime)   
379{   
380lock(this)   
381{   
382this._lastOkTime=lastoktime; //设定最后成功消息交互时间   
383}   
384}   
385  
386private void defaultReportHandler() //却省的报告事件处理函数   
387{   
388  
389}   
390  
391private void defaultSMSHandler()   
392{   
393  
394}   
395  
396private void defaultTeminateHandler()   
397{   
398  
399}   
400  
401private void defaultTestEventHandler()   
402{   
403  
404}   
405private void defaultTestRespEventHandler()   
406{   
407  
408}   
409private void defaultTerminateEventHandler()   
410{   
411}   
412private void defaultTerminateRespEventHandler()   
413{   
414}   
415private void defaultCancelRespEventHandler()   
416{   
417}   
418private void defaultQueryRespEventHandler()   
419{   
420}   
421  
422private void defaultConnectRespEventHandler()   
423{   
424QueueItem q=new QueueItem(this.getNextSequence(),(uint)MSG.CMPP_COMMAND_ID.CMPP_ACTIVE_TEST,0,(int)MSG_STATE.NEW);   
425MSG.CMPP_MSG_TEST test=new MSG.CMPP_MSG_TEST(q.Sequence ); //立即发送包过去   
426q.setmsgObj(test);   
427this.addToOutQueue(q);   
428}   
429private void defaultSubmitRespEventHandler()   
430{   
431} 
432
433private void defaultClientStopEventHandler()   
434{}   
435  
436private void rePortError(string info)   
437{   
438  
439}   
440  
441private bool _init(string CMPPServer,int CMPPPort)   
442{   
443bool reVal=false;   
444CMPP_Server=CMPPServer;   
445CMPP_Port=CMPPPort;   
446try   
447{   
448tcp=new Socket(AddressFamily.InterNetwork ,SocketType.Stream ,ProtocolType.Tcp );   
449ip=Dns.GetHostByName(CMPP_Server);   
450cmpp_ep=new IPEndPoint(ip.AddressList[0],CMPP_Port);   
451tcp.Connect(cmpp_ep); //连接   
452reVal=true;   
453}   
454catch(SocketException se)   
455{   
456ErrorInfo="Socker Error:" + se.ToString();   
457}   
458return(reVal);   
459}   
460private uint getNextSequence()   
461{   
462lock(typeof(CMPPClient))   
463{   
464try   
465{   
466lastSequence++;   
467}   
468catch(OverflowException ofe)   
469{   
470this.ErrorInfo =this.ErrorInfo +"\r\n"+ofe.ToString();   
471lastSequence=uint.MinValue;   
472}   
473return(lastSequence);   
474}   
475}   
476  
477private void RecvISMGMsgThread() //处理ISMG消息的线程   
478{   
479while(!this.isStop )   
480{   
481try   
482{   
483byte[] rbuf=new byte[10240]; //结果缓冲区   
484byte[] recv_temp=new Byte[1024]; //recv临时缓冲区   
485int index=0;   
486int msglength=tcp.Receive(rbuf); //阻塞接收//分析收到的数据   
487  
488MSG.CMPP_MSG_Header header; //=new MSG.CMPP_MSG_Header(rbuf,index); //取得一个消息   
489while(index<msglength) !="null)" ");="" ";="" (the_pk,msg.cmpp_msg_header.headerlength);="" (uint)msg.cmpp_command_id.cmpp_active_test="" (uint)msg.cmpp_command_id.cmpp_active_test_resp="" (uint)msg.cmpp_command_id.cmpp_cancel_resp="" (uint)msg.cmpp_command_id.cmpp_connect_resp="" (uint)msg.cmpp_command_id.cmpp_deliver:="" (uint)msg.cmpp_command_id.cmpp_query_resp="" (uint)msg.cmpp_command_id.cmpp_submit_resp="" (uint)msg.cmpp_command_id.cmpp_terminate="" (uint)msg.cmpp_command_id.cmpp_terminate_resp="" );="" +"\r\n"+"发送:cmpp_active_test_resp="" +"\r\n"+"收到:cmpp_active_test";="" +"\r\n"+"收到:cmpp_terminate";="" +"\r\n"+"收到:cmpp_terminate_resp";="" +"\r\n"+("发送:cmpp__deliver_resp="" +"\r\n"+("收到:cmpp_active_test_resp="" +"\r\n"+("收到:cmpp_cancel_resp="" +"\r\n"+("收到:cmpp_connect_resp="" +"\r\n"+("收到:cmpp_deliver="" +"\r\n"+("收到:cmpp_query_resp="" +"\r\n"+("收到:cmpp_submit_resp="" ,更新="" 0.1秒="" :="" ;="" ;i++)="" _debugbs(byte[]="" _debugbs(the_pk);="" _restartrecvnsend();="" arg="new" biconvert.dumpbytes(initvalue,"c:\\\cmpp_submit_resp.txt");="" biconvert.dumpbytes(the_pk,"c:\\\cmpp_deliver.txt");="" biconvert.dumpbytes(the_pk,"c:\\\cmpp_submit_resp.txt");="" break;="" byte[]="" byte[header.msglength]="" cancel_reply="new" cancelrespeventargs="" cancelrespeventargs(cancel_reply);="" case="" catch(socketexception="" cn_reply="new" connectrespeventargs="" connectrespeventargs(cn_reply);="" deamonthread()="" debug="" defaultcancelrespeventhandler();="" defaultconnectrespeventhandler();="" defaultqueryrespeventhandler();="" defaultsmshandler();="" defaultsubmitrespeventhandler();="" defaultterminateeventhandler();="" defaultterminaterespeventhandler();="" defaulttesteventhandler();="" defaulttestrespeventhandler();="" delfromwaitingqueue(submit_resp.sequence);="" deliver="new" deliver_resp="new" deliver_resp.msgid="deliver.MsgID" deliver_resp.result="0;" e="new" else="" for(int="" header="new" i="0;i&lt;header.MSGLength" if(cn_reply.isok)="" if(deliver.isreport)="" if(t_count="" if(tcpiscanuse())="" if(this.ispingtime())="" if(this.oncancelresphandler!="null)" if(this.onconnectresphandler="" if(this.onqueryresphandler!="null)" if(this.onreporthandler!="null)" if(this.onsmshandler!="null)" if(this.onsubmitresphandler!="null)" if(this.onterminatehandler!="null)" if(this.onterminateresphandler!="null)" if(this.ontesthandler!="null)" if(this.ontestresphandler!="null)" int="" ismg--〉sp="" loglastoktime(datetime.now="" msg.cmpp_msg_cancel_resp="" msg.cmpp_msg_cancel_resp(the_pk);="" msg.cmpp_msg_connect_resp="" msg.cmpp_msg_connect_resp(the_pk);="" msg.cmpp_msg_deliver="" msg.cmpp_msg_deliver(the_pk);="" msg.cmpp_msg_deliver_resp="" msg.cmpp_msg_deliver_resp(seq);="" msg.cmpp_msg_header(rbuf,index);="" msg.cmpp_msg_query_resp="" msg.cmpp_msg_query_resp(the_pk);="" msg.cmpp_msg_submit_resp="" msg.cmpp_msg_submit_resp(the_pk);="" msg.cmpp_msg_terminate="" msg.cmpp_msg_terminate(the_pk);="" msg.cmpp_msg_terminate_resp="" msg.cmpp_msg_terminate_resp(seq);="" msg.cmpp_msg_terminate_resp(the_pk);="" msg.cmpp_msg_test="" msg.cmpp_msg_test(the_pk);="" msg.cmpp_msg_test_resp="" msg.cmpp_msg_test_resp(seq);="" msg.cmpp_msg_test_resp(the_pk);="" msgid="" oncancelresphandler(this,e);="" onconnectresphandler(this,e);="" onreporthandler(this,arg);="" onsmshandler(this,smsarg);="" onsubmitresphandler(this,e);="" onterminatehandler(this,e);="" onterminateresphandler(this,e);="" ontesthandler(this,e);="" ontestresphandler(this,e);="" private="" query_resp="new" queryrespeventargs="" queryrespeventargs(query_resp);="" reporteventargs="" reporteventargs(reportmsgid.tostring(),="" reporteventargs(the_pk,msg.cmpp_msg_header.headerlength+8+21+10+1+1+1+21+1+1);="" reporteventargs传递的字节数组是="" reportmsgid="deliver.ReportMsgID" return;="" se)="" seq="ter_resp.Sequence" seq;="" smsarg="new" smseventargs="" statereport="deliver.StateReport;" string="" sub_resp++;="" submit_resp="new" submit_resp.="" submitrespeventargs="" submitrespeventargs(submit_resp);="" switch(header.command_id)="" t="deliver_resp.toBytes();" t_count="0;" t_count++;="" tcp.send(t);="" tcp.send(terminate_resp.tobytes());="" tcp.send(test_reply.tobytes());="" ter_resp="new" terminate="new" terminate_resp="new" terminateeventargs="" terminateeventargs(terminate);="" terminaterespeventargs="" terminaterespeventargs(ter_resp);="" test="new" test_reply="new" test_reply2="new" testeventargs="" testeventargs(test);="" testrespeventargs="" testrespeventargs(test_reply2);="" the_pk="new" the_pk)="" the_pk[i]="rbuf[index++];" this._stopme();="" this.defaultreporthandler();="" this.delfromoutqueue(seq);="" this.delfromwaitingqueue(query_resp.sequence="" this.delfromwaitingqueue(seq);="" this.errorinfo="this.ErrorInfo" this.islogin="false;" this.isstop)="" this.ping();="" this.stopme()="" thread.sleep(50);="" uint="" uint64="" void="" while(!="" {="" }="" 一条="" 传递的整个deliver包="" 保留映像="" 准备自我停止?="" 删除等待队列中的消息="" 删除等待队列的消息="" 删除输出表重点项目="" 删除队列中的等待连接信息包="" 删除需要等待的消息="" 发出终止设定="" 发过来的流水号,需要立即发送一个deliver_resp="" 发送一个ping包="" 取得一个消息="" 取得关于此消息的状态="" 取得发送过来的流水号="" 取得回复消息的下一个流水序列号="" 取得流水信号="" 取得消息id="" 取得消息的seq="" 启动接收和发送="" 失败,正确则处理是否状态包,不是状态包则存到mo缓存,表示收到信息,时状态包则判断缓存消息进行消息送达处理="" 存储byte字节="" 寻找="" 将等待的队列中的元素删除="" 循环时间计数="" 或者="" 报告信息包的数据,在此不考虑多个报告的情况="" 报告消息已经正确发送到="" 收到服务器送达的慧英消息="" 收到消息,处理后存入数据库="" 曾经发送过去的消息="" 服务器的回应消息,应当丢弃不管="" 服务器给客户的测试信号="" 构造报告事件参数="" 构造消息="" 检查下消息的正确性,清除等待队列="" 检查消息正确定,立即返回="" 正确="" 此线程是监视线程="" 清空等待回应队列="" 生成此消息的大小="" 的消息="" 监视本系统连接是否正常="" 触发事件,应当很快结束处理,不要靠考虑存储之类的耗费资源事宜="" 记录当前最后一次消息soket正确时间="" 设定连接成功标志="" 该变量仅供测试使用="" 超时="" 退出线程="" 逐个消息分析="" 马上送出回应包,不需要进入队列="">50) // 500*100=50000=50秒   
490{   
491t_count=0;   
492checkReSend() ; //检查需要重新发送的消息   
493//触发一个事件,让系统自动检查消息队列,存储消息队列中的消息状态   
494}   
495}   
496else   
497{   
498EventArgs e=new EventArgs();   
499if(this.onSocketClosedHandler!=null)   
500{   
501onSocketClosedHandler(this,e);   
502}   
503else   
504{   
505}   
506this.isStop =true; //通知其他线程退出   
507}   
508Thread.Sleep(1000);   
509}   
510}   
511  
512private void SendSPMsgThread()   
513{   
514while (!this.isStop )   
515{   
516Thread.Sleep(10);   
517if(this.isLogin)   
518{   
519ArrayList lists=this.getTop16Queue(); //取出16条最顶的消息   
520if(lists!=null &amp;&amp; lists.Count &gt;0)   
521{   
522int count=lists.Count;   
523ArrayList pks=new ArrayList( count); //定义容量   
524for (int i=0;i<lists.count; )="" +"\r\n"+se.tostring();="" ++;="" ;="" _forcedsubthread(thread="" _stopme()="" bool="" byte[400];="" byte[]="" catch(exception="" catch(socketexception="" cmppport)="" cmppport,int="" cmppserver,int="" datetime="" i++)="" if(l="" if(outitem!="null)" init(string="" int="" l="tcp.Receive(rbuf)" l;="" lock(this)="" login(string="" outitem="(QueueItem)lists[i];" outitem.failedcount="" password)="" password);="" private="" public="" queueitem="" rbuf="new" recvtimeout)="" recvtimeout,int="" return(false);="" return(this._init(cmppserver,cmppport));="" se)="" sendlogin(systemid,="" sendqueueitem(outitem);="" sendtimeout)="" systemid,string="" t)="" t.abort();="" t.join();="" t1="DateTime.Now;" this.errorinfo="this.ErrorInfo" this.isstop="true;" this.loglastoktime(datetime.now);="" this.recvtimeout="recvtimeout;" this.sendtimeout="recvtimeout;" thread.sleep(100);="" try="" username,="" username,string="" void="" while(!this.islogin)="" {="" {}="" }="" 公用函数="" 函数区域="" 发送出错="" 发送失败="" 发送每一个消息="" 取出每一个消息对象="" 属性区域="" 强制停止线程="" 最后一次正确的发送="">16)   
525{   
526if(BIConvert.Bytes2UInt(rbuf,4)==(uint)MSG.CMPP_COMMAND_ID.CMPP_CONNECT_RESP)   
527{   
528MSG.CMPP_MSG_CONNECT_RESP resp=new MSG.CMPP_MSG_CONNECT_RESP(rbuf);   
529if(resp.isOk)   
530{   
531EventArgs e=new EventArgs();   
532if(onLogonSuccEventHandler!=null)   
533{   
534onLogonSuccEventHandler(this,e);   
535}   
536else   
537{   
538this.defaultConnectRespEventHandler();   
539}   
540this.isLogin =true;   
541}   
542else   
543{   
544}   
545break;   
546}   
547}   
548this._lastOkTime =DateTime.Now ; //更新当前最后成功收发套接字的时间   
549}   
550catch(SocketException)   
551{   
552}   
553n</lists.count;></msglength)></this._outseqqueue.count;i++)></this._waitingseqqueue.count;i++)>
Published At
Categories with Web编程
Tagged with
comments powered by Disqus