变量"/>
[源码解读]深入理解pthread中的condition条件变量
深入理解pthread中的condition条件变量
文章目录
- 深入理解pthread中的condition条件变量
- pthread_cond_init.c
- pthread_cond_wait.c
- pthread_cond_signal.c
- pthread_cond_broadcast.c
1.下面情况适合用pthread_cond_broadcast
- 一个生产者多消费者,生产者能一次产生多个产品的情况。
- 多生产者多消费者
- 读写锁实现(写入之后,通知所有读者)
2.下面情况适合pthread_cond_signal的情况
- 单一生产者,生产者一次生产一个产品的情况,最好一个消费者
注意:pthread_cond_signal在单一异步唤醒的处理线程的情况时,是不安全的
pthread_cond_wait() 用于阻塞当前线程,等待别的线程使用 pthread_cond_signal() 或pthread_cond_broadcast来唤醒它 。 pthread_cond_wait() 必须与pthread_mutex 配套使用。pthread_cond_wait() 函数一进入wait状态就会自动release mutex。当其他线程通过 pthread_cond_signal() 或pthread_cond_broadcast ,把该线程唤醒,使 pthread_cond_wait()通过(返回)时,该线程又自动获得该mutex 。
pthread_cond_signal 函数的作用是发送一个信号给另外一个正在处于阻塞等待状态的线程,使其脱离阻塞状态,继续执行.如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。
使用pthread_cond_signal一般不会有“惊群现象”产生,他最多只给一个线程发信号。假如有多个线程正在阻塞等待着这个条件变量的话,那么是根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行。如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号。但无论如何一个pthread_cond_signal调用最多发信一次。
但是 pthread_cond_signal 在多处理器上可能同时唤醒多个线程,当你只能让一个线程处理某个任务时,其它被唤醒的线程就需要继续 wait。
下面主要从源码级别来讨论一下condition的初始化和唤醒操作
参考源码为 glibc2.17,下载链接
pthread_cond_init.c
condition的初始化
int __pthread_cond_init (cond, cond_attr)pthread_cond_t *cond;const pthread_condattr_t *cond_attr;
{struct pthread_condattr *icond_attr = (struct pthread_condattr *) cond_attr;cond->__data.__lock = LLL_LOCK_INITIALIZER; /// 0cond->__data.__futex = 0;/// icond_attr->value的bit0标识是否进程间共享, Bit 1-7: clock IDcond->__data.__nwaiters = (icond_attr != NULL? ((icond_attr->value >> 1)& ((1 << COND_NWAITERS_SHIFT) - 1)): CLOCK_REALTIME); /// COND_NWAITERS_SHIFT 1, CLOCK_REALTIME 0cond->__data.__total_seq = 0;cond->__data.__wakeup_seq = 0;cond->__data.__woken_seq = 0;cond->__data.__mutex = (icond_attr == NULL || (icond_attr->value & 1) == 0? NULL : (void *) ~0l);cond->__data.__broadcast_seq = 0;LIBC_PROBE (cond_init, 2, cond, cond_attr);return 0;
}
pthreadtypes.h , 数据类型定义
typedef union
{char __size[__SIZEOF_PTHREAD_MUTEXATTR_T];///__SIZEOF_PTHREAD_MUTEXATTR_T 4int __align;
} pthread_mutexattr_t;/* Data structure for conditional variable handling. The structure ofthe attribute type is deliberately not exposed. */
typedef union
{struct{int __lock;unsigned int __futex;__extension__ unsigned long long int __total_seq;__extension__ unsigned long long int __wakeup_seq;__extension__ unsigned long long int __woken_seq;void *__mutex;unsigned int __nwaiters;unsigned int __broadcast_seq;} __data;char __size[__SIZEOF_PTHREAD_COND_T]; ///__SIZEOF_PTHREAD_COND_T 48__extension__ long long int __align;
} pthread_cond_t;typedef union
{char __size[__SIZEOF_PTHREAD_CONDATTR_T]; /// __SIZEOF_PTHREAD_CONDATTR_T 4int __align;
} pthread_condattr_t;/* Keys for thread-specific data */
typedef unsigned int pthread_key_t;/* Once-only execution */
typedef int pthread_once_t;/* Conditional variable attribute data structure. */
struct pthread_condattr
{/* Combination of values:Bit 0 : flag whether coditional variable will be shareable betweenprocesses.Bit 1-7: clock ID. */int value;
};
我们经常看到条件变量的初始化:
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
/* Conditional variable handling. */
#define PTHREAD_COND_INITIALIZER { { 0, 0, 0, 0, 0, (void *) 0, 0, 0 } }typedef union
{struct{int __lock;unsigned int __futex;__extension__ unsigned long long int __total_seq;__extension__ unsigned long long int __wakeup_seq;__extension__ unsigned long long int __woken_seq;void *__mutex;unsigned int __nwaiters;unsigned int __broadcast_seq;} __data;char __size[__SIZEOF_PTHREAD_COND_T]; ///__SIZEOF_PTHREAD_COND_T 48__extension__ long long int __align;
} pthread_cond_t;
atomic.c
atomic_compare_and_exchange_val_acq(mem, newval, oldval)
如果,mem的数值等于oldval, 则mem=newvalatomic_compare_and_exchange_bool_acq(mem, newval, oldval)
如果,mem的数值等于oldval, 则mem=newval,返回为0
/* We have by default no support for atomic operations. So definethem non-atomic. If this is a problem somebody will have to comeup with real definitions. *//* The only basic operation needed is compare and exchange. */
#define atomic_compare_and_exchange_val_acq(mem, newval, oldval) \({ __typeof (mem) __gmemp = (mem); \__typeof (*mem) __gret = *__gmemp; \__typeof (*mem) __gnewval = (newval); \\if (__gret == (oldval)) \*__gmemp = __gnewval; \__gret; })#define atomic_compare_and_exchange_bool_acq(mem, newval, oldval) \({ __typeof (mem) __gmemp = (mem); \__typeof (*mem) __gnewval = (newval); \\*__gmemp == (oldval) ? (*__gmemp = __gnewval, 0) : 1; })
pthread_cond_wait.c
1. 首先加条件变量内部的自旋锁
2. 释放外部的互斥量
3. __total_seq自增,__futex自增,__nwaiters+=1 << COND_NWAITERS_SHIFT,因为_nwaiters有个掩码
/* We have one new user of the condvar. */
++cond->__data.__total_seq;
++cond->__data.__futex;
cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;
4. do{}while (val == seq || cond->__data.__woken_seq == val);val == seq防止被虚假唤醒,由于__futex的改变导致被唤醒,如果有多个消费者处在wait端,signal会唤醒一次,多核CPU中可能会唤醒多次。应用层需要进行再次判断条件是否满足。cond->__data.__woken_seq == val表示有多个线程被唤醒,当一个线程执行了
++cond->__data.__woken_seq后cond->__data.__woken_seq == val成立,下一个线程就不会执行。
struct _condvar_cleanup_buffer
{int oldtype;pthread_cond_t *cond;pthread_mutex_t *mutex;unsigned int bc_seq;
};void
__attribute__ ((visibility ("hidden")))
__condvar_cleanup (void *arg)
{struct _condvar_cleanup_buffer *cbuffer =(struct _condvar_cleanup_buffer *) arg;unsigned int destroying;int pshared = (cbuffer->cond->__data.__mutex == (void *) ~0l)? LLL_SHARED : LLL_PRIVATE;/* We are going to modify shared data. */lll_lock (cbuffer->cond->__data.__lock, pshared);if (cbuffer->bc_seq == cbuffer->cond->__data.__broadcast_seq){/* This thread is not waiting anymore. Adjust the sequence countersappropriately. We do not increment WAKEUP_SEQ if this wouldbump it over the value of TOTAL_SEQ. This can happen if a threadwas woken and then canceled. */if (cbuffer->cond->__data.__wakeup_seq< cbuffer->cond->__data.__total_seq){++cbuffer->cond->__data.__wakeup_seq;++cbuffer->cond->__data.__futex;}++cbuffer->cond->__data.__woken_seq;}cbuffer->cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;/* If pthread_cond_destroy was called on this variable already,notify the pthread_cond_destroy caller all waiters have leftand it can be successfully destroyed. */destroying = 0;if (cbuffer->cond->__data.__total_seq == -1ULL&& cbuffer->cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT)){lll_futex_wake (&cbuffer->cond->__data.__nwaiters, 1, pshared);destroying = 1;}/* We are done. */lll_unlock (cbuffer->cond->__data.__lock, pshared);/* Wake everybody to make sure no condvar signal gets lost. */if (! destroying)lll_futex_wake (&cbuffer->cond->__data.__futex, INT_MAX, pshared);/* Get the mutex before returning unless asynchronous cancellationis in effect. */__pthread_mutex_cond_lock (cbuffer->mutex);
}int __pthread_cond_wait (cond, mutex)pthread_cond_t *cond;pthread_mutex_t *mutex;
{struct _pthread_cleanup_buffer buffer;struct _condvar_cleanup_buffer cbuffer;int err;int pshared = (cond->__data.__mutex == (void *) ~0l)? LLL_SHARED : LLL_PRIVATE;/*Values for 'private' parameter of locking macros. Yes, thedefinition seems to be backwards. But it is not. The bit will bereversed before passing to the system call. #define LLL_PRIVATE 0#define LLL_SHARED FUTEX_PRIVATE_FLAG //128*/LIBC_PROBE (cond_wait, 2, cond, mutex);/* Make sure we are alone. */lll_lock (cond->__data.__lock, pshared);/* Now we can release the mutex. */err = __pthread_mutex_unlock_usercnt (mutex, 0);if (__builtin_expect (err, 0)){lll_unlock (cond->__data.__lock, pshared);return err;}/* We have one new user of the condvar. */++cond->__data.__total_seq;++cond->__data.__futex;cond->__data.__nwaiters += 1 << COND_NWAITERS_SHIFT;/* Remember the mutex we are using here. If there is already adifferent address store this is a bad user bug. Do not storeanything for pshared condvars. */if (cond->__data.__mutex != (void *) ~0l)cond->__data.__mutex = mutex;/* Prepare structure passed to cancellation handler. */cbuffer.cond = cond;cbuffer.mutex = mutex;/* Before we block we enable cancellation. Therefore we have toinstall a cancellation handler. */__pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);/* The current values of the wakeup counter. The "woken" countermust exceed this value. */unsigned long long int val;unsigned long long int seq;val = seq = cond->__data.__wakeup_seq;/* Remember the broadcast counter. */cbuffer.bc_seq = cond->__data.__broadcast_seq;do{unsigned int futex_val = cond->__data.__futex;/* Prepare to wait. Release the condvar futex. */lll_unlock (cond->__data.__lock, pshared);/* Enable asynchronous cancellation. Required by the standard. */cbuffer.oldtype = __pthread_enable_asynccancel ();/* Wait until woken by signal or broadcast. */lll_futex_wait (&cond->__data.__futex, futex_val, pshared);/* Disable asynchronous cancellation. */__pthread_disable_asynccancel (cbuffer.oldtype);/* We are going to look at shared data again, so get the lock. */lll_lock (cond->__data.__lock, pshared);/* If a broadcast happened, we are done. */if (cbuffer.bc_seq != cond->__data.__broadcast_seq)goto bc_out;/* Check whether we are eligible for wakeup. */val = cond->__data.__wakeup_seq;}while (val == seq || cond->__data.__woken_seq == val);/* Another thread woken up. */++cond->__data.__woken_seq;bc_out:cond->__data.__nwaiters -= 1 << COND_NWAITERS_SHIFT;/* If pthread_cond_destroy was called on this varaible already,notify the pthread_cond_destroy caller all waiters have leftand it can be successfully destroyed. */if (cond->__data.__total_seq == -1ULL&& cond->__data.__nwaiters < (1 << COND_NWAITERS_SHIFT))lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);/* We are done with the condvar. */lll_unlock (cond->__data.__lock, pshared);/* The cancellation handling is back to normal, remove the handler. */__pthread_cleanup_pop (&buffer, 0);/* Get the mutex before returning. */return __pthread_mutex_cond_lock (mutex);
}
pthread_cond_signal.c
int
__pthread_cond_signal (cond)pthread_cond_t *cond;
{int pshared = (cond->__data.__mutex == (void *) ~0l)? LLL_SHARED : LLL_PRIVATE;LIBC_PROBE (cond_signal, 1, cond);/* Make sure we are alone. *//// 加锁lll_lock (cond->__data.__lock, pshared);/* Are there any waiters to be woken? *//// 判断消费者有人在waitif (cond->__data.__total_seq > cond->__data.__wakeup_seq){/* Yes. Mark one of them as woken. *//// 自增唤醒序列的长度++cond->__data.__wakeup_seq;/// 自增__futex++cond->__data.__futex;/* Wake one. */if (! __builtin_expect (lll_futex_wake_unlock (&cond->__data.__futex, 1,1, &cond->__data.__lock,pshared), 0))return 0;/// 唤醒1一个&cond->__data.__futexlll_futex_wake (&cond->__data.__futex, 1, pshared);}/* We are done. *//// 解锁lll_unlock (cond->__data.__lock, pshared);return 0;
}
pthread_cond_broadcast.c
int
__pthread_cond_broadcast (cond)pthread_cond_t *cond;
{LIBC_PROBE (cond_broadcast, 1, cond);int pshared = (cond->__data.__mutex == (void *) ~0l)? LLL_SHARED : LLL_PRIVATE;/* Make sure we are alone. */lll_lock (cond->__data.__lock, pshared);/* Are there any waiters to be woken? */if (cond->__data.__total_seq > cond->__data.__wakeup_seq){/* Yes. Mark them all as woken. */cond->__data.__wakeup_seq = cond->__data.__total_seq;cond->__data.__woken_seq = cond->__data.__total_seq;cond->__data.__futex = (unsigned int) cond->__data.__total_seq * 2;int futex_val = cond->__data.__futex;/* Signal that a broadcast happened. */++cond->__data.__broadcast_seq;/* We are done. */lll_unlock (cond->__data.__lock, pshared);/* Do not use requeue for pshared condvars. */if (cond->__data.__mutex == (void *) ~0l)goto wake_all;/* Wake everybody. */pthread_mutex_t *mut = (pthread_mutex_t *) cond->__data.__mutex;/* XXX: Kernel so far doesn't support requeue to PI futex. *//* XXX: Kernel so far can only requeue to the same type of futex,in this case private (we don't requeue for pshared condvars). */if (__builtin_expect (mut->__data.__kind& (PTHREAD_MUTEX_PRIO_INHERIT_NP| PTHREAD_MUTEX_PSHARED_BIT), 0))goto wake_all;/* lll_futex_requeue returns 0 for success and non-zerofor errors. */if (__builtin_expect (lll_futex_requeue (&cond->__data.__futex, 1,INT_MAX, &mut->__data.__lock,futex_val, LLL_PRIVATE), 0)){/* The requeue functionality is not available. */wake_all:/// 唤醒INT_MAX个人lll_futex_wake (&cond->__data.__futex, INT_MAX, pshared);}/* That's all. */return 0;}/* We are done. */lll_unlock (cond->__data.__lock, pshared);return 0;
}
更多推荐
[源码解读]深入理解pthread中的condition条件变量
发布评论