golang笔记18

编程入门 行业动态 更新时间:2024-10-26 14:27:48

golang<a href=https://www.elefans.com/category/jswz/34/1770047.html style=笔记18"/>

golang笔记18

golang笔记18--go并发多线程

  • 介绍
  • 核心用法
    • Mutex
    • RWMutex
    • WaitGroup
    • Cond
    • Once
    • map
    • Pool
    • Context
    • select
  • 注意事项
  • 参考文档

介绍

大家都知道go语言近年来越来越火了,其中有一个要点是go语言在并发场景有很高的性能,比如可以通过启动很多个 goroutine 来执行并发任务,通过Channel 类型实现 goroutine 之间的数据交流。当我们想用go实现高并发的时候,我们要了解常见的并发源语,以便于开发的时候做出最优选择。
本文基于较新版本的go1.20.7, 介绍了go并发多线场景常用的源语和方法案例…

核心用法

Mutex

多线程场景下为了解决资源竞争问题,通常会使用互斥锁,限定临界区只能同时由一个线程持有。
在go语言中是通过Mutex来实现的。

案例见:

package mainimport ("fmt""sync"
)func noLock() {var count = 0var wg sync.WaitGroupwg.Add(10)for i := 0; i < 10; i++ {go func() {defer wg.Done()for j := 0; j < 10000; j++ {count++}}()}wg.Wait()fmt.Printf("count=%v\n", count)
}func hasLock() {var count = 0var wg sync.WaitGroupwg.Add(10)var mu sync.Mutexfor i := 0; i < 10; i++ {go func() {defer wg.Done()for j := 0; j < 10000; j++ {mu.Lock()count++mu.Unlock()}}()}wg.Wait()fmt.Printf("count=%v\n", count)
}func main() {fmt.Println("no lock:")for i := 0; i < 10; i++ {noLock()}fmt.Println("has lock:")for i := 0; i < 10; i++ {hasLock()}
}

输出:

no lock:
count=53430
count=42448
count=47531
count=57758
count=50497
count=44185
count=41547
count=33113
count=35673
count=31391
has lock:
count=100000
count=100000
count=100000
count=100000
count=100000
count=100000
count=100000
count=100000
count=100000
count=100000

很多时候无法快速看出程序是否有竞争问题,此时可以使用race来检查是否有竞争关系

$ go run -race 4.1.go

常见错误:

  • 不成对出现
  • copy已经使用的mutex
    使用 go vet 检查
  • 重入
  • 死锁

RWMutex

当有大量读写操作的时候,若仅仅使用Mutex会影响性能,此时可以使用读写锁来将读写区分开来;goroutine A持有读锁的时候,其它goroutine也可以继续读操作,写操作goroutine A持有锁的时候,它就是一个排它锁,其它读写操作会阻塞等待锁被释放。
RWMutex是一个reader/writer互斥锁,某一时刻能由任意的reader持有,或只能被单个writer持有。

适用于读多、写少的场景。

案例见:

package mainimport ("fmt""sync""time"
)// Counter 线程安全的计数器
type Counter struct {mu    sync.RWMutexcount uint64
}func (c *Counter) Incr() {c.mu.Lock()c.count++c.mu.Unlock()
}func (c *Counter) Count() uint64 {c.mu.RLock()defer c.mu.RUnlock()return c.count
}func main() {var count Counterfor i := 0; i < 10; i++ {go func(i int) {for {ret := count.Count()fmt.Printf("reader %v, count=%v\n", i, ret)time.Sleep(time.Second * 2)}}(i)}for {count.Incr()fmt.Printf("writer, count=%v\n", count.count)time.Sleep(time.Second * 5)}
}

输出:

writer, count=1
reader 3, count=1
reader 1, count=1
reader 2, count=1
...
reader 0, count=1
reader 3, count=1
reader 5, count=1
reader 4, count=1
reader 9, count=1
reader 7, count=1
writer, count=2
reader 3, count=2
reader 7, count=2
reader 8, count=2
...

