static void *tp_work_thread(void *arg) { pthread_t curid;//current thread id TpThreadInfo *pTinfo = (TpThreadInfo *) arg; //wait cond for processing real job. while (!(pTinfo->tp_pool->stop_flag)) { pthread_mutex_lock(&pTinfo->thread_lock); pthread_cond_wait(&pTinfo->thread_cond, &pTinfo->thread_lock); pthread_mutex_unlock(&pTinfo->thread_lock); //process pTinfo->proc_fun(pTinfo->th_job); //thread state be set idle after work //pthread_mutex_lock(&pTinfo->thread_lock); pTinfo->is_busy = FALSE; enQueue(&pTinfo->tp_pool->idle_q, pTinfo); //pthread_mutex_unlock(&pTinfo->thread_lock); DEBUG("Job done, I am idle now.\n"); } } |
上面这个函数是任务处理函数,该函数将始终处理等待唤醒状态,直到新任务到达或者线程销毁时被唤醒,然后调用任务处理回调函数对任务进行处理;当任务处理完成时,则将自己置入空闲队列中,以供下一个任务处理。
TpThreadInfo *tp_add_thread(TpThreadPool *pTp) { int err; TpThreadInfo *new_thread; if (pTp->max_th_num <= pTp->cur_th_num) return NULL; //malloc new thread info struct new_thread = pTp->thread_info + pTp->cur_th_num; new_thread->tp_pool = pTp; //init new thread's cond & mutex pthread_cond_init(&new_thread->thread_cond, NULL); pthread_mutex_init(&new_thread->thread_lock, NULL); //init status is busy, only new process job will call this function new_thread->is_busy = TRUE; err = pthread_create(&new_thread->thread_id, NULL, tp_work_thread, new_thread); if (0 != err) { free(new_thread); return NULL; } //add current thread number in the pool. pTp->cur_th_num++; return new_thread; } |
上面这个函数用于向线程池中添加新的线程,该函数将会在当线程池没有空闲线程可用时被调用。
函数将会新建一个线程,并设置自己的状态为busy(立即就要被用于执行任务)。
线程池管理
线程池的管理主要是监控线程池的整体忙碌状态,当线程池大部分线程处于空闲状态时,管理线程将适当的销毁一定数量的空闲线程,以便减少线程池对系统资源的消耗。
这里设计认为,当空闲线程的数量超过线程池线程数量的1/2时,线程池总体处理空闲状态,可以适当销毁部分线程池的线程,以减少线程池对系统资源的开销。
线程池状态计算
这里的BUSY_THRESHOLD的值是0.5,也即是当空闲线程数量超过一半时,返回0,说明线程池整体状态为闲,否则返回1,说明为忙。