mutex的结构
struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
- count 用于计数,如果为1表示未加锁。0表示已经加锁。负数表示可能存在等待
- wait_lock 自旋锁,用于等待。
- wait_list 等待队列
mutex的加锁操作
mutex调用mutex_lock完成加锁操作。
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
/*
* The locking fastpath is the 1->0 transition from
* 'unlocked' into 'locked' state.
*/
__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
mutex_set_owner(lock);
}
在mutex_lock中先调用__mutex_fastpath_lock进行快加锁,如果加锁失败则调用__mutex_lock_slowpath进行慢加锁。这里先看一下快加锁的步骤,
__mutex_fastpath_lock
static inline void
__mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_dec_return_acquire(count) < 0))
fail_fn(count);
}
__mutex_fastpath_lock最终会调用atomic_dec_return完成count的操作。
#define atomic_dec_return_acquire atomic_dec_return
而这个函数是与体系结构相关的,不同的体系结构有不同的实现,下面的代码以x86结构为例,该函数位于arch/x86/include/asm/atomic.h
/**
* atomic_add_return - add integer and return
* @i: integer value to add
* @v: pointer of type atomic_t
*
* Atomically adds @i to @v and returns @i + @v
*/
static __always_inline int atomic_add_return(int i, atomic_t *v)
{
return i + xadd(&v->counter, i);
}
/**
* atomic_sub_return - subtract integer and return
* @v: pointer of type atomic_t
* @i: integer value to subtract
*
* Atomically subtracts @i from @v and returns @v - @i
*/
static __always_inline int atomic_sub_return(int i, atomic_t *v)
{
return atomic_add_return(-i, v);
}
#define atomic_inc_return(v) (atomic_add_return(1, v))
#define atomic_dec_return(v) (atomic_sub_return(1, v))
#define __xchg_op(ptr, arg, op, lock) \
({ \
__typeof__ (*(ptr)) __ret = (arg); \
switch (sizeof(*(ptr))) { \
case __X86_CASE_B: \
asm volatile (lock #op "b %b0, %1\n" \
: "+q" (__ret), "+m" (*(ptr)) \
: : "memory", "cc"); \
break; \
case __X86_CASE_W: \
asm volatile (lock #op "w %w0, %1\n" \
: "+r" (__ret), "+m" (*(ptr)) \
: : "memory", "cc"); \
break; \
case __X86_CASE_L: \
asm volatile (lock #op "l %0, %1\n" \
: "+r" (__ret), "+m" (*(ptr)) \
: : "memory", "cc"); \
break; \
case __X86_CASE_Q: \
asm volatile (lock #op "q %q0, %1\n" \
: "+r" (__ret), "+m" (*(ptr)) \
: : "memory", "cc"); \
break; \
default: \
__ ## op ## _wrong_size(); \
} \
__ret; \
})
/*
* xadd() adds "inc" to "*ptr" and atomically returns the previous
* value of "*ptr".
*
* xadd() is locked when multiple CPUs are online
*/
#define __xadd(ptr, inc, lock) __xchg_op((ptr), (inc), xadd, lock)
#define xadd(ptr, inc) __xadd((ptr), (inc), LOCK_PREFIX)
最终调用xadd命令完成操作。xadd命令的语义是(http://x86.renejeschke.de/html/file_module_x86_id_327.html):
Exchanges the first operand (destination operand) with the second operand (source operand), then loads the sum of the two values into the destination operand. The destination operand can be a register or a memory location; the source operand is a register. This instruction can be used with a LOCK prefix to allow the instruction to be executed atomically.
用伪代码描述就是,
Temporary = Source + Destination;
Source = Destination;
Destination = Temporary;
回到上面的代码
asm volatile (lock #op "w %w0, %1\n" \
: "+r" (__ret), "+m" (*(ptr)) \
: : "memory", "cc");
__ret对应的是-1,ptr对应的mutex->count。上面的代码执行后,__ret指向的值变为mutex-> count, ptr指向的值变为mutex->count - 1,也就是说值递减了1。代码中的volatile保证指令的顺序,lock保证了操作的原子性。xadd “w %w0, %1”中的w表示将操作数转为2个字节宽,同理l表示4字节宽。为了验证这个结论我们可以写一个简单的例子,
#include<stdio.h>
#include <stdlib.h>
main() {
int i=10,j=11;
asm("xadd %0, %1\n": "+q" (i), "+m" (j): : "memory", "cc");
printf("i=%d j=%d\n",i,j);
}
使用gcc编译上面的代码,并且使用gdb进行调试,
gcc -g -o xadd xadd.c
gdb xadd
使用disas看代码与汇编的对应关系,
(gdb) disas /m main
Dump of assembler code for function main:
3 main() {
0x0000000000400596 <+0>: push %rbp
0x0000000000400597 <+1>: mov %rsp,%rbp
0x000000000040059a <+4>: sub $0x10,%rsp
0x000000000040059e <+8>: mov %fs:0x28,%rax
0x00000000004005a7 <+17>: mov %rax,-0x8(%rbp)
0x00000000004005ab <+21>: xor %eax,%eax
4
5 int i=10,j=11;
0x00000000004005ad <+23>: movl $0xa,-0xc(%rbp)
0x00000000004005b4 <+30>: movl $0xb,-0x10(%rbp)
6
7 asm("xadd %0, %1\n": "+q" (i), "+m" (j): : "memory", "cc");
0x00000000004005bb <+37>: mov -0xc(%rbp),%eax
0x00000000004005be <+40>: xadd %eax,-0x10(%rbp)
=> 0x00000000004005c2 <+44>: mov %eax,-0xc(%rbp)
8
9 printf("i=%d j=%d\n",i,j);
0x00000000004005c5 <+47>: mov -0x10(%rbp),%edx
0x00000000004005c8 <+50>: mov -0xc(%rbp),%eax
0x00000000004005cb <+53>: mov %eax,%esi
0x00000000004005cd <+55>: mov $0x400684,%edi
0x00000000004005d2 <+60>: mov $0x0,%eax
0x00000000004005d7 <+65>: callq 0x400470 <printf@plt>
0x00000000004005dc <+70>: mov $0x0,%eax
10 }
0x00000000004005e1 <+75>: mov -0x8(%rbp),%rcx
0x00000000004005e5 <+79>: xor %fs:0x28,%rcx
0x00000000004005ee <+88>: je 0x4005f5 <main+95>
0x00000000004005f0 <+90>: callq 0x400460 <__stack_chk_fail@plt>
0x00000000004005f5 <+95>: leaveq
0x00000000004005f6 <+96>: retq
asm(“xadd %0, %1\n”: “+q” (i), “+m” (j): : “memory”, “cc”); 这个调用被编译成了三条指令,
## 赋值
0x00000000004005ad <+23>: movl $0xa,-0xc(%rbp) %rbp - 0xc 中存放10(变量i)
0x00000000004005b4 <+30>: movl $0xb,-0x10(%rbp) %rbp - 0x10中存放11(变量j)
## 开始执行
0x00000000004005bb <+37>: mov -0xc(%rbp),%eax %eax中存放10 第一个操作数
0x00000000004005be <+40>: xadd %eax,-0x10(%rbp) %eax存放11,%rbp - 0x10存放的21(变量j)
0x00000000004005c2 <+44>: mov %eax,-0xc(%rbp) %rbp - 0xc 中存放的11(变量i)
## 结束执行
再回到__xchg_op函数,该函数最终返回的就是mutex->count的原始值。
__mutex_lock_slowpath
__mutex_lock_slowpath最终会走入如下的代码,
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct task_struct *task = current;
struct mutex_waiter waiter;
unsigned long flags;
int ret;
if (use_ww_ctx) {
struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;
}
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
/* got the lock, yay! */
preempt_enable();
return 0;
}
spin_lock_mutex(&lock->wait_lock, flags);
/*
* Once more, try to acquire the lock. Only try-lock the mutex if
* it is unlocked to reduce unnecessary xchg() operations.
*/
if (!mutex_is_locked(lock) &&
(atomic_xchg_acquire(&lock->count, 0) == 1))
goto skip_wait;
debug_mutex_lock_common(lock, &waiter);
debug_mutex_add_waiter(lock, &waiter, task);
/* add waiting tasks to the end of the waitqueue (FIFO): */
list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;
lock_contended(&lock->dep_map, ip);
for (;;) {
/*
* Lets try to take the lock again - this is needed even if
* we get here for the first time (shortly after failing to
* acquire the lock), to make sure that we get a wakeup once
* it's unlocked. Later on, if we sleep, this is the
* operation that gives us the lock. We xchg it to -1, so
* that when we release the lock, we properly wake up the
* other waiters. We only attempt the xchg if the count is
* non-negative in order to avoid unnecessary xchg operations:
*/
if (atomic_read(&lock->count) >= 0 &&
(atomic_xchg_acquire(&lock->count, -1) == 1))
break;
/*
* got a signal? (This code gets eliminated in the
* TASK_UNINTERRUPTIBLE case.)
*/
if (unlikely(signal_pending_state(state, task))) {
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx->acquired > 0) {
ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
if (ret)
goto err;
}
__set_task_state(task, state);
/* didn't get the lock, go to sleep: */
spin_unlock_mutex(&lock->wait_lock, flags);
schedule_preempt_disabled();
spin_lock_mutex(&lock->wait_lock, flags);
}
__set_task_state(task, TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, task);
/* set it to 0 if there are no waiters left: */
if (likely(list_empty(&lock->wait_list)))
atomic_set(&lock->count, 0);
debug_mutex_free_waiter(&waiter);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
mutex_set_owner(lock);
if (use_ww_ctx) {
struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
ww_mutex_set_context_slowpath(ww, ww_ctx);
}
spin_unlock_mutex(&lock->wait_lock, flags);
preempt_enable();
return 0;
err:
mutex_remove_waiter(lock, &waiter, task);
spin_unlock_mutex(&lock->wait_lock, flags);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
上面这个函数主要分为几步,
- 优化的自旋锁 mutex_optimistic_spin。如果当前锁的拥有者正在其他cpu处理器上运行,则自旋尝试获取锁。如果锁的拥有者不在cpu上运行了或者该进程需要让出cpu则跳出自旋。如果锁的拥有者让出锁了,则结束自旋,并且尝试获取锁,如果获取失败,则获取新的锁拥有者,进入到下一次的自旋。
static bool mutex_optimistic_spin(struct mutex *lock,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
while (true) {
struct task_struct *owner;
......
// 一次性获取当前锁的拥有者
owner = READ_ONCE(lock->owner);
if (owner && !mutex_spin_on_owner(lock, owner))
// 如果锁拥有者让出cpu或者当前进程需要被调度,则退出循环
break;
/* Try to acquire the mutex if it is unlocked. */
// 尝试获取锁
if (mutex_try_to_acquire(lock)) {
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx) {
struct ww_mutex *ww;
ww = container_of(lock, struct ww_mutex, base);
ww_mutex_set_context_fastpath(ww, ww_ctx);
}
mutex_set_owner(lock);
osq_unlock(&lock->osq);
return true;
}
......
}
}
static noinline
bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{
bool ret = true;
rcu_read_lock();
while (lock->owner == owner) {
/*
* Ensure we emit the owner->on_cpu, dereference _after_
* checking lock->owner still matches owner. If that fails,
* owner might point to freed memory. If it still matches,
* the rcu_read_lock() ensures the memory stays valid.
*/
barrier();
// 锁拥有者让出cpu或者当前进程需要被调度
if (!owner->on_cpu || need_resched()) {
ret = false;
break;
}
cpu_relax_lowlatency();
}
rcu_read_unlock();
return ret;
}
- 使用spinlock获取自旋锁
- 再次尝试获取锁,获取成功后释放自旋锁。并且设置当前进程为锁的拥有者。
if (!mutex_is_locked(lock) && (atomic_xchg_acquire(&lock->count, 0) == 1)) goto skip_wait;
- 添加到等待队列
list_add_tail(&waiter.list, &lock->wait_list); waiter.task = task;
- 进入到循环体中,如果发现锁被释放了则退出循环。如果没有获取锁,释放自旋锁,进入睡眠等待唤醒。如果唤醒了,则继续获取自旋锁。
- 跳出循环后,将该进程从等待队列中移除。
mutex_remove_waiter(lock, &waiter, task); /* set it to 0 if there are no waiters left: */ if (likely(list_empty(&lock->wait_list))) atomic_set(&lock->count, 0);