WaitGroup

WaitGroup就是package sync用来做任务编排的一个并发原语。它要解决的就是并发-等待的问题:现在有一个goroutine A 在检查点(checkpoint)等待一组goroutine全部完成,如果在执行任务的这些goroutine还没全部完成,那么goroutine A就会阻塞在检查点,直到所有goroutine都完成后才能继续执行。

它有如下三个方法:

  • Add,用来设置WaitGroup的计数值;
  • Done,用来将WaitGroup的计数值减1,其实就是调用了Add(-1);
  • Wait,调用这个方法的goroutine会一直阻塞,直到WaitGroup的计数值变为0。

案例见:

package mainimport ("fmt""sync""time"
)type Counter struct {mu    sync.Mutexcount uint64
}// Incr 计数器加1
func (c *Counter) Incr() {c.mu.Lock()c.count++c.mu.Unlock()
}// Count 获取count值
func (c *Counter) Count() uint64 {c.mu.Lock()defer c.mu.Unlock()return c.count
}// worker sleep 1s后加1
func worker(c *Counter, w *sync.WaitGroup, i int) {defer w.Done()time.Sleep(time.Second)c.Incr()fmt.Printf("worker %v add 1\n", i)
}func main() {var counter Countervar wg sync.WaitGroupwg.Add(10)for i := 0; i < 10; i++ {go worker(&counter, &wg, i)}wg.Wait()fmt.Printf("finished, count=%v\n", counter.count)
}

案例中10个worker分别对count+1, 10个worker完成后才输出最终的count。

输出:

worker 8 add 1
worker 6 add 1
worker 3 add 1
worker 1 add 1
worker 2 add 1
worker 4 add 1
worker 5 add 1
worker 7 add 1
worker 9 add 1
worker 0 add 1
finished, count=10

Cond

Go 标准库提供 Cond 原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond 通常应用于等待某个条件的一组 goroutine,等条件变为 true 的时候,其中一个 goroutine 或者所有的 goroutine 都会被唤醒执行

案例见:

package mainimport ("log""math/rand""sync""time"
)func main() {c := sync.NewCond(&sync.Mutex{})var ready intfor i := 0; i < 10; i++ {go func(i int) {time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)c.L.Lock()ready++c.L.Unlock()log.Printf("运动员 %v 已经就绪", i)c.Broadcast()}(i)}c.L.Lock()for ready != 10 {c.Wait()log.Printf("裁判员被唤醒一次")}c.L.Unlock()log.Printf("所有运动员就绪,开始比赛 3,2,1...")
}

输出:

2023/10/11 23:52:41 运动员 4 已经就绪
2023/10/11 23:52:41 裁判员被唤醒一次
2023/10/11 23:52:43 运动员 7 已经就绪
2023/10/11 23:52:43 裁判员被唤醒一次
2023/10/11 23:52:43 运动员 5 已经就绪
2023/10/11 23:52:43 裁判员被唤醒一次
2023/10/11 23:52:44 运动员 6 已经就绪
2023/10/11 23:52:44 裁判员被唤醒一次
2023/10/11 23:52:44 运动员 3 已经就绪
2023/10/11 23:52:44 裁判员被唤醒一次
2023/10/11 23:52:45 运动员 8 已经就绪
2023/10/11 23:52:45 裁判员被唤醒一次
2023/10/11 23:52:47 运动员 0 已经就绪
2023/10/11 23:52:47 裁判员被唤醒一次
2023/10/11 23:52:48 运动员 1 已经就绪
2023/10/11 23:52:48 裁判员被唤醒一次
2023/10/11 23:52:49 运动员 9 已经就绪
2023/10/11 23:52:49 裁判员被唤醒一次
2023/10/11 23:52:49 运动员 2 已经就绪
2023/10/11 23:52:49 裁判员被唤醒一次
2023/10/11 23:52:49 所有运动员就绪,开始比赛 3,2,1...

Once

