1. 前语
作业行列(workqueue)的Linux内核中的界说的用来处理不是很紧急事情的回调办法处理办法。
以下代码的linux内核版别为2.6.19.2, 源代码文件主要为kernel/workqueue.c.
2. 数据结构
/* include/linux/workqueue.h */ // 作业节点结构struct work_struct { // 等候时间unsigned long pending;// 链表节点struct list_head entry;// workqueue回调函数void (*func)(void *);// 回调函数func的数据void *data;// 指向CPU相关数据, 一般指向struct cpu_workqueue_struct结构void *wq_data;// 定时器struct timer_list timer;};
struct execute_work { struct work_struct work;};
/* kernel/workqueue.c */ /* * The per-CPU workqueue (if single thread, we always use the first * possible cpu)。
* * The sequence counters are for flush_scheduled_work()。 It wants to wait * until all currently-scheduled works are completed, but it doesn’t * want to be livelocked by new, incoming ones. So it waits until * remove_sequence is >= the insert_sequence which pertained when * flush_scheduled_work() was called. */ // 这个结构是针对每个CPU的struct cpu_workqueue_struct { // 结构锁spinlock_t lock;// 下一个要履行的节点序号long remove_sequence; /* Least-recently added (next to run) */ // 下一个要刺进节点的序号long insert_sequence; /* Next to add */ // 作业组织链表节点struct list_head worklist;// 要进行处理的等候行列wait_queue_head_t more_work;// 处理完的等候行列wait_queue_head_t work_done;// 作业行列节点struct workqueue_struct *wq;// 进程指针struct task_struct *thread;int run_depth; /* Detect run_workqueue() recursion depth */ } ____cacheline_aligned;/* * The externally visible workqueue abstraction is an array of * per-CPU workqueues:*/ // 作业行列结构struct workqueue_struct { struct cpu_workqueue_struct *cpu_wq;const char *name;struct list_head list; /* Empty if single thread */ };
kernel/workqueue.c中界说了一个作业行列链表, 一切作业行列能够挂接到这个链表中:static LIST_HEAD(workqueues);
3. 一些宏界说
/* include/linux/workqueue.h */ // 初始化作业行列#define __WORK_INITIALIZER(n, f, d) { // 初始化list。entry = { (n)。entry, (n)。entry },// 回调函数。func = (f),// 回调函数参数。data = (d),// 初始化定时器。timer = TIMER_INITIALIZER(NULL, 0, 0),}
// 声明作业行列并初始化#define DECLARE_WORK(n, f, d)
struct work_struct n = __WORK_INITIALIZER(n, f, d)
/* * initialize a work-struct’s func and data pointers:*/ // 从头界说作业结构参数#define PREPARE_WORK(_work, _func, _data)
do {(_work)->func = _func;(_work)->data = _data;} while (0)
/* * initialize all of a work-struct:*/ // 初始化作业结构, 和__WORK_INITIALIZER功用相同,不过__WORK_INITIALIZER用在// 参数初始化界说, 而该宏用在程序之中对作业结构赋值#define INIT_WORK(_work, _func, _data)
do { INIT_LIST_HEAD((_work)->entry);(_work)->pending = 0;PREPARE_WORK((_work), (_func), (_data));init_timer((_work)->timer);} while (0)
4. 操作函数
4.1 创立作业行列
一般的创立函数是create_workqueue, 但这其实仅仅一个宏:/* include/linux/workqueue.h */ #define create_workqueue(name) __create_workqueue((name), 0)
在workqueue的初始化函数中, 界说了一个针对内核中一切线程可用的事情作业行列, 其他内核线程树立的事情作业结构就都挂接到该行列:void init_workqueues(void)
{……
keventd_wq = create_workqueue(events);……
}
中心创立函数是__create_workqueue:
struct workqueue_struct *__create_workqueue(const char *name,int singlethread)
{ int cpu, destroy = 0;struct workqueue_struct *wq;struct task_struct *p;// 分配作业行列结构空间wq = kzalloc(sizeof(*wq), GFP_KERNEL);if (!wq)
return NULL;// 为每个CPU分配独自的作业行列空间wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);if (!wq->cpu_wq) { kfree(wq);return NULL;} wq->name = name;mutex_lock(workqueue_mutex);if (singlethread) { // 运用create_workqueue宏时该参数一直为0 // 如果是单一线程形式, 在单线程中调用各个作业行列// 树立一个的作业行列内核线程INIT_LIST_HEAD(wq->list);// 树立作业行列的线程p = create_workqueue_thread(wq, singlethread_cpu);if (!p)
destroy = 1;else // 唤醒该线程wake_up_process(p);} else { // 链表形式, 将作业行列添加到作业行列链表list_add(wq->list, workqueues);// 为每个CPU树立一个作业行列线程for_each_online_cpu(cpu) { p = create_workqueue_thread(wq, cpu);if (p) { // 绑定CPU kthread_bind(p, cpu);// 唤醒线程wake_up_process(p);} else destroy = 1;} mutex_unlock(workqueue_mutex);/* * Was there any error during startup? If yes then clean up:*/ if (destroy) { // 树立线程失利, 开释作业行列destroy_workqueue(wq);wq = NULL;} return wq;} EXPORT_SYMBOL_GPL(__create_workqueue);
// 创立作业行列线程static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,int cpu)
{ // 每个CPU的作业行列struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);struct task_struct *p;spin_lock_init(cwq->lock);// 初始化cwq->wq = wq;cwq->thread = NULL;cwq->insert_sequence = 0;cwq->remove_sequence = 0;INIT_LIST_HEAD(cwq->worklist);// 初始化等候行列more_work, 该行列处理要履行的作业结构init_waitqueue_head(cwq->more_work);// 初始化等候行列work_done, 该行列处理履行完的作业结构init_waitqueue_head(cwq->work_done);// 树立内核线程work_thread if (is_single_threaded(wq))