【分析笔记】Linux tasklet 机制的理解
-
Tasklet 介绍
Linux 内核提供的四种中断下半部中 softirq(软中断)、tasklet(小任务)、workqueue(工作队列) 、request thread(中断线程)中的其中一种,其效率仅次于软中断,但远高于request thread 和 workqueue。
-
软中断(softirq) 之所以性能高的原因,在 SMP 系统下多个 cpu 同时并发处理
如网卡的 fifo 半满中断触发,被 cpu0 处理,cpu0 会在关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。若网卡的 fifo 全满中断有再次触发,就会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续相同的流程。由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。 -
小任务(tasklet) 之所以性能较软中断差,是因为同一种小任务在多个 cpu 上不会并发执行
由于 tasklet 基于 softirq 的基础实现,为了易用性考虑,同一种 tasklet 在多个 cpu 上不会并行执行,因此不存在并发问题,在使用上就可以少一些顾虑。但也正是因为不存在并发,导致了性能较之 softirq 差一些。 -
tasklet 之所以比 workqueue 和 request thread 性能高
原因是因为前者是在软中断上下文件工作(意味着不能调用任何阻塞的接口),而后两者是在进程上下文工作(实质上是在内核线程里面执行)。
使用示例模版
#include <linux/module.h> #include <linux/init.h> #include <linux/kernel.h> #include <linux/interrupt.h> static struct tasklet_struct my_tasklet; static void my_tasklet_handle(unsigned long data) { printk("tasklet handle running...\n"); } static irqreturn_t xxx_interrupt(int irq, void *dev_id) { // 调度 tasklet tasklet_schedule(&my_tasklet); } static int __init demo_driver_init(void) { // 初始化一个 tasklet ,关联处理函数 tasklet_init(&my_tasklet, my_tasklet_handle, 0); request_irq(xxx, xxx_interrupt, IRQF_SHARED, xxx, xxx); return 0; } static void __exit demo_driver_exit(void) { tasklet_kill(&my_tasklet); return ; } module_init(demo_driver_init); module_exit(demo_driver_exit); MODULE_LICENSE("GPL v2");
内核源码分析
Linux 内核被启动后,会执行 start_kernel() 函数,tasklet 是基于 softirq 实现的,会在 softirq_init() 里面进行必要的初始化,主要是初始化 tasklet 链表和与相应的软中断号建立关联。
tasklet_hi_action 是高优先级的 tasklet,tasklet_action 是普通的 tasklet,两者实现原理都一样。
// kernel\linux-4.9\init\main.c asmlinkage __visible void __init start_kernel(void) { ... softirq_init(); ... } // kernel\linux-4.9\kernel\softirq.c void __init softirq_init(void) { int cpu; // 初始化链表 for_each_possible_cpu(cpu) { per_cpu(tasklet_vec, cpu).tail = &per_cpu(tasklet_vec, cpu).head; per_cpu(tasklet_hi_vec, cpu).tail = &per_cpu(tasklet_hi_vec, cpu).head; } // 建立TASKLET_SOFTIRQ、HI_SOFTIRQ软中断号的对应服务接口 open_softirq(TASKLET_SOFTIRQ, tasklet_action); open_softirq(HI_SOFTIRQ, tasklet_hi_action); }
当调用 tasklet_schedule() 时,如果该 tasklet 没有被设置 TASKLET_STATE_SCHED 标记时,才会加入链表内,如果已经设置了 TASKLET_STATE_SCHED 了,那么就会忽略此次的 tasklet _schedule(),这就意味着如果在极短的时间内调用 tasklet_schedule() 只会触发一次(这里可能会存在丢失中断事件的情况),之后会通过 raise_softirq_irqoff() 启用 TASKLET_SOFTIRQ 软中断。
哪颗 cpu 受理该软中断,就将 tasklet 加入到该 cpu 的 tasklet 链表内,由于相同的软中断可以同时被其他 cpu 触发执行,因此会出现 cpu0\cpu1 的 tasklet 链表内有同一个 tasklet 的情况。
// kernel\linux-4.9\include\interrupt.h static inline void tasklet_schedule(struct tasklet_struct *t) { if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) __tasklet_schedule(t); } // kernel\linux-4.9\kernel\softirq.c void __tasklet_schedule(struct tasklet_struct *t) { unsigned long flags; // 将指定的 tasklet 加入到链表内并设置软中断 local_irq_save(flags); t->next = NULL; *__this_cpu_read(tasklet_vec.tail) = t; __this_cpu_write(tasklet_vec.tail, &(t->next)); raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_restore(flags); }
在初始化的时候已经为 TASKLET_SOFTIRQ 软中断与 tasklet_action() 建立关联的关系,因此软中断触发时,就会调用 tasklet_action(),这里是精髓部分。
多次触发软中断时,当前 cpu 只能同一时间执行一次相同的软中断,但是如果有多个 cpu 的话,那么会有多个 cpu 并发执行软中断,所以下面的 tasklet_action() 要想到这一点。
-
tasklet_action() 屏蔽当前 CPU 中断,获取当前 CPU 的 tasklet 链表并清空原有的链表,在恢复中断。避免在操作链表的过程中,被硬件中断打断。
-
开始遍历链表取出 tasklet,先 TASKLET_STATE_RUN 标记确定该 tasklet 是否已经被其它 cpu 执行,因为该 tasklet 在执行的过程中,又被加入到当前的 cpu 的 tasklet 链表内。
-
如果没有被执行,就继续检查该 tasklet 是否被 tasklet_disable(),如果有被 disable 就清除 TASKLET_STATE_RUN 标记,这样可以重新被添加会当前 tasklet 链表内,等待再次执行。如果有被执行,即使又被设置了 TASKLET_STATE_RUN 标记,也会在第 5 步执行完成后,会清除掉该标记。
-
如果没有被 disable,那么就清除 TASKLET_STATE_SCHED 标记,该标记一旦被清除,就意味着在 tasklet 执行期间,tasklet_schedule() 可以继续添加新的 tasklet 其他 cpu 的 tasklet 链表内。如果当前 cpu 已经完成了 tasklet_action() ,新的 tasklet 也可能会重新添加到当前的 tasklet 链表。
-
这里就会执行通过 tasklet_init() 绑定的 func,也就是示例中的 my_tasklet_handle(),执行完成后再清除 TASKLET_STATE_RUN 标记,继续下一个 tasklet。
-
能走到这一步,会有两种情况,一种情况是即将执行 tasklet ,发现已经被 disable 掉了,另外一种情况是 tasklet 已经在其它 CPU 上执行中。无论哪种情况,都会将当前的 tasklet 重新放回到当前 cpu 的 tasklet 链表内,并调用 __raise_softirq_irqoff() 重新触发软中断(应该是启用该软中断)。
注意,以上获取 TASKLET_STATE_RUN 和 TASKLET_STATE_SCHED 标记都是位原子操作,所以不会出现因并发引发的问题。
// kernel\linux-4.9\kernel\softirq.c // 某 CPU 要调度各个 tasklet 的实现 static __latent_entropy void tasklet_action(struct softirq_action *a) { struct tasklet_struct *list; // 1 ------------------------------------------------------ local_irq_disable(); list = __this_cpu_read(tasklet_vec.head); // 获取 tasklet 链表 __this_cpu_write(tasklet_vec.head, NULL); // 清空 tasklet 链表 __this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head)); local_irq_enable(); // 如果存在 tasklet 就会进入循环 while (list) { struct tasklet_struct *t = list; list = list->next; // 2 ------------------------------------------------------ // TASKLET_STATE_SCHED: 表示该 tasklet 已经被挂接到某个 CPU 上 // TASKLET_STATE_RUN: 表示该 tasklet 正在某个 CPU 上执行 // 检查并设置 TASKLET_STATE_RUN 标记 // 返回 1: 表示 tasklet 没有被执行 返回 0: 表示 tasklet 已经被执行 if (tasklet_trylock(t)) { // 3 ------------------------------------------------------ // 如果当前 tasklet 没有被 tasklet_disable() if (!atomic_read(&t->count)) { // 4 ---------------------------------------------- // 清除 TASKLET_STATE_SCHED 状态,便于该 tasklet 可以再次被触发 if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state)) BUG(); // 5 ---------------------------------------------- // 这期间,该 tasklet 可以被 tasklet_schedule(),从而引出下面第二种情况 t->func(t->data); // 如果没有执行且没有 disable 则执行 tasklet_unlock(t); // 清理 TASKLET_STATE_RUN 标记 continue; // 继续下一个 tasklet } // 如果当前已经被 disable 了,那就清理 TASKLET_STATE_RUN 标记 tasklet_unlock(t); } // 6 ----------------------------------------------------- // 有两种情况下,会将该 tasklet 再挂接回链表内,并重新触发,等待下一次执行的机会 // 1. 如果没有被执行,但是被调用 tasklet_disable() 接口 disable 了 // 2. 当前 tasklet 已经在其它 CPU 正在执行 func 这时候 tasklet 又会被挂回在 // 原来的链表中,为了满足同一种类型的 tasklet 只能在一个 CPU 上执行的设计 // 因此此次不执行 tasklet,挂入链表后等待下一次被执行的时机执行 local_irq_disable(); t->next = NULL; *__this_cpu_read(tasklet_vec.tail) = t; __this_cpu_write(tasklet_vec.tail, &(t->next)); __raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_enable(); } }
贴出判断和标记和清除 tasklet 运行的 TASKLET_STATE_RUN 标记代码
// kernel\linux-4.9\include\interrupt.h static inline int tasklet_trylock(struct tasklet_struct *t) { return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state); } // kernel\linux-4.9\include\interrupt.h static inline void tasklet_unlock(struct tasklet_struct *t) { smp_mb__before_atomic(); clear_bit(TASKLET_STATE_RUN, &(t)->state); }
贴出关闭 tasklet 执行的代码
// kernel\linux-4.9\include\interrupt.h static inline void tasklet_disable(struct tasklet_struct *t) { tasklet_disable_nosync(t); tasklet_unlock_wait(t); smp_mb(); } // kernel\linux-4.9\include\interrupt.h // 关闭 tasklet(实际上应该理解为暂停调度执行) static inline void tasklet_disable_nosync(struct tasklet_struct *t) { // 这里对整型原子操作 count 自增了,对应 tasklet_action() 里面的 atomic_read() atomic_inc(&t->count); smp_mb__after_atomic(); } // kernel\linux-4.9\include\interrupt.h static inline void tasklet_enable(struct tasklet_struct *t) { // 这里对整型原子操作 count 自减了,对应 tasklet_action() 里面的 atomic_read() smp_mb__before_atomic(); atomic_dec(&t->count); } tasklet_init() 实现与用户函数关联的接口,也是很简单 // kernel\linux-4.9\kernel\softirq.c void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data) { t->next = NULL; t->state = 0; atomic_set(&t->count, 0); t->func = func; t->data = data; }
tasklet_kill() 主要实现是尽可能快的让 tasklet 得到执行,等待执行完成后再退出。
-
判断该 tasklet 是否已经被挂接到某个 cpu 的 tasklet 链表内,如果有挂接到,那么就立即让出 cpu,直至 tasklet 清除 TASKLET_STATE_SCHED 标记(参考 tasklet_action() 第 4 个步骤),进入运行状态。
-
如果 tasklet 没有挂接或者一旦进入到执行状态,那么就会不停的检测 TASKLET_STATE_RUN 是否有被清除,被清除的话说明已经运行完成(参考 tasklet_action() 第5个步骤),可以放心的退出了。
// kernel\linux-4.9\kernel\softirq.c void tasklet_kill(struct tasklet_struct *t) { if (in_interrupt()) pr_notice("Attempt to kill tasklet from interrupt\n"); // 1 ---------------------------------------------- while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) { do { yield(); } while (test_bit(TASKLET_STATE_SCHED, &t->state)); } // 2 ---------------------------------------------- tasklet_unlock_wait(t); clear_bit(TASKLET_STATE_SCHED, &t->state); } // kernel\linux-4.9\include\interrupt.h static inline void tasklet_unlock_wait(struct tasklet_struct *t) { while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); } }
至此,tasklet 关键的实现原理分析完成,实际上还应联合 softirq,才算完整的了解整个机制。
Tasklet 机制总结
-
每颗 cpu 都有自己的 tasklet 链表,这样可以将 tasklet 分布在各个 cpu 上,可实现并发不同的 tasklet。
-
相同的 tasklet 只能在某一颗 cpu 上串行执行,其它 cpu 会暂时避让,在此情况下,不需要考虑并发问题(即不需要加锁)。
3. tasklet_schedule() 接口调用时,如果 tasklet 还未被执行,或者处于 disable 期间,指定的 tasklet 不会被加入链表内,即该请求不会被受理。
- tasklet_disable() 接口只是暂时停止指定的 tasklet 执行,依然会被加回待执行链表内。而在 disable 期间,相同的 tasklet 将无法被加入链表调度。
-
Copyright © 2024 深圳全志在线有限公司 粤ICP备2021084185号 粤公网安备44030502007680号