Once 可以用来执行且仅仅执行一次动作,常常用来初始化单例资源,或者并发访问只需初始化一次的共享资源,或者在测试的时候初始化一次测试资源。

sync.Once 只暴露了一个方法 Do,你可以多次调用 Do 方法,但是只有第一次调用 Do 方法时 f 参数才会执行,这里的 f 是一个无参数无返回值的函数。

func (o *Once) Do(f func())

案例见:

package mainimport ("fmt""net""runtime""sync""time"
)func runFuncName() string {pc := make([]uintptr, 1)runtime.Callers(2, pc)f := runtime.FuncForPC(pc[0])return f.Name()
}
func onceCase1() {fmt.Printf("this is %v \n", runFuncName())var once sync.Oncef1 := func() {fmt.Println("this is f1")}f2 := func() {fmt.Println("this is f2")}once.Do(f1)once.Do(f2)
}var conn net.Conn
var once sync.Oncefunc onceGetConn() net.Conn {fmt.Printf("this is %v \n", runFuncName())addr := "baidu"once.Do(func() {fmt.Println("this is once.Do")conn, _ = net.DialTimeout("tcp", addr+":80", time.Second*10)})if conn != nil {return conn} else {return nil}
}func main() {onceCase1()conn = onceGetConn()conn = onceGetConn()fmt.Println("conn=", conn)
}

onceCase1 中可以看到once.Do 中的函数只执行第一次;

onceGetConn 中可以看到单例函数只执行一次初始化;

输出:

this is main.onceCase1 
this is f1
this is main.onceGetConn 
this is once.Do
this is main.onceGetConn 
conn= &{{0xc0000a6180}}

map

Go 内建的 map 对象不是线程(goroutine)安全的,并发读写的时候运行时会有检查,遇到并发问题就会导致 panic。

案例1:

package mainfunc main() {var m = make(map[int]int, 10)go func() {for {m[1] = 1}}()go func() {for {_ = m[2]}}()select {}
}

输出:

fatal error: concurrent map read and map writegoroutine 6 [running]:
main.main.func2()/home/xg/files/code/1cc/study/zhangxing12/go/src/chapter04/4.6.go:12 +0x2e
created by main.main/home/xg/files/code/1cc/study/zhangxing12/go/src/chapter04/4.6.go:10 +0x8agoroutine 1 [select (no cases)]:
main.main()/home/xg/files/code/1cc/study/zhangxing12/go/src/chapter04/4.6.go:15 +0x8fgoroutine 5 [runnable]:
main.main.func1()/home/xg/files/code/1cc/study/zhangxing12/go/src/chapter04/4.6.go:7 +0x2e
created by main.main/home/xg/files/code/1cc/study/zhangxing12/go/src/chapter04/4.6.go:5 +0x5dProcess finished with the exit code 2

为解决该问题,可以重写线程安全的map,使用第三方的发片式map,或者使用Go 官方线程安全 map 的标准实现 sync.Map, 其使用场景:

  • 只会增长的缓存系统中,一个 key 只写入一次而被读很多次;
  • 多个 goroutine 为不相交的键集读、写和重写键值对。

案例2:
使用sync.Map 后,并发读写正常

package mainimport ("fmt""sync"
)func main() {m := sync.Map{}go func() {for {for i := 0; i < 10; i++ {m.Store(i, i*10)}}}()go func() {for {v, _ := m.Load(2)fmt.Println(v)}}()select {}
}

输出:

20
20
20
...

Pool

Go 的自动垃圾回收机制还是有一个 STW(stop-the-world,程序暂停)的时间,而且,大量地创建在堆上的对象,也会影响垃圾回收标记的时间。

Go 标准库中提供了一个通用的 Pool 数据结构,也就是 sync.Pool,我们使用它可以创建池化的对象。这个类型也有一些使用起来不太方便的地方,就是它池化的对象可能会被垃圾回收掉,这对于数据库长连接等场景是不合适的。

sync.Pool 本身就是线程安全的,多个 goroutine 可以并发地调用它的方法存取对象;

