Navigation

    全志在线开发者论坛

    • Register
    • Login
    • Search
    • Categories
    • Tags
    • 在线文档
    • 社区主页

    【分析笔记】Linux tasklet 机制的理解

    Linux
    1
    1
    1353
    Loading More Posts
    • Oldest to Newest
    • Newest to Oldest
    • Most Votes
    Reply
    • Reply as topic
    Log in to reply
    This topic has been deleted. Only users with topic management privileges can see it.
    • D
      dream LV 6 last edited by

      ​

      Tasklet 介绍

      Linux 内核提供的四种中断下半部中 softirq(软中断)、tasklet(小任务)、workqueue(工作队列) 、request thread(中断线程)中的其中一种,其效率仅次于软中断,但远高于request thread 和 workqueue。

      • 软中断(softirq) 之所以性能高的原因,在 SMP 系统下多个 cpu 同时并发处理
        如网卡的 fifo 半满中断触发,被 cpu0 处理,cpu0 会在关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。若网卡的 fifo 全满中断有再次触发,就会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续相同的流程。由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。

      • 小任务(tasklet) 之所以性能较软中断差,是因为同一种小任务在多个 cpu 上不会并发执行
        由于 tasklet 基于 softirq 的基础实现,为了易用性考虑,同一种 tasklet 在多个 cpu 上不会并行执行,因此不存在并发问题,在使用上就可以少一些顾虑。但也正是因为不存在并发,导致了性能较之 softirq 差一些。

      • tasklet 之所以比 workqueue 和 request thread 性能高

      原因是因为前者是在软中断上下文件工作(意味着不能调用任何阻塞的接口),而后两者是在进程上下文工作(实质上是在内核线程里面执行)。

      使用示例模版

      #include <linux/module.h>
      #include <linux/init.h>
      #include <linux/kernel.h>
      #include <linux/interrupt.h>
      
      static struct tasklet_struct my_tasklet;
      
      static void my_tasklet_handle(unsigned long data)
      {
      	printk("tasklet handle running...\n");
      }
      
      static irqreturn_t xxx_interrupt(int irq, void *dev_id)
      {
          // 调度 tasklet
          tasklet_schedule(&my_tasklet);
      }
      
      static int __init demo_driver_init(void)
      {
          // 初始化一个 tasklet ,关联处理函数
      	tasklet_init(&my_tasklet, my_tasklet_handle, 0);
          request_irq(xxx, xxx_interrupt, IRQF_SHARED, xxx, xxx);
      	return 0;
      }
      
      static void __exit demo_driver_exit(void)
      {
      	tasklet_kill(&my_tasklet);
      	return ;
      }
      
      module_init(demo_driver_init);
      module_exit(demo_driver_exit);
      MODULE_LICENSE("GPL v2");
      

      内核源码分析

      Linux 内核被启动后,会执行 start_kernel() 函数,tasklet 是基于 softirq 实现的,会在 softirq_init() 里面进行必要的初始化,主要是初始化 tasklet 链表和与相应的软中断号建立关联。

      tasklet_hi_action 是高优先级的 tasklet,tasklet_action 是普通的 tasklet,两者实现原理都一样。

      // kernel\linux-4.9\init\main.c
      asmlinkage __visible void __init start_kernel(void)
      {
      	...
      	softirq_init();
      	...
      }
      
      // kernel\linux-4.9\kernel\softirq.c
      void __init softirq_init(void)
      {
      	int cpu;
      
          // 初始化链表
      	for_each_possible_cpu(cpu) {
      		per_cpu(tasklet_vec, cpu).tail =
      			&per_cpu(tasklet_vec, cpu).head;
      		per_cpu(tasklet_hi_vec, cpu).tail =
      			&per_cpu(tasklet_hi_vec, cpu).head;
      	}
      
          // 建立TASKLET_SOFTIRQ、HI_SOFTIRQ软中断号的对应服务接口
      	open_softirq(TASKLET_SOFTIRQ, tasklet_action);
      	open_softirq(HI_SOFTIRQ, tasklet_hi_action);
      }
      

      当调用 tasklet_schedule() 时,如果该 tasklet 没有被设置 TASKLET_STATE_SCHED 标记时,才会加入链表内,如果已经设置了 TASKLET_STATE_SCHED 了,那么就会忽略此次的 tasklet _schedule(),这就意味着如果在极短的时间内调用 tasklet_schedule() 只会触发一次(这里可能会存在丢失中断事件的情况),之后会通过 raise_softirq_irqoff() 启用 TASKLET_SOFTIRQ 软中断。

      哪颗 cpu 受理该软中断,就将 tasklet 加入到该 cpu 的 tasklet 链表内,由于相同的软中断可以同时被其他 cpu 触发执行,因此会出现 cpu0\cpu1 的 tasklet 链表内有同一个 tasklet 的情况。

      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_schedule(struct tasklet_struct *t)
      {
      	if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
      		__tasklet_schedule(t);
      }
      
      // kernel\linux-4.9\kernel\softirq.c
      void __tasklet_schedule(struct tasklet_struct *t)
      {
      	unsigned long flags;
      
          // 将指定的 tasklet 加入到链表内并设置软中断
      	local_irq_save(flags);
      	t->next = NULL;
      	*__this_cpu_read(tasklet_vec.tail) = t;
      	__this_cpu_write(tasklet_vec.tail, &(t->next));
      	raise_softirq_irqoff(TASKLET_SOFTIRQ);
      	local_irq_restore(flags);
      }
      

      在初始化的时候已经为  TASKLET_SOFTIRQ 软中断与 tasklet_action() 建立关联的关系,因此软中断触发时,就会调用 tasklet_action(),这里是精髓部分。

      多次触发软中断时,当前 cpu 只能同一时间执行一次相同的软中断,但是如果有多个 cpu 的话,那么会有多个 cpu 并发执行软中断,所以下面的 tasklet_action() 要想到这一点。

      1. tasklet_action() 屏蔽当前 CPU 中断,获取当前 CPU 的 tasklet 链表并清空原有的链表,在恢复中断。避免在操作链表的过程中,被硬件中断打断。

      2. 开始遍历链表取出 tasklet,先 TASKLET_STATE_RUN 标记确定该 tasklet 是否已经被其它 cpu 执行,因为该 tasklet 在执行的过程中,又被加入到当前的 cpu 的 tasklet 链表内。

      3. 如果没有被执行,就继续检查该 tasklet 是否被 tasklet_disable(),如果有被 disable 就清除 TASKLET_STATE_RUN 标记,这样可以重新被添加会当前 tasklet 链表内,等待再次执行。如果有被执行,即使又被设置了 TASKLET_STATE_RUN 标记,也会在第 5 步执行完成后,会清除掉该标记。

      4. 如果没有被 disable,那么就清除 TASKLET_STATE_SCHED 标记,该标记一旦被清除,就意味着在 tasklet 执行期间,tasklet_schedule() 可以继续添加新的 tasklet 其他 cpu  的 tasklet 链表内。如果当前 cpu 已经完成了 tasklet_action() ,新的 tasklet 也可能会重新添加到当前的  tasklet 链表。

      5. 这里就会执行通过 tasklet_init() 绑定的 func,也就是示例中的 my_tasklet_handle(),执行完成后再清除 TASKLET_STATE_RUN 标记,继续下一个 tasklet。

      6. 能走到这一步,会有两种情况,一种情况是即将执行 tasklet ,发现已经被 disable 掉了,另外一种情况是 tasklet 已经在其它 CPU 上执行中。无论哪种情况,都会将当前的 tasklet 重新放回到当前 cpu 的 tasklet 链表内,并调用 __raise_softirq_irqoff() 重新触发软中断(应该是启用该软中断)。

      注意,以上获取 TASKLET_STATE_RUN 和 TASKLET_STATE_SCHED 标记都是位原子操作,所以不会出现因并发引发的问题。

      // kernel\linux-4.9\kernel\softirq.c
      // 某 CPU 要调度各个 tasklet 的实现
      static __latent_entropy void tasklet_action(struct softirq_action *a)
      {
      	struct tasklet_struct *list;
      
          // 1 ------------------------------------------------------
      	local_irq_disable();
      	list = __this_cpu_read(tasklet_vec.head); // 获取 tasklet 链表
      	__this_cpu_write(tasklet_vec.head, NULL); // 清空 tasklet 链表
      	__this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
      	local_irq_enable();
      
          // 如果存在 tasklet 就会进入循环
      	while (list) 
          {
      		struct tasklet_struct *t = list;
      		list = list->next;
      
              // 2 ------------------------------------------------------
              // TASKLET_STATE_SCHED: 表示该 tasklet 已经被挂接到某个 CPU 上
              // TASKLET_STATE_RUN:   表示该 tasklet 正在某个 CPU 上执行
              // 检查并设置 TASKLET_STATE_RUN 标记
              // 返回 1: 表示 tasklet 没有被执行  返回 0: 表示 tasklet 已经被执行
      		if (tasklet_trylock(t)) 
              {
                  
                  // 3 ------------------------------------------------------
                  // 如果当前 tasklet 没有被 tasklet_disable()
      			if (!atomic_read(&t->count))  
                  {
                      // 4 ----------------------------------------------
                      // 清除 TASKLET_STATE_SCHED 状态,便于该 tasklet 可以再次被触发
      				if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
      					BUG();
                      // 5 ----------------------------------------------
                      // 这期间,该 tasklet 可以被 tasklet_schedule(),从而引出下面第二种情况
      				t->func(t->data);        // 如果没有执行且没有 disable 则执行
      				tasklet_unlock(t);        // 清理 TASKLET_STATE_RUN 标记
      				continue;                 // 继续下一个 tasklet
      			}
      
                  // 如果当前已经被 disable 了,那就清理 TASKLET_STATE_RUN 标记
      			tasklet_unlock(t);
      		}
      
              // 6 -----------------------------------------------------
              // 有两种情况下,会将该 tasklet 再挂接回链表内,并重新触发,等待下一次执行的机会
              // 1. 如果没有被执行,但是被调用 tasklet_disable() 接口 disable 了
              // 2. 当前 tasklet 已经在其它 CPU 正在执行 func 这时候 tasklet 又会被挂回在
              //    原来的链表中,为了满足同一种类型的 tasklet 只能在一个 CPU 上执行的设计
              //    因此此次不执行 tasklet,挂入链表后等待下一次被执行的时机执行
      		local_irq_disable();
      		t->next = NULL;
      		*__this_cpu_read(tasklet_vec.tail) = t; 
      		__this_cpu_write(tasklet_vec.tail, &(t->next));
      		__raise_softirq_irqoff(TASKLET_SOFTIRQ);
      		local_irq_enable();
      	}
      }
      

      贴出判断和标记和清除 tasklet 运行的 TASKLET_STATE_RUN  标记代码

      // kernel\linux-4.9\include\interrupt.h
      static inline int tasklet_trylock(struct tasklet_struct *t)
      {
      	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_unlock(struct tasklet_struct *t)
      {
      	smp_mb__before_atomic();
      	clear_bit(TASKLET_STATE_RUN, &(t)->state);
      }
      

      贴出关闭 tasklet 执行的代码

      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_disable(struct tasklet_struct *t)
      {
      	tasklet_disable_nosync(t);
      	tasklet_unlock_wait(t);
      	smp_mb();
      }
      
      // kernel\linux-4.9\include\interrupt.h
      // 关闭 tasklet(实际上应该理解为暂停调度执行)
      static inline void tasklet_disable_nosync(struct tasklet_struct *t)
      {
          // 这里对整型原子操作 count 自增了,对应 tasklet_action() 里面的 atomic_read()
      	atomic_inc(&t->count);
      	smp_mb__after_atomic();
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_enable(struct tasklet_struct *t)
      {
          // 这里对整型原子操作 count 自减了,对应 tasklet_action() 里面的 atomic_read()
      	smp_mb__before_atomic();
      	atomic_dec(&t->count);
      }
      tasklet_init() 实现与用户函数关联的接口,也是很简单 
      
      // kernel\linux-4.9\kernel\softirq.c
      void tasklet_init(struct tasklet_struct *t,
      		  void (*func)(unsigned long), unsigned long data)
      {
      	t->next = NULL;
      	t->state = 0;
      	atomic_set(&t->count, 0);
      	t->func = func;
      	t->data = data;
      }
      

      tasklet_kill() 主要实现是尽可能快的让 tasklet 得到执行,等待执行完成后再退出。

      1. 判断该 tasklet 是否已经被挂接到某个 cpu 的 tasklet 链表内,如果有挂接到,那么就立即让出 cpu,直至 tasklet 清除 TASKLET_STATE_SCHED 标记(参考 tasklet_action() 第 4 个步骤),进入运行状态。

      2. 如果 tasklet 没有挂接或者一旦进入到执行状态,那么就会不停的检测 TASKLET_STATE_RUN 是否有被清除,被清除的话说明已经运行完成(参考 tasklet_action() 第5个步骤),可以放心的退出了。

      // kernel\linux-4.9\kernel\softirq.c
      void tasklet_kill(struct tasklet_struct *t)
      {
      	if (in_interrupt())
      		pr_notice("Attempt to kill tasklet from interrupt\n");
      
          // 1 ----------------------------------------------
      	while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
      		do {
      			yield();
      		} while (test_bit(TASKLET_STATE_SCHED, &t->state));
      	}
          // 2 ----------------------------------------------
      	tasklet_unlock_wait(t);
      	clear_bit(TASKLET_STATE_SCHED, &t->state);
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_unlock_wait(struct tasklet_struct *t)
      {
      	while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
      }
      

      至此,tasklet 关键的实现原理分析完成,实际上还应联合 softirq,才算完整的了解整个机制。

      Tasklet 机制总结

      1. 每颗 cpu 都有自己的 tasklet 链表,这样可以将 tasklet 分布在各个 cpu 上,可实现并发不同的 tasklet。

      2. 相同的 tasklet 只能在某一颗 cpu 上串行执行,其它 cpu 会暂时避让,在此情况下,不需要考虑并发问题(即不需要加锁)。

      3. tasklet_schedule() 接口调用时,如果 tasklet 还未被执行,或者处于 disable 期间,指定的 tasklet 不会被加入链表内,即该请求不会被受理。

      1. tasklet_disable() 接口只是暂时停止指定的 tasklet 执行,依然会被加回待执行链表内。而在 disable 期间,相同的 tasklet 将无法被加入链表调度。
      1 Reply Last reply Reply Quote Share 1
      • 1 / 1
      • First post
        Last post

      Copyright © 2024 深圳全志在线有限公司 粤ICP备2021084185号 粤公网安备44030502007680号

      行为准则 | 用户协议 | 隐私权政策