某源码thread,socket研究3

2023-01-31 01:01:50 源码 研究 Thread
///
/// @file Worker.h
/// @brief 用户接口类
/// @author guozhiming
/// @date 2007-05-16
///
#ifndef __WORKER__
#define __WORKER__
#include "ThreadPool.h"
/// @brief 抽象类
class G_Worker
{
    public:
        /// @brief 构造函数
        G_Worker(unsigned int num);
        /// @brief 析构函数
        ~G_Worker();
        /// @brief 服务器帮定端口
        ///
        /// @param nPort 帮定端口
        ///
        /// @return true表示成功 , false表示失败
        bool Bind(unsigned int nPort);
        /// @brief 存虚函数子类继承并实现逻辑
        ///
        /// @param pStr 客户端发送的字符串
        virtual void recvMessage(void *pStr , int nSocket) = 0;
        /// @brief 发送数据到客户端
        ///
        /// @param pStr 数据
        /// @param nSocket 发送到客户端的套接字
        //
        /// @return
        int sendMessage(int nSocket , const void *pStr);
    protected:
                                            
    private:
        G_ThreadPool *g_threadPool;
};
#endif



#include "Worker.h"
#include "Log.h"
G_Worker::G_Worker(unsigned int num)
{
    g_threadPool = new G_ThreadPool(num , this);  //开num个线程
}
G_Worker::~G_Worker()
{
    if(g_threadPool)
    {
        delete g_threadPool;
        g_threadPool = NULL;
    }
}
bool G_Worker::Bind(unsigned int nPort)
{
    return g_threadPool->Bind(nPort);//整个线程池,绑定一个端口。。
}
int G_Worker::sendMessage(int nSocket , const void *pStr)
{
    return g_threadPool->sendMessage(nSocket , pStr);//这是tcp还是udp发送呢。
}



///
/// @file ThreadPool.h
/// @brief 线程池的实现 , 是个管理线程 , 负责调用每个线程之间的互相调用的关系
/// @author guozhiming
/// @date 2007-05-16
///
#ifndef __G_THREADPOOL__
#define __G_THREADPOOL__
#include "ListenThread.h"
#include "SendMessThread.h"
#include "Queue.h"
#include "RecvMessThread.h"
#include "Worker.h"
class G_ListenThread;
class G_SendMessThread;
class G_RecvMessThread;
class G_Worker;
class G_ThreadPool : public G_Thread
{
    public:
        /// @brief 构造函数
        G_ThreadPool(unsigned int num , G_Worker *g_work);
        /// @brief 析构函数
        ~G_ThreadPool();
        /// @brief 服务器帮定端口
        ///
        /// @param nPort 帮定端口
        ///
        /// @return true表示成功 , false表示失败
        bool Bind(unsigned int nPort);
        /// @brief 主线程
        void Run();
        /// @brief 填加socket到队列中
        ///
        /// @param nSocket 套接口
        ///
        /// @return true 表示成功 , false 表示失败
        bool pushSocket(unsigned int nSocket);
        /// @brief 从队列中取套接字
        ///
        /// @param nSocket 取出的套接字存放在nSocket中
        ///
        /// @return true 表示成功 , false 表示失败
        bool popSocket(int &nSocket);
        /// @brief 从G_Data->G_RecvMessThread->G_ThreadPool->G_Worker 回掉
        ///
        /// @param pStr 客户发的字符串
        /// @param nSocket 接受客户连接的套接字
        void recvMessage(void *pStr , int nSocket);
        /// @brief 发送数据 从testPool->G_Worker->G_ThreadPool->G_SendMessThread->G_Data
        ///
        /// @param pStr 数据
        /// @param nSocket 套接口
        /// @return
        //
        int sendMessage(int nSocket , const void *pStr);
    private:
        G_Worker *g_worker;
        /// @brief 监听线程
        G_ListenThread *g_listenThread;
        /// @brief 发送消息线程
        G_SendMessThread *g_sendMessThread;
        /// @brief 存放socket队列
        G_Queue<int> g_sockQueue;
        /// @brief 存放空闲工作线程队列
        G_Queue<G_RecvMessThread*> g_idleRecvMessThreadQueue;
        /// @brief 存放忙碌工作线程队列
        G_Queue<G_RecvMessThread*> g_busyRecvMessThreadQueue;
        /// @brief 每个RecvMessThread线程中最大用户数
        static const int maxCounter = 2000;
        /// @brief 如果线程不够用新增加的线程
        static const int addTaskThread = 2;
};
#endif



