大家好,好久没有来俺的空间了,请大家多多包含!!! #8-)

一个VC线程池的实现

上一篇 / 下一篇  2007-03-18 10:06:10 / 个人分类:软件开发小贴士

类定义如下

// ThreadPoolImp.h: interface for the ThreadPoolImp class.
//
//////////////////////////////////////////////////////////////////////

#if !defined(AFX_THREADPOOLIMP_H__82F4FC7E_2DB4_4D2A_ACC8_2EFC787CAE42__INCLUDED_)
#define AFX_THREADPOOLIMP_H__82F4FC7E_2DB4_4D2A_ACC8_2EFC787CAE42__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000


#pragma warning( disable : 4705 4786)

#include <map>
#include "AutoLock.h"

using namespace std;

class IJobDesc;
class IWorker;

class CThreadPoolImp 
{
public:
    class ThreadInfo
    {
      public:  
        ThreadInfo() { m_hThread=0; m_bBusyWorking=false; }
        ThreadInfo(HANDLE handle, bool bBusy) { m_hThread=handle; m_bBusyWorking=bBusy; }
        ThreadInfo(const ThreadInfo& info) { m_hThread=info.m_hThread; m_bBusyWorking=info.m_bBusyWorking; }
 ////////
        HANDLE m_hThread;
        bool m_bBusyWorking;
    };

    typedef map<DWORD,ThreadInfo> ThreadInfoMap;
    typedef ThreadInfoMap::iterator  Iterator_ThreadInfoMap;
 
    friend static unsigned int CThreadPoolImp::ManagerProc(void* p);
    friend static unsigned int CThreadPoolImp::WorkerProc(void* p);
protected:
    enum ThreadPoolStatus { BUSY, IDLE, NORMAL };
public:
    //interface to the outside
    void Start(unsigned short nStatic, unsigned short nmax);
    void Stop(bool bHash=false);
    void ProcessJob(IJobDesc* pJob, IWorker* pWorker) const;

 //constructor and destructor
    CThreadPoolImp();
    virtual ~CThreadPoolImp();

protected:
    //interfaces public:
    HANDLE GetMgrIoPort() const { return m_hMgrIoPort; }
    UINT GetMgrWaitTime() const { return 1000; }
    HANDLE GetWorkerIoPort() const { return m_hWorkerIoPort; }

private:
    static DWORD WINAPI ManagerProc(void* p);
    static DWORD WINAPI WorkerProc(void* p);
protected:
    //manager thread
    HANDLE m_hMgrThread;
    HANDLE m_hMgrIoPort;
protected:
    //configuration parameters
    mutable unsigned short m_nNumberOfStaticThreads;
    mutable unsigned short m_nNumberOfTotalThreads;

protected:
    //helper functions
    void AddThreads();
    void RemoveThreads();
    ThreadPoolStatus GetThreadPoolStatus();
    void ChangeStatus(DWORD threadId, bool status);
    void RemoveThread(DWORD threadId);

protected:
    //all the work threads
    ThreadInfoMap m_threadMap;
    CCriticalSection m_arrayCs;
    HANDLE m_hWorkerIoPort;
};

#endif // !defined(AFX_THREADPOOLIMP_H__82F4FC7E_2DB4_4D2A_ACC8_2EFC787CAE42__INCLUDED_)


实现如下


// ThreadPool.cpp: implementation of the CThreadPoolImp class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "ThreadPoolimp.h"
#include "outdebug.h"
#include <assert.h>
#include "work.h"

#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
//#define new DEBUG_NEW
#endif

CThreadPoolImp::CThreadPoolImp()
{
}

CThreadPoolImp::~CThreadPoolImp()
{

}

