深入了解glibc的互斥锁的加锁过程

编程入门 行业动态 更新时间:2024-10-18 03:37:07

深入了解glibc的互斥锁的<a href=https://www.elefans.com/category/jswz/34/1766067.html style=加锁过程"/>

深入了解glibc的互斥锁的加锁过程

深入了解glibc的互斥锁

互斥锁是多线程同步时常用的手段,使用互斥锁可以保护对共享资源的操作。共享资源也被称为临界区,当一个线程对一个临界区加锁后,其他线程就不能进入该临界区,直到持有临界区锁的线程释放该锁。

本文以glibc中mutex的实现为例,讲解其背后的实现原理。

glibc mutex类型

glibc的互斥锁的类型名称为pthread_mutex_t,其结构可以用下面的结构体表示:

typedef struct {int __lock;int __count;int __owner;int __nusers;int __kind;// other ignore
} pthread_mutex_t;

其中:

  • __lock表示当前mutex的状态,0表示没有被加锁,1表示mutex已经被加锁,2表示mutex被某个线程持有并且有另外的线程在等待它的释放。
  • __count表示mutex被加锁的次数,对于不可重入锁,该值为0或者1,对于可重入锁,count可以大于1。
  • __owner用来记录持有当前mutex的线程id
  • __nusers用于记录多少个线程持有该互斥锁,一般来说该值只能是0或者1,但是对于读写锁,多个读线程可以共同持有锁,因此nusers通常用于读写锁的场景下。
  • __kind表示锁的类型

pthread_mutex_t锁可以是如下的类型:

  • PTHREAD_MUTEX_TIMED_NP: 普通锁,当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。当锁unlock时,会唤醒等待队列中的一个线程。
  • PTHREAD_MUTEX_RECURSIVE_NP: 可重入锁,如果线程没有获得该mutex的情况下,争用该锁,那么与PTHREAD_MUTEX_TIMED_NP一样。如果一个线程已经获取锁,其可以再次获取锁,并通过多次unlock解锁。
  • PTHREAD_MUTEX_ERRORCHECK_NP: 检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,而不是死锁,其他点和PTHREAD_MUTEX_TIMED_NP相同。
  • PTHREAD_MUTEX_ADAPTIVE_NP: 自适应锁,此锁在多核处理器下首先进行自旋获取锁,如果自旋次数超过配置的最大次数,则也会陷入内核态挂起。

mutex的加锁过程

本文使用的源码是glibc-2.34版本,.34.tar.gz。

本文主要侧重于讲解互斥锁从用户态到内核态的加锁过程,而不同类型锁的实现细节,本文不重点讨论。后续将在其他文章中做探讨。

下面就以最简单的类型PTHREAD_MUTEX_TIMED_NP来跟踪加锁过程,从___pthread_mutex_lock开始看起,其定义在pthread_mutex_lock.c中。

如下所示,PTHREAD_MUTEX_TIMED_NP的锁会调用lll_mutex_lock_optimized方法进行加锁,如下所示:

  if (__builtin_expect (type & ~(PTHREAD_MUTEX_KIND_MASK_NP| PTHREAD_MUTEX_ELISION_FLAGS_NP), 0))return __pthread_mutex_lock_full (mutex);if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP)){FORCE_ELISION (mutex, goto elision);simple:/* Normal mutex.  */LLL_MUTEX_LOCK_OPTIMIZED (mutex);assert (mutex->__data.__owner == 0);}

lll_mutex_lock_optimized也定义在nptl/pthread_mutex_lock.c文件中,从注释了解到,这是为单线程进行的优化,如果是单线程,则直接将mutex的__lock的值修改为1(因为不存在竞争),如果不是单线程,则调用lll_lock方法。

