某源码thread,socket研究3
///
/// @file Worker.h
/// @brief 用户接口类
/// @author guozhiming
/// @date 2007-05-16
///
#ifndef __WORKER__
#define __WORKER__
#include "ThreadPool.h"
/// @brief 抽象类
class G_Worker
{
public:
/// @brief 构造函数
G_Worker(unsigned int num);
/// @brief 析构函数
~G_Worker();
/// @brief 服务器帮定端口
///
/// @param nPort 帮定端口
///
/// @return true表示成功 , false表示失败
bool Bind(unsigned int nPort);
/// @brief 存虚函数子类继承并实现逻辑
///
/// @param pStr 客户端发送的字符串
virtual void recvMessage(void *pStr , int nSocket) = 0;
/// @brief 发送数据到客户端
///
/// @param pStr 数据
/// @param nSocket 发送到客户端的套接字
//
/// @return
int sendMessage(int nSocket , const void *pStr);
protected:
private:
G_ThreadPool *g_threadPool;
};
#endif
#include "Worker.h"
#include "Log.h"
G_Worker::G_Worker(unsigned int num)
{
g_threadPool = new G_ThreadPool(num , this); //开num个线程
}
G_Worker::~G_Worker()
{
if(g_threadPool)
{
delete g_threadPool;
g_threadPool = NULL;
}
}
bool G_Worker::Bind(unsigned int nPort)
{
return g_threadPool->Bind(nPort);//整个线程池,绑定一个端口。。
}
int G_Worker::sendMessage(int nSocket , const void *pStr)
{
return g_threadPool->sendMessage(nSocket , pStr);//这是tcp还是udp发送呢。
}
///
/// @file ThreadPool.h
/// @brief 线程池的实现 , 是个管理线程 , 负责调用每个线程之间的互相调用的关系
/// @author guozhiming
/// @date 2007-05-16
///
#ifndef __G_THREADPOOL__
#define __G_THREADPOOL__
#include "ListenThread.h"
#include "SendMessThread.h"
#include "Queue.h"
#include "RecvMessThread.h"
#include "Worker.h"
class G_ListenThread;
class G_SendMessThread;
class G_RecvMessThread;
class G_Worker;
class G_ThreadPool : public G_Thread
{
public:
/// @brief 构造函数
G_ThreadPool(unsigned int num , G_Worker *g_work);
/// @brief 析构函数
~G_ThreadPool();
/// @brief 服务器帮定端口
///
/// @param nPort 帮定端口
///
/// @return true表示成功 , false表示失败
bool Bind(unsigned int nPort);
/// @brief 主线程
void Run();
/// @brief 填加socket到队列中
///
/// @param nSocket 套接口
///
/// @return true 表示成功 , false 表示失败
bool pushSocket(unsigned int nSocket);
/// @brief 从队列中取套接字
///
/// @param nSocket 取出的套接字存放在nSocket中
///
/// @return true 表示成功 , false 表示失败
bool popSocket(int &nSocket);
/// @brief 从G_Data->G_RecvMessThread->G_ThreadPool->G_Worker 回掉
///
/// @param pStr 客户发的字符串
/// @param nSocket 接受客户连接的套接字
void recvMessage(void *pStr , int nSocket);
/// @brief 发送数据 从testPool->G_Worker->G_ThreadPool->G_SendMessThread->G_Data
///
/// @param pStr 数据
/// @param nSocket 套接口
/// @return
//
int sendMessage(int nSocket , const void *pStr);
private:
G_Worker *g_worker;
/// @brief 监听线程
G_ListenThread *g_listenThread;
/// @brief 发送消息线程
G_SendMessThread *g_sendMessThread;
/// @brief 存放socket队列
G_Queue<int> g_sockQueue;
/// @brief 存放空闲工作线程队列
G_Queue<G_RecvMessThread*> g_idleRecvMessThreadQueue;
/// @brief 存放忙碌工作线程队列
G_Queue<G_RecvMessThread*> g_busyRecvMessThreadQueue;
/// @brief 每个RecvMessThread线程中最大用户数
static const int maxCounter = 2000;
/// @brief 如果线程不够用新增加的线程
static const int addTaskThread = 2;
};
#endif
#include "ThreadPool.h"
#include "Log.h"
G_ThreadPool::G_ThreadPool(unsigned int num , G_Worker *g_work) : g_worker(g_work)
{
g_listenThread = new G_ListenThread(this);
g_listenThread->Start(); ///启动监听线程
g_sendMessThread = new G_SendMessThread();
g_sendMessThread->Start(); ///发送消息线程
for(int i=0; i<num; i++)
{
/// 启动处理client发送信息线程 , 收消息线程
//明明只启动了接受线程!!!开了num个,为什么开这么多。网络的都交给epoll就好
G_RecvMessThread *g_recvMessThread = new G_RecvMessThread(this);
g_idleRecvMessThreadQueue.push(g_recvMessThread);//空闲列,busy列呢,没看到,暂时没放入值
g_recvMessThread->Start();
}
Start(); ///线程池自己启动
}
G_ThreadPool::~G_ThreadPool()
{
if(g_listenThread)
{
delete g_listenThread;
g_listenThread = NULL;
}
if(g_sendMessThread)
{
delete g_sendMessThread;
g_sendMessThread = NULL;
}
g_sockQueue.clear();
g_idleRecvMessThreadQueue.clear();
g_busyRecvMessThreadQueue.clear();
}
bool G_ThreadPool::Bind(unsigned int nPort)
{
return g_listenThread->Bind(nPort);
}
int G_ThreadPool::sendMessage(int nSocket , const void *pStr)
{
g_sendMessThread->sendMessage(nSocket , pStr);
}
void G_ThreadPool::Run()
{
int nSocket;
G_RecvMessThread *g_recvMessThread;
while(1)
{
pause(); ///等待ListenThread 发信号,这种事情,用epoll的epoll_wait就好了嘛
while(popSocket(nSocket)) ///必须把存放socket队列中的套接口全部取出
{
g_sendMessThread->addEpoll(nSocket);//这里也是用epoll的啊
while(1)//各种循环
{
///从空闲队列中获得对首TaskThread
if(g_idleRecvMessThreadQueue.getFront(g_recvMessThread))
{
///如果TaskThread线程中客户大于maxCounter , 从空闲队列中pop并放到忙碌队列中
if(g_recvMessThread->getCounter() >= maxCounter) //接受线程里面,不止一个mess?还要count?
{
if(g_idleRecvMessThreadQueue.pop(g_recvMessThread))
{
g_busyRecvMessThreadQueue.push(g_recvMessThread);
}
else
{
///表示空闲队列中再没有TaskThread可以用 , 创建addTaskThread个TaskThread线程 , 并且把busy队列中的TaskThread放到idle队列中这样可以防止busy队列中的用户数减少但是他还在busy队列中
for(int i=0; i<addTaskThread; i++)
{
G_RecvMessThread *g_recvMessThread = new G_RecvMessThread(this);
g_idleRecvMessThreadQueue.push(g_recvMessThread);
g_recvMessThread->Start();
}
while(g_busyRecvMessThreadQueue.pop(g_recvMessThread))//感觉这么麻烦。。。
{
g_idleRecvMessThreadQueue.push(g_recvMessThread);
}
}
}
else
{
/// 填加到TaskThread 线程中
g_recvMessThread->addSocket(nSocket);
g_recvMessThread->continues(); /// 唤醒TaskThread 线程
break;
}
}
else
{
/// 空闲队列中没有任何线程 , 应该没有这种情况
debug_output("idleRecvMessThreadQueue is not g_recvMessThread\n");
}
}
}
}
}
bool G_ThreadPool::pushSocket(unsigned int nSocket)
{
return g_sockQueue.push(nSocket);
}
bool G_ThreadPool::popSocket(int &nSocket)
{
return g_sockQueue.pop(nSocket);
}
void G_ThreadPool::recvMessage(void *pStr , int nSocket)
{
g_worker->recvMessage(pStr , nSocket);
}
///
/// @file TaskThread.h
/// @brief 任务类 , 接受client发的消息进行处理
/// @author guozhiming
/// @date 2007-05-17
///
#ifndef __TASKTHREAD__
#define __TASKTHREAD__
#include "def.h"
#include "Thread.h"
#include "ThreadPool.h"
#include "Queue.h"
#include "Data.h"
class G_ThreadPool;
class G_Data;
class G_RecvMessThread : public G_Thread
{
public:
/// @brief 构造函数
G_RecvMessThread(G_ThreadPool *pool);
/// @brief 析构函数
~G_RecvMessThread();
/// @brief 主线程运行
void Run();
/// @brief 填加套接字
///
/// @param nSocket 套接字
void addSocket(int nSocket);
/// @brief 获得连接的客户端数目
///
/// @return 数目
unsigned int getCounter();
/// @brief 往队列中存放数据 ,,哪个队列?下面定义了一个queue
///
/// @param pStr 数据
///
/// @return true 成功 , false 失败
bool pushData(std::string pStr);
private:
/// @brief 设置套接口非阻塞模式
///
/// @param sockfd 套接口
///
/// @return true 成功 , false 失败
bool setNonBlock(int sockfd);
/// @brief epoll_create 返回文件描述符
int epfd;
struct epoll_event events[100]; //才100个。。封装了epoll啊。在recv里封了,难道send里,也封了一个
/// @brief 记录接受客户端数目
unsigned int counter;
/// @brief 线程池对象
G_ThreadPool *g_threadPool;
/// @brief 存放数据的队列
G_Queue<std::string> g_dataBufferQueue;
G_Data *g_data;
};
#endif
#include "RecvMessThread.h"
#include "Log.h"
G_RecvMessThread::G_RecvMessThread(G_ThreadPool *pool) : g_threadPool(pool)
{
counter = 0;
epfd = epoll_create(256); //最多同时监视256个
g_data = new G_Data(this); //这个数据结构要看看,有啥稀奇
}
G_RecvMessThread::~G_RecvMessThread()
{
close(epfd);
}
unsigned int G_RecvMessThread::getCounter()
{
return counter;
}
bool G_RecvMessThread::setNonBlock(int sockfd)
{
int opts = fcntl(sockfd , F_GETFL);
if(-1 == opts)
{
debug_output("%s\n" , "fcntl F_GETFL is faild");
return false;
}
opts = opts | O_NONBLOCK;
if(fcntl(sockfd , F_SETFL , opts) < 0)
{
debug_output("%s\n" , "fcntl F_SETFL is faild");
return false;
}
return true;
}
void G_RecvMessThread::addSocket(int nSocket)
{
struct epoll_event ev;
bzero(&ev , sizeof(ev));
setNonBlock(nSocket);
ev.data.fd = nSocket;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd , EPOLL_CTL_ADD , nSocket , &ev);
counter++;
}
bool G_RecvMessThread::pushData(std::string pStr)
{
return g_dataBufferQueue.push(pStr);
}
void G_RecvMessThread::Run()
{
pause(); /// 暂停线程 //都用这招。。。
int nfds , sock;
struct epoll_event ev;
bool nRet;
char line[1024]; //一次最多1024个。。感觉这个模型蛮奇怪。。
while(1)
{
nfds = epoll_wait(epfd,events,100,50);
for(int i=0; i<nfds; i++)
{
if(events[i].events&EPOLLIN)
{
if((sock = events[i].data.fd) < 0)
continue;
if(!(nRet=g_data->recvData(sock)))//这是最底层的收数据的啊
{
debug_output("client is quit\n");
ev.data.fd= sock;
epoll_ctl(epfd , EPOLL_CTL_DEL , sock , &ev);//收到了直接关掉了
close(sock);
events[i].data.fd = -1;
counter --;
}
else
{
std::string pBuffer;
while(g_dataBufferQueue.size())
{
g_dataBufferQueue.pop(pBuffer);
g_threadPool->recvMessage((void*)pBuffer.c_str() , sock); //这又是一个收数据的。
}
}
usleep(100);//还要休眠。。这么坑爹。。。
}
}
}
}
相关文章