Workqueque的创建
首先申请了workqueue_struct结构体内存,cpu_workqueue_struct结构体的内存。然后在init_cpu_workqueue函数中对cpu_workqueue_struct结构体进行初始化。同时调用create_workqueue_thread函数创建处理工作队列的内核进程。
create_workqueue_thread中创建了如下的内核进程
p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
最后调用start_workqueue_thread启动新创建的进程。
struct workqueue_struct *__create_workqueue_key(const char *name, int singlethread, int freezeable, int rt, struct lock_class_key *key, const char *lock_name) { struct workqueue_struct *wq; struct cpu_workqueue_struct *cwq; int err = 0, cpu;wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return NULL; wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); if (!wq->cpu_wq) { kfree(wq); return NULL; } wq->name = name; lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); wq->singlethread = singlethread; wq->freezeable = freezeable; wq->rt = rt; INIT_LIST_HEAD(&wq->list); if (singlethread) { cwq = init_cpu_workqueue(wq, singlethread_cpu); err = create_workqueue_thread(cwq, singlethread_cpu); start_workqueue_thread(cwq, -1); } else { cpu_maps_update_begin(); /* * We must place this wq on list even if the code below fails. * cpu_down(cpu) can remove cpu from cpu_populated_map before * destroy_workqueue() takes the lock, in that case we leak * cwq[cpu]->thread. */ spin_lock(&workqueue_lock); list_add(&wq->list, &workqueues); spin_unlock(&workqueue_lock); /* * We must initialize cwqs for each possible cpu even if we * are going to call destroy_workqueue() finally. Otherwise * cpu_up() can hit the uninitialized cwq once we drop the * lock. */ for_each_possible_cpu(cpu) { cwq = init_cpu_workqueue(wq, cpu); if (err || !cpu_online(cpu)) continue; err = create_workqueue_thread(cwq, cpu); start_workqueue_thread(cwq, cpu); } cpu_maps_update_done(); } if (err) { destroy_workqueue(wq); wq = NULL; } return wq; } EXPORT_SYMBOL_GPL(__create_workqueue_key); |
向工作队列中添加工作
Shedule_work 函数向工作队列中添加任务。这个接口比较简单,无非是一些队列操作,不再叙述。
/**
* schedule_work - put work task in global workqueue
* @work: job to be done
*
* This puts a job in the kernel-global workqueue.
*/
int schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}
EXPORT_SYMBOL(schedule_work);
工作队列内核进程的处理过程
在创建工作队列的时候,我们创建了一个或者多个进程来处理挂到队列上的工作。这个内核进程的主要函数体为worker_thread,这个函数比较有意思的地方就是,自己降低的优先级,说明worker_thread调度的优先级比较低。在系统负载大大时候,采用工作队列执行的操作可能存在较大的延迟。
就函数的执行流程来说是真心的简单,只是从队列中取出work,从队列中删除掉,清除掉pending标记,并执行work设置的回调函数。
static int worker_thread(void *__cwq) { struct cpu_workqueue_struct *cwq = __cwq; DEFINE_WAIT(wait);if (cwq->wq->freezeable) set_freezable(); set_user_nice(current, -5); for (;;) { prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); if (!freezing(current) && !kthread_should_stop() && list_empty(&cwq->worklist)) schedule(); finish_wait(&cwq->more_work, &wait); try_to_freeze(); if (kthread_should_stop()) break; run_workqueue(cwq); } return 0; } static void run_workqueue(struct cpu_workqueue_struct *cwq) { spin_lock_irq(&cwq->lock); cwq->run_depth++; if (cwq->run_depth > 3) { /* morton gets to eat his hat */ printk("%s: recursion depth exceeded: %dn", __func__, cwq->run_depth); dump_stack(); } while (!list_empty(&cwq->worklist)) { struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); work_func_t f = work->func; #ifdef CONFIG_LOCKDEP /* * It is permissible to free the struct work_struct * from inside the function that is called from it, * this we need to take into account for lockdep too. * To avoid bogus "held lock freed" warnings as well * as problems when looking into work->lockdep_map, * make a copy and use that here. */ struct lockdep_map lockdep_map = work->lockdep_map; #endifcwq->current_work = work; list_del_init(cwq->worklist.next); spin_unlock_irq(&cwq->lock); BUG_ON(get_wq_data(work) != cwq); work_clear_pending(work); lock_map_acquire(&cwq->wq->lockdep_map); lock_map_acquire(&lockdep_map); f(work); lock_map_release(&lockdep_map); lock_map_release(&cwq->wq->lockdep_map); if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " "%s/0x%08x/%dn", current->comm, preempt_count(), task_pid_nr(current)); printk(KERN_ERR " last function: "); print_symbol("%sn", (unsigned long)f); debug_show_held_locks(current); dump_stack(); } spin_lock_irq(&cwq->lock); cwq->current_work = NULL; } cwq->run_depth--; spin_unlock_irq(&cwq->lock); } |