#ifndef LLL_MUTEX_LOCK
/* lll_lock with single-thread optimization.  */
static inline void
lll_mutex_lock_optimized (pthread_mutex_t *mutex)
{/* The single-threaded optimization is only valid for privatemutexes.  For process-shared mutexes, the mutex could be in ashared mapping, so synchronization with another process is neededeven without any threads.  If the lock is already marked asacquired, POSIX requires that pthread_mutex_lock deadlocks fornormal mutexes, so skip the optimization in that case aswell.  */int private = PTHREAD_MUTEX_PSHARED (mutex);if (private == LLL_PRIVATE && SINGLE_THREAD_P && mutex->__data.__lock == 0)mutex->__data.__lock = 1;elselll_lock (mutex->__data.__lock, private);
}# define LLL_MUTEX_LOCK(mutex)						\lll_lock ((mutex)->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex))

lll_lock定义在sysdeps/nptl/lowlevellock.h文件中,又会调用到**__lll_lock方法,由于存在竞争,因此在__lll_lock方法中使用了CAS方法**尝试对mutex的__lock值进行修改。

CAS是compare-and-swap的含义,其是原子变量的实现的基础,其伪代码如下所示,即当内存mem出的值如果等于old_value,则将其替换为new_value,这个过程是原子的,底层由CMPXCHG指令保证。

bool CAS(T* mem, T new_value, T old_value) {if (*mem == old_value) {*mem = new_value;return true;} else {return false;}
}

__lll_lock中的atomic_compare_and_exchange_bool_acq就是上述所说的CAS方法,如果futex = 0,则尝试将其修改为1,表示加锁成功, 如果futex >= 1,则会调用**__lll_lock_wait_private或者__lll_lock_wait**。注意这里的futex其实就是mutex结构体中的__lock。

#define __lll_lock(futex, private)                                      \((void)                                                               \({                                                                   \int *__futex = (futex);                                            \if (__glibc_unlikely                                               \(atomic_compare_and_exchange_bool_acq (__futex, 1, 0)))        \{                                                                \if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \__lll_lock_wait_private (__futex);                           \else                                                           \__lll_lock_wait (__futex, private);                          \}                                                                \}))
#define lll_lock(futex, private)	\__lll_lock (&(futex), private)

__lll_lock_wait_private和__lll_lock_wait是类似的,这里首先会调用atomic_exchange_acquire将futex的旧值和2进行交换,返回值是futex的旧值

因此如果其返回值不为0,代表当前锁还是加锁状态,可能需要进入内核态等待(调用futex_wait)。如果其返回0,则代表,当前锁已经被释放,加锁成功,退出循环。

注意futex值修改为2的目的是为了提高pthread_mutex_unlock的效率。在pthread_mutex_unlock中,会调用atomic_exchange_rel()无条件的把mutex->__lock的值更新为0,并且检查mutex->__lock的原始值,如果原始值为0或者1,表示没有竞争发生,自然也就没有必要调用futex系统调用,浪费时间。只有检查到mutex->__lock的值大于1的时候,才需要调用futex系统调用,唤醒等待该锁上的线程。

void
__lll_lock_wait_private (int *futex)
{if (atomic_load_relaxed (futex) == 2)goto futex;while (atomic_exchange_acquire (futex, 2) != 0){futex:LIBC_PROBE (lll_lock_wait_private, 1, futex);futex_wait ((unsigned int *) futex, 2, LLL_PRIVATE); /* Wait if *futex == 2.  */}
}
libc_hidden_def (__lll_lock_wait_private)void
__lll_lock_wait (int *futex, int private)
{if (atomic_load_relaxed (futex) == 2)goto futex;while (atomic_exchange_acquire (futex, 2) != 0){futex:LIBC_PROBE (lll_lock_wait, 1, futex);futex_wait ((unsigned int *) futex, 2, private); /* Wait if *futex == 2.  */}
}

__lll_lock_wait_private和__lll_lock_wait调用了futex_wait,该函数相对简单,其内部将会调用lll_futex_timed_wait方法。

