利⽤⽆锁队列实现的协程池,简约⽽不简单

编程入门 行业动态 更新时间:2024-10-11 23:22:22

利⽤⽆锁<a href=https://www.elefans.com/category/jswz/34/1771257.html style=队列实现的协程池,简约⽽不简单"/>

利⽤⽆锁队列实现的协程池,简约⽽不简单

前言

众所周知,golang协程的创建、销毁、调度是非常轻量的,但是即便再轻量,规模大了开销也不能忽视的。比如利用协程处理http请求,每个请求用一个协程处理,当QPS上万的时候,资源消耗还是比较大的。

协程池和线程池一样,池子中都是热协程,需要的时候取出来,用完后归还,避免了高频率的创建和销毁。同时协程池还能将空闲超时的协程销毁来释放资源,并且还有一定保护能力,通过设定协程最大数量避免无休止的创建协程把系统资源消耗殆尽。

总之,golang虽然提供了非常轻量且容易使用的协程编程环境,但是不同的应用场景对于协程的使用需求也是不同的,协程池就是一种非常通用的应用场景。

无锁队列

在介绍协程池的实现之前需要简单说明一下无锁队列,关于无锁队列的实现网上有很多文章,此处只简单的说一些根本文实现有关的重点内容:CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,就可以用其来实现各种无锁(lock free)的数据结构,本文使用的就是atomic.CompareAndSwapPointer。

实现

本文实现的源码开源连接为:.git,接下来进入CAAD(代码即文档)模式,通篇只有代码和注释!

接口

// Pool定义了协程池的接口
type Pool interface {Name() string             // 获取协程池的名字,当有多个协程池对象的时候可以用名字区分不同的协程池Capacity() int32          // 获取协程池的容量,即最大协程数量Tune(size int32)          // 修改协程池的容量Status() Status           // 获取协程池的状态,关于状态下面有定义Go(Routine) error         // 执行(阻塞)Routine,关于Routine下面也有定义GoNonblock(Routine) error // 非阻塞执行Routine,当协程数量达到最大值且无空闲协程时立刻返回Close()                   // 关闭协程池
}// Routine定义了协程池执行的函数,context避免协程池关闭的时候协程被阻塞,也就是说协程池的使用者
// 需要将函数实现成Routine形式才能被协程池调用,
type Routine func(context.Context)// Status定义了协程池的状态
type Status struct {Runnings int32 // 运行中的协程数量Idles    int32 // 空闲的协程数量
}

协程

// coroutine 定义了协程
type coroutine struct {rc     chan Routine  // Routine的chan,Pool.Go(Routine)通过rc传递给协程执行pool   *pool         // 协程池指针,每个协程通过pool指向协程池(pool是Pool的实现)active time.Time     // 活跃时间,最后一次执行完Routine的时间,用于清理空闲超时的协程next   *coroutine    // 下一个协程,所谓无锁队列就是用这个变量将协程形成了队列
}// run是协程的运行函数
func (c *coroutine) run() {// 此处只需要知道pool有一个sync.WaitGroup的成员变量wg,用来等待所有协程退出,// 所以协程退出的时候需要调动Donedefer c.pool.wg.Done()// 前面提到了,通过chan Routine获取函数for r := range c.rc {// 空指针表示协程需要退出,比如协程池关闭或者协程空闲超时都会收到nilif r == nil {return}// 执行Routine,此处传入了协程池的context,建议Routine的实现select该contextr(c.pool.ctx)// 执行完函数,将该协程放到协程池的空闲队列,此处开始进入本文的核心内容了c.pool.pushIdle(c)}
}

无锁队列