void CThreadPoolImp::Start(unsigned short nStatic, unsigned short nMax)
{
    assert(nMax>=nStatic);
    HANDLE  hThread;
    DWORD nThreadId;
    m_nNumberOfStaticThreads=nStatic;
    m_nNumberOfTotalThreads=nMax;

    //lock the resource
    CAutoLock AutoLock(m_arrayCs);

    //create an IO port
    m_hMgrIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE, NULL, 0, 0);
    hThread = CreateThread(
        NULL, // SD
        0,                        // initial stack size
        (LPTHREAD_START_ROUTINE)ManagerProc,    // thread function
        (LPVOID)this,                       // thread argument
        0,                    // creation option
        &nThreadId );                       // thread identifier
    m_hMgrThread = hThread;

    //now we start these worker threads
    m_hWorkerIoPort = CreateIoCompletionPort((HANDLE)INVALID_HANDLE_VALUE, NULL, 0, 0);
    for(long n = 0; n < nStatic; n++)
    {
        hThread = CreateThread(
              NULL, // SD
              0,                        // initial stack size
              (LPTHREAD_START_ROUTINE)WorkerProc,    // thread function
              (LPVOID)this,                       // thread argument
              0,                    // creation option
              &nThreadId );   
        ReportDebug("generate a worker thread handle id is %d.\n", nThreadId);
        m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type (nThreadId,ThreadInfo(hThread, false)));
    }
}

void CThreadPoolImp::Stop(bool bHash)
{
    CAutoLock Lock(m_arrayCs);

    ::PostQueuedCompletionStatus(m_hMgrIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
    WaitForSingleObject(m_hMgrThread, INFINITE);
    CloseHandle(m_hMgrThread);
    CloseHandle(m_hMgrIoPort);

    //shut down all the worker threads
    UINT nCount=m_threadMap.size();
    HANDLE* pThread = new HANDLE[nCount];
    long n=0;
    ThreadInfo info;
    Iterator_ThreadInfoMap i=m_threadMap.begin();
    while(i!=m_threadMap.end())
    {
       ::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
       info=i->second;
       pThread[n++]=info.m_hThread;
       i++;
    }

    DWORD rc=WaitForMultipleObjects(nCount, pThread, TRUE, 30000);

    CloseHandle(m_hWorkerIoPort);
    if(rc>=WAIT_OBJECT_0 && rc<WAIT_OBJECT_0+nCount)
    {
        for(unsigned int n=0;n<nCount;n++)
        {
             CloseHandle(pThread[n]);
        }
    }
    else if(rc==WAIT_TIMEOUT&&bHash)
    {
        //some threads not terminated, we have to stop them.
        DWORD exitCode;
        for(unsigned int i=0; i<nCount; i++)
        {
            if (::GetExitCodeThread(pThread[i], &exitCode)==STILL_ACTIVE)
            {
                 TerminateThread(pThread[i], 99);
            }
            CloseHandle(pThread[i]);
        }
    }


    delete[] pThread;
}

DWORD WINAPI CThreadPoolImp::ManagerProc(void* p)
{
    //convert the parameter to the server pointer.
    CThreadPoolImp* pServer=(CThreadPoolImp*)p;
    HANDLE        IoPort = pServer->GetMgrIoPort();
    unsigned long      pN1, pN2;
    OVERLAPPED*       pOverLapped;

    LABEL_MANAGER_PROCESSING:
    while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2,
       &pOverLapped, pServer->GetMgrWaitTime() ))
    {
        if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
        {
            return 0;
        }
        else
        {
            ReportDebug("mgr events comes in!\n");
        }
    }

    //time out processing
    if (::GetLastError()==WAIT_TIMEOUT)
    {
        //time out processing
        ReportDebug("Time out processing!\n");
        //the manager will take a look at all the worker´s status. The
        if (pServer->GetThreadPoolStatus()==CThreadPoolImp::BUSY)
        pServer->AddThreads();
        if (pServer->GetThreadPoolStatus()==CThreadPoolImp::IDLE)
        pServer->RemoveThreads();

        goto LABEL_MANAGER_PROCESSING;
     }


     return 0;
}