static __always_inline int
futex_wait (unsigned int *futex_word, unsigned int expected, int private)
{int err = lll_futex_timed_wait (futex_word, expected, NULL, private);switch (err){case 0:case -EAGAIN:case -EINTR:return -err;case -ETIMEDOUT: /* Cannot have happened as we provided no timeout.  */case -EFAULT: /* Must have been caused by a glibc or application bug.  */case -EINVAL: /* Either due to wrong alignment or due to the timeout notbeing normalized.  Must have been caused by a glibc orapplication bug.  */case -ENOSYS: /* Must have been caused by a glibc bug.  *//* No other errors are documented at this time.  */default:futex_fatal_error ();}
}

lll_futex_timed_wait方法其实是对sys_futex系统调用的封装,其最终将调用sys_futex方法。

# define lll_futex_timed_wait(futexp, val, timeout, private)     \lll_futex_syscall (4, futexp,                                 \__lll_private_flag (FUTEX_WAIT, private),  \val, timeout)# define lll_futex_syscall(nargs, futexp, op, ...)                      \({                                                                    \long int __ret = INTERNAL_SYSCALL (futex, nargs, futexp, op, 	\__VA_ARGS__);                    \(__glibc_unlikely (INTERNAL_SYSCALL_ERROR_P (__ret))         	\? -INTERNAL_SYSCALL_ERRNO (__ret) : 0);                     	\})#define __NR_futex 202
#undef INTERNAL_SYSCALL
#define INTERNAL_SYSCALL(name, nr, args...)				\internal_syscall##nr (SYS_ify (name), args)#undef SYS_ify
#define SYS_ify(syscall_name)	__NR_##syscall_name#undef internal_syscall4
#define internal_syscall4(number, arg1, arg2, arg3, arg4)		\
({									\unsigned long int resultvar;					\TYPEFY (arg4, __arg4) = ARGIFY (arg4);			 	\TYPEFY (arg3, __arg3) = ARGIFY (arg3);			 	\TYPEFY (arg2, __arg2) = ARGIFY (arg2);			 	\TYPEFY (arg1, __arg1) = ARGIFY (arg1);			 	\register TYPEFY (arg4, _a4) asm ("r10") = __arg4;			\register TYPEFY (arg3, _a3) asm ("rdx") = __arg3;			\register TYPEFY (arg2, _a2) asm ("rsi") = __arg2;			\register TYPEFY (arg1, _a1) asm ("rdi") = __arg1;			\asm volatile (							\"syscall\n\t"							\: "=a" (resultvar)							\: "0" (number), "r" (_a1), "r" (_a2), "r" (_a3), "r" (_a4)		\: "memory", REGISTERS_CLOBBERED_BY_SYSCALL);			\(long int) resultvar;						\
})

sys_futex的函数原型如下所示:

int sys_futex (int *uaddr, int op, int val, const struct timespec *timeout);

其作用是原子性的检查uaddr中计数器的值是否为val,如果是则让进程休眠,直到FUTEX_WAKE或者超时(time-out)。也就是把进程挂到uaddr相对应的等待队列上去。

这里实际上就是检查mutex的**__lock是否等于2**。

  • 如果不等于2,意味着,锁可能已经被释放,不需要将线程添加到sleep队列,sys_futex直接返回,重新尝试加锁。
  • 如果等于2,则意味着用户态到内核段的这段时间内,锁的值没有发生变化,于是将线程添加到sleep队列,等待其他线程释放锁。

glibc的mutex的加锁是用户态的原子操作和内核态sys_futex共同作用的结果,上述过程可以用下面这张流程图来概括:

mutex的解锁过程

同样从最简单的PTHREAD_MUTEX_TIMED_NP看起,其调用了lll_mutex_unlock_optimized方法进行unlock。

  if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP)== PTHREAD_MUTEX_TIMED_NP){/* Always reset the owner field.  */normal:mutex->__data.__owner = 0;if (decr)/* One less user.  */--mutex->__data.__nusers;/* Unlock.  */lll_mutex_unlock_optimized (mutex);LIBC_PROBE (mutex_release, 1, mutex);return 0;}