sync.Pool 不可在使用之后再复制使用。

案例见:

package mainimport ("bytes""fmt""io""math/rand""os""sync""time"
)var bufPool = sync.Pool{New: func() any {return new(bytes.Buffer)},
}func Log(w io.Writer, key, val string) {b := bufPool.Get().(*bytes.Buffer)b.Reset()b.WriteString(time.Now().Local().Format(time.RFC3339))b.WriteByte(' ')b.WriteString(key)b.WriteByte('=')b.WriteString(val)b.WriteByte('\n')w.Write(b.Bytes())bufPool.Put(b)
}func main() {rand.New(rand.NewSource(time.Now().UnixNano()))for i := 0; i < 10; i++ {time.Sleep(time.Second)valStr := fmt.Sprintf("/search?=q=flowers %v", rand.Int63n(100))Log(os.Stdout, "path", valStr)}
}

输出:

2023-10-16T14:16:15+08:00 path=/search?=q=flowers 71
2023-10-16T14:16:16+08:00 path=/search?=q=flowers 51
2023-10-16T14:16:17+08:00 path=/search?=q=flowers 21
2023-10-16T14:16:18+08:00 path=/search?=q=flowers 14
2023-10-16T14:16:19+08:00 path=/search?=q=flowers 42
2023-10-16T14:16:20+08:00 path=/search?=q=flowers 15
2023-10-16T14:16:21+08:00 path=/search?=q=flowers 19
2023-10-16T14:16:22+08:00 path=/search?=q=flowers 53
2023-10-16T14:16:23+08:00 path=/search?=q=flowers 45
2023-10-16T14:16:24+08:00 path=/search?=q=flowers 60

Context

Go 标准库的 Context 不仅提供了上下文传递的信息,还提供了 cancel、timeout 等其它信息; 它比较适合使用在如下场景:

  • 上下文信息传递 (request-scoped),比如处理 http 请求、在请求处理链路上传递信息;
  • 控制子 goroutine 的运行;
  • 超时控制的方法调用;
  • 可以取消的方法调用。

案例见:

