Hello! 欢迎来到小浪云!


带你走进Linux内核源码中最常见的数据结构之「mutex」


avatar
小浪云 2025-01-02 13

1 定义

互斥锁(mutex)是一种用于多线程编程的机制,用于防止多条线程同时对同一公共资源进行读写操作。

为了达到这个目的,互斥锁将代码划分为临界区域(critical section),这部分代码涉及对公共资源的读写操作。一个程序、进程或线程可以拥有多个临界区域,但并不一定都需要应用互斥锁。

举例来说,如果一条线程正在修改数据,而另一条线程被唤醒并尝试读取这些数据,那么就会导致数据的状态不确定,甚至可能导致数据损坏。为了保护多个线程共享的数据,必须确保同一时间只有一个临界区域处于运行状态,其他的临界区域必须被挂起并无法获得运行机会。

互斥锁实现多线程同步的核心思想是,当一个线程访问公共资源时,这个线程会执行“加锁”操作,阻止其他线程访问该资源。访问完成后,线程会执行“解锁”操作,让其他线程可以访问资源。当多个线程竞争访问资源时,只有最先执行“加锁”操作的线程可以访问资源。

在多个线程竞争访问已被锁定的公共资源时,其他线程只能等待资源解锁,形成一个等待队列。一旦资源被解锁,操作系统会唤醒等待队列中的线程,第一个获得资源锁的线程将开始访问资源,而其他线程则继续等待。

带你走进Linux内核源码中最常见的数据结构之「mutex」

mutex有什么缺点?

不同于mutex最初的设计与目的,现在的struct mutex是内核中最大的锁之一,比如在x86-64上,它差不多有32bytes的大小,而Struct samaphore是24bytes,rw_semaphore为40bytes,更大的数据结构意味着占用更多的CPU缓存和更多的内存占用。

什么时候应该使用mutex?

除非mutex的严格语义要求不合适或者临界区域阻止锁的共享,否则相较于其他锁原语来说更倾向于使用mutex

mutex与spinlock的区别?

带你走进Linux内核源码中最常见的数据结构之「mutex」

spinlock是让一个尝试获取它的线程在一个循环中等待的锁,线程在等待时会一直查看锁的状态。而mutex是一个可以让多个进程轮流分享相同资源的机制
spinlock通常短时间持有,mutex可以长时间持有
spinlock任务在等待锁释放时不可以睡眠,mutex可以
看到一个非常有意思的解释:

spinlock就像是坐在车后座的熊孩子,一直问”到了吗?到了吗?到了吗?…“

mutex就像一个司机返回的信号,说”我们到了!“

2 实现

看一下Linux kernel-5.8是如何实现mutex的2 实现

struct mutex {   atomic_long_t    owner;   spinlock_t    wait_lock; #ifdef CONFIG_MUTEX_SPIN_ON_OWNER   struct optimistic_spin_queue osq; /* Spinner MCS lock */ #endif   struct list_head  wait_list; #ifdef CONFIG_DEBUG_MUTEXES   void      *magic; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC   struct lockdep_map  dep_map; #endif }; 

可以看到,mutex使用了原子变量owner来追踪锁的状态,owner实际上是指向当前mutex锁拥有者的struct task_struct *指针,所以当锁没有被持有时,owner为NULL

/*  * This is the control structure for tasks blocked on mutex,  * which resides on the blocked task's kernel stack:  * 表示等待队列wait_list中进程的结构体  */ struct mutex_waiter {   struct list_head  list;   struct task_struct  *task;   struct ww_acquire_ctx  *ww_ctx; #ifdef CONFIG_DEBUG_MUTEXES   void      *magic; #endif }; 

上锁

当要获取mutex时,通常有三种路径方式

fastpath:通过 cmpxchg() 当前任务与所有者来尝试原子性的获取锁。这仅适用于无竞争的情况(cmpxchg() 检查 0UL,因此上面的所有 3 个状态位都必须为 0)。如果锁被争用,它会转到下一个可能的路径。
midpath:又名乐观旋转(optimistic spinning)—在锁的持有者正在运行并且没有其他具有更高优先级(need_resched)的任务准备运行时,通过旋转来获取锁。理由是如果锁的所有者正在运行,它很可能很快就会释放锁。mutex spinner使用 MCS 锁排队,因此只有一个spinner可以竞争mutex。
MCS 锁(由 Mellor-Crummey 和 Scott 提出)是一个简单的自旋锁,具有公平的理想属性,每个 cpu 都试图获取在本地变量上旋转的锁,排队采用的是链表实现的FIFO。它避免了常见的test-and-set自旋锁实现引起的昂贵的cacheline bouncing。类似MCS的锁是专门为睡眠锁的乐观旋转而量身定制的(毕竟如果只是短暂的自旋比休眠效率要高)。自定义 MCS 锁的一个重要特性是它具有额外的属性,即当spinner需要重新调度时,它们能够直接退出 MCS 自旋锁队列。这有助于避免需要重新调度的 MCS spinner持续在mutex持有者上自旋,而仅需直接进入慢速路径获取MCS锁。
slowpath:最后的手段,如果仍然无法获得锁,则将任务添加到等待队列并休眠,直到被解锁路径唤醒。在正常情况下它阻塞为 TASK_UNINTERRUPTIBLE。
虽然正式的内核互斥锁是可休眠的锁,但midpath路径 (ii) 使它们更实际地成为混合类型。通过简单地不中断任务并忙于等待几个周期而不是立即休眠,此锁的性能已被视为显着改善了许多工作负载。请注意,此技术也用于 rw 信号量。

具体代码调用链很长…

/*不可中断的获取锁*/ void __sched mutex_lock(struct mutex *lock) {     might_sleep();     /*fastpath*/     if (!__mutex_trylock_fast(lock))         /*midpath and slowpath*/         __mutex_lock_slowpath(lock); }  __mutex_trylock_fast(lock) -> atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr) -> atomic64_try_cmpxchg_acquire(v, (s64 *)old, new);  __mutex_lock_slowpath(lock)->__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_) ->  __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false)       /*可中断的获取锁*/ int mutex_lock_interruptible(struct mutex *lock); 