lll_mutex_unlock_optimized是对单线程解锁进行优化的函数。如果是单线程,意味着没有竞争,则可以直接将锁的值__lock修改为0。如果是多线程,则调用lll_unlock方法。

static inline void
lll_mutex_unlock_optimized (pthread_mutex_t *mutex)
{/* The single-threaded optimization is only valid for privatemutexes.  For process-shared mutexes, the mutex could be in ashared mapping, so synchronization with another process is neededeven without any threads.  */int private = PTHREAD_MUTEX_PSHARED (mutex);if (private == LLL_PRIVATE && SINGLE_THREAD_P)mutex->__data.__lock = 0;elselll_unlock (mutex->__data.__lock, private);
}

lll_unlock定义在sysdeps/nptl/lowlevellock.h中。这里调用atomic_exchange_rel原子性地将__futex与0进行交换,并将原始值存放在__oldval中。

如果__oldval小于等于1,意味着没有竞争,不需要唤醒其他线程。如果__oldval大于1,则意味着需要唤醒其他线程,这样就会调用__lll_lock_wake_private或__lll_lock_wake进行线程唤醒。

#define __lll_unlock(futex, private)					\((void)								\({									\int *__futex = (futex);						\int __private = (private);						\int __oldval = atomic_exchange_rel (__futex, 0);			\if (__glibc_unlikely (__oldval > 1))				\{								\if (__builtin_constant_p (private) && (private) == LLL_PRIVATE) \__lll_lock_wake_private (__futex);                           \else                                                           \__lll_lock_wake (__futex, __private);			\}								\}))
#define lll_unlock(futex, private)	\__lll_unlock (&(futex), private)

__lll_lock_wake_private和__lll_lock_wake是相似的方法,其都将调用lll_futex_wake。

void
__lll_lock_wake_private (int *futex)
{lll_futex_wake (futex, 1, LLL_PRIVATE);
}
libc_hidden_def (__lll_lock_wake_private)void
__lll_lock_wake (int *futex, int private)
{lll_futex_wake (futex, 1, private);
}

lll_futex_wake实际上也是对sys_futex系统调用的封装,其将传入FUTEX_WAKE参数以唤醒一个线程。

/* Wake up up to NR waiters on FUTEXP.  */
# define lll_futex_wake(futexp, nr, private)                             \lll_futex_syscall (4, futexp,                                         \__lll_private_flag (FUTEX_WAKE, private), nr, 0)
# define lll_futex_syscall(nargs, futexp, op, ...)                      \({                                                                    \long int __ret = INTERNAL_SYSCALL (futex, nargs, futexp, op, 	\__VA_ARGS__);                    \(__glibc_unlikely (INTERNAL_SYSCALL_ERROR_P (__ret))         	\? -INTERNAL_SYSCALL_ERRNO (__ret) : 0);                     	\})
#undef internal_syscall4
#define internal_syscall4(number, arg1, arg2, arg3, arg4)		\
({									\unsigned long int resultvar;					\TYPEFY (arg4, __arg4) = ARGIFY (arg4);			 	\TYPEFY (arg3, __arg3) = ARGIFY (arg3);			 	\TYPEFY (arg2, __arg2) = ARGIFY (arg2);			 	\TYPEFY (arg1, __arg1) = ARGIFY (arg1);			 	\register TYPEFY (arg4, _a4) asm ("r10") = __arg4;			\register TYPEFY (arg3, _a3) asm ("rdx") = __arg3;			\register TYPEFY (arg2, _a2) asm ("rsi") = __arg2;			\register TYPEFY (arg1, _a1) asm ("rdi") = __arg1;			\asm volatile (							\"syscall\n\t"							\: "=a" (resultvar)							\: "0" (number), "r" (_a1), "r" (_a2), "r" (_a3), "r" (_a4)		\: "memory", REGISTERS_CLOBBERED_BY_SYSCALL);			\(long int) resultvar;						\
})