package mainimport ("context""fmt""runtime""time"
)var neverReady = make(chan struct{})const shortDuration = 1 * time.Microsecondfunc runFuncName() string {pc := make([]uintptr, 1)runtime.Callers(2, pc)f := runtime.FuncForPC(pc[0])return f.Name()
}func case1WithCancel() {fmt.Printf("this is %v\n", runFuncName())gen := func(ctx context.Context) <-chan int {dst := make(chan int)n := 1go func() {for {select {case <-ctx.Done():return // returning not to leak the goroutinecase dst <- n:n++}}}()return dst}ctx, cancel := context.WithCancel(context.Background())defer cancel() // cancel when we are finished consuming integersfor n := range gen(ctx) {fmt.Println(n)if n == 5 {break}}
}func case2WithDeadline() {fmt.Printf("this is %v\n", runFuncName())d := time.Now().Add(shortDuration)ctx, cancel := context.WithDeadline(context.Background(), d)defer cancel()select {case <-neverReady:fmt.Println("ready")case <-time.After(2 * time.Second):fmt.Printf("overslept %v\n", 2*time.Second)case <-ctx.Done():fmt.Println("ctx.Done:", ctx.Err())}
}func case3WithTimeout() {ctx, cancel := context.WithTimeout(context.Background(), shortDuration)defer cancel()select {case <-neverReady:fmt.Println("ready")case <-ctx.Done():fmt.Println("ctx.Done:", ctx.Err())}
}func main() {fmt.Println(time.Now().Local())case1WithCancel()fmt.Println(time.Now().Local())case2WithDeadline()fmt.Println(time.Now().Local())case3WithTimeout()fmt.Println(time.Now().Local())
}

输出:

2023-10-16 16:41:32.05194173 +0800 CST
this is main.case1WithCancel
1
2
3
4
5
2023-10-16 16:41:32.052263636 +0800 CST
this is main.case2WithDeadline
ctx.Done: context deadline exceeded
2023-10-16 16:41:32.052326891 +0800 CST
ctx.Done: context deadline exceeded
2023-10-16 16:41:32.052351282 +0800 CST

select

select 是 Go 中的一个控制结构,类似于 switch 语句。
select 语句只能用于通道操作,每个 case 必须是一个通道操作,要么是发送要么是接收。
select 语句会监听所有指定的通道上的操作,一旦其中一个通道准备好就会执行相应的代码块。
如果多个通道都准备好,那么 select 语句会随机选择一个通道执行。如果所有通道都没有准备好,那么执行 default 块中的代码。

Go 编程语言中 select 语句的语法如下:

select {
case <- channel1:
// 执行的代码
case value := <- channel2:
// 执行的代码
case channel3 <- value:
// 执行的代码
// 你可以定义任意数量的 case
default:
// 所有通道都没有准备好,执行的代码
}

上述语法中:
每个 case 都必须是一个通道
所有 channel 表达式都会被求值
所有被发送的表达式都会被求值
如果任意某个通道可以进行,它就执行,其他被忽略。
如果有多个 case 都可以运行,select 会随机公平地选出一个执行,其他不会执行。
否则:

  • 如果有 default 子句,则执行该语句。
  • 如果没有 default 子句,select 将阻塞,直到某个通道可以运行;Go 不会重新对 channel 或值进行求值

如下,两个 goroutine 定期分别输出 one two到通道c1 c2中, 通过 select 来接受数据

package mainimport ("fmt""time"
)func main() {c1 := make(chan string)c2 := make(chan string)go func() {for {time.Sleep(1 * time.Second)c1 <- fmt.Sprint("one", time.Now().Local())}}()go func() {for {time.Sleep(2 * time.Second)c2 <- fmt.Sprint("two", time.Now().Local())}}()for {select {case msg1 := <-c1:fmt.Println("received", msg1)case msg2 := <-c2:fmt.Println("received", msg2)}}
}

输出:

received one2023-10-16 17:27:50.605975411 +0800 CST
received two2023-10-16 17:27:51.606263901 +0800 CST
received one2023-10-16 17:27:51.607610553 +0800 CST
received one2023-10-16 17:27:52.608383998 +0800 CST
received two2023-10-16 17:27:53.606825344 +0800 CST
received one2023-10-16 17:27:53.609350218 +0800 CST
...

注意golang中select为空的话会导致语法检测为死锁,因此要禁止如下写法
案例见: src/chapter04/4.9.go

package mainimport "fmt"func foo() {fmt.Printf("hi this is foo\n")
}func bar() {fmt.Printf("hi this is bar\n")
}func main() {go foo()go bar()select {}
}

输出:

hi this is bar
hi this is foo
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]:
main.main()/home/xg/files/code/1cc/study/zhangxing12/go/src/chapter04/4.9.go:19 +0x3f

go 中select为了避免饥饿会随机执行case, 具体见如下案例:
案例中既不全Receive C也不全 Receive S

package mainimport ("fmt""time"
)func genInt(ch chan int, stopCh chan bool) {for j := 0; j < 10; j++ {ch <- jtime.Sleep(time.Second)}stopCh <- true
}func main() {ch := make(chan int)c := 0stopCh := make(chan bool)go genInt(ch, stopCh)for {select {case c = <-ch:fmt.Println("Receive C", c)case s := <-ch:fmt.Println("Receive S", s)case _ = <-stopCh:goto end}}
end:
}

输出:

Receive C 0
Receive S 1
Receive C 2
Receive S 3
Receive C 4
Receive C 5
Receive S 6
Receive S 7
Receive S 8
Receive S 9

注意事项

待补充

参考文档

深度解密Go语言之sync.map
go 并发变成实战课
Go 语言 select 语句
Concurrency in Go
Go 语言条件语句

更多推荐

golang笔记18

本文发布于:2023-12-07 03:10:31,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1669896.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:笔记   golang

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!