go语言中的5种并发模式浅析及代码示例

2023-06-01 00:00:00 示例 并发 浅析

1、done-channel 模式

由于goroutine不会被垃圾回收,因此很可能导致内存泄漏,为了避免内存泄漏,goroutine 应该有被触发取消的机制。父Goroutine需要通过一个名为done的只读通道向其子Goroutine发送取消信号。

按照惯例,它被设置为第一个参数。

这种模式在其他模式中也被大量使用。

package main

import (
"fmt"
)

func main() {
jobs := make(chan int, 5)
done := make(chan bool)

go doWork(done, jobs)

for j := 1; j <= 3; j++ {

fmt.Println("sent job", j)
jobs <- j
}
close(jobs)
fmt.Println("sent all jobs")

// 任务结束
done <- true
}

func doWork(done chan bool, jobs chan int) {
for {
select {
case j, more := <-jobs:
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
}
case <-done: // 任务结束,关闭子协程
return
default:
}
}
}


2、for- select 模式

这种模式通常用在从多个通道读取数据

package main

import (
"fmt"
"time"
)

func main() {
ch1, ch2 := make(chan int), make(chan int)

// 每2秒不断往通道1写数据
go func() {
i := 0
for {
i += 2
ch1 <- i
time.Sleep(2 * time.Second)
}
}()

// 每2秒不断往通道2写数据
go func() {
i := 1
for {
i += 2
ch2 <- i
time.Sleep(2 * time.Second)
}
}()

// 不断从通道读数据
for {
select {
case v := <-ch1:
fmt.Println("ch1:", v)
time.Sleep(time.Second)
case v := <-ch2:
fmt.Println("ch2:", v)
time.Sleep(time.Second)
default:
fmt.Println("default")
time.Sleep(time.Second)
}
}

}

如果 ch1 和 ch2 没数据,会走 default

如果 ch1 和 ch2 都有数据会随机选择一个执行,之所以随机是为了避免只执行第一个 case 导致饥饿


3、or-done 模式

该模式旨在将多个完成通道组合成一个 agg_done;

这意味着如果一个 done 通道发出信号,则整个agg_done通道也将关闭。然而,我们不知道在运行时完成通道的数量。

or-done 模式可以通过使用goroutine和递归来实现。

示例中使上下递归函数像树一样相互依赖,上部将自己的orDone通道注入下部,然后下层也将自己的orDone返回给上层,如果任何orDone通道关闭,则通知上层和下层。


这点和上面 done-channel 模式是不同的,上面是所有 goroutine 完成任务,这里是只要有 1 个 goroutine 完成就结束所有 goroutine。

就好比发送一个请求到多个微服务节点,只要有 1 个返回就算完成。

package main

import (
"fmt"
"time"
)

func main() {
var or func(channels ...<-chan interface{}) <-chan interface{}
// 只要有1个结束阻塞,关闭orDone并返回
or = func(channels ...<-chan interface{}) <-chan interface{} {
// 小于2个通道直接返回
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
// 声明一个orDone
orDone := make(chan interface{})
go func() {
// 完成关闭orDone
defer close(orDone)
switch len(channels) {
case 2: // 如果是2个channel,只需要监听这两个
select {
case <-channels[0]:
case <-channels[1]:
}
default:
// 二分法递归
m := len(channels) / 2
select {
case <-or(channels[:m]...):
case <-or(channels[m:]...):
}
}
}()
return orDone
}

// 传入一个时间模拟请求时长,时间到了就close掉,结束当前channel的阻塞
sig := func(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}

start := time.Now()
// 这里orDone开始是阻塞的,里面开了5个channel
<-or(
sig(2*time.Hour),
sig(5*time.Minute),
sig(1*time.Second),
)
fmt.Printf("done after %v\n", time.Since(start))
}


4、fanout-channel 模式

意思是只有 1 个输入 channel,有多个输出 channel,经常用在设计模式中的观察者模式。

观察者模式中,当数据发生变动后,多个观察者都会收到这个信号。

package main

import (
"fmt"
"time"
)

func main() {
// 输入的channel,相当于被观察者
ch := make(chan interface{})
go func() {
for {
ch <- time.Now()
time.Sleep(3 * time.Second)
}
}()

// 观察者
out := make([]chan interface{}, 2)
for k := range out {
out[k] = make(chan interface{})
}

go fanout(ch, out)

// 是否观察到数据变化
for {
select {
case res := <-out[0]:
fmt.Println(res)
case res := <-out[1]:
fmt.Println(res)
}
}
}

func fanout(ch <-chan interface{}, out []chan interface{}) {
defer func() {
for i := 0; i < len(out); i++ {
close(out[i])
}
}()

// 订阅被观察者
for v := range ch {
v := v
for i := 0; i < len(out); i++ {
i := i
out[i] <- v
}
}

}


5、fan-in-channel 模式

和上面的相反,这个是指多个源 channel 输入,一个目标 channel 输出的情况。

package main

import (
"fmt"
"time"
)

func main() {
// 输入的channel
in := make([]chan interface{}, 2)
in2 := make([]<-chan interface{}, 2)

for k := range in {
k := k
in[k] = make(chan interface{})
var inin <-chan interface{} = in[k]
in2[k] = inin

go func() {
for {
in[k] <- time.Now()
time.Sleep(3 * time.Second)
}
}()

}

// 打印输出的channel
for v := range fanIn(in2...) {
fmt.Println(v)
}

}

func fanIn(chans ...<-chan interface{}) <-chan interface{} {
switch len(chans) {
case 0:
c := make(chan interface{})
close(c)
return c
case 1:
return chans[0]
case 2:
return mergeTwo(chans[0], chans[1])
default: // 多个channel二分法
m := len(chans) / 2
return mergeTwo(fanIn(chans[:m]...), fanIn(chans[m:]...))
}

}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
// 针对2个channel输出
c := make(chan interface{})
go func() {
defer close(c)
for a != nil || b != nil {
select {
case v, ok := <-a:
if !ok {
a = nil
continue
}
c <- v
case v, ok := <-b:
if !ok {
b = nil
continue
}
c <- v
}

}
}()
return c
}


相关文章