Navigation

    全志在线开发者论坛

    • Register
    • Login
    • Search
    • Categories
    • Tags
    • 在线文档
    • 社区主页
    1. Home
    2. dream
    D
    • Profile
    • Following 9
    • Followers 0
    • my integral 2776
    • Topics 10
    • Posts 68
    • Best 5
    • Groups 0

    dreamLV 6

    @dream

    2776
    integral
    7
    Reputation
    15
    Profile views
    68
    Posts
    0
    Followers
    9
    Following
    Joined Last Online

    dream Unfollow Follow

    Best posts made by dream

    • 【分析笔记】Linux I2C-Tools 使用踩坑笔记

      一、踩坑缘由

      在调试 I2C 器件时,我一般习惯于使用 i2cdetect 工具来确认芯片是否有应答,通常有应答之后,就会开始着手移植或者编写对应的驱动程序,但是在调试 sgp41 传感器时却不灵了。

      af47336d-7d7b-48d3-9364-c7444169fe07-image.png

      二、问题现象

      在连续完成多个 I2C 器件的调试和驱动开发之后,最后一个 sgp41 传感器却一直无法被检测到。在使用示波器再次测量芯片供电、检查I2C波形、引脚顺序、电平匹配都正确后,认为是芯片坏了,换了多颗芯片,都无法识别,寄给供应商,供应商又说检测良好,这就很神奇了。

      在同一个座子上,sht41 都能正常被检测到,但是 sgp41 却无法检测,更何况该总线上还挂了其它的 I2C 器件都能准确检测出来。

      79255a34-d236-47af-bc7e-d6f253c1a09c-image.png

      9de9cad8-a396-4d44-99d4-5ffc0dd27a8d-image.png

      三、问题分析

      百思不得其解,在仔细观察逻辑分析仪解析的结果,发现在检测 0x44(sht41) 器件地址的时候,i2cdetect 使用的是采用写的方式检测,而检测 0x59(sgp41) 器件地址时,采用读的方式检测。

      由于之前调试过 sht41 器件,知道这类传感器需要先写再读,才会有应答信号(规格书没有体现出来),因此推测跟这个有关。手写了一个 I2C 设备驱动,先写再读取,发现可以正常通信,证实了我的猜测。

      023395f3-6231-42f6-83ae-639177f866f6-image.png

      四、深入分析

      我特别好奇的是,为什么 i2cdetect 工具会对不同的地址段采用不同的方式进行检测,我分析了 i2cdetect.c 源代码,发现默认是以自动模式检测,而自动模式则会根据不同的地址段采取读或写来实现检测。

      源码下载:http://mirrors.edge.kernel.org/pub/software/utils/i2c-tools/i2c-tools-4.0.tar.gz

      3665f900-2e13-43b7-bd8b-f4a73d965f09-image.png

      如果需要检测类似 sht41、sgp41 这种必须要先写再读才会有应答的芯片,就必须要指定检测模式为写检测(MODE_QUICK),由于 sht41 的器件地址 0x44 恰好在自动模式中以写的方式检测,因此可以检测得到。通过分析源码发现,-r 和 -q 即可指定读写模式:

      74ab0cc3-dcc8-42c6-a9ce-b67146028b7b-image.png

      下图为指定以写的方式检测 sgp41(0x59),可以发现有应答信号,被检测出来了:

      81ff8c47-984b-4b79-b686-83f178c4a678-image.png

      五、经验总结

      1. 不是所有的 I2C 器件都会直接响应读请求。
      2. 使用 i2cdetect 工具检测芯片,建议使用 -q 和 -r 参数都试试。
      posted in Linux
      D
      dream
    • Reply: 【V853开发板试用】使用pack命令报错:ERROR: Unsupport PACK_PLATFORM: tinyos

      最方便的编译方法是:

      1. source build/envsetup.sh
      2. lunch 1

      你报错,大概是通过 build.sh 编译,在linux_dev 选择了 tinyos,其实你如果选择 openWRT 也是也可以的。

      posted in V853系列-AI视觉
      D
      dream
    • Reply: 【分析笔记】Linux I2C-Tools 使用踩坑笔记

      @dream 在 【分析笔记】Linux I2C-Tools 使用踩坑笔记 中说:

      @daizebin sht41 的器件地址是 0x44,就决定了在自动模式下,是以写方式检测,所以可以识别到,但是 sgp41 的器件地址是 0x59,落在了以读方式检测的范围,所以无法识别到。具体看本文 scan_i2c_bus() 的源码截图就能看出来。

      再补充一下,不是所有的 I2C 器件都必须要以写的方式才能检测出来,只有少部分,如 sht41\sgp41 这类器件比较特别,需要先写再读才有应答,正因为大部分 I2C 器件无论是读写都会有应答,所以才会因惯性思维,导致踩坑。

      posted in Linux
      D
      dream
    • 【分析笔记】Linux tasklet 机制的理解

      ​

      Tasklet 介绍

      Linux 内核提供的四种中断下半部中 softirq(软中断)、tasklet(小任务)、workqueue(工作队列) 、request thread(中断线程)中的其中一种,其效率仅次于软中断,但远高于request thread 和 workqueue。

      • 软中断(softirq) 之所以性能高的原因,在 SMP 系统下多个 cpu 同时并发处理
        如网卡的 fifo 半满中断触发,被 cpu0 处理,cpu0 会在关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。若网卡的 fifo 全满中断有再次触发,就会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续相同的流程。由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。

      • 小任务(tasklet) 之所以性能较软中断差,是因为同一种小任务在多个 cpu 上不会并发执行
        由于 tasklet 基于 softirq 的基础实现,为了易用性考虑,同一种 tasklet 在多个 cpu 上不会并行执行,因此不存在并发问题,在使用上就可以少一些顾虑。但也正是因为不存在并发,导致了性能较之 softirq 差一些。

      • tasklet 之所以比 workqueue 和 request thread 性能高

      原因是因为前者是在软中断上下文件工作(意味着不能调用任何阻塞的接口),而后两者是在进程上下文工作(实质上是在内核线程里面执行)。

      使用示例模版

      #include <linux/module.h>
      #include <linux/init.h>
      #include <linux/kernel.h>
      #include <linux/interrupt.h>
      
      static struct tasklet_struct my_tasklet;
      
      static void my_tasklet_handle(unsigned long data)
      {
      	printk("tasklet handle running...\n");
      }
      
      static irqreturn_t xxx_interrupt(int irq, void *dev_id)
      {
          // 调度 tasklet
          tasklet_schedule(&my_tasklet);
      }
      
      static int __init demo_driver_init(void)
      {
          // 初始化一个 tasklet ,关联处理函数
      	tasklet_init(&my_tasklet, my_tasklet_handle, 0);
          request_irq(xxx, xxx_interrupt, IRQF_SHARED, xxx, xxx);
      	return 0;
      }
      
      static void __exit demo_driver_exit(void)
      {
      	tasklet_kill(&my_tasklet);
      	return ;
      }
      
      module_init(demo_driver_init);
      module_exit(demo_driver_exit);
      MODULE_LICENSE("GPL v2");
      

      内核源码分析

      Linux 内核被启动后,会执行 start_kernel() 函数,tasklet 是基于 softirq 实现的,会在 softirq_init() 里面进行必要的初始化,主要是初始化 tasklet 链表和与相应的软中断号建立关联。

      tasklet_hi_action 是高优先级的 tasklet,tasklet_action 是普通的 tasklet,两者实现原理都一样。

      // kernel\linux-4.9\init\main.c
      asmlinkage __visible void __init start_kernel(void)
      {
      	...
      	softirq_init();
      	...
      }
      
      // kernel\linux-4.9\kernel\softirq.c
      void __init softirq_init(void)
      {
      	int cpu;
      
          // 初始化链表
      	for_each_possible_cpu(cpu) {
      		per_cpu(tasklet_vec, cpu).tail =
      			&per_cpu(tasklet_vec, cpu).head;
      		per_cpu(tasklet_hi_vec, cpu).tail =
      			&per_cpu(tasklet_hi_vec, cpu).head;
      	}
      
          // 建立TASKLET_SOFTIRQ、HI_SOFTIRQ软中断号的对应服务接口
      	open_softirq(TASKLET_SOFTIRQ, tasklet_action);
      	open_softirq(HI_SOFTIRQ, tasklet_hi_action);
      }
      

      当调用 tasklet_schedule() 时,如果该 tasklet 没有被设置 TASKLET_STATE_SCHED 标记时,才会加入链表内,如果已经设置了 TASKLET_STATE_SCHED 了,那么就会忽略此次的 tasklet _schedule(),这就意味着如果在极短的时间内调用 tasklet_schedule() 只会触发一次(这里可能会存在丢失中断事件的情况),之后会通过 raise_softirq_irqoff() 启用 TASKLET_SOFTIRQ 软中断。

      哪颗 cpu 受理该软中断,就将 tasklet 加入到该 cpu 的 tasklet 链表内,由于相同的软中断可以同时被其他 cpu 触发执行,因此会出现 cpu0\cpu1 的 tasklet 链表内有同一个 tasklet 的情况。

      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_schedule(struct tasklet_struct *t)
      {
      	if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
      		__tasklet_schedule(t);
      }
      
      // kernel\linux-4.9\kernel\softirq.c
      void __tasklet_schedule(struct tasklet_struct *t)
      {
      	unsigned long flags;
      
          // 将指定的 tasklet 加入到链表内并设置软中断
      	local_irq_save(flags);
      	t->next = NULL;
      	*__this_cpu_read(tasklet_vec.tail) = t;
      	__this_cpu_write(tasklet_vec.tail, &(t->next));
      	raise_softirq_irqoff(TASKLET_SOFTIRQ);
      	local_irq_restore(flags);
      }
      

      在初始化的时候已经为  TASKLET_SOFTIRQ 软中断与 tasklet_action() 建立关联的关系,因此软中断触发时,就会调用 tasklet_action(),这里是精髓部分。

      多次触发软中断时,当前 cpu 只能同一时间执行一次相同的软中断,但是如果有多个 cpu 的话,那么会有多个 cpu 并发执行软中断,所以下面的 tasklet_action() 要想到这一点。

      1. tasklet_action() 屏蔽当前 CPU 中断,获取当前 CPU 的 tasklet 链表并清空原有的链表,在恢复中断。避免在操作链表的过程中,被硬件中断打断。

      2. 开始遍历链表取出 tasklet,先 TASKLET_STATE_RUN 标记确定该 tasklet 是否已经被其它 cpu 执行,因为该 tasklet 在执行的过程中,又被加入到当前的 cpu 的 tasklet 链表内。

      3. 如果没有被执行,就继续检查该 tasklet 是否被 tasklet_disable(),如果有被 disable 就清除 TASKLET_STATE_RUN 标记,这样可以重新被添加会当前 tasklet 链表内,等待再次执行。如果有被执行,即使又被设置了 TASKLET_STATE_RUN 标记,也会在第 5 步执行完成后,会清除掉该标记。

      4. 如果没有被 disable,那么就清除 TASKLET_STATE_SCHED 标记,该标记一旦被清除,就意味着在 tasklet 执行期间,tasklet_schedule() 可以继续添加新的 tasklet 其他 cpu  的 tasklet 链表内。如果当前 cpu 已经完成了 tasklet_action() ,新的 tasklet 也可能会重新添加到当前的  tasklet 链表。

      5. 这里就会执行通过 tasklet_init() 绑定的 func,也就是示例中的 my_tasklet_handle(),执行完成后再清除 TASKLET_STATE_RUN 标记,继续下一个 tasklet。

      6. 能走到这一步,会有两种情况,一种情况是即将执行 tasklet ,发现已经被 disable 掉了,另外一种情况是 tasklet 已经在其它 CPU 上执行中。无论哪种情况,都会将当前的 tasklet 重新放回到当前 cpu 的 tasklet 链表内,并调用 __raise_softirq_irqoff() 重新触发软中断(应该是启用该软中断)。

      注意,以上获取 TASKLET_STATE_RUN 和 TASKLET_STATE_SCHED 标记都是位原子操作,所以不会出现因并发引发的问题。

      // kernel\linux-4.9\kernel\softirq.c
      // 某 CPU 要调度各个 tasklet 的实现
      static __latent_entropy void tasklet_action(struct softirq_action *a)
      {
      	struct tasklet_struct *list;
      
          // 1 ------------------------------------------------------
      	local_irq_disable();
      	list = __this_cpu_read(tasklet_vec.head); // 获取 tasklet 链表
      	__this_cpu_write(tasklet_vec.head, NULL); // 清空 tasklet 链表
      	__this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
      	local_irq_enable();
      
          // 如果存在 tasklet 就会进入循环
      	while (list) 
          {
      		struct tasklet_struct *t = list;
      		list = list->next;
      
              // 2 ------------------------------------------------------
              // TASKLET_STATE_SCHED: 表示该 tasklet 已经被挂接到某个 CPU 上
              // TASKLET_STATE_RUN:   表示该 tasklet 正在某个 CPU 上执行
              // 检查并设置 TASKLET_STATE_RUN 标记
              // 返回 1: 表示 tasklet 没有被执行  返回 0: 表示 tasklet 已经被执行
      		if (tasklet_trylock(t)) 
              {
                  
                  // 3 ------------------------------------------------------
                  // 如果当前 tasklet 没有被 tasklet_disable()
      			if (!atomic_read(&t->count))  
                  {
                      // 4 ----------------------------------------------
                      // 清除 TASKLET_STATE_SCHED 状态,便于该 tasklet 可以再次被触发
      				if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
      					BUG();
                      // 5 ----------------------------------------------
                      // 这期间,该 tasklet 可以被 tasklet_schedule(),从而引出下面第二种情况
      				t->func(t->data);        // 如果没有执行且没有 disable 则执行
      				tasklet_unlock(t);        // 清理 TASKLET_STATE_RUN 标记
      				continue;                 // 继续下一个 tasklet
      			}
      
                  // 如果当前已经被 disable 了,那就清理 TASKLET_STATE_RUN 标记
      			tasklet_unlock(t);
      		}
      
              // 6 -----------------------------------------------------
              // 有两种情况下,会将该 tasklet 再挂接回链表内,并重新触发,等待下一次执行的机会
              // 1. 如果没有被执行,但是被调用 tasklet_disable() 接口 disable 了
              // 2. 当前 tasklet 已经在其它 CPU 正在执行 func 这时候 tasklet 又会被挂回在
              //    原来的链表中,为了满足同一种类型的 tasklet 只能在一个 CPU 上执行的设计
              //    因此此次不执行 tasklet,挂入链表后等待下一次被执行的时机执行
      		local_irq_disable();
      		t->next = NULL;
      		*__this_cpu_read(tasklet_vec.tail) = t; 
      		__this_cpu_write(tasklet_vec.tail, &(t->next));
      		__raise_softirq_irqoff(TASKLET_SOFTIRQ);
      		local_irq_enable();
      	}
      }
      

      贴出判断和标记和清除 tasklet 运行的 TASKLET_STATE_RUN  标记代码

      // kernel\linux-4.9\include\interrupt.h
      static inline int tasklet_trylock(struct tasklet_struct *t)
      {
      	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_unlock(struct tasklet_struct *t)
      {
      	smp_mb__before_atomic();
      	clear_bit(TASKLET_STATE_RUN, &(t)->state);
      }
      

      贴出关闭 tasklet 执行的代码

      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_disable(struct tasklet_struct *t)
      {
      	tasklet_disable_nosync(t);
      	tasklet_unlock_wait(t);
      	smp_mb();
      }
      
      // kernel\linux-4.9\include\interrupt.h
      // 关闭 tasklet(实际上应该理解为暂停调度执行)
      static inline void tasklet_disable_nosync(struct tasklet_struct *t)
      {
          // 这里对整型原子操作 count 自增了,对应 tasklet_action() 里面的 atomic_read()
      	atomic_inc(&t->count);
      	smp_mb__after_atomic();
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_enable(struct tasklet_struct *t)
      {
          // 这里对整型原子操作 count 自减了,对应 tasklet_action() 里面的 atomic_read()
      	smp_mb__before_atomic();
      	atomic_dec(&t->count);
      }
      tasklet_init() 实现与用户函数关联的接口,也是很简单 
      
      // kernel\linux-4.9\kernel\softirq.c
      void tasklet_init(struct tasklet_struct *t,
      		  void (*func)(unsigned long), unsigned long data)
      {
      	t->next = NULL;
      	t->state = 0;
      	atomic_set(&t->count, 0);
      	t->func = func;
      	t->data = data;
      }
      

      tasklet_kill() 主要实现是尽可能快的让 tasklet 得到执行,等待执行完成后再退出。

      1. 判断该 tasklet 是否已经被挂接到某个 cpu 的 tasklet 链表内,如果有挂接到,那么就立即让出 cpu,直至 tasklet 清除 TASKLET_STATE_SCHED 标记(参考 tasklet_action() 第 4 个步骤),进入运行状态。

      2. 如果 tasklet 没有挂接或者一旦进入到执行状态,那么就会不停的检测 TASKLET_STATE_RUN 是否有被清除,被清除的话说明已经运行完成(参考 tasklet_action() 第5个步骤),可以放心的退出了。

      // kernel\linux-4.9\kernel\softirq.c
      void tasklet_kill(struct tasklet_struct *t)
      {
      	if (in_interrupt())
      		pr_notice("Attempt to kill tasklet from interrupt\n");
      
          // 1 ----------------------------------------------
      	while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
      		do {
      			yield();
      		} while (test_bit(TASKLET_STATE_SCHED, &t->state));
      	}
          // 2 ----------------------------------------------
      	tasklet_unlock_wait(t);
      	clear_bit(TASKLET_STATE_SCHED, &t->state);
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_unlock_wait(struct tasklet_struct *t)
      {
      	while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
      }
      

      至此,tasklet 关键的实现原理分析完成,实际上还应联合 softirq,才算完整的了解整个机制。

      Tasklet 机制总结

      1. 每颗 cpu 都有自己的 tasklet 链表,这样可以将 tasklet 分布在各个 cpu 上,可实现并发不同的 tasklet。

      2. 相同的 tasklet 只能在某一颗 cpu 上串行执行,其它 cpu 会暂时避让,在此情况下,不需要考虑并发问题(即不需要加锁)。

      3. tasklet_schedule() 接口调用时,如果 tasklet 还未被执行,或者处于 disable 期间,指定的 tasklet 不会被加入链表内,即该请求不会被受理。

      1. tasklet_disable() 接口只是暂时停止指定的 tasklet 执行,依然会被加回待执行链表内。而在 disable 期间,相同的 tasklet 将无法被加入链表调度。
      posted in Linux
      D
      dream
    • 【分析笔记】Linux 内核自旋锁的理解和使用原则

      ​
      自旋锁简单说明

      自旋锁主要解决在竞态并发下,保护执行时间很短的临界区。它只允许一个执行单位进入临界区,在该执行单位离开前,其它的执行单位将会在进入临界区前不停的循环等待(即所谓的自旋),直至该执行单位离开临界区后,最先等待的一个执行单位会立即进入临界区。此方式不涉及到上下文切换,因此效率极高。

      出现并发的场景

      硬中断触发打断当前进程、softirq、tasklet、timer等形成的并发
      softirq(软中断)、tasklet(小任务)、timer(内核定时器) 触发打断 当前进程(或内核线程)形成的并发
      在 SMP 系统下,多次触发 softirq 之间形成的并发(同一个 softirq 可在多个 cpu 并发执行)
      在 SMP 系统下,不同 tasklet、timer 之间的并发(同一个 tasklet 和 timer 不会并发执行)
      在内核抢占的调度机制形成高低优先级进程之间(或内核线程)的并发

      额外的注意事项

      一、软中断在同一个cpu下并不会并发,但是在多个cpu下是可以并发的,因此性能很高。

      如网卡接受数据,产生一个中断后,被 cpu0 处理,关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。此时网卡又再次产生中断,会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续参与,由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。

      二、同一个 tasklet、timer 在同一时间,只会在一个cpu上运行,是为了易用性做出的牺牲。

      由于 tasklet,timer 都是基于 softirq 的基础实现,为了易用性考虑,与 softirq 不同的是,同一种tasklet、timer 在多个cpu上也不会并行执行,因此不存在并发问题。

      三、新版本的 Linux 内核不再支持中断嵌套(不确定是从哪个版本开始,以下为内核补丁说明)
      https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=e58aa3d2d0cc

      自旋锁的种类说明

      最基础的自旋锁有三个版本:

      1. spin_lock()\spin_unlock()

      这是最基础的自旋锁,也是对系统影响最小的自旋锁,在未获得锁时,会自旋等待进入临界区。

      1. spin_lock_bh()\spin_unlock_bh()

      这是在最基础的自旋锁上获取锁之前,先关闭中断底半部,明确的来说就是关闭软中断(包含基于软中断实现的 tasklet 和 timer),主要影响系统的软中断类的并发。

      1. spin_lock_irq()\spin_unlock_irq()、spin_lock_irqsave()\spin_unlock_irqrestore()

      这是在最基础的自旋锁上获取锁之前,先屏蔽当前 cpu 的中断,禁止内核抢占当前进程,主要用于防止软硬件中断并发,影响最大,它影响了当前 CPU 的软硬中断和进程调度。

      spin_lock_irq() 是会屏蔽当前 cpu 所有的中断,spin_unlock_irq() 会开启当前 cpu 所有的中断。spin_lock_irqsave() 是现将当前 cpu 的中断使能位取出来,然后在屏蔽当前 cpu 所有中断,spin_unlock_irqrestore() 再恢复之前的中断使能位。

      凡是用到 spin_lock_irq()\spin_unlock_irq() 都可以用 spin_lock_irqsave()\spin_unlock_irqrestore() 替换,根据使用情况决定选择哪种方式即可。例如希望中断执行完成后,所有的中断都要开启,那就选择 spin_lock_irq()\spin_unlock_irq(),如果希望中断执行完成后,只需要恢复执行前的中断开关状态,那么就选择 spin_lock_irqsave()\spin_unlock_irqrestore(),如执行前 A中断 本来就要求关闭的,那么执行完之后,还是希望 A中断 仍处于关闭状态。

      使用自旋锁的原则

      首先要先明确硬件中断的优先级最高,它可以随时打断软中断和内核线程与用户进程,他们之间的优先级如下:

      硬中断 >>> 软中断(含基于软中断实现的 tasklet、timer) >>> 内核线程\用户进程

      然后需要确定谁可能会并发访问临界区,然后遵循如下规则,选择合适的锁即可:

      • 低优先级要防着高优先级的,用能禁止高优先级的自旋锁,而高优先级的只需最简单的锁
      • 同等级要防着同等级的, 就使用最简单自旋锁

      一、低优先级要防着高优先级的,用能禁止高优先级的自旋锁,而高优先级的只需最简单的锁

      例1:用户进程上下文或内核线程 和 硬件中断 都会访问同一个临界区

      用户进程:使用 spin_lock_irq()\spin_unlock_irq()

      硬件中断:使用 spin_lock()\spin_unlock()

      进程上下文访问临界区要防止被硬件中断打断侵入,就需要通过调用 spin_lock_irq()\spin_unlock_irq()  禁止当前 CPU 的中断再去获取锁,那么临界区内就不会被硬件中断访问。但它也只能关闭当前 cpu 的中断,此时其它 cpu 还能继续响应中断,所以中断内部还是需要加上 spin_lock()\spin_unlock() 来保护临界区,即使该中断未拿到锁而持续自旋,也不会影响进程上下文继续执行,顶多就自旋等待一会就能获得锁。

      这里也能说明,被自旋锁保护的临界区代码不能太过复杂,不然在这种场景下,就会导致中断自旋时间过长,在该中断自旋期间就无法响应其它的中断,如 tick 心跳中断,最终可能导致系统异常死机。

      例2:软中断(softirq、tasklet、timer) 和 硬件中断 都会访问同一个临界区

      软件中断:使用 spin_lock_irq()\spin_unlock_irq()

      硬件中断:使用 spin_lock()\spin_unlock()

      例3:用户进程上下文或内核线程 和 软中断(softirq、tasklet、timer) 都会访问同一个临界区

      用户进程:使用 spin_lock_bh()\spin_unlock_bh()

      软件中断:使用 spin_lock()\spin_unlock()

      进程上下文访问临界区要防止被软中断打断侵入,就需要使用 spin_lock_bh()\spin_unlock_bh() 禁用软中断,但只能关闭当前 cpu 的软中断,其它 cpu 依然能响应软中断,因此还需在软中断中使用 spin_lock()\spin_unlock() 来保护临界区。

      二、同等级要防着同等级的, 就使用最简单自旋锁

      例1:用户进程上下文或内核线程 和 用户进程上下文和内核线程 即多个进程会访问同一个临界区

      只需要使用:spin_lock()\spin_unlock(),因为内核支持抢占调度,所以需要上锁。

      例2:不同的 硬件中断 都会访问同一个临界区

      只需要使用: spin_lock()\spin_unlock(),不同的硬件中断是可以同时被多颗 cpu 响应处理的,因此需要使用自旋锁进行保护。

      如果是旧版内核支持中断嵌套的,则应该使用 spin_lock_irq()\spin_unlock_irq(),以避免被高优先级中断抢占,从而导致出现死锁情况。

      例3:不同的 tasklet、timer 会访问同一个临界区

      只需要使用:spin_lock()\spin_unlock(),因为不同的 tasklet 或 timer 是可以在不同的 cpu 并发执行。

      注意,如果只有相同的 tasklet 或者 timer 访问临界区,是不需要加锁的,因为相同的 tasklet 或 timer 不会并发,即使是有多个 cpu 也不会。

      例4:在一个或者多个软中断(softirq) 中会访问同一个临界区

      只需要使用:spin_lock()\spin_unlock(),虽然同一时间一个 cpu 只能执行一个软中断,但其它的 cpu 还是可以并发执行相同的软中断的。

      三、workqueue、waitqueue、completion 用锁规则

      workqueue(工作队列)是基于内核线程实现、waitqueue(等待队列)工作在用户进程上下文、completion(完成量)是基于等待队列实现也是工作在用户进程上下文,因此它们的用锁规则等同于用户进程。

      自旋锁的代码分析

      自旋锁在不同的硬件环境的实现不一样,此处分析以最复杂的环境下自旋锁的实现原理,即:SMP 下支持任务抢占的硬件环境。

      一、自旋锁初始化

      锁的数据结构定义,这个数据结构用的是结构体内嵌共用体设计,理解这点非常重要。

      #define TICKET_SHIFT	16    // 指明 owner 和 next 的位宽
      
      typedef struct {
      	union {                
      		u32 slock;        // 32 位
      		struct __raw_tickets {
      			u16 owner;    // 16 位 解锁计数
      			u16 next;     // 16 位 上锁计数
      		} tickets;
      	};
      } arch_spinlock_t;
      锁的初始化 spin_lock_init() :lock->raw_lock 成员变量被设置为 __ARCH_SPIN_LOCK_UNLOCKED    { { 0 } },即 lock->tickets.owner = 0, lock->tickets.next = 0
      
      #define spin_lock_init(_lock)				\
      do {							\
      	spinlock_check(_lock);				\
      	raw_spin_lock_init(&(_lock)->rlock);		\
      } while (0)
      
      # define raw_spin_lock_init(lock)				\
      	do { *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0)
      
      #define __RAW_SPIN_LOCK_UNLOCKED(lockname)	\
      	(raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname)
      
      #define __RAW_SPIN_LOCK_INITIALIZER(lockname)	\
      	{					\
      	.raw_lock = __ARCH_SPIN_LOCK_UNLOCKED,	\
      	SPIN_DEBUG_INIT(lockname)		\
      	SPIN_DEP_MAP_INIT(lockname) }
      
      #define __ARCH_SPIN_LOCK_UNLOCKED	{ { 0 } }
      

      二、自旋锁上锁的分析

      static __always_inline void spin_lock(spinlock_t *lock)
      {
      	raw_spin_lock(&lock->rlock);   // 调用宏
      }
      
      #define raw_spin_lock(lock)	_raw_spin_lock(lock)  // 调用  _raw_spin_lock
      
      void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
      {
      	__raw_spin_lock(lock); // 继续调用 __raw_spin_lock
      }
      
      static inline void __raw_spin_lock(raw_spinlock_t *lock)
      {
          // 设置当前进程不可被抢占
      	preempt_disable();
          spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
          // 调用 do_raw_spin_lock
      	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
      }
      

      上面的代码使用 preempt_disable() 设置当前进程不可被抢占,此举可避免在持锁期间被高优先级进程抢占当前进程去访问临界区。

      void do_raw_spin_lock(raw_spinlock_t *lock)
      {
      	debug_spin_lock_before(lock);
      	arch_spin_lock(&lock->raw_lock);  // 调用 arch_spin_lock
      	debug_spin_lock_after(lock);
      }
      
      static inline void arch_spin_lock(arch_spinlock_t *lock)
      {
      	unsigned long tmp;
      	u32 newval;
      	arch_spinlock_t lockval;
      
      	prefetchw(&lock->slock);
      	__asm__ __volatile__(
          "1:	ldrex	%0, [%3]\n"          // 4 -------------------------
          "	add	%1, %0, %4\n"            // 5 -------------------------
          "	strex	%2, %1, [%3]\n"      // 6 -------------------------
          "	teq	%2, #0\n"                // 7 -------------------------
          "	bne	1b"                         
      	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp) // 1 -----------
      	: "r" (&lock->slock), "I" (1 << TICKET_SHIFT) // 2 ------------
      	: "cc");  // 3 指明上述汇编指令会改变条件寄存器
      
          // 8 ---------------------------------------------
      	while (lockval.tickets.next != lockval.tickets.owner) {
      		wfe(); // 让当前 cpu 进入低功耗模式
      		lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
      	}
      
          // 9 ---------------------------------------------
      	smp_mb();
      }
      

      这部分的代码是实现自旋锁的核心,是通过内嵌汇编指令实现,使用比较关键的汇编指令 ldrex\strex 实现原子访问:

      • ldrex Rx, [Ry]:读取寄存器Ry指向的4字节内存值,将其保存到Rx寄存器中,同时标记对Ry指向内存区域的独占访问

      • strex Rx, Ry, [Rz]:如果发现已经被标记为独占访问了,则将寄存器Ry中的值更新到寄存器Rz指向的内存,并将寄存器Rx设置成0。指令执行成功后,会将独占访问标记位清除。

      • ldrex 负责拷贝数据和独占访问标记,strex 在根据标记存在与否拷贝数据和清除标记的过程是原子操作。

      以下为 arch_spin_lock() 的代码进行逐行解释:

      1. 将 lockval、newval、tmp 局部变量分别与 %0、%1、%2 编号关联(编号对应由编译器指定 CPU 的寄存器)

      2. 将 &lock->slock 、(1 << TICKET_SHIFT) 分别与 %3、%4 编号关联(编号会由编译器指定 CPU 的寄存器),这里的 TICKET_SHIFT  含义是指明内部变量位宽,代码定义的是 16,表面是 16 位宽的变量,也就是说 %4 是与 1 << 16 关联,便于汇编指令计算。

      3. 指定此内嵌的汇编指令会修改条件寄存器

      4. ldrex 指令实现读取 %3(lock->slock) 里面的数据到 %0(lockval),也就是将入参 lock 的数据拷贝到局部变量 lockval 中,并标记 lock 所在内存的独占访问标记。这一步主要是记录当前锁的计数。

      5. add 指令实现将 %0(lockval) + %4(1 << 16) ,结果放到 %1(newval),实现的效果等同于 newval= lockval.slock + (1 << 16)。这个步骤是根据 arch_spinlock_t  数据结构设计的,它内部是一个共同体,slock 与 tickets 使用的是相同的内存空间,slock 的低 16 位等同于 tickets.owner,高 16 位宽等同于 tickets.next。

      6. strex 会先检查 %3(lock->slock) 这块内存的独占标记是否还在,如果不在则设置 %2(tmp) 为 1,说明已经被其它线程修改了,如果还在的话设置为 0,将 %1(newval) 数据覆盖到  %3(lock->slock) ,再将独占访问标记清除。这两步主要是借助中间变量 newval 对 next 计数进行自增后,更新到原 lock 里面。

      7. teq 和 bne 指令实现判断如果 %2(tmp) 不等于 0,说明已经被其它线程修改了,重新再跳转到标签 1 执行,也就是重新跳转回第 4 个步骤继续执行,否则就继续往下执行。

      8. 这里主要是不停的判断当前的当前锁的 owen 是否与当前的 next 相等,如果不相等则一直循环检查,这个步骤就是实现了我们所说的自旋功能。当其它持有锁的线程要对该锁进行解锁,解锁的操作会将 owen 自增,当 owen 与当前记录的 next 相等,就会让当前线程退出自旋。

      9. 自旋退出了,就意味着拿到锁了,就可以访问临界区了。

      三、自旋锁解锁的分析

      static __always_inline void spin_unlock(spinlock_t *lock)
      {
      	raw_spin_unlock(&lock->rlock);
      }
      
      #define raw_spin_unlock(lock)		_raw_spin_unlock(lock)
      
      void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
      {
      	__raw_spin_unlock(lock);
      }
      
      static inline void __raw_spin_unlock(raw_spinlock_t *lock)
      {
      	spin_release(&lock->dep_map, 1, _RET_IP_);
      	do_raw_spin_unlock(lock);
           // 2 -------------------------------------
      	preempt_enable();
      }
      
      void do_raw_spin_unlock(raw_spinlock_t *lock)
      {
      	debug_spin_unlock(lock);
      	arch_spin_unlock(&lock->raw_lock);
      }
      
      static inline void arch_spin_unlock(arch_spinlock_t *lock)
      {
      	smp_mb();
          // 1 -------------------------------------
      	lock->tickets.owner++;
      	dsb_sev();
      }
      
      1. 对 lock->tickets.owner 进行自增,这样可以让等待锁的线程退出自旋

      2. 恢复内核对当前线程的抢占

      推演自旋锁的工作过程:

      从自旋锁刚进行初始化的状态来推演:lock->tickets.owner = 0,lock->tickets.next = 0

      1. 线程A:开始申请锁:读取锁 lock 到临时变量 A.lockval 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0

      1. 线程B:开始申请锁:读取锁 lock 到临时变量 B.lockval 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner =0,lock->tickets.next = 0,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 0

      1. 线程A:计算锁序号:对 A.newval =  A.lockval.slock + (1<<16) = 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval = 0x10000

      4. 线程C:开始申请锁:读取锁 lock 到临时变量 lockvalC 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 0

      5. 线程B:计算锁序号:对 B.newval =  B.lockval.slock + (1<<16) = 0x00 + 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 0,B.newval = 0x10000

      1. 线程A:更新锁计数:检查该内存区域的独占访问标记存在,将 A.newval 覆盖到 lock->slock 并清除独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 1,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval​​​​​​​ = 0x10000

      1. 线程A:锁持有检测:由于 A.lockval.tickets.next == A.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

      lock->tickets.owner = 0,lock->tickets.next = 1,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval​​​​​​​ = 0x10000

      1. 线程B:更新锁计数:检查该内存区域的独占访问标记已被清除,重新读取锁 lock 到临时变量 B.lockval 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner =0,lock->tickets.next = 1,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1​​​​​​​

      9. 线程B:计算锁序号:对 B.newval​​​​​​​ =  B.lockval.slock + (1<<16) = 0x10000 + 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 1,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

      1. 线程B:更新锁计数:检查该内存区域的独占访问标记存在,将 B.newval​​​​​​​ 覆盖到 lock->slock 并清除独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 2,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

      1. 线程B:锁持有检测:由于 B.lockval.tickets.next != B.lockval.tickets.owner,因此进入 while 循环,不停的读取 lock->tickets.owner 并覆盖到 B.lockval.tickets.owner

      lock->tickets.owner = 0,lock->tickets.next = 2,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

      12. 线程C:计算锁序号:对 C.newval​​​​​​​ =  C.lockval.slock + (1<<16) = 0x00 + 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 0,C.newval = 0x10000

      1. 线程C:更新锁计数:检查该内存区域的独占访问标记已被清除,重新读取锁 lock 到临时变量 C.lockval 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 2,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2

      14. 线程C:计算锁序号:对 C.newval​​​​​​​ =  C.lockval.slock + (1<<16) = 0x20000 + 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 2,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2,C.newval​​​​​​​ = 0x30000​​​​​​​

      1. 线程C:更新锁计数:检查该内存区域的独占访问标记存在,将 C.newval​​​​​​​ 覆盖到 lock->slock ​​​​​​​并清除独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 3,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2,C.newval​​​​​​​ = 0x30000

      1. 线程C:锁持有检测:由于 C.lockval.tickets.next != C.lockval.tickets.owner,因此进入 while 循环,不停的读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner

      lock->tickets.owner = 0,lock->tickets.next = 3,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2

      至此,线程B 一直在等待 lock->tickets.owner ​​​​​​​等于 1,而 线程C 则一直在等待  lock->tickets.owner ​​​​​​​等于 2

      1. 线程A:访问完临界区后,调用 spin_unlock() 释放锁,lock->tickets.owner = lock->tickets.owner + 1

      lock->tickets.owner = 1,lock->tickets.next = 3

      1. 线程B:锁持有检测:读取 lock->tickets.owner 并覆盖到 B.lockval.tickets.owner,由于 B.lockval.tickets.next == B.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

      lock->tickets.owner = 1,lock->tickets.next = 3,B.lockval.tickets.owner = 1,B.lockval.tickets.next = 1

      1. 线程C:锁持有检测:继续读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner,由于 C.lockval.tickets.next != C.lockval.tickets.owner,继续循环自旋。

      lock->tickets.owner = 1,lock->tickets.next = 3,C.lockval.tickets.owner = 1,C.lockval.tickets.next = 2​​​​​​​

      1. 线程B:访问完临界区后,调用 spin_unlock() 释放锁,lock->tickets.owner = lock->tickets.owner + 1

      lock->tickets.owner = 2,lock->tickets.next = 3

      1. 线程C:锁持有检测:读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner,由于 C.lockval.tickets.next == C.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

      lock->tickets.owner = 2,lock->tickets.next = 3,C.lockval.tickets.owner = 2,C.lockval.tickets.next = 2​​​​​​​

      整个过程推演完毕,很巧妙的借助两个计数器和局部变量,实现等锁线程的有序排队,该思路也适用于应用程序开发。

      自旋锁的原理总结

      1. 自旋锁是通过 ldrex、strex 来确保读写锁的计数器是原子操作的,这是 arm 芯片级实现的。

      2. 在上锁的过程中,会有两处循环,第一处是汇编指令循环, 第二处是 C 语言的 while 循环,两个循环的意义不一样:

      • 汇编指令循环:实现对锁计数器的原子读写,确保得到的锁数据是最新的,锁的计数更新是准确无误的。

      • C语言的循环:实现锁在等待时的自旋功能,通过比较计数器,实现谁先等待锁,谁就先得到锁的有序排队。

      以上两个循环的协同工作,前者实现原子操作和记录,后者实现了有序排队,完成了自旋锁的核心互斥功能。

      posted in Linux
      D
      dream

    Latest posts made by dream

    • Reply: 系统启动打印key pressed value

      @jinxiangwzh 有个通过 ADC 检测按键的 ADC,加颗上拉电阻。 不然只能修改 BOOT0

      posted in Linux
      D
      dream
    • Reply: PnpFe|In: 非法ID

      @rayzhang 你这像是固件问题。

      posted in 编译和烧写问题专区
      D
      dream
    • Reply: V853我们来了!!!

      @allwinnertech 支持,期待配套视频出来。

      posted in V853系列-AI视觉
      D
      dream
    • 【分析笔记】Linux tasklet 机制的理解

      ​

      Tasklet 介绍

      Linux 内核提供的四种中断下半部中 softirq(软中断)、tasklet(小任务)、workqueue(工作队列) 、request thread(中断线程)中的其中一种,其效率仅次于软中断,但远高于request thread 和 workqueue。

      • 软中断(softirq) 之所以性能高的原因,在 SMP 系统下多个 cpu 同时并发处理
        如网卡的 fifo 半满中断触发,被 cpu0 处理,cpu0 会在关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。若网卡的 fifo 全满中断有再次触发,就会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续相同的流程。由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。

      • 小任务(tasklet) 之所以性能较软中断差,是因为同一种小任务在多个 cpu 上不会并发执行
        由于 tasklet 基于 softirq 的基础实现,为了易用性考虑,同一种 tasklet 在多个 cpu 上不会并行执行,因此不存在并发问题,在使用上就可以少一些顾虑。但也正是因为不存在并发,导致了性能较之 softirq 差一些。

      • tasklet 之所以比 workqueue 和 request thread 性能高

      原因是因为前者是在软中断上下文件工作(意味着不能调用任何阻塞的接口),而后两者是在进程上下文工作(实质上是在内核线程里面执行)。

      使用示例模版

      #include <linux/module.h>
      #include <linux/init.h>
      #include <linux/kernel.h>
      #include <linux/interrupt.h>
      
      static struct tasklet_struct my_tasklet;
      
      static void my_tasklet_handle(unsigned long data)
      {
      	printk("tasklet handle running...\n");
      }
      
      static irqreturn_t xxx_interrupt(int irq, void *dev_id)
      {
          // 调度 tasklet
          tasklet_schedule(&my_tasklet);
      }
      
      static int __init demo_driver_init(void)
      {
          // 初始化一个 tasklet ,关联处理函数
      	tasklet_init(&my_tasklet, my_tasklet_handle, 0);
          request_irq(xxx, xxx_interrupt, IRQF_SHARED, xxx, xxx);
      	return 0;
      }
      
      static void __exit demo_driver_exit(void)
      {
      	tasklet_kill(&my_tasklet);
      	return ;
      }
      
      module_init(demo_driver_init);
      module_exit(demo_driver_exit);
      MODULE_LICENSE("GPL v2");
      

      内核源码分析

      Linux 内核被启动后,会执行 start_kernel() 函数,tasklet 是基于 softirq 实现的,会在 softirq_init() 里面进行必要的初始化,主要是初始化 tasklet 链表和与相应的软中断号建立关联。

      tasklet_hi_action 是高优先级的 tasklet,tasklet_action 是普通的 tasklet,两者实现原理都一样。

      // kernel\linux-4.9\init\main.c
      asmlinkage __visible void __init start_kernel(void)
      {
      	...
      	softirq_init();
      	...
      }
      
      // kernel\linux-4.9\kernel\softirq.c
      void __init softirq_init(void)
      {
      	int cpu;
      
          // 初始化链表
      	for_each_possible_cpu(cpu) {
      		per_cpu(tasklet_vec, cpu).tail =
      			&per_cpu(tasklet_vec, cpu).head;
      		per_cpu(tasklet_hi_vec, cpu).tail =
      			&per_cpu(tasklet_hi_vec, cpu).head;
      	}
      
          // 建立TASKLET_SOFTIRQ、HI_SOFTIRQ软中断号的对应服务接口
      	open_softirq(TASKLET_SOFTIRQ, tasklet_action);
      	open_softirq(HI_SOFTIRQ, tasklet_hi_action);
      }
      

      当调用 tasklet_schedule() 时,如果该 tasklet 没有被设置 TASKLET_STATE_SCHED 标记时,才会加入链表内,如果已经设置了 TASKLET_STATE_SCHED 了,那么就会忽略此次的 tasklet _schedule(),这就意味着如果在极短的时间内调用 tasklet_schedule() 只会触发一次(这里可能会存在丢失中断事件的情况),之后会通过 raise_softirq_irqoff() 启用 TASKLET_SOFTIRQ 软中断。

      哪颗 cpu 受理该软中断,就将 tasklet 加入到该 cpu 的 tasklet 链表内,由于相同的软中断可以同时被其他 cpu 触发执行,因此会出现 cpu0\cpu1 的 tasklet 链表内有同一个 tasklet 的情况。

      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_schedule(struct tasklet_struct *t)
      {
      	if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
      		__tasklet_schedule(t);
      }
      
      // kernel\linux-4.9\kernel\softirq.c
      void __tasklet_schedule(struct tasklet_struct *t)
      {
      	unsigned long flags;
      
          // 将指定的 tasklet 加入到链表内并设置软中断
      	local_irq_save(flags);
      	t->next = NULL;
      	*__this_cpu_read(tasklet_vec.tail) = t;
      	__this_cpu_write(tasklet_vec.tail, &(t->next));
      	raise_softirq_irqoff(TASKLET_SOFTIRQ);
      	local_irq_restore(flags);
      }
      

      在初始化的时候已经为  TASKLET_SOFTIRQ 软中断与 tasklet_action() 建立关联的关系,因此软中断触发时,就会调用 tasklet_action(),这里是精髓部分。

      多次触发软中断时,当前 cpu 只能同一时间执行一次相同的软中断,但是如果有多个 cpu 的话,那么会有多个 cpu 并发执行软中断,所以下面的 tasklet_action() 要想到这一点。

      1. tasklet_action() 屏蔽当前 CPU 中断,获取当前 CPU 的 tasklet 链表并清空原有的链表,在恢复中断。避免在操作链表的过程中,被硬件中断打断。

      2. 开始遍历链表取出 tasklet,先 TASKLET_STATE_RUN 标记确定该 tasklet 是否已经被其它 cpu 执行,因为该 tasklet 在执行的过程中,又被加入到当前的 cpu 的 tasklet 链表内。

      3. 如果没有被执行,就继续检查该 tasklet 是否被 tasklet_disable(),如果有被 disable 就清除 TASKLET_STATE_RUN 标记,这样可以重新被添加会当前 tasklet 链表内,等待再次执行。如果有被执行,即使又被设置了 TASKLET_STATE_RUN 标记,也会在第 5 步执行完成后,会清除掉该标记。

      4. 如果没有被 disable,那么就清除 TASKLET_STATE_SCHED 标记,该标记一旦被清除,就意味着在 tasklet 执行期间,tasklet_schedule() 可以继续添加新的 tasklet 其他 cpu  的 tasklet 链表内。如果当前 cpu 已经完成了 tasklet_action() ,新的 tasklet 也可能会重新添加到当前的  tasklet 链表。

      5. 这里就会执行通过 tasklet_init() 绑定的 func,也就是示例中的 my_tasklet_handle(),执行完成后再清除 TASKLET_STATE_RUN 标记,继续下一个 tasklet。

      6. 能走到这一步,会有两种情况,一种情况是即将执行 tasklet ,发现已经被 disable 掉了,另外一种情况是 tasklet 已经在其它 CPU 上执行中。无论哪种情况,都会将当前的 tasklet 重新放回到当前 cpu 的 tasklet 链表内,并调用 __raise_softirq_irqoff() 重新触发软中断(应该是启用该软中断)。

      注意,以上获取 TASKLET_STATE_RUN 和 TASKLET_STATE_SCHED 标记都是位原子操作,所以不会出现因并发引发的问题。

      // kernel\linux-4.9\kernel\softirq.c
      // 某 CPU 要调度各个 tasklet 的实现
      static __latent_entropy void tasklet_action(struct softirq_action *a)
      {
      	struct tasklet_struct *list;
      
          // 1 ------------------------------------------------------
      	local_irq_disable();
      	list = __this_cpu_read(tasklet_vec.head); // 获取 tasklet 链表
      	__this_cpu_write(tasklet_vec.head, NULL); // 清空 tasklet 链表
      	__this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
      	local_irq_enable();
      
          // 如果存在 tasklet 就会进入循环
      	while (list) 
          {
      		struct tasklet_struct *t = list;
      		list = list->next;
      
              // 2 ------------------------------------------------------
              // TASKLET_STATE_SCHED: 表示该 tasklet 已经被挂接到某个 CPU 上
              // TASKLET_STATE_RUN:   表示该 tasklet 正在某个 CPU 上执行
              // 检查并设置 TASKLET_STATE_RUN 标记
              // 返回 1: 表示 tasklet 没有被执行  返回 0: 表示 tasklet 已经被执行
      		if (tasklet_trylock(t)) 
              {
                  
                  // 3 ------------------------------------------------------
                  // 如果当前 tasklet 没有被 tasklet_disable()
      			if (!atomic_read(&t->count))  
                  {
                      // 4 ----------------------------------------------
                      // 清除 TASKLET_STATE_SCHED 状态,便于该 tasklet 可以再次被触发
      				if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
      					BUG();
                      // 5 ----------------------------------------------
                      // 这期间,该 tasklet 可以被 tasklet_schedule(),从而引出下面第二种情况
      				t->func(t->data);        // 如果没有执行且没有 disable 则执行
      				tasklet_unlock(t);        // 清理 TASKLET_STATE_RUN 标记
      				continue;                 // 继续下一个 tasklet
      			}
      
                  // 如果当前已经被 disable 了,那就清理 TASKLET_STATE_RUN 标记
      			tasklet_unlock(t);
      		}
      
              // 6 -----------------------------------------------------
              // 有两种情况下,会将该 tasklet 再挂接回链表内,并重新触发,等待下一次执行的机会
              // 1. 如果没有被执行,但是被调用 tasklet_disable() 接口 disable 了
              // 2. 当前 tasklet 已经在其它 CPU 正在执行 func 这时候 tasklet 又会被挂回在
              //    原来的链表中,为了满足同一种类型的 tasklet 只能在一个 CPU 上执行的设计
              //    因此此次不执行 tasklet,挂入链表后等待下一次被执行的时机执行
      		local_irq_disable();
      		t->next = NULL;
      		*__this_cpu_read(tasklet_vec.tail) = t; 
      		__this_cpu_write(tasklet_vec.tail, &(t->next));
      		__raise_softirq_irqoff(TASKLET_SOFTIRQ);
      		local_irq_enable();
      	}
      }
      

      贴出判断和标记和清除 tasklet 运行的 TASKLET_STATE_RUN  标记代码

      // kernel\linux-4.9\include\interrupt.h
      static inline int tasklet_trylock(struct tasklet_struct *t)
      {
      	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_unlock(struct tasklet_struct *t)
      {
      	smp_mb__before_atomic();
      	clear_bit(TASKLET_STATE_RUN, &(t)->state);
      }
      

      贴出关闭 tasklet 执行的代码

      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_disable(struct tasklet_struct *t)
      {
      	tasklet_disable_nosync(t);
      	tasklet_unlock_wait(t);
      	smp_mb();
      }
      
      // kernel\linux-4.9\include\interrupt.h
      // 关闭 tasklet(实际上应该理解为暂停调度执行)
      static inline void tasklet_disable_nosync(struct tasklet_struct *t)
      {
          // 这里对整型原子操作 count 自增了,对应 tasklet_action() 里面的 atomic_read()
      	atomic_inc(&t->count);
      	smp_mb__after_atomic();
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_enable(struct tasklet_struct *t)
      {
          // 这里对整型原子操作 count 自减了,对应 tasklet_action() 里面的 atomic_read()
      	smp_mb__before_atomic();
      	atomic_dec(&t->count);
      }
      tasklet_init() 实现与用户函数关联的接口,也是很简单 
      
      // kernel\linux-4.9\kernel\softirq.c
      void tasklet_init(struct tasklet_struct *t,
      		  void (*func)(unsigned long), unsigned long data)
      {
      	t->next = NULL;
      	t->state = 0;
      	atomic_set(&t->count, 0);
      	t->func = func;
      	t->data = data;
      }
      

      tasklet_kill() 主要实现是尽可能快的让 tasklet 得到执行,等待执行完成后再退出。

      1. 判断该 tasklet 是否已经被挂接到某个 cpu 的 tasklet 链表内,如果有挂接到,那么就立即让出 cpu,直至 tasklet 清除 TASKLET_STATE_SCHED 标记(参考 tasklet_action() 第 4 个步骤),进入运行状态。

      2. 如果 tasklet 没有挂接或者一旦进入到执行状态,那么就会不停的检测 TASKLET_STATE_RUN 是否有被清除,被清除的话说明已经运行完成(参考 tasklet_action() 第5个步骤),可以放心的退出了。

      // kernel\linux-4.9\kernel\softirq.c
      void tasklet_kill(struct tasklet_struct *t)
      {
      	if (in_interrupt())
      		pr_notice("Attempt to kill tasklet from interrupt\n");
      
          // 1 ----------------------------------------------
      	while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
      		do {
      			yield();
      		} while (test_bit(TASKLET_STATE_SCHED, &t->state));
      	}
          // 2 ----------------------------------------------
      	tasklet_unlock_wait(t);
      	clear_bit(TASKLET_STATE_SCHED, &t->state);
      }
      
      // kernel\linux-4.9\include\interrupt.h
      static inline void tasklet_unlock_wait(struct tasklet_struct *t)
      {
      	while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
      }
      

      至此,tasklet 关键的实现原理分析完成,实际上还应联合 softirq,才算完整的了解整个机制。

      Tasklet 机制总结

      1. 每颗 cpu 都有自己的 tasklet 链表,这样可以将 tasklet 分布在各个 cpu 上,可实现并发不同的 tasklet。

      2. 相同的 tasklet 只能在某一颗 cpu 上串行执行,其它 cpu 会暂时避让,在此情况下,不需要考虑并发问题(即不需要加锁)。

      3. tasklet_schedule() 接口调用时,如果 tasklet 还未被执行,或者处于 disable 期间,指定的 tasklet 不会被加入链表内,即该请求不会被受理。

      1. tasklet_disable() 接口只是暂时停止指定的 tasklet 执行,依然会被加回待执行链表内。而在 disable 期间,相同的 tasklet 将无法被加入链表调度。
      posted in Linux
      D
      dream
    • 【分析笔记】Linux 内核自旋锁的理解和使用原则

      ​
      自旋锁简单说明

      自旋锁主要解决在竞态并发下,保护执行时间很短的临界区。它只允许一个执行单位进入临界区,在该执行单位离开前,其它的执行单位将会在进入临界区前不停的循环等待(即所谓的自旋),直至该执行单位离开临界区后,最先等待的一个执行单位会立即进入临界区。此方式不涉及到上下文切换,因此效率极高。

      出现并发的场景

      硬中断触发打断当前进程、softirq、tasklet、timer等形成的并发
      softirq(软中断)、tasklet(小任务)、timer(内核定时器) 触发打断 当前进程(或内核线程)形成的并发
      在 SMP 系统下,多次触发 softirq 之间形成的并发(同一个 softirq 可在多个 cpu 并发执行)
      在 SMP 系统下,不同 tasklet、timer 之间的并发(同一个 tasklet 和 timer 不会并发执行)
      在内核抢占的调度机制形成高低优先级进程之间(或内核线程)的并发

      额外的注意事项

      一、软中断在同一个cpu下并不会并发,但是在多个cpu下是可以并发的,因此性能很高。

      如网卡接受数据,产生一个中断后,被 cpu0 处理,关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。此时网卡又再次产生中断,会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续参与,由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。

      二、同一个 tasklet、timer 在同一时间,只会在一个cpu上运行,是为了易用性做出的牺牲。

      由于 tasklet,timer 都是基于 softirq 的基础实现,为了易用性考虑,与 softirq 不同的是,同一种tasklet、timer 在多个cpu上也不会并行执行,因此不存在并发问题。

      三、新版本的 Linux 内核不再支持中断嵌套(不确定是从哪个版本开始,以下为内核补丁说明)
      https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=e58aa3d2d0cc

      自旋锁的种类说明

      最基础的自旋锁有三个版本:

      1. spin_lock()\spin_unlock()

      这是最基础的自旋锁,也是对系统影响最小的自旋锁,在未获得锁时,会自旋等待进入临界区。

      1. spin_lock_bh()\spin_unlock_bh()

      这是在最基础的自旋锁上获取锁之前,先关闭中断底半部,明确的来说就是关闭软中断(包含基于软中断实现的 tasklet 和 timer),主要影响系统的软中断类的并发。

      1. spin_lock_irq()\spin_unlock_irq()、spin_lock_irqsave()\spin_unlock_irqrestore()

      这是在最基础的自旋锁上获取锁之前,先屏蔽当前 cpu 的中断,禁止内核抢占当前进程,主要用于防止软硬件中断并发,影响最大,它影响了当前 CPU 的软硬中断和进程调度。

      spin_lock_irq() 是会屏蔽当前 cpu 所有的中断,spin_unlock_irq() 会开启当前 cpu 所有的中断。spin_lock_irqsave() 是现将当前 cpu 的中断使能位取出来,然后在屏蔽当前 cpu 所有中断,spin_unlock_irqrestore() 再恢复之前的中断使能位。

      凡是用到 spin_lock_irq()\spin_unlock_irq() 都可以用 spin_lock_irqsave()\spin_unlock_irqrestore() 替换,根据使用情况决定选择哪种方式即可。例如希望中断执行完成后,所有的中断都要开启,那就选择 spin_lock_irq()\spin_unlock_irq(),如果希望中断执行完成后,只需要恢复执行前的中断开关状态,那么就选择 spin_lock_irqsave()\spin_unlock_irqrestore(),如执行前 A中断 本来就要求关闭的,那么执行完之后,还是希望 A中断 仍处于关闭状态。

      使用自旋锁的原则

      首先要先明确硬件中断的优先级最高,它可以随时打断软中断和内核线程与用户进程,他们之间的优先级如下:

      硬中断 >>> 软中断(含基于软中断实现的 tasklet、timer) >>> 内核线程\用户进程

      然后需要确定谁可能会并发访问临界区,然后遵循如下规则,选择合适的锁即可:

      • 低优先级要防着高优先级的,用能禁止高优先级的自旋锁,而高优先级的只需最简单的锁
      • 同等级要防着同等级的, 就使用最简单自旋锁

      一、低优先级要防着高优先级的,用能禁止高优先级的自旋锁,而高优先级的只需最简单的锁

      例1:用户进程上下文或内核线程 和 硬件中断 都会访问同一个临界区

      用户进程:使用 spin_lock_irq()\spin_unlock_irq()

      硬件中断:使用 spin_lock()\spin_unlock()

      进程上下文访问临界区要防止被硬件中断打断侵入,就需要通过调用 spin_lock_irq()\spin_unlock_irq()  禁止当前 CPU 的中断再去获取锁,那么临界区内就不会被硬件中断访问。但它也只能关闭当前 cpu 的中断,此时其它 cpu 还能继续响应中断,所以中断内部还是需要加上 spin_lock()\spin_unlock() 来保护临界区,即使该中断未拿到锁而持续自旋,也不会影响进程上下文继续执行,顶多就自旋等待一会就能获得锁。

      这里也能说明,被自旋锁保护的临界区代码不能太过复杂,不然在这种场景下,就会导致中断自旋时间过长,在该中断自旋期间就无法响应其它的中断,如 tick 心跳中断,最终可能导致系统异常死机。

      例2:软中断(softirq、tasklet、timer) 和 硬件中断 都会访问同一个临界区

      软件中断:使用 spin_lock_irq()\spin_unlock_irq()

      硬件中断:使用 spin_lock()\spin_unlock()

      例3:用户进程上下文或内核线程 和 软中断(softirq、tasklet、timer) 都会访问同一个临界区

      用户进程:使用 spin_lock_bh()\spin_unlock_bh()

      软件中断:使用 spin_lock()\spin_unlock()

      进程上下文访问临界区要防止被软中断打断侵入,就需要使用 spin_lock_bh()\spin_unlock_bh() 禁用软中断,但只能关闭当前 cpu 的软中断,其它 cpu 依然能响应软中断,因此还需在软中断中使用 spin_lock()\spin_unlock() 来保护临界区。

      二、同等级要防着同等级的, 就使用最简单自旋锁

      例1:用户进程上下文或内核线程 和 用户进程上下文和内核线程 即多个进程会访问同一个临界区

      只需要使用:spin_lock()\spin_unlock(),因为内核支持抢占调度,所以需要上锁。

      例2:不同的 硬件中断 都会访问同一个临界区

      只需要使用: spin_lock()\spin_unlock(),不同的硬件中断是可以同时被多颗 cpu 响应处理的,因此需要使用自旋锁进行保护。

      如果是旧版内核支持中断嵌套的,则应该使用 spin_lock_irq()\spin_unlock_irq(),以避免被高优先级中断抢占,从而导致出现死锁情况。

      例3:不同的 tasklet、timer 会访问同一个临界区

      只需要使用:spin_lock()\spin_unlock(),因为不同的 tasklet 或 timer 是可以在不同的 cpu 并发执行。

      注意,如果只有相同的 tasklet 或者 timer 访问临界区,是不需要加锁的,因为相同的 tasklet 或 timer 不会并发,即使是有多个 cpu 也不会。

      例4:在一个或者多个软中断(softirq) 中会访问同一个临界区

      只需要使用:spin_lock()\spin_unlock(),虽然同一时间一个 cpu 只能执行一个软中断,但其它的 cpu 还是可以并发执行相同的软中断的。

      三、workqueue、waitqueue、completion 用锁规则

      workqueue(工作队列)是基于内核线程实现、waitqueue(等待队列)工作在用户进程上下文、completion(完成量)是基于等待队列实现也是工作在用户进程上下文,因此它们的用锁规则等同于用户进程。

      自旋锁的代码分析

      自旋锁在不同的硬件环境的实现不一样,此处分析以最复杂的环境下自旋锁的实现原理,即:SMP 下支持任务抢占的硬件环境。

      一、自旋锁初始化

      锁的数据结构定义,这个数据结构用的是结构体内嵌共用体设计,理解这点非常重要。

      #define TICKET_SHIFT	16    // 指明 owner 和 next 的位宽
      
      typedef struct {
      	union {                
      		u32 slock;        // 32 位
      		struct __raw_tickets {
      			u16 owner;    // 16 位 解锁计数
      			u16 next;     // 16 位 上锁计数
      		} tickets;
      	};
      } arch_spinlock_t;
      锁的初始化 spin_lock_init() :lock->raw_lock 成员变量被设置为 __ARCH_SPIN_LOCK_UNLOCKED    { { 0 } },即 lock->tickets.owner = 0, lock->tickets.next = 0
      
      #define spin_lock_init(_lock)				\
      do {							\
      	spinlock_check(_lock);				\
      	raw_spin_lock_init(&(_lock)->rlock);		\
      } while (0)
      
      # define raw_spin_lock_init(lock)				\
      	do { *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0)
      
      #define __RAW_SPIN_LOCK_UNLOCKED(lockname)	\
      	(raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname)
      
      #define __RAW_SPIN_LOCK_INITIALIZER(lockname)	\
      	{					\
      	.raw_lock = __ARCH_SPIN_LOCK_UNLOCKED,	\
      	SPIN_DEBUG_INIT(lockname)		\
      	SPIN_DEP_MAP_INIT(lockname) }
      
      #define __ARCH_SPIN_LOCK_UNLOCKED	{ { 0 } }
      

      二、自旋锁上锁的分析

      static __always_inline void spin_lock(spinlock_t *lock)
      {
      	raw_spin_lock(&lock->rlock);   // 调用宏
      }
      
      #define raw_spin_lock(lock)	_raw_spin_lock(lock)  // 调用  _raw_spin_lock
      
      void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
      {
      	__raw_spin_lock(lock); // 继续调用 __raw_spin_lock
      }
      
      static inline void __raw_spin_lock(raw_spinlock_t *lock)
      {
          // 设置当前进程不可被抢占
      	preempt_disable();
          spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
          // 调用 do_raw_spin_lock
      	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
      }
      

      上面的代码使用 preempt_disable() 设置当前进程不可被抢占,此举可避免在持锁期间被高优先级进程抢占当前进程去访问临界区。

      void do_raw_spin_lock(raw_spinlock_t *lock)
      {
      	debug_spin_lock_before(lock);
      	arch_spin_lock(&lock->raw_lock);  // 调用 arch_spin_lock
      	debug_spin_lock_after(lock);
      }
      
      static inline void arch_spin_lock(arch_spinlock_t *lock)
      {
      	unsigned long tmp;
      	u32 newval;
      	arch_spinlock_t lockval;
      
      	prefetchw(&lock->slock);
      	__asm__ __volatile__(
          "1:	ldrex	%0, [%3]\n"          // 4 -------------------------
          "	add	%1, %0, %4\n"            // 5 -------------------------
          "	strex	%2, %1, [%3]\n"      // 6 -------------------------
          "	teq	%2, #0\n"                // 7 -------------------------
          "	bne	1b"                         
      	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp) // 1 -----------
      	: "r" (&lock->slock), "I" (1 << TICKET_SHIFT) // 2 ------------
      	: "cc");  // 3 指明上述汇编指令会改变条件寄存器
      
          // 8 ---------------------------------------------
      	while (lockval.tickets.next != lockval.tickets.owner) {
      		wfe(); // 让当前 cpu 进入低功耗模式
      		lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
      	}
      
          // 9 ---------------------------------------------
      	smp_mb();
      }
      

      这部分的代码是实现自旋锁的核心,是通过内嵌汇编指令实现,使用比较关键的汇编指令 ldrex\strex 实现原子访问:

      • ldrex Rx, [Ry]:读取寄存器Ry指向的4字节内存值,将其保存到Rx寄存器中,同时标记对Ry指向内存区域的独占访问

      • strex Rx, Ry, [Rz]:如果发现已经被标记为独占访问了,则将寄存器Ry中的值更新到寄存器Rz指向的内存,并将寄存器Rx设置成0。指令执行成功后,会将独占访问标记位清除。

      • ldrex 负责拷贝数据和独占访问标记,strex 在根据标记存在与否拷贝数据和清除标记的过程是原子操作。

      以下为 arch_spin_lock() 的代码进行逐行解释:

      1. 将 lockval、newval、tmp 局部变量分别与 %0、%1、%2 编号关联(编号对应由编译器指定 CPU 的寄存器)

      2. 将 &lock->slock 、(1 << TICKET_SHIFT) 分别与 %3、%4 编号关联(编号会由编译器指定 CPU 的寄存器),这里的 TICKET_SHIFT  含义是指明内部变量位宽,代码定义的是 16,表面是 16 位宽的变量,也就是说 %4 是与 1 << 16 关联,便于汇编指令计算。

      3. 指定此内嵌的汇编指令会修改条件寄存器

      4. ldrex 指令实现读取 %3(lock->slock) 里面的数据到 %0(lockval),也就是将入参 lock 的数据拷贝到局部变量 lockval 中,并标记 lock 所在内存的独占访问标记。这一步主要是记录当前锁的计数。

      5. add 指令实现将 %0(lockval) + %4(1 << 16) ,结果放到 %1(newval),实现的效果等同于 newval= lockval.slock + (1 << 16)。这个步骤是根据 arch_spinlock_t  数据结构设计的,它内部是一个共同体,slock 与 tickets 使用的是相同的内存空间,slock 的低 16 位等同于 tickets.owner,高 16 位宽等同于 tickets.next。

      6. strex 会先检查 %3(lock->slock) 这块内存的独占标记是否还在,如果不在则设置 %2(tmp) 为 1,说明已经被其它线程修改了,如果还在的话设置为 0,将 %1(newval) 数据覆盖到  %3(lock->slock) ,再将独占访问标记清除。这两步主要是借助中间变量 newval 对 next 计数进行自增后,更新到原 lock 里面。

      7. teq 和 bne 指令实现判断如果 %2(tmp) 不等于 0,说明已经被其它线程修改了,重新再跳转到标签 1 执行,也就是重新跳转回第 4 个步骤继续执行,否则就继续往下执行。

      8. 这里主要是不停的判断当前的当前锁的 owen 是否与当前的 next 相等,如果不相等则一直循环检查,这个步骤就是实现了我们所说的自旋功能。当其它持有锁的线程要对该锁进行解锁,解锁的操作会将 owen 自增,当 owen 与当前记录的 next 相等,就会让当前线程退出自旋。

      9. 自旋退出了,就意味着拿到锁了,就可以访问临界区了。

      三、自旋锁解锁的分析

      static __always_inline void spin_unlock(spinlock_t *lock)
      {
      	raw_spin_unlock(&lock->rlock);
      }
      
      #define raw_spin_unlock(lock)		_raw_spin_unlock(lock)
      
      void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
      {
      	__raw_spin_unlock(lock);
      }
      
      static inline void __raw_spin_unlock(raw_spinlock_t *lock)
      {
      	spin_release(&lock->dep_map, 1, _RET_IP_);
      	do_raw_spin_unlock(lock);
           // 2 -------------------------------------
      	preempt_enable();
      }
      
      void do_raw_spin_unlock(raw_spinlock_t *lock)
      {
      	debug_spin_unlock(lock);
      	arch_spin_unlock(&lock->raw_lock);
      }
      
      static inline void arch_spin_unlock(arch_spinlock_t *lock)
      {
      	smp_mb();
          // 1 -------------------------------------
      	lock->tickets.owner++;
      	dsb_sev();
      }
      
      1. 对 lock->tickets.owner 进行自增,这样可以让等待锁的线程退出自旋

      2. 恢复内核对当前线程的抢占

      推演自旋锁的工作过程:

      从自旋锁刚进行初始化的状态来推演:lock->tickets.owner = 0,lock->tickets.next = 0

      1. 线程A:开始申请锁:读取锁 lock 到临时变量 A.lockval 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0

      1. 线程B:开始申请锁:读取锁 lock 到临时变量 B.lockval 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner =0,lock->tickets.next = 0,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 0

      1. 线程A:计算锁序号:对 A.newval =  A.lockval.slock + (1<<16) = 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval = 0x10000

      4. 线程C:开始申请锁:读取锁 lock 到临时变量 lockvalC 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 0

      5. 线程B:计算锁序号:对 B.newval =  B.lockval.slock + (1<<16) = 0x00 + 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 0,B.newval = 0x10000

      1. 线程A:更新锁计数:检查该内存区域的独占访问标记存在,将 A.newval 覆盖到 lock->slock 并清除独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 1,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval​​​​​​​ = 0x10000

      1. 线程A:锁持有检测:由于 A.lockval.tickets.next == A.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

      lock->tickets.owner = 0,lock->tickets.next = 1,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval​​​​​​​ = 0x10000

      1. 线程B:更新锁计数:检查该内存区域的独占访问标记已被清除,重新读取锁 lock 到临时变量 B.lockval 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner =0,lock->tickets.next = 1,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1​​​​​​​

      9. 线程B:计算锁序号:对 B.newval​​​​​​​ =  B.lockval.slock + (1<<16) = 0x10000 + 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 1,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

      1. 线程B:更新锁计数:检查该内存区域的独占访问标记存在,将 B.newval​​​​​​​ 覆盖到 lock->slock 并清除独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 2,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

      1. 线程B:锁持有检测:由于 B.lockval.tickets.next != B.lockval.tickets.owner,因此进入 while 循环,不停的读取 lock->tickets.owner 并覆盖到 B.lockval.tickets.owner

      lock->tickets.owner = 0,lock->tickets.next = 2,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval​​​​​​​ = 0x20000

      12. 线程C:计算锁序号:对 C.newval​​​​​​​ =  C.lockval.slock + (1<<16) = 0x00 + 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 0,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 0,C.newval = 0x10000

      1. 线程C:更新锁计数:检查该内存区域的独占访问标记已被清除,重新读取锁 lock 到临时变量 C.lockval 并设置该内存区域独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 2,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2

      14. 线程C:计算锁序号:对 C.newval​​​​​​​ =  C.lockval.slock + (1<<16) = 0x20000 + 0x10000,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 2,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2,C.newval​​​​​​​ = 0x30000​​​​​​​

      1. 线程C:更新锁计数:检查该内存区域的独占访问标记存在,将 C.newval​​​​​​​ 覆盖到 lock->slock ​​​​​​​并清除独占标记,此时的状态:

      lock->tickets.owner = 0,lock->tickets.next = 3,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2,C.newval​​​​​​​ = 0x30000

      1. 线程C:锁持有检测:由于 C.lockval.tickets.next != C.lockval.tickets.owner,因此进入 while 循环,不停的读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner

      lock->tickets.owner = 0,lock->tickets.next = 3,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2

      至此,线程B 一直在等待 lock->tickets.owner ​​​​​​​等于 1,而 线程C 则一直在等待  lock->tickets.owner ​​​​​​​等于 2

      1. 线程A:访问完临界区后,调用 spin_unlock() 释放锁,lock->tickets.owner = lock->tickets.owner + 1

      lock->tickets.owner = 1,lock->tickets.next = 3

      1. 线程B:锁持有检测:读取 lock->tickets.owner 并覆盖到 B.lockval.tickets.owner,由于 B.lockval.tickets.next == B.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

      lock->tickets.owner = 1,lock->tickets.next = 3,B.lockval.tickets.owner = 1,B.lockval.tickets.next = 1

      1. 线程C:锁持有检测:继续读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner,由于 C.lockval.tickets.next != C.lockval.tickets.owner,继续循环自旋。

      lock->tickets.owner = 1,lock->tickets.next = 3,C.lockval.tickets.owner = 1,C.lockval.tickets.next = 2​​​​​​​

      1. 线程B:访问完临界区后,调用 spin_unlock() 释放锁,lock->tickets.owner = lock->tickets.owner + 1

      lock->tickets.owner = 2,lock->tickets.next = 3

      1. 线程C:锁持有检测:读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner,由于 C.lockval.tickets.next == C.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。

      lock->tickets.owner = 2,lock->tickets.next = 3,C.lockval.tickets.owner = 2,C.lockval.tickets.next = 2​​​​​​​

      整个过程推演完毕,很巧妙的借助两个计数器和局部变量,实现等锁线程的有序排队,该思路也适用于应用程序开发。

      自旋锁的原理总结

      1. 自旋锁是通过 ldrex、strex 来确保读写锁的计数器是原子操作的,这是 arm 芯片级实现的。

      2. 在上锁的过程中,会有两处循环,第一处是汇编指令循环, 第二处是 C 语言的 while 循环,两个循环的意义不一样:

      • 汇编指令循环:实现对锁计数器的原子读写,确保得到的锁数据是最新的,锁的计数更新是准确无误的。

      • C语言的循环:实现锁在等待时的自旋功能,通过比较计数器,实现谁先等待锁,谁就先得到锁的有序排队。

      以上两个循环的协同工作,前者实现原子操作和记录,后者实现了有序排队,完成了自旋锁的核心互斥功能。

      posted in Linux
      D
      dream
    • 【分析笔记】NXP PCF85263 设备驱动分析笔记

      驱动移植

      供应商无法提供相应的驱动程序,不过在 linux 最新的内核倒是有一份 pcf85363 的驱动,看代码并核对寄存器功能,是可以兼容 pcf85263 芯片。只是我们用的内核比较老 linux 4.9,rtc 子系统的接口有些变化,不能直接拿来用。根据 Linux 4.9 现有的驱动程序,修改了 pcf85363 驱动,可以正常的使用。

      https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/drivers/rtc/rtc-pcf85363.c?h=v6.0

      驱动框架

      RTC 设备驱动

      RTC 子系统的设备驱动还是非常简单的,只要按要求实现五个接口就能使用。

      struct rtc_class_ops {
      	......
      	int (*read_time)(struct device *dev, struct rtc_time *tm);
      	int (*set_time)(struct device *dev, struct rtc_time *tm);
      	int (*read_alarm)(struct device *dev, struct rtc_wkalrm *alrm);
      	int (*set_alarm)(struct device *dev, struct rtc_wkalrm *alrm);
      	int (*alarm_irq_enable)(struct device *dev, unsigned int enabled);
      	......
      };
      

      通过以下接口即可注册到系统内:
      devm_rtc_device_register(&client->dev, client->name, &rtc_ops, THIS_MODULE);

      rtc_class_ops->read_time(struct device *dev, struct rtc_time *tm)
      1. 应用层通过 ioctl(fd, RTC_RD_TIME, &rtc_tm) 读取时间的回调接口。
      2. 驱动层需要读取芯片里面的时间日期寄存器组,转换为十进制更新到 rtc_tm。
      
      rtc_class_ops->set_time(struct device *dev, struct rtc_time *tm)
      1. 应用层通过 ioctl(fd, RTC_SET_TIME, &rtc_tm) 设置时间的回调接口。
      2. 驱动层需要停止芯片计时,并将 tm 转换为 BCD 更新到时间日期寄存器组。
      
      rtc_class_ops->read_alarm(struct device *dev, struct rtc_wkalrm *alrm)
      1. 应用层通过 ioctl(fd, RTC_ALM_READ, &rtc_tm) 读取闹钟的回调接口。
      2. 驱动层需要读取芯片里面的 alarm 寄存器组,转换为十进制更新到 alrm->time。
      3. 驱动层需要读取芯片里面的 alarm 控制寄存器,更新到 alrm->enabled。
      
      rtc_class_ops->set_alarm(struct device *dev, struct rtc_wkalrm *alrm)
      1. 应用层通过 ioctl(fd, RTC_ALM_SET, &rtc_tm) 设置闹钟的回调接口。
      
      rtc_class_ops->alarm_irq_enable(struct device *dev, unsigned int enabled)
      1. 应用层通过 ioctl(fd, RTC_AIE_ON/RTC_AIE_OFF, 0) 命令开启关闭闹钟中断的回调接口。
      2. 应用层通过 read(fd, &data, sizeof(unsigned long))/select() 等待被中断唤醒。
      3. 驱动层需要通过 GPIO 注册中断,并在中断里读取中断标志位,通过标志位来确定调用如下接口,唤醒应用程序。
      	rtc_update_irq(pcf85x63->rtc, 1, RTC_IRQF | RTC_AF);
      

      应用层访问设备驱动的流程:

      APP ---> rtc/rtc-dev.c(/dev/rtcX) ---> rtc/interface.c ---> pcf85263.c
      
      RTC 设备驱动验证

      方法一:使用现成的命令 hwclock

      root@localhost:~# hwclock -f /dev/rtc0 --show
      2022-10-20 09:30:12.679335+0800
      

      方法二:自己编写应用程序验证

      参考:linux-4.9\tools\testing\selftests\timers\rtctest.c
      

      RTC 调试过程

      一、测量硬件电压和时钟晶体
      1. 根据芯片手册,测量供电是否正常。
      2. 测量晶振频率是否为 32768 Hz。
      3. 测量 I2C 总线的上拉是否正常。
      二、测试芯片能不能正常工作

      查看寄存器手册,只要启动 RTC 时钟,再读取秒数寄存器,有累加即可确认正常。
      在这里插入图片描述
      在这里插入图片描述

      root@localhost:~# i2cset -f -y 1 0x51 0x2e 0x00
      root@localhost:~# i2cget -f -y 1 0x51 0x01
      0x15
      root@localhost:~# i2cget -f -y 1 0x51 0x01
      0x16
      root@localhost:~# i2cget -f -y 1 0x51 0x01
      0x17
      
      三、驱动移植
      1. 如果供应商有现成的驱动程序,当然是最快的。
      2. 如果供应商没有,则看看最新的内核有没有。
      3. 如果都没有,就拿相似的驱动程序根据芯片手册编写。
      四、时间同步
      1. 设置系统时间从RTC启动和恢复
      2. 通过NTP同步方式设置RTC时间
      make ARCH=arm64 menuconfig
      
      Device Drivers -->
      	[*] Real Time Clock -->
      		[*]   Set system time from RTC on startup and resume
      		(rtc0)  RTC used to set the system time           
      		[*]   Set the RTC time based on NTP synchronization 
      		(rtc0)  RTC used to synchronize NTP adjustment 
      

      驱动分析

      一、初始化流程
      1. 从 DTS 获取配置的中断引脚,85263 通过引脚产生中断通知 SOC。
      2. 配置 0x2B 寄存器为 0x00, 清除所有的中断标志
      3. 配置 0x27 寄存器, 设置 INTA(7pin) 引脚作为中断输出引脚
      4. 申请中断并设置为低电平触发, 且在中断响应期间不重复触发(IRQF_ONESHOT)
      5. 按要求实现 rtc 的五个基本回调接口,并调用 devm_rtc_device_register() 注册到 RTC 子系统。
        在这里插入图片描述
      // 获取中断引脚
      pcf85x63->irq_number = 0;
      gpio_config.gpio = of_get_named_gpio_flags(np, "int_port", 0, (enum of_gpio_flags *)(&gpio_config));
      if (gpio_is_valid(gpio_config.gpio)){
      	pcf85x63->irq_gpio = gpio_config.gpio;
      	pcf85x63->irq_number = gpio_to_irq(pcf85x63->irq_gpio);
      }
      
      if(0 == pcf85x63->irq_number){
      	dev_err(&client->dev, "get int gpio failed....\n");
      	return -EINVAL;
      }
      
      // 配置 0x2B 寄存器为 0x00, 清除所有的中断标志
      regmap_write(pcf85x63->regmap, CTRL_FLAGS, 0);
      // 配置 0x27 寄存器, 设置 INTA(7pin) 引脚作为中断输出引脚
      regmap_update_bits(pcf85x63->regmap, CTRL_PIN_IO, PIN_IO_INTA_OUT, PIN_IO_INTAPM);
      // 申请中断并设置为低电平触发(IRQF_TRIGGER_LOW), 且在中断响应期间不重复触发(IRQF_ONESHOT)
      ret = devm_request_threaded_irq(&client->dev, pcf85x63->irq_number, NULL, pcf85x63_rtc_handle_irq, IRQF_TRIGGER_LOW | IRQF_ONESHOT, client->name, client);
      if (ret) {
      	dev_warn(&client->dev, "unable to request irq, alarms disabled\n");
      	return -EINVAL;
      }
      
      // 注册 RTC 设备
      pcf85x63->client = client;
      i2c_set_clientdata(client, pcf85x63);
      pcf85x63->rtc = devm_rtc_device_register(&client->dev, client->name, &rtc_ops, THIS_MODULE);
      if (IS_ERR(pcf85x63->rtc)){
      	dev_err(&client->dev, "register rtc device failed....\n");
      	return PTR_ERR(pcf85x63->rtc);
      }
      
      二、读取时间的实现
      1. 一次性读出时间日期寄存器组(00h ~ 07h)。
      2. 由于芯片寄存器是以 BCD 方式存储,所以需要转换为十进制,并复制到 tm。
        在这里插入图片描述
      static int pcf85x63_rtc_read_time(struct device *dev, struct rtc_time *tm)
      {
      	struct pcf85x63 *pcf85x63 = dev_get_drvdata(dev);
      	unsigned char buf[DT_YEARS + 1];
      	int ret, len = sizeof(buf);
      	
      	// 一次性读出时间日期寄存器组(00h ~ 07h)
      	if ((ret = regmap_bulk_read(pcf85x63->regmap, DT_100THS, buf, len))) {
      		dev_err(dev, "%s: error %d\n", __func__, ret);
      		return ret;
      	}
      	
      	// 通过 BCD 转换
      	tm->tm_year = bcd2bin(buf[DT_YEARS]);
      	tm->tm_year += 100; // adjust for 1900 base of rtc_time
      	tm->tm_wday = buf[DT_WEEKDAYS] & 7;
      	buf[DT_SECS] &= 0x7F;
      	tm->tm_sec = bcd2bin(buf[DT_SECS]);
      	buf[DT_MINUTES] &= 0x7F;
      	tm->tm_min = bcd2bin(buf[DT_MINUTES]);
      	tm->tm_hour = bcd2bin(buf[DT_HOURS]);
      	tm->tm_mday = bcd2bin(buf[DT_DAYS]);
      	tm->tm_mon = bcd2bin(buf[DT_MONTHS]) - 1;
      	return 0;
      }
      
      三、设置时间的实现
      1. 通过设置 0x2E 寄存器来切断外部时钟的分配器,实现停止计时。
      2. 通过设置 0x2F 寄存器重置预分频器。
      3. 将应用层传下的时间转换为 BCD 更新到时间日期寄存器组(00h ~ 07h)
      4. 配置 0x2E 寄存器为 0x00, 开始计时。
        在这里插入图片描述
        在这里插入图片描述
      static int pcf85x63_rtc_set_time(struct device *dev, struct rtc_time *tm)
      {
      	struct pcf85x63 *pcf85x63 = dev_get_drvdata(dev);
      	unsigned char tmp[11] = {0};
      	unsigned char *buf = &tmp[2];
      	int ret;
      	
      	// 要设置时间之前需要做的事情
      	tmp[0] = STOP_EN_STOP; 	// 配置 0x2E 寄存器为 0x01, 切断时钟, 停止计数( RTC clock is stopped)
      	tmp[1] = RESET_CPR;		// 配置 0x2F 寄存器为 0xA4,重置预分频器
      	if((ret = regmap_bulk_write(pcf85x63->regmap, CTRL_STOP_EN, tmp, 2)))
      		return ret;
      
      	// 将时间转换为 BCD 更新到时间日期寄存器组(00h ~ 07h)
      	buf[DT_100THS] = 0;
      	buf[DT_SECS] = bin2bcd(tm->tm_sec);
      	buf[DT_MINUTES] = bin2bcd(tm->tm_min);
      	buf[DT_HOURS] = bin2bcd(tm->tm_hour);
      	buf[DT_DAYS] = bin2bcd(tm->tm_mday);
      	buf[DT_WEEKDAYS] = tm->tm_wday;
      	buf[DT_MONTHS] = bin2bcd(tm->tm_mon + 1);
      	buf[DT_YEARS] = bin2bcd(tm->tm_year % 100);
      	if(regmap_bulk_write(pcf85x63->regmap, DT_100THS, buf, sizeof(tmp) - 2))
      		return ret;
      	
      	// 配置 0x2E 寄存器为 0x00, 开始计时( RTC clock runs)
      	return regmap_write(pcf85x63->regmap, CTRL_STOP_EN, 0);
      }
      
      四、读取闹钟的回调
      1. 一次性读出闹钟寄存器组(08h ~ 0Ch)。
      2. 将寄存器的 BCD 数据转换为十进制复制到 alrm->time。
      3. 读取 0x29 闹钟寄存器, 查询闹钟中断是否被使能。
      4. 如果闹钟中断被使能则通过更新 alrm->enabled 成员让应用层知道。
        在这里插入图片描述
      static int pcf85x63_rtc_read_alarm(struct device *dev, struct rtc_wkalrm *alrm)
      {
      	struct pcf85x63 *pcf85x63 = dev_get_drvdata(dev);
      	unsigned char buf[DT_MONTH_ALM1 - DT_SECOND_ALM1 + 1];
      	unsigned int val;
      	int ret;
      	
      	// 一次性读出闹钟寄存器组(08h ~ 0Ch)
      	if ((ret = regmap_bulk_read(pcf85x63->regmap, DT_SECOND_ALM1, buf, sizeof(buf))))
      		return ret;
      	
      	alrm->time.tm_sec = bcd2bin(buf[0]);
      	alrm->time.tm_min = bcd2bin(buf[1]);
      	alrm->time.tm_hour = bcd2bin(buf[2]);
      	alrm->time.tm_mday = bcd2bin(buf[3]);
      	alrm->time.tm_mon = bcd2bin(buf[4]) - 1;
      	
      	// 读取 0x29 闹钟寄存器, 查询闹钟中断是否被使能
      	if ((ret = regmap_read(pcf85x63->regmap, CTRL_INTA_EN, &val)))
      		return ret;
      	
      	// 如果闹钟中断被使能则通过更新 alrm->enabled 成员让应用层知道
      	alrm->enabled =  !!(val & INT_A1IE);
      	return 0;
      }
      
      五、设置闹钟的回调
      1. 关闭闹钟中断,避免设置过程中触发中断。
      2. 将应用层传下来的时间转化为 BCD,并更新到闹钟寄存器组(08h ~ 0Ch)。
      3. 根据应用层传下来的 alrm->enabled 决定是否再次打开闹钟中断。
      static int pcf85x63_rtc_set_alarm(struct device *dev, struct rtc_wkalrm *alrm)
      {
      	struct pcf85x63 *pcf85x63 = dev_get_drvdata(dev);
      	unsigned char buf[DT_MONTH_ALM1 - DT_SECOND_ALM1 + 1];
      	int ret;
      	
      	// 转换为 BCD 
      	buf[0] = bin2bcd(alrm->time.tm_sec);
      	buf[1] = bin2bcd(alrm->time.tm_min);
      	buf[2] = bin2bcd(alrm->time.tm_hour);
      	buf[3] = bin2bcd(alrm->time.tm_mday);
      	buf[4] = bin2bcd(alrm->time.tm_mon + 1);
      
      	 // 在设置时间之前先把中断关闭, 避免误触发中断
      	if ((ret = _pcf85x63_rtc_alarm_irq_enable(pcf85x63, 0)))
      		return ret;
      	
      	// 将时更新到闹钟寄存器组(08h ~ 0Ch)
      	if ((ret = regmap_bulk_write(pcf85x63->regmap, DT_SECOND_ALM1, buf, sizeof(buf))))
      		return ret;
      	
      	// 根据应用层的设置, 决定是否启用闹钟中断
      	return _pcf85x63_rtc_alarm_irq_enable(pcf85x63, alrm->enabled);
      }
      
      六、启用关闭闹钟中断
      1. 配置 0x10 寄存器, 启用/关闭 时、分、秒、日、月、的闹钟功能
      2. 配置 0x29 闹钟寄存器, 启用/关闭 闹钟中断, 上述月、日、时、分、秒有闹钟事件会触发中断
      3. 清除闹钟中断标志
        在这里插入图片描述
      static int _pcf85x63_rtc_alarm_irq_enable(struct pcf85x63 *pcf85x63, unsigned int enabled)
      {
      	int ret;
      	unsigned int alarm_flags = ALRM_SEC_A1E | ALRM_MIN_A1E | ALRM_HR_A1E | ALRM_DAY_A1E | ALRM_MON_A1E;
      	
      	// 配置 0x10 寄存器, 启用/关闭 时、分、秒、日、月、的闹钟功能
      	ret = regmap_update_bits(pcf85x63->regmap, DT_ALARM_EN, alarm_flags, enabled ? alarm_flags : 0);
      	if (ret){
      		return ret;
      	}
      	
      	// 配置 0x29 闹钟寄存器,  启用/关闭 闹钟中断, 上述月、日、时、分、秒有闹钟事件会触发中断
      	ret = regmap_update_bits(pcf85x63->regmap, CTRL_INTA_EN, INT_A1IE, enabled ? INT_A1IE : 0);
      	if (ret || enabled){
      		return ret;
      	}
      	
      	// 清除闹钟中断标志
      	return regmap_update_bits(pcf85x63->regmap, CTRL_FLAGS, FLAGS_A1F, 0);
      }
      
      static int pcf85x63_rtc_alarm_irq_enable(struct device *dev, unsigned int enabled)
      {
      	struct pcf85x63 *pcf85x63 = dev_get_drvdata(dev);
      	return _pcf85x63_rtc_alarm_irq_enable(pcf85x63, enabled);
      }
      
      七、闹钟中断服务程序
      1. 读取 0x2B 寄存器, 得到所有的中断标志。
      2. 如果是闹钟的中断(FLAGS_A1F),则调用 rtc_update_irq() 唤醒应用层,并清除闹钟中断标志位。
        在这里插入图片描述
      static irqreturn_t pcf85x63_rtc_handle_irq(int irq, void *dev_id)
      {
      	struct pcf85x63 *pcf85x63 = i2c_get_clientdata(dev_id);
      	unsigned int flags;
      	
      	// 读取 0x2B 寄存器, 得到所有的中断标志
      	if (regmap_read(pcf85x63->regmap, CTRL_FLAGS, &flags))
      		return IRQ_NONE;
      	
      	// 如果是闹钟的中断
      	if (flags & FLAGS_A1F) 
      	{
      		// 通知应用层有闹钟
      		rtc_update_irq(pcf85x63->rtc, 1, RTC_IRQF | RTC_AF);
      		// 清除闹钟中断标志
      		regmap_update_bits(pcf85x63->regmap, CTRL_FLAGS, FLAGS_A1F, 0);
      		return IRQ_HANDLED;
      	}
      
      	return IRQ_NONE;
      }
      
      posted in Linux
      D
      dream
    • Reply: 移植Qt 5.12.9 到 H616 上,支持 GPU 加速与 QtWebEngine

      @yuzukitsuru 感谢大佬分享

      posted in H616系列-OTT
      D
      dream
    • Reply: 分析Tina打包流程,并在buildroot上来实现打包生成tina镜像,支持PhoenixSuit烧写。

      @allwinnertech 好贴,感谢分享。

      posted in D1系列-RISC-V
      D
      dream
    • Reply: D1H支持1920*1200分辨率,但是1080*1920竖屏支持吗

      @xiaoxiao 我遇过类似的问题,开机 logo 的分辨力如果比显示屏大,就会出现问题,当时在 UBOOT 似乎未实现图片缩放的功能。(现在好像也没有)

      posted in D1系列-RISC-V
      D
      dream
    • Reply: 全志v831 以太网经常会断开又连上,现象跟拔掉网线又插上一样

      @weipengyao 还得改走线,才能彻底解决问题。

      posted in V853系列-AI视觉
      D
      dream