【分析笔记】Linux 内核自旋锁的理解和使用原则
-
自旋锁简单说明自旋锁主要解决在竞态并发下,保护执行时间很短的临界区。它只允许一个执行单位进入临界区,在该执行单位离开前,其它的执行单位将会在进入临界区前不停的循环等待(即所谓的自旋),直至该执行单位离开临界区后,最先等待的一个执行单位会立即进入临界区。此方式不涉及到上下文切换,因此效率极高。
出现并发的场景
硬中断触发打断当前进程、softirq、tasklet、timer等形成的并发
softirq(软中断)、tasklet(小任务)、timer(内核定时器) 触发打断 当前进程(或内核线程)形成的并发
在 SMP 系统下,多次触发 softirq 之间形成的并发(同一个 softirq 可在多个 cpu 并发执行)
在 SMP 系统下,不同 tasklet、timer 之间的并发(同一个 tasklet 和 timer 不会并发执行)
在内核抢占的调度机制形成高低优先级进程之间(或内核线程)的并发额外的注意事项
一、软中断在同一个cpu下并不会并发,但是在多个cpu下是可以并发的,因此性能很高。
如网卡接受数据,产生一个中断后,被 cpu0 处理,关闭中断后,将数据从网卡的 fifo 拷贝到 ram 之后触发软中断,再打开中断,基于谁触发谁处理原则,cpu0 会继续执行软中断服务函数。此时网卡又再次产生中断,会被 cpu1 处理,同样是关闭中断后拷贝数据再开启中断,再去触发和执行软中断进行网卡数据包处理。若此时 cpu0\cpu1 都还在软中断处理数据,网卡再次产生中断,那么 cpu2 就会继续参与,由此可见,软中断充分利用的多 cpu 进行并发处理,因此性能非常高,但也同时因为并发的存在,就需要考虑临界区的问题。
二、同一个 tasklet、timer 在同一时间,只会在一个cpu上运行,是为了易用性做出的牺牲。
由于 tasklet,timer 都是基于 softirq 的基础实现,为了易用性考虑,与 softirq 不同的是,同一种tasklet、timer 在多个cpu上也不会并行执行,因此不存在并发问题。
三、新版本的 Linux 内核不再支持中断嵌套(不确定是从哪个版本开始,以下为内核补丁说明)
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=e58aa3d2d0cc自旋锁的种类说明
最基础的自旋锁有三个版本:
- spin_lock()\spin_unlock()
这是最基础的自旋锁,也是对系统影响最小的自旋锁,在未获得锁时,会自旋等待进入临界区。
- spin_lock_bh()\spin_unlock_bh()
这是在最基础的自旋锁上获取锁之前,先关闭中断底半部,明确的来说就是关闭软中断(包含基于软中断实现的 tasklet 和 timer),主要影响系统的软中断类的并发。
- spin_lock_irq()\spin_unlock_irq()、spin_lock_irqsave()\spin_unlock_irqrestore()
这是在最基础的自旋锁上获取锁之前,先屏蔽当前 cpu 的中断,禁止内核抢占当前进程,主要用于防止软硬件中断并发,影响最大,它影响了当前 CPU 的软硬中断和进程调度。
spin_lock_irq() 是会屏蔽当前 cpu 所有的中断,spin_unlock_irq() 会开启当前 cpu 所有的中断。spin_lock_irqsave() 是现将当前 cpu 的中断使能位取出来,然后在屏蔽当前 cpu 所有中断,spin_unlock_irqrestore() 再恢复之前的中断使能位。
凡是用到 spin_lock_irq()\spin_unlock_irq() 都可以用 spin_lock_irqsave()\spin_unlock_irqrestore() 替换,根据使用情况决定选择哪种方式即可。例如希望中断执行完成后,所有的中断都要开启,那就选择 spin_lock_irq()\spin_unlock_irq(),如果希望中断执行完成后,只需要恢复执行前的中断开关状态,那么就选择 spin_lock_irqsave()\spin_unlock_irqrestore(),如执行前 A中断 本来就要求关闭的,那么执行完之后,还是希望 A中断 仍处于关闭状态。
使用自旋锁的原则
首先要先明确硬件中断的优先级最高,它可以随时打断软中断和内核线程与用户进程,他们之间的优先级如下:
硬中断 >>> 软中断(含基于软中断实现的 tasklet、timer) >>> 内核线程\用户进程
然后需要确定谁可能会并发访问临界区,然后遵循如下规则,选择合适的锁即可:
- 低优先级要防着高优先级的,用能禁止高优先级的自旋锁,而高优先级的只需最简单的锁
- 同等级要防着同等级的, 就使用最简单自旋锁
一、低优先级要防着高优先级的,用能禁止高优先级的自旋锁,而高优先级的只需最简单的锁
例1:用户进程上下文或内核线程 和 硬件中断 都会访问同一个临界区
用户进程:使用 spin_lock_irq()\spin_unlock_irq()
硬件中断:使用 spin_lock()\spin_unlock()
进程上下文访问临界区要防止被硬件中断打断侵入,就需要通过调用 spin_lock_irq()\spin_unlock_irq() 禁止当前 CPU 的中断再去获取锁,那么临界区内就不会被硬件中断访问。但它也只能关闭当前 cpu 的中断,此时其它 cpu 还能继续响应中断,所以中断内部还是需要加上 spin_lock()\spin_unlock() 来保护临界区,即使该中断未拿到锁而持续自旋,也不会影响进程上下文继续执行,顶多就自旋等待一会就能获得锁。
这里也能说明,被自旋锁保护的临界区代码不能太过复杂,不然在这种场景下,就会导致中断自旋时间过长,在该中断自旋期间就无法响应其它的中断,如 tick 心跳中断,最终可能导致系统异常死机。
例2:软中断(softirq、tasklet、timer) 和 硬件中断 都会访问同一个临界区
软件中断:使用 spin_lock_irq()\spin_unlock_irq()
硬件中断:使用 spin_lock()\spin_unlock()
例3:用户进程上下文或内核线程 和 软中断(softirq、tasklet、timer) 都会访问同一个临界区
用户进程:使用 spin_lock_bh()\spin_unlock_bh()
软件中断:使用 spin_lock()\spin_unlock()
进程上下文访问临界区要防止被软中断打断侵入,就需要使用 spin_lock_bh()\spin_unlock_bh() 禁用软中断,但只能关闭当前 cpu 的软中断,其它 cpu 依然能响应软中断,因此还需在软中断中使用 spin_lock()\spin_unlock() 来保护临界区。
二、同等级要防着同等级的, 就使用最简单自旋锁
例1:用户进程上下文或内核线程 和 用户进程上下文和内核线程 即多个进程会访问同一个临界区
只需要使用:spin_lock()\spin_unlock(),因为内核支持抢占调度,所以需要上锁。
例2:不同的 硬件中断 都会访问同一个临界区
只需要使用: spin_lock()\spin_unlock(),不同的硬件中断是可以同时被多颗 cpu 响应处理的,因此需要使用自旋锁进行保护。
如果是旧版内核支持中断嵌套的,则应该使用 spin_lock_irq()\spin_unlock_irq(),以避免被高优先级中断抢占,从而导致出现死锁情况。
例3:不同的 tasklet、timer 会访问同一个临界区
只需要使用:spin_lock()\spin_unlock(),因为不同的 tasklet 或 timer 是可以在不同的 cpu 并发执行。
注意,如果只有相同的 tasklet 或者 timer 访问临界区,是不需要加锁的,因为相同的 tasklet 或 timer 不会并发,即使是有多个 cpu 也不会。
例4:在一个或者多个软中断(softirq) 中会访问同一个临界区
只需要使用:spin_lock()\spin_unlock(),虽然同一时间一个 cpu 只能执行一个软中断,但其它的 cpu 还是可以并发执行相同的软中断的。
三、workqueue、waitqueue、completion 用锁规则
workqueue(工作队列)是基于内核线程实现、waitqueue(等待队列)工作在用户进程上下文、completion(完成量)是基于等待队列实现也是工作在用户进程上下文,因此它们的用锁规则等同于用户进程。
自旋锁的代码分析
自旋锁在不同的硬件环境的实现不一样,此处分析以最复杂的环境下自旋锁的实现原理,即:SMP 下支持任务抢占的硬件环境。
一、自旋锁初始化
锁的数据结构定义,这个数据结构用的是结构体内嵌共用体设计,理解这点非常重要。
#define TICKET_SHIFT 16 // 指明 owner 和 next 的位宽 typedef struct { union { u32 slock; // 32 位 struct __raw_tickets { u16 owner; // 16 位 解锁计数 u16 next; // 16 位 上锁计数 } tickets; }; } arch_spinlock_t; 锁的初始化 spin_lock_init() :lock->raw_lock 成员变量被设置为 __ARCH_SPIN_LOCK_UNLOCKED { { 0 } },即 lock->tickets.owner = 0, lock->tickets.next = 0 #define spin_lock_init(_lock) \ do { \ spinlock_check(_lock); \ raw_spin_lock_init(&(_lock)->rlock); \ } while (0) # define raw_spin_lock_init(lock) \ do { *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0) #define __RAW_SPIN_LOCK_UNLOCKED(lockname) \ (raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname) #define __RAW_SPIN_LOCK_INITIALIZER(lockname) \ { \ .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED, \ SPIN_DEBUG_INIT(lockname) \ SPIN_DEP_MAP_INIT(lockname) } #define __ARCH_SPIN_LOCK_UNLOCKED { { 0 } }
二、自旋锁上锁的分析
static __always_inline void spin_lock(spinlock_t *lock) { raw_spin_lock(&lock->rlock); // 调用宏 } #define raw_spin_lock(lock) _raw_spin_lock(lock) // 调用 _raw_spin_lock void __lockfunc _raw_spin_lock(raw_spinlock_t *lock) { __raw_spin_lock(lock); // 继续调用 __raw_spin_lock } static inline void __raw_spin_lock(raw_spinlock_t *lock) { // 设置当前进程不可被抢占 preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); // 调用 do_raw_spin_lock LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); }
上面的代码使用 preempt_disable() 设置当前进程不可被抢占,此举可避免在持锁期间被高优先级进程抢占当前进程去访问临界区。
void do_raw_spin_lock(raw_spinlock_t *lock) { debug_spin_lock_before(lock); arch_spin_lock(&lock->raw_lock); // 调用 arch_spin_lock debug_spin_lock_after(lock); } static inline void arch_spin_lock(arch_spinlock_t *lock) { unsigned long tmp; u32 newval; arch_spinlock_t lockval; prefetchw(&lock->slock); __asm__ __volatile__( "1: ldrex %0, [%3]\n" // 4 ------------------------- " add %1, %0, %4\n" // 5 ------------------------- " strex %2, %1, [%3]\n" // 6 ------------------------- " teq %2, #0\n" // 7 ------------------------- " bne 1b" : "=&r" (lockval), "=&r" (newval), "=&r" (tmp) // 1 ----------- : "r" (&lock->slock), "I" (1 << TICKET_SHIFT) // 2 ------------ : "cc"); // 3 指明上述汇编指令会改变条件寄存器 // 8 --------------------------------------------- while (lockval.tickets.next != lockval.tickets.owner) { wfe(); // 让当前 cpu 进入低功耗模式 lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner); } // 9 --------------------------------------------- smp_mb(); }
这部分的代码是实现自旋锁的核心,是通过内嵌汇编指令实现,使用比较关键的汇编指令 ldrex\strex 实现原子访问:
-
ldrex Rx, [Ry]:读取寄存器Ry指向的4字节内存值,将其保存到Rx寄存器中,同时标记对Ry指向内存区域的独占访问
-
strex Rx, Ry, [Rz]:如果发现已经被标记为独占访问了,则将寄存器Ry中的值更新到寄存器Rz指向的内存,并将寄存器Rx设置成0。指令执行成功后,会将独占访问标记位清除。
-
ldrex 负责拷贝数据和独占访问标记,strex 在根据标记存在与否拷贝数据和清除标记的过程是原子操作。
以下为 arch_spin_lock() 的代码进行逐行解释:
-
将 lockval、newval、tmp 局部变量分别与 %0、%1、%2 编号关联(编号对应由编译器指定 CPU 的寄存器)
-
将 &lock->slock 、(1 << TICKET_SHIFT) 分别与 %3、%4 编号关联(编号会由编译器指定 CPU 的寄存器),这里的 TICKET_SHIFT 含义是指明内部变量位宽,代码定义的是 16,表面是 16 位宽的变量,也就是说 %4 是与 1 << 16 关联,便于汇编指令计算。
-
指定此内嵌的汇编指令会修改条件寄存器
-
ldrex 指令实现读取 %3(lock->slock) 里面的数据到 %0(lockval),也就是将入参 lock 的数据拷贝到局部变量 lockval 中,并标记 lock 所在内存的独占访问标记。这一步主要是记录当前锁的计数。
-
add 指令实现将 %0(lockval) + %4(1 << 16) ,结果放到 %1(newval),实现的效果等同于 newval= lockval.slock + (1 << 16)。这个步骤是根据 arch_spinlock_t 数据结构设计的,它内部是一个共同体,slock 与 tickets 使用的是相同的内存空间,slock 的低 16 位等同于 tickets.owner,高 16 位宽等同于 tickets.next。
-
strex 会先检查 %3(lock->slock) 这块内存的独占标记是否还在,如果不在则设置 %2(tmp) 为 1,说明已经被其它线程修改了,如果还在的话设置为 0,将 %1(newval) 数据覆盖到 %3(lock->slock) ,再将独占访问标记清除。这两步主要是借助中间变量 newval 对 next 计数进行自增后,更新到原 lock 里面。
-
teq 和 bne 指令实现判断如果 %2(tmp) 不等于 0,说明已经被其它线程修改了,重新再跳转到标签 1 执行,也就是重新跳转回第 4 个步骤继续执行,否则就继续往下执行。
-
这里主要是不停的判断当前的当前锁的 owen 是否与当前的 next 相等,如果不相等则一直循环检查,这个步骤就是实现了我们所说的自旋功能。当其它持有锁的线程要对该锁进行解锁,解锁的操作会将 owen 自增,当 owen 与当前记录的 next 相等,就会让当前线程退出自旋。
-
自旋退出了,就意味着拿到锁了,就可以访问临界区了。
三、自旋锁解锁的分析
static __always_inline void spin_unlock(spinlock_t *lock) { raw_spin_unlock(&lock->rlock); } #define raw_spin_unlock(lock) _raw_spin_unlock(lock) void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock) { __raw_spin_unlock(lock); } static inline void __raw_spin_unlock(raw_spinlock_t *lock) { spin_release(&lock->dep_map, 1, _RET_IP_); do_raw_spin_unlock(lock); // 2 ------------------------------------- preempt_enable(); } void do_raw_spin_unlock(raw_spinlock_t *lock) { debug_spin_unlock(lock); arch_spin_unlock(&lock->raw_lock); } static inline void arch_spin_unlock(arch_spinlock_t *lock) { smp_mb(); // 1 ------------------------------------- lock->tickets.owner++; dsb_sev(); }
-
对 lock->tickets.owner 进行自增,这样可以让等待锁的线程退出自旋
-
恢复内核对当前线程的抢占
推演自旋锁的工作过程:
从自旋锁刚进行初始化的状态来推演:lock->tickets.owner = 0,lock->tickets.next = 0
- 线程A:开始申请锁:读取锁 lock 到临时变量 A.lockval 并设置该内存区域独占标记,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 0,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0
- 线程B:开始申请锁:读取锁 lock 到临时变量 B.lockval 并设置该内存区域独占标记,此时的状态:
lock->tickets.owner =0,lock->tickets.next = 0,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 0
- 线程A:计算锁序号:对 A.newval = A.lockval.slock + (1<<16) = 0x10000,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 0,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval = 0x10000
4. 线程C:开始申请锁:读取锁 lock 到临时变量 lockvalC 并设置该内存区域独占标记,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 0,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 0
5. 线程B:计算锁序号:对 B.newval = B.lockval.slock + (1<<16) = 0x00 + 0x10000,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 0,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 0,B.newval = 0x10000
- 线程A:更新锁计数:检查该内存区域的独占访问标记存在,将 A.newval 覆盖到 lock->slock 并清除独占标记,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 1,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval = 0x10000
- 线程A:锁持有检测:由于 A.lockval.tickets.next == A.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。
lock->tickets.owner = 0,lock->tickets.next = 1,A.lockval.tickets.owner = 0,A.lockval.tickets.next = 0,A.newval = 0x10000
- 线程B:更新锁计数:检查该内存区域的独占访问标记已被清除,重新读取锁 lock 到临时变量 B.lockval 并设置该内存区域独占标记,此时的状态:
lock->tickets.owner =0,lock->tickets.next = 1,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1
9. 线程B:计算锁序号:对 B.newval = B.lockval.slock + (1<<16) = 0x10000 + 0x10000,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 1,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval = 0x20000
- 线程B:更新锁计数:检查该内存区域的独占访问标记存在,将 B.newval 覆盖到 lock->slock 并清除独占标记,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 2,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval = 0x20000
- 线程B:锁持有检测:由于 B.lockval.tickets.next != B.lockval.tickets.owner,因此进入 while 循环,不停的读取 lock->tickets.owner 并覆盖到 B.lockval.tickets.owner
lock->tickets.owner = 0,lock->tickets.next = 2,B.lockval.tickets.owner = 0,B.lockval.tickets.next = 1,B.newval = 0x20000
12. 线程C:计算锁序号:对 C.newval = C.lockval.slock + (1<<16) = 0x00 + 0x10000,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 0,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 0,C.newval = 0x10000
- 线程C:更新锁计数:检查该内存区域的独占访问标记已被清除,重新读取锁 lock 到临时变量 C.lockval 并设置该内存区域独占标记,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 2,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2
14. 线程C:计算锁序号:对 C.newval = C.lockval.slock + (1<<16) = 0x20000 + 0x10000,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 2,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2,C.newval = 0x30000
- 线程C:更新锁计数:检查该内存区域的独占访问标记存在,将 C.newval 覆盖到 lock->slock 并清除独占标记,此时的状态:
lock->tickets.owner = 0,lock->tickets.next = 3,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2,C.newval = 0x30000
- 线程C:锁持有检测:由于 C.lockval.tickets.next != C.lockval.tickets.owner,因此进入 while 循环,不停的读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner
lock->tickets.owner = 0,lock->tickets.next = 3,C.lockval.tickets.owner = 0,C.lockval.tickets.next = 2
至此,线程B 一直在等待 lock->tickets.owner 等于 1,而 线程C 则一直在等待 lock->tickets.owner 等于 2
- 线程A:访问完临界区后,调用 spin_unlock() 释放锁,lock->tickets.owner = lock->tickets.owner + 1
lock->tickets.owner = 1,lock->tickets.next = 3
- 线程B:锁持有检测:读取 lock->tickets.owner 并覆盖到 B.lockval.tickets.owner,由于 B.lockval.tickets.next == B.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。
lock->tickets.owner = 1,lock->tickets.next = 3,B.lockval.tickets.owner = 1,B.lockval.tickets.next = 1
- 线程C:锁持有检测:继续读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner,由于 C.lockval.tickets.next != C.lockval.tickets.owner,继续循环自旋。
lock->tickets.owner = 1,lock->tickets.next = 3,C.lockval.tickets.owner = 1,C.lockval.tickets.next = 2
- 线程B:访问完临界区后,调用 spin_unlock() 释放锁,lock->tickets.owner = lock->tickets.owner + 1
lock->tickets.owner = 2,lock->tickets.next = 3
- 线程C:锁持有检测:读取 lock->tickets.owner 并覆盖到 C.lockval.tickets.owner,由于 C.lockval.tickets.next == C.lockval.tickets.owner 相等则跳出 while 循环,正式持有锁返回了。
lock->tickets.owner = 2,lock->tickets.next = 3,C.lockval.tickets.owner = 2,C.lockval.tickets.next = 2
整个过程推演完毕,很巧妙的借助两个计数器和局部变量,实现等锁线程的有序排队,该思路也适用于应用程序开发。
自旋锁的原理总结
-
自旋锁是通过 ldrex、strex 来确保读写锁的计数器是原子操作的,这是 arm 芯片级实现的。
-
在上锁的过程中,会有两处循环,第一处是汇编指令循环, 第二处是 C 语言的 while 循环,两个循环的意义不一样:
-
汇编指令循环:实现对锁计数器的原子读写,确保得到的锁数据是最新的,锁的计数更新是准确无误的。
-
C语言的循环:实现锁在等待时的自旋功能,通过比较计数器,实现谁先等待锁,谁就先得到锁的有序排队。
以上两个循环的协同工作,前者实现原子操作和记录,后者实现了有序排队,完成了自旋锁的核心互斥功能。
Copyright © 2024 深圳全志在线有限公司 粤ICP备2021084185号 粤公网安备44030502007680号