glibc不同属性的mutex的实现

上面的章节主要聚焦在PTHREAD_MUTEX_TIMED_NP的实现,下面将对不同属性的mutex的实现进行讲解。

PTHREAD_MUTEX_TIMED_NP

PTHREAD_MUTEX_TIMED_NP在上面已经有了大幅章节进行讲解,这里再回顾一下,其行为相对简单,直接调用LLL_MUTEX_LOCK_OPTIMIZED去竞争锁

if (__glibc_likely (type == PTHREAD_MUTEX_TIMED_NP))
{FORCE_ELISION (mutex, goto elision);
simple:/* Normal mutex.  */LLL_MUTEX_LOCK_OPTIMIZED (mutex);assert (mutex->__data.__owner == 0);
}

PTHREAD_MUTEX_RECURSIVE_NP

可重入锁意味着同一个线程可以对互斥锁加锁多次。下面是源码的实现部分。首先获取了线程的id,判断锁的owner的线程id和当前线程的id是否相等,如果相等则将锁的count值加1。如果锁的owner的线程id和当前线程的id不相等,则使用LLL_MUTEX_LOCK_OPTIMIZED进行加锁(这里就和普通锁一样了)。

  else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)== PTHREAD_MUTEX_RECURSIVE_NP, 1)){/* Recursive mutex.  */pid_t id = THREAD_GETMEM (THREAD_SELF, tid);/* Check whether we already hold the mutex.  */if (mutex->__data.__owner == id){/* Just bump the counter.  */if (__glibc_unlikely (mutex->__data.__count + 1 == 0))/* Overflow of the counter.  */return EAGAIN;++mutex->__data.__count;return 0;}LLL_MUTEX_LOCK_OPTIMIZED (mutex);assert (mutex->__data.__owner == 0);mutex->__data.__count = 1;}

PTHREAD_MUTEX_ADAPTIVE_NP

自适应锁在没有加锁成功时,会进行自旋,当自旋超过一定次数时,将调用LLL_MUTEX_LOCK尝试加锁,加锁失败将睡眠。

  else if (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex)== PTHREAD_MUTEX_ADAPTIVE_NP, 1)){if (LLL_MUTEX_TRYLOCK (mutex) != 0){int cnt = 0;int max_cnt = MIN (max_adaptive_count (),mutex->__data.__spins * 2 + 10);do{if (cnt++ >= max_cnt){LLL_MUTEX_LOCK (mutex);break;}atomic_spin_nop ();}while (LLL_MUTEX_TRYLOCK (mutex) != 0);mutex->__data.__spins += (cnt - mutex->__data.__spins) / 8;}assert (mutex->__data.__owner == 0);}

PTHREAD_MUTEX_ERRORCHECK_NP

检错锁的作用是避免同一个线程对同一个互斥锁加锁多次导致死锁。这里就是获取了锁的owner的线程id,并于当前的线程id进行对比,如果相等,则返回EDEADLK错误。

  else{pid_t id = THREAD_GETMEM (THREAD_SELF, tid);assert (PTHREAD_MUTEX_TYPE (mutex) == PTHREAD_MUTEX_ERRORCHECK_NP);/* Check whether we already hold the mutex.  */if (__glibc_unlikely (mutex->__data.__owner == id))return EDEADLK;goto simple;}

总结

本文从glic源码去分析了互斥锁的底层实现原理,其实现包含了用户态到内核态的过程,利用了CAS技术和sys_futex系统调用。互斥锁也有多种属性,构成了普通锁/可重入锁/检错锁/自适应锁等类型,每种类型都有其各自的特性,开发时,需要结合场景选择合适的属性。

更多推荐

深入了解glibc的互斥锁的加锁过程

本文发布于:2024-02-25 14:19:09,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1699258.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:加锁   过程   互斥   glibc

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!