// pushIdle把协程放入空闲协程队列
func (p *pool) pushIdle(c *coroutine) {// 此时协程已经执行完Routine,需要记录一下最后的活跃时间c.active = time.Now()for {// 获取空闲队列的第一个协程,即队列头,clean表示协程池是否正在清理空闲队列head, clean := p.idleHead()if clean {// 如果协程池正在清理空闲协程,需要等清理完毕后再把协程放入到空闲队列中,// 如何才能知道协程池清理完了呢?chan或者sync.Cond应该是比较容易想到的方案,// 笔者采用了自旋的方案,因为清理空闲协程非常快且不频繁,自旋是性能最好的方法。// 此处使用了runtime.Gosched()实现自旋,此时立查询清理是否完成多半还是在清理中,// 倒不如把时间片让出来给其他协程,实在没事干了再去查询清理状态会更有效的利用CPU。// runtime.Gosched()会让协程释放CPU时间片,笔者此处问一个问题,如果不调用该函数,// 采用死循环的方式自旋查询清理状态(即把runtime.Gosched()注释掉)是否可行,// 答案是不行的,原因读者应该能够想明白。runtime.Gosched()continue}// 到这里说明协程池不在清理状态,c.storeNext(head)是用c.next->head(当前),// p.casIdleHead(head, unsafe.Pointer(c))利用CAS操作实现p.idles->c,// 相当于把c放入了队列头,c.next指向了以前的队列头。因为CAS是原子操作,无需用锁互斥// 就可以把协程放入队列,这也是无锁队列的由来if c.storeNext(head); p.casIdleHead(head, unsafe.Pointer(c)) {// 运行中的协程数量-1,通过原子操作计数,因为执行上面是多个协程并发执行的// 此处需要注意,在清理超时协程的时候会插入cleaning协程,不能计为运行中的协程if c != cleaning {atomic.AddInt64(&p.count, int64(-1)<<32)}break}}
}// casIdleHead利用CAS实现协程池头指针的操作,casIdleHead不仅可以实现插入协程到队列头,
// 同时可以将队列头协程弹出,详情见下面的popIdle()
func (p *pool) casIdleHead(o, n unsafe.Pointer) bool {// 实现非常简单,就是利用了atomic.CompareAndSwapPointer()函数,p.idles指向了第一个协程,// 目标是让p.idles指向n,o是以前的队列头,CAS就是如果p.idles==o则p.idles=n,否则返回falsereturn atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&p.idles)), o, n)
}// popIdle弹出队列的第一个协程
func (p *pool) popIdle() *coroutine {for {// 和插入队列一样,都要判断是否为清理状态,此处就不多解释了。因为队列可能为空,// 所以要判断是否为空head, cleaning := p.idleHead()if nil == head {return nil} else if cleaning {runtime.Gosched()continue}// 下面的操作让p.idles指向head.next,等同于将head从队列中移除c := (*coroutine)(head)if next := c.loadNext(); p.casIdleHead(head, next) {// 返回队列头部的协程,不难发现协程队列其实是个栈(FILO),而队列应该是FIFO,// 其实是栈还是队列并不重要,重要的是无锁队列是一个广为人知的名字,熟悉无锁队列// 的读者可以立刻想象到本文所描述的实现方案。c.storeNext(nil)return c}}
}var cleaning = &coroutine{}// idleHead返回队列第一个协程,即队列头
func (p *pool) idleHead() (unsafe.Pointer, bool) {// p.idles指向了第一个协程,用原子的方式读取队列头,因为多个协程都在操作p.idles实现// 队列的push和pop操作head := atomicLoadCoroutine(&p.idles)// 这句就是本文标题中简约而不简单的部分了,cleaning是全局变量,上面有定义,如果队列头指向// cleaning表示协程池正在执行清理函数。那么问题来了,为什么要用这种方式?因为所有的协程// 都在用CAS的方式操作队列头,也就是说只有队列头实现了全局状态的一致性,但凡引入任何其他变量,// 都无法通过原子的方式同时操作队列和该变量,此时就必须要加锁,这不是笔者想要的。有的同学可能// 会问,空闲协程已经通过队列的方式组织起来了,直接遍历不就完了?答案肯定是不行的,因为遍历// 队列中的任何一个协程都需要多个操作,当判断协程超时的时候可能已经被调度了新的Routine,此时又// 要重头开始。前面也提到了,多协程并发的操作队列头,队列的前排是状态变化最频繁的,遍历队列的// 过程中可能一直在前排绕圈,因为他们一直都在变化。而在队列头部插入一个特殊的协程,那么所有// 操作队列头的协程(清理协程除外)都会进入等待状态(就是前面提到的自旋),直到这个特殊的协程// 被弹出。return head, cleaning == (*coroutine)(head)
}

清理

// 终于到了清理函数了,来看看清理函数有没有前面提到的非常快
func (p *pool) clean(now time.Time) {// 经过无锁队列章节的说明,这句就非常好理解了,把cleaning这个特殊的协程放入队列头部p.pushIdle(cleaning)// 从cleaning.next开始遍历,cleaning.next就是pushIdle之前的队列头,此处需要注意,// from是c的前一个协程,即from.next==c.// 需要了解一点,下面这个for循环相当于只有一个清理协程在工作,其他的协程都在自旋状态,// 理论上可以不用原子操作。for from, c := cleaning, cleaning.next; nil != c; from, c = c, c.next {// 这个应该不用解释了,判断空闲时间是不是超时?if now.Sub(c.active) >= p.IdleTimeout {// from之后的所有协程全部被删除,为什么?前面提到过,协程池的数据结构是栈,越// 靠后面的协程是越先被插入,也就是空闲的时间越长,所以只要某一个协程超时,那么// 该协程后面的所有协程肯定都超时。from.storeNext(nil)// 遍历所有超时协程并通知退出var count int32for c != nil {c.rc <- nilc, c.next = c.next, nilcount++}// 从协程的总数中减去已经退出的协程数量atomic.AddInt64(&p.count, -int64(count))break}}// 把cleaning协程从队列中弹出,恢复状态,不难看出,清理协程的函数最多就是遍历一次所有空闲// 协程,总体来看是比较快的atomicStoreCoroutine(&p.idles, unsafe.Pointer(cleaning.next))
}

利用无锁队列实现的协程池,简约而不简单

前言

众所周知,golang协程的创建、销毁、调度是非常轻量的,但是即便再轻量,规模大了开销也不能忽视的。比如利用协程处理http请求,每个请求用一个协程处理,当QPS上万的时候,资源消耗还是比较大的。

协程池和线程池一样,池子中都是热协程,需要的时候取出来,用完后归还,避免了高频率的创建和销毁。同时协程池还能将空闲超时的协程销毁来释放资源,并且还有一定保护能力,通过设定协程最大数量避免无休止的创建协程把系统资源消耗殆尽。

总之,golang虽然提供了非常轻量且容易使用的协程编程环境,但是不同的应用场景对于协程的使用需求也是不同的,协程池就是一种非常通用的应用场景。

无锁队列

在介绍协程池的实现之前需要简单说明一下无锁队列,关于无锁队列的实现网上有很多文章,此处只简单的说一些根本文实现有关的重点内容:CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,就可以用其来实现各种无锁(lock free)的数据结构,本文使用的就是atomic.CompareAndSwapPointer。

实现

本文实现的源码开源连接为:.git,接下来进入CAAD(代码即文档)模式,通篇只有代码和注释!需要解释一点,开源代码是笔者兴趣使然用业余时间写的,并不涉及到任何工作相关的内容。

接口

// Pool定义了协程池的接口
type Pool interface {Name() string             // 获取协程池的名字,当有多个协程池对象的时候可以用名字区分不同的协程池Capacity() int32          // 获取协程池的容量,即最大协程数量Tune(size int32)          // 修改协程池的容量Status() Status           // 获取协程池的状态,关于状态下面有定义Go(Routine) error         // 执行(阻塞)Routine,关于Routine下面也有定义GoNonblock(Routine) error // 非阻塞执行Routine,当协程数量达到最大值且无空闲协程时立刻返回Close()                   // 关闭协程池
}// Routine定义了协程池执行的函数,context避免协程池关闭的时候协程被阻塞,也就是说协程池的使用者
// 需要将函数实现成Routine形式才能被协程池调用,
type Routine func(context.Context)// Status定义了协程池的状态
type Status struct {Runnings int32 // 运行中的协程数量Idles    int32 // 空闲的协程数量
}

协程

// coroutine 定义了协程
type coroutine struct {rc     chan Routine  // Routine的chan,Pool.Go(Routine)通过rc传递给协程执行pool   *pool         // 协程池指针,每个协程通过pool指向协程池(pool是Pool的实现)active time.Time     // 活跃时间,最后一次执行完Routine的时间,用于清理空闲超时的协程next   *coroutine    // 下一个协程,所谓无锁队列就是用这个变量将协程形成了队列
}// run是协程的运行函数
func (c *coroutine) run() {// 此处只需要知道pool有一个sync.WaitGroup的成员变量wg,用来等待所有协程退出,// 所以协程退出的时候需要调动Donedefer c.pool.wg.Done()// 前面提到了,通过chan Routine获取函数for r := range c.rc {// 空指针表示协程需要退出,比如协程池关闭或者协程空闲超时都会收到nilif r == nil {return}// 执行Routine,此处传入了协程池的context,建议Routine的实现select该contextr(c.pool.ctx)// 执行完函数,将该协程放到协程池的空闲队列,此处开始进入本文的核心内容了c.pool.pushIdle(c)}
}

无锁队列

// pushIdle把协程放入空闲协程队列
func (p *pool) pushIdle(c *coroutine) {// 此时协程已经执行完Routine,需要记录一下最后的活跃时间c.active = time.Now()for {// 获取空闲队列的第一个协程,即队列头,clean表示协程池是否正在清理空闲队列head, clean := p.idleHead()if clean {// 如果协程池正在清理空闲协程,需要等清理完毕后再把协程放入到空闲队列中,// 如何才能知道协程池清理完了呢?chan或者sync.Cond应该是比较容易想到的方案,// 笔者采用了自旋的方案,因为清理空闲协程非常快且不频繁,自旋是性能最好的方法。// 此处使用了runtime.Gosched()实现自旋,此时立查询清理是否完成多半还是在清理中,// 倒不如把时间片让出来给其他协程,实在没事干了再去查询清理状态会更有效的利用CPU。// runtime.Gosched()会让协程释放CPU时间片,笔者此处问一个问题,如果不调用该函数,// 采用死循环的方式自旋查询清理状态(即把runtime.Gosched()注释掉)是否可行,// 答案是不行的,原因读者应该能够想明白。runtime.Gosched()continue}// 到这里说明协程池不在清理状态,c.storeNext(head)是用c.next->head(当前),// p.casIdleHead(head, unsafe.Pointer(c))利用CAS操作实现p.idles->c.next,// 相当于把c放入了队列头,c.next指向了以前的队列头。因为CAS是原子操作,无需用锁互斥// 就可以把协程放入队列,这也是无锁队列的由来if c.storeNext(head); p.casIdleHead(head, unsafe.Pointer(c)) {// 运行中的协程数量-1,通过源自操作计数,因为执行上面是多个协程并发执行的// 此处需要注意,在清理超时协程的时候回插入cleaning协程,不能计为运行中的协程if c != cleaning {atomic.AddInt64(&p.count, int64(-1)<<32)}break}}
}// casIdleHead利用CAS实现协程池头指针的操作,casIdleHead不仅可以实现插入协程到队列头,
// 同时可以将队列头协程弹出,详情见下面的popIdle()
func (p *pool) casIdleHead(o, n unsafe.Pointer) bool {// 实现非常简单,就是利用了atomic.CompareAndSwapPointer()函数,p.idles指向了第一个协程,// 目标是让p.idles指向n,o是以前的队列头,CAS就是如果p.idles==o则p.idles=n,否则返回falsereturn atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&p.idles)), o, n)
}// popIdle弹出队列的第一个协程
func (p *pool) popIdle() *coroutine {for {// 和插入队列一样,都要判断是否为清理状态,此处就不多解释了。因为队列可能为空,// 所以要判断是否为空head, cleaning := p.idleHead()if nil == head {return nil} else if cleaning {runtime.Gosched()continue}// 下面的操作让p.idles指向head.next,等同于将head从队列中移除c := (*coroutine)(head)if next := c.loadNext(); p.casIdleHead(head, next) {// 返回队列头部的协程,不难发现协程队列其实是个栈(FILO),而队列应该是FIFO,// 其实是栈还是队列并不重要,重要的是无锁队列是一个广为人知的名字,熟悉无锁队列// 的读者可以立刻想象到本文所描述的实现方案。c.storeNext(nil)return c}}
}var cleaning = &coroutine{}// idleHead返回队列第一个协程,即队列头
func (p *pool) idleHead() (unsafe.Pointer, bool) {// p.idles指向了第一个协程,用原子的方式读取队列头,因为多个协程都在操作p.idles实现// 队列的push和pop操作head := atomicLoadCoroutine(&p.idles)// 这句就是本文标题中简约而不简单的部分了,cleaning是全局变量,上面有定义,如果队列头指向// cleaning表示协程池正在执行清理函数。那么问题来了,为什么要用这种方式?因为所有的协程// 都在用CAS的方式操作队列头,也就是说只有队列头实现了全局状态的一致性,但凡引入任何其他变量,// 都无法通过原子的方式同时操作队列和该变量,此时就必须要加锁,这不是笔者想要的。有的同学可能// 会问,空闲协程已经通过队列的方式组织起来了,直接遍历不就完了?答案肯定是不行的,因为遍历// 队列中的任何一个协程都需要多个操作,当判断协程超时的时候可能已经被调度了新的Routine,此时又// 要重头开始。前面也提到了,多协程并发的操作队列头,队列的前排是状态变化最频繁的,遍历队列的// 过程中可能一直在前排绕圈,因为他们一直都在变化。而在队列头部插入一个特殊的协程,那么所有// 操作队列头的协程(清理协程除外)都会进入等待状态(就是前面提到的自旋),直到这个特殊的协程// 被弹出。return head, cleaning == (*coroutine)(head)
}

清理

// 终于到了清理函数了,来看看清理函数有没有前面提到的非常快
func (p *pool) clean(now time.Time) {// 经过无锁队列章节的说明,这句就非常好理解了,把cleaning这个特殊的协程放入队列头部p.pushIdle(cleaning)// 从cleaning.next开始遍历,cleaning.next就是pushIdle之前的队列头,此处需要注意,// from是c的前一个协程,即from.next==c.// 需要了解一点,下面这个for循环相当于只有一个清理协程在工作,其他的协程都在自旋状态,// 理论上可以不用原子操作。for from, c := cleaning, cleaning.next; nil != c; from, c = c, c.next {// 这个应该不用解释了,判断空闲时间是不是超时?if now.Sub(c.active) >= p.IdleTimeout {// from之后的所有协程全部被删除,为什么?前面提到过,协程池的数据结构是栈,越// 靠后面的协程是越先被插入,也就是空闲的时间越长,所以只要某一个协程超时,那么// 该协程后面的所有协程肯定都超时。from.storeNext(nil)// 遍历所有超时协程并通知退出var count int32for c != nil {c.rc <- nilc, c.next = c.next, nilcount++}// 从协程的总数中减去已经退出的协程数量atomic.AddInt64(&p.count, -int64(count))break}}// 把cleaning协程从队列中弹出,恢复状态,不难看出,清理协程的函数最多就是遍历一次所有空闲// 协程,总体来看是比较快的atomicStoreCoroutine(&p.idles, unsafe.Pointer(cleaning.next))
}

Go

// 无论是Go还是GoNonblock,最终调用的都是goRoutine,无非是nonblocking是true还是false
func (p *pool) goRoutine(r Routine, nonblocking bool) error {// 如果协程池不在运行状态,返回协程池已关闭错误if !p.state.is(stateRunning) {return ErrPoolClosed}// 从空闲队列中弹出第一个协程var c *coroutinefor c = p.popIdle(); nil == c; c = p.popIdle() {// 无空闲协程,就需要创建新的协程了,前提条件是协程数量没有超过最大值,if count := atomic.LoadInt64(&p.count); int32(count) >= p.Capacity() {// 如果协程总量已经达到最大值,如果是nonblock则直接返回协程满错误if nonblocking {return ErrPoolFull}// 否则自旋的方式再尝试获取空闲协程runtime.Gosched()// atomic.CompareAndSwapInt64(&p.count, count, count+1)就是协程总数+1,// 下面的语句如果执行失败,说明其他人抢在前面创建或者有新的空闲协程,因为协程// 计数发生变化,需要重新循环判断} else if atomic.CompareAndSwapInt64(&p.count, count, count+1) {// 创建新协程,此处cache是sync.Pool,可以避免频繁的申请和释放内存c = p.cache.Get().(*coroutine)// 创建了新协程,wg就要+1p.wg.Add(1)go c.run()break}}// 增加运行中的协程计数并把Routine传给协程atomic.AddInt64(&p.count, int64(1)<<32)c.rc <- rreturn nil
}

总结

以上就是利用无锁队列实现的协程池的重点代码,其他的代码主要是辅助作用,此处就不一一解释了,有问题的可以联系笔者。先做一个简单的总结:

  1. 无锁队列只是个代名词,真实是一个无锁栈;
  2. 为什么是单向指针(next)而不是双向指针(prev,next),因为CAS只能操作一个指针,熟悉LevelDB的同学应该知道,MemTable使用的是跳跃表(SkipList)而不是map,其原因是跳跃表的指针也是单向指针,LevelDB使用的是内存屏障技术而不是CAS,这样就避免锁的操作,因为map是线程不安全的;
  3. 所有的等待其实是自旋,包括等待清理,等待空闲协程,看似无锁,其实是自旋锁。

其实本文的方案存在一个缺点,那就是自旋(成也萧何败也萧何)。等待清理的自旋并没有什么,毕竟清理非常快,而且清理周期相对协程的调度周期大很多。笔者指的是等待空闲协程的自旋,当协程池满且所有运行协程都被某些事件阻塞,此时所有等待空闲协程的请求都在自旋的查询队列,相当于空转。此时本应CPU使用率很低但因为这些自旋的协程导致CPU使用率非常高,但是这并不会对程序有什么影响,但凡有任何协程被唤醒他们都会让出CPU时间片。说简单点,就是这些等待的协程在利用别人不用的CPU自旋,对于完美主义者的笔者来说虽然有一些遗憾,但是能接受。

笔者做过简单的测试,本文提到的协程池方案调度性能还是比较高的(比ants还要高10%)。至于协程池能用来干什么,以后会陆续介绍较大规模任务调度中使用协程池的方法,敬请期待。

更多推荐

利⽤⽆锁队列实现的协程池,简约⽽不简单

本文发布于:2024-03-13 11:48:57,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1733925.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   不简单   简约   协程池

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!