首先,启动主线程,接收来自客户端的请求。并启动4个子线程接收已经建连的客户端发来的消息。此时主线不阻塞,继续接收新的注册请求。4个子线程处理发来的消息,并解析消息,将要做的任务交给线程池处理。自己继续处理发来的消息。
IOCP服务器实现
#pragma once #include <string> #include <winsock2.h> #include <Windows.h> #include <vector> #include <iostream> #include "CThreadPool.h" #include "WorkA.h" #include "WorkB.h" using namespace std; #pragma comment(lib, "Ws2_32.lib") // Socket编程需用的动态链接库 //#pragma comment(lib, "Kernel32.lib") #define DefaultIP "127.0.0.1" #define DefaultPort 9999 #define DefaultClientNum 6000 #define MessMaxLen 1024 #define DataBuffSize 2 * 1024 /** * 结构体名称:PER_IO_DATA * 结构体功能:重叠I/O需要用到的结构体,临时记录IO数据 **/ typedef struct { OVERLAPPED overlapped; WSABUF databuff; char buffer[ DataBuffSize ]; int BufferLen; int operationType; SOCKET socket; }PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA; /** * 结构体名称:PER_HANDLE_DATA * 结构体存储:记录单个套接字的数据,包括了套接字的变量及套接字的对应的客户端的地址。 * 结构体作用:当服务器连接上客户端时,信息存储到该结构体中,知道客户端的地址以便于回访。 **/ typedef struct { SOCKET socket; SOCKADDR_STORAGE ClientAddr; }PER_HANDLE_DATA, *LPPER_HANDLE_DATA; //单例类 class CMYIOCPServer { public: ~CMYIOCPServer(void); bool ServerSetUp(); void SetServerIp(const string & sIP=DefaultIP); void SetPort(const int &iPort=DefaultPort); void SetMaxClientNum(const int &iMaxNum = DefaultClientNum); static DWORD WINAPI ServerWorkThread(LPVOID CompletionPortID); static void SendMessage(SOCKET &tSOCKET,char MessAge[MessMaxLen]); static CMYIOCPServer* GetInstance(); private: //私有方法 CMYIOCPServer(void); bool LoadWindowsSocket(); bool InitServerSocket(); bool CreateServerSocker(); static void HandleMessage(); //私有数据 string m_sServerIP; int m_iLisenPoint; string m_sError; int m_iMaxClientNum; vector< PER_HANDLE_DATA* > m_vclientGroup;//保持客户端的连接信息 static HANDLE m_hMutex;//多线程访问互斥变量 static HANDLE m_completionPort; SOCKET m_srvSocket; static CMYIOCPServer *m_pInstance; static char m_byteMsg[MessMaxLen]; static CWorkQueue m_CWorkQueue;//线程池 }; #include "StdAfx.h" #include "MYIOCPServer.h" HANDLE CMYIOCPServer::m_completionPort = NULL; HANDLE CMYIOCPServer:: m_hMutex = NULL; CMYIOCPServer* CMYIOCPServer:: m_pInstance = NULL; char CMYIOCPServer::m_byteMsg[MessMaxLen] = {0} ; CWorkQueue CMYIOCPServer:: m_CWorkQueue; /************************** 获得单例对象 ***************************/ CMYIOCPServer* CMYIOCPServer::GetInstance() { if(NULL == m_pInstance) { m_pInstance = new CMYIOCPServer(); } m_CWorkQueue.Create(10); return m_pInstance; } /************************** 类的构造函数 **************************/ CMYIOCPServer::CMYIOCPServer(void) { m_iLisenPoint = DefaultPort; } /************************** 类的析构函数 **************************/ CMYIOCPServer::~CMYIOCPServer(void) { m_CWorkQueue.Destroy(5); } /************************** 设置服务器IP **************************/ void CMYIOCPServer::SetServerIp(const string & sIP) { m_sServerIP = sIP; } /************************** 设置服务器端口 **************************/ void CMYIOCPServer::SetPort(const int &iPort) { m_iLisenPoint = iPort; } /************************** 设置最大的客户端连接数目 **************************/ void CMYIOCPServer::SetMaxClientNum(const int &iMaxNum) { m_iMaxClientNum = iMaxNum; } /************************** 服务器接收客户端消息, 工作线程 **************************/ DWORD WINAPI CMYIOCPServer::ServerWorkThread(LPVOID CompletionPortID) { HANDLE CompletionPort = (HANDLE)CompletionPortID; DWORD BytesTransferred; LPOVERLAPPED IpOverlapped; LPPER_HANDLE_DATA PerHandleData = NULL; LPPER_IO_DATA PerIoData = NULL; DWORD RecvBytes; DWORD Flags = 0; BOOL bRet = false; while(true){ bRet = GetQueuedCompletionStatus(m_completionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE); if(bRet == 0){ cerr << "GetQueuedCompletionStatus Error: " << GetLastError() << endl; continue; //这里不能返回,返回子线程就结束了 //return -1; } PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped); // 检查在套接字上是否有错误发生 if(0 == BytesTransferred){ closesocket(PerHandleData->socket); GlobalFree(PerHandleData); GlobalFree(PerIoData); continue; } //得到消息码流 memset(m_byteMsg,0,MessMaxLen); memcpy(m_byteMsg,PerIoData->databuff.buf,MessMaxLen); //得到客户端SOCKET信息 SOCKET sClientSocket = PerHandleData->socket; printf("message is %s \n",m_byteMsg); HandleMessage(); //SendMessage(sClientSocket,m_byteMsg); // 为下一个重叠调用建立单I/O操作数据 ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存 PerIoData->databuff.len = 1024; PerIoData->databuff.buf = PerIoData->buffer; PerIoData->operationType = 0; // read WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL); } return 0; } |