#include "ThreadPool.h"
#include "Log.h"
G_ThreadPool::G_ThreadPool(unsigned int num , G_Worker *g_work) : g_worker(g_work)
{
    g_listenThread = new G_ListenThread(this);
    g_listenThread->Start();  ///启动监听线程
    g_sendMessThread = new G_SendMessThread();
    g_sendMessThread->Start();   ///发送消息线程
    for(int i=0; i<num; i++)
    {
        /// 启动处理client发送信息线程 , 收消息线程
        //明明只启动了接受线程!!!开了num个,为什么开这么多。网络的都交给epoll就好
        G_RecvMessThread *g_recvMessThread = new G_RecvMessThread(this);
        g_idleRecvMessThreadQueue.push(g_recvMessThread);//空闲列,busy列呢,没看到,暂时没放入值
        g_recvMessThread->Start();       
    }
    Start();   ///线程池自己启动
}
G_ThreadPool::~G_ThreadPool()
{
    if(g_listenThread)
    {
        delete g_listenThread;
        g_listenThread = NULL;
    }
    if(g_sendMessThread)
    {
        delete g_sendMessThread;
        g_sendMessThread = NULL;
    }
                    
    g_sockQueue.clear();
    g_idleRecvMessThreadQueue.clear();
    g_busyRecvMessThreadQueue.clear();
}
bool G_ThreadPool::Bind(unsigned int nPort)
{
    return g_listenThread->Bind(nPort);
}
int G_ThreadPool::sendMessage(int nSocket , const void *pStr)
{
    g_sendMessThread->sendMessage(nSocket , pStr);
}
void G_ThreadPool::Run()
{
    int nSocket; 
    G_RecvMessThread *g_recvMessThread;
    while(1)
    {
        pause();   ///等待ListenThread 发信号,这种事情,用epoll的epoll_wait就好了嘛
        while(popSocket(nSocket))     ///必须把存放socket队列中的套接口全部取出
        {
            g_sendMessThread->addEpoll(nSocket);//这里也是用epoll的啊
            while(1)//各种循环
            {
                ///从空闲队列中获得对首TaskThread
                if(g_idleRecvMessThreadQueue.getFront(g_recvMessThread))
                {
                    ///如果TaskThread线程中客户大于maxCounter , 从空闲队列中pop并放到忙碌队列中
                    if(g_recvMessThread->getCounter() >= maxCounter)  //接受线程里面,不止一个mess?还要count?
                    {
                        if(g_idleRecvMessThreadQueue.pop(g_recvMessThread))
                        {
                            g_busyRecvMessThreadQueue.push(g_recvMessThread);
                        }
                        else
                        {
                            ///表示空闲队列中再没有TaskThread可以用 , 创建addTaskThread个TaskThread线程 , 并且把busy队列中的TaskThread放到idle队列中这样可以防止busy队列中的用户数减少但是他还在busy队列中
                            for(int i=0; i<addTaskThread; i++)
                            {
                                G_RecvMessThread *g_recvMessThread = new G_RecvMessThread(this);
                                g_idleRecvMessThreadQueue.push(g_recvMessThread);
                                g_recvMessThread->Start();
                            }
                            while(g_busyRecvMessThreadQueue.pop(g_recvMessThread))//感觉这么麻烦。。。
                            {
                                g_idleRecvMessThreadQueue.push(g_recvMessThread);
                            }
                        }
                    }
                    else
                    {
                        /// 填加到TaskThread 线程中
                        g_recvMessThread->addSocket(nSocket);
                        g_recvMessThread->continues();   /// 唤醒TaskThread 线程
                        break;
                    }
                }
                else
                {
                    /// 空闲队列中没有任何线程 , 应该没有这种情况
                    debug_output("idleRecvMessThreadQueue is not g_recvMessThread\n");
                }
            }
        }
    }
}
bool G_ThreadPool::pushSocket(unsigned int nSocket)
{
    return g_sockQueue.push(nSocket);
}
bool G_ThreadPool::popSocket(int &nSocket)
{
    return g_sockQueue.pop(nSocket);
}
void G_ThreadPool::recvMessage(void *pStr , int nSocket)
{
    g_worker->recvMessage(pStr , nSocket);
}