DWORD WINAPI CThreadPoolImp::WorkerProc(void* p)
{
    //convert the parameter to the server pointer.
    CThreadPoolImp* pServer=(CThreadPoolImp*)p;
    HANDLE        IoPort = pServer->GetWorkerIoPort();
    unsigned long      pN1, pN2;
    OVERLAPPED*       pOverLapped;

    DWORD threadId=::GetCurrentThreadId();
    ReportDebug("worker thread id is %d.\n", threadId);

    while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2,
          &pOverLapped, INFINITE ))
    {
        if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE)
        {
             pServer->RemoveThread(threadId);
             break;
        }
        else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
        {
             break;
        }
        else
        {
             ReportDebug("worker events comes in!\n");
             //before processing, we need to change the status to busy.
             pServer->ChangeStatus(threadId, true);
             //retrieve the job descrīption and agent pointer
             IWorker* pIWorker = reinterpret_cast<IWorker*>(pN1);
             IJobDesc* pIJob= reinterpret_cast<IJobDesc*>(pN2);
             pIWorker->ProcessJob(pIJob);
             pServer->ChangeStatus(threadId, false);
        }
     }


     return 0;
}

void CThreadPoolImp::ChangeStatus(DWORD threadId, bool status)
{
    CAutoLock CAutoLock(m_arrayCs);

    //retrieve the current thread handle
    Iterator_ThreadInfoMap i;
    ThreadInfo info;


    i=m_threadMap.find(threadId);
    info=i->second;
    info.m_bBusyWorking=status;
    m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(threadId, info));
}

void CThreadPoolImp::ProcessJob(IJobDesc* pJob, IWorker* pWorker) const
{
    ::PostQueuedCompletionStatus(m_hWorkerIoPort, \
                reinterpret_cast<DWORD>(pWorker), \
                reinterpret_cast<DWORD>(pJob),\
                NULL);
}

void CThreadPoolImp::AddThreads()
{
    HANDLE  hThread;
    DWORD nThreadId;
    unsigned int nCount=m_threadMap.size();
    unsigned int nTotal=min(nCount+2, m_nNumberOfTotalThreads);
    for(unsigned int i=0; i<nTotal-nCount; i++)
    {
        hThread = CreateThread(
           NULL, // SD
           0,    // initial stack size
           (LPTHREAD_START_ROUTINE)WorkerProc,    // thread function
           (LPVOID)this,                       // thread argument
           0,                    // creation option
           &nThreadId );   
       ReportDebug("generate a worker thread handle id is %d.\n", nThreadId);
       m_threadMap.insert(m_threadMap.end(),ThreadInfoMap::value_type(nThreadId,ThreadInfo(hThread, false)));
 }
}

void CThreadPoolImp::RemoveThread(DWORD threadId)
{
    CAutoLock lock(m_arrayCs);
    m_threadMap.erase(threadId);
}

void CThreadPoolImp::RemoveThreads()
{
    unsigned int nCount=m_threadMap.size();
    unsigned int nTotal=max(nCount-2, m_nNumberOfStaticThreads);
    for(unsigned int i=0; i<nCount-nTotal; i++)
    {
       ::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFE);
    }
}

CThreadPoolImp::ThreadPoolStatus CThreadPoolImp::GetThreadPoolStatus()
{
    int nTotal = m_threadMap.size();
    ThreadInfo info;
    int nCount=0;
    Iterator_ThreadInfoMap i=m_threadMap.begin();

 
    while(i!=m_threadMap.end())
    {
        info=i->second;
        if (info.m_bBusyWorking==true) nCount++;
           i++;
    }
    if ( nCount/(1.0*nTotal) > 0.8 )
        return BUSY;
    if ( nCount/ (1.0*nTotal) < 0.2 )
        return IDLE;
    return NORMAL;
}


TAG: 软件开发小贴士

 

评分:0

我来说两句

日历

« 2024-03-21  
     12
3456789
10111213141516
17181920212223
24252627282930
31      

数据统计

  • 访问量: 6462
  • 日志数: 10
  • 建立时间: 2007-01-22
  • 更新时间: 2008-03-18

RSS订阅

Open Toolbar