尝试上锁

int __sched mutex_trylock(struct mutex *lock) {   bool locked;  #ifdef CONFIG_DEBUG_MUTEXES   DEBUG_LOCKS_WARN_ON(lock->magic != lock); #endif    locked = __mutex_trylock(lock);   if (locked)     mutex_acquire(&lock->dep_map, 0, 1, _RET_IP_);    return locked; }  static inline bool __mutex_trylock(struct mutex *lock) {   return !__mutex_trylock_or_owner(lock); } 

释放锁

void __sched mutex_unlock(struct mutex *lock) { #ifndef CONFIG_DEBUG_LOCK_ALLOC   if (__mutex_unlock_fast(lock))     return; #endif   __mutex_unlock_slowpath(lock, _RET_IP_); } 

跟加锁对称,也有fastpath, midpath, slowpath三条路径。

判断锁状态

bool mutex_is_locked(struct mutex *lock) {   return __mutex_owner(lock) != NULL; } 

很显而易见,mutex持有者不为NULL即表示锁定状态。

3 实际案例

实验:

#include  #include   #define LOOP 1000000  int cnt = 0; int cs1 = 0, cs2 = 0;  void* task(void* args) {     while(1)     {         if(cnt >= LOOP)         {             break;         }         cnt++;         if((int)args == 1) cs1 ++; else cs2++;     }     return NULL; }  int main() {     pthread_t tid1;     pthread_t tid2;     /* create the thread */     pthread_create(&tid1, NULL, task, (void*)1);     pthread_create(&tid2, NULL, task, (void*)2);     /* wait for thread to exit */     pthread_join(tid1, NULL);     pthread_join(tid2, NULL);      printf("cnt = %d cs1=%d cs2=%d total=%d ", cnt,cs1,cs2,cs1+cs2);     return 0; } 

输出:

cnt = 1000000 cs1=958560 cs2=1520226 total=2478786 

正确结果不应该是1000000吗?为什么会出错呢,我们可以从汇编角度来分析一下。

$> g++ -E test.c -o test.i $> g++ -S test.i -o test.s $> vim test.s    .file "test.c"   .globl  _cnt   .bss   .align 4 _cnt:   .space 4   .text   .globl  __Z5task1Pv   .def  __Z5task1Pv;  .scl  2;  .type 32; .endef __Z5task1Pv:   ... 

我们可以看到一个简单的cnt++,对应

movl  _cnt, %eax addl  $1, %eax movl  %eax, _cnt 

CPU先将cnt的值读到寄存器eax中,然后将[eax] + 1,最后将eax的值返回到cnt中,这些操作不是**原子性质(atomic)**的,这就导致cnt被多个线程操作时,+1过程会被打断。

加入mutex保护临界资源

#include  #include   #define LOOP 1000000  pthread_mutex_t mutex; int cnt = 0; int cs1 = 0, cs2 = 0;  void* task(void* args) {     while(1)     {         pthread_mutex_lock(&mutex);         if(cnt >= LOOP)         {             pthread_mutex_unlock(&mutex);             break;         }         cnt++;         pthread_mutex_unlock(&mutex);         if((int)args == 1) cs1 ++; else cs2++;     }     return NULL; }  int main() {     pthread_mutex_init(&mutex , NULL);     pthread_t tid1;     pthread_t tid2;     /* create the thread */     pthread_create(&tid1, NULL, task, (void*)1);     pthread_create(&tid2, NULL, task, (void*)2);     /* wait for thread to exit */     pthread_join(tid1, NULL);     pthread_join(tid2, NULL);      printf("cnt = %d cs1=%d cs2=%d total=%d ", cnt,cs1,cs2,cs1+cs2);     return 0; }  输出: cnt = 1000000 cs1=517007 cs2=482993 total=1000000 

相关阅读