///
/// @file TaskThread.h
/// @brief 任务类 , 接受client发的消息进行处理
/// @author guozhiming
/// @date 2007-05-17
///
#ifndef __TASKTHREAD__
#define __TASKTHREAD__
#include "def.h"
#include "Thread.h"
#include "ThreadPool.h"
#include "Queue.h"
#include "Data.h"
class G_ThreadPool;
class G_Data;
class G_RecvMessThread : public G_Thread
{
    public:
        /// @brief 构造函数
        G_RecvMessThread(G_ThreadPool *pool);
        /// @brief 析构函数
        ~G_RecvMessThread();
        /// @brief 主线程运行
        void Run();
        /// @brief 填加套接字
        ///
        /// @param nSocket 套接字
        void addSocket(int nSocket);
        /// @brief 获得连接的客户端数目
        ///
        /// @return 数目
        unsigned int getCounter();
        /// @brief      往队列中存放数据 ,,哪个队列?下面定义了一个queue
        ///
        /// @param pStr  数据
        ///
        /// @return true 成功 , false 失败
        bool pushData(std::string pStr);
    private:
                
        /// @brief 设置套接口非阻塞模式
        ///
        /// @param sockfd 套接口
        ///
        /// @return true 成功 , false 失败
        bool setNonBlock(int sockfd);
        /// @brief epoll_create 返回文件描述符
        int epfd;
        struct epoll_event events[100];   //才100个。。封装了epoll啊。在recv里封了,难道send里,也封了一个
        /// @brief 记录接受客户端数目
        unsigned int counter;
        /// @brief 线程池对象
        G_ThreadPool *g_threadPool;
        /// @brief 存放数据的队列
        G_Queue<std::string> g_dataBufferQueue;
        G_Data *g_data;
};
#endif



#include "RecvMessThread.h"
#include "Log.h"
G_RecvMessThread::G_RecvMessThread(G_ThreadPool *pool) : g_threadPool(pool)
{
    counter = 0;
    epfd = epoll_create(256); //最多同时监视256个
    g_data = new G_Data(this); //这个数据结构要看看,有啥稀奇
}
G_RecvMessThread::~G_RecvMessThread()
{
    close(epfd);
}
unsigned int G_RecvMessThread::getCounter()
{
    return counter;
}
bool G_RecvMessThread::setNonBlock(int sockfd)
{
    int opts = fcntl(sockfd , F_GETFL);
    if(-1 == opts)
    {
        debug_output("%s\n" , "fcntl F_GETFL is faild");
        return false;
    }
    opts = opts | O_NONBLOCK;
    if(fcntl(sockfd , F_SETFL , opts) < 0)
    {
        debug_output("%s\n" , "fcntl F_SETFL is faild");
        return false;
    }
    return true;
}
void G_RecvMessThread::addSocket(int nSocket)
{
    struct epoll_event ev;
    bzero(&ev , sizeof(ev));
    setNonBlock(nSocket);
    ev.data.fd = nSocket;
    ev.events = EPOLLIN | EPOLLET;
    epoll_ctl(epfd , EPOLL_CTL_ADD , nSocket , &ev);
    counter++;
}
bool G_RecvMessThread::pushData(std::string pStr)
{
    return g_dataBufferQueue.push(pStr);
}
void G_RecvMessThread::Run()
{
    pause();    /// 暂停线程  //都用这招。。。
    int nfds , sock;
    struct epoll_event ev;
    bool nRet;
    char line[1024]; //一次最多1024个。。感觉这个模型蛮奇怪。。
    while(1)
    {
        nfds = epoll_wait(epfd,events,100,50);
        for(int i=0; i<nfds; i++)
        {
            if(events[i].events&EPOLLIN)
            {
                if((sock = events[i].data.fd) < 0)
                    continue;
                if(!(nRet=g_data->recvData(sock)))//这是最底层的收数据的啊
                {
                    debug_output("client is quit\n");
                    ev.data.fd= sock;
                    epoll_ctl(epfd , EPOLL_CTL_DEL , sock , &ev);//收到了直接关掉了
                    close(sock);
                    events[i].data.fd = -1;
                    counter --;
                }
                else
                {
                    std::string pBuffer;
                    while(g_dataBufferQueue.size())
                    {
                        g_dataBufferQueue.pop(pBuffer);
                        g_threadPool->recvMessage((void*)pBuffer.c_str() , sock); //这又是一个收数据的。
                    }
                }
                usleep(100);//还要休眠。。这么坑爹。。。
            }
        }
    }
}


相关文章