Go关闭goroutine协程的方法

2023-06-27 08:38:51 方法 关闭 goroutine

这篇文章主要介绍了Go关闭goroutine协程的方法,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。


1.简介

本文将介绍首先为什么需要主动关闭

goroutine
,并介绍如何在Go语言中关闭
goroutine
的常见套路,包括传递终止信号和协程内部捕捉终止信号。之后,文章列举了需要主动关闭协程运行的常见场景,如启动一个协程执行一个不断重复的任务。希望通过本文的介绍,读者能够掌握如何在适当的时候关闭
goroutine
,以及了解关闭
goroutine
的常见套路。

2.为什么需要关闭goroutine

2.1 协程的生命周期

了解协程的生命周期是优雅地关闭协程的前提,因为在关闭协程之前需要知道协程的当前状态,以便采取相应的措施。所以这里我们需要先了解下

goroutine
的生命周期。

Go
语言中,协程(goroutine)是一种轻量级的线程,可以在一个程序中同时运行多个协程,提高程序的并发性能。协程的生命周期包括创建、运行和结束三个阶段。

首先需要创建一个协程,协程的创建可以通过关键字 go 来实现,例如:

go func() {
    // 协程执行的代码
}()

上面的代码会启动一个新的协程,同时在新的协程中执行匿名函数,此时协程便已被创建了。

一旦协程被创建,它就会在新的线程中运行。协程的运行状态可以由 Go 运行时(goroutine scheduler)来管理,它会自动将协程调度到适当的

P
中运行,并确保协程的公平调度和平衡负载。

在运行阶段,协程会不断地执行任务,直到任务完成或者遇到终止条件。在终止阶段,协程将会被回收,从而完成其整个生命周期。

综上所述,协程由

go
关键字启动,在协程中执行其业务逻辑,直到最后遇到终止条件,此时代表着协程的任务已经结束了,将进入终止阶段。最终协程将会被回收。

2.2 协程的终止条件

正常来说,都是协程任务执行完成之后,此时协程自动退出,例如:

func main() {
   var wg sync.WaitGroup
   wg.Add(1)
   go func() {
      defer wg.Done()
      // 协程执行的代码
      fmt.Println("协程执行完毕")
   }()
   wg.Wait()
   // 等待协程执行完毕
   fmt.Println("主程序结束")

上面的代码中,我们使用 

WaitGroup
 等待协程执行完毕。在协程执行完毕后,程序会输出协程执行完毕和主程序结束两条信息。

还有一种情况是协程发生panic,它将会自动退出。例如:

func main() {
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        // 协程执行的代码
        panic("协程发生错误")
    }()
    // 等待协程执行完毕
    wg.Wait()
    fmt.Println("主程序结束")
}

在这种情况下,协程也会自动退出,不会再占用系统资源。

综合看来,协程的终止条件,其实就是协程中的任务执行完成了,或者是执行过程中发生了panic,协程将满足终止条件,退出执行。

2.3 为什么需要主动关闭goroutine

从上面协程的终止条件来看,正常情况下,协程只要将任务正常处理完成,协程自动退出,此时并不需要主动关闭

goroutine

这里先举一个生产者消费者的例子,在这个例子中,我们创建了一个生产者和一个消费者,它们之间通过一个

channel
进行通信。生产者生产数据并发送到一个
channel
中,消费者从这个
channel
中读取数据并进行处理。代码示例如下:

func main() {
    // 生产者代码
    go func(out chan<- int) {
        for i := 0; ; i++ {
            select {
            case out <- i:
                fmt.Printf("producer: produced %d
", i)
            time.Sleep(time.Second)
        }
    }
    // 消费者逻辑
    go func(in <-chan int) {
        for {
            select {
            case i := <-in:
                fmt.Printf("consumer: consumed %d
", i)
            }
        }
    }
    // 让生产者协程和消费者协程一直执行下去
    time.Sleep(100000000)
}

在这个例子中,我们使用了两个

goroutine
:生产者和消费者。生产者向
channel
中生产数据,消费者从
channel
中消费数据。

但是,假如生产者出现了问题,此时生产者的协程将会被退出,不再执行。而消费者仍然在等待数据的输入。此时消费者协程已经没有存在的必要了,其实是需要退出执行。

因此,对于一些虽然没有达到终止条件的协程,但是其又没有再继续执行下去的必要,此时主动关闭其执行,从而保证程序的健壮性和性能。

3.如何优雅得关闭goroutine

优雅得关闭

goroutine
的执行,我们可以遵循以下三个步骤。首先是传递关闭协程的信号,其次是协程内部需要能够到关闭信号,最后是协程退出时,能够正确释放其所占据的资源。通过以上步骤,可以保在需要时优雅地停止
goroutine
的执行。下面对这三个步骤详细进行讲解。

3.1 传递关闭终止信号

首先是通过给

goroutine
传递关闭协程的信号,从而让协程进行退出操作。这里可以使用
context.Context
来传递信号,具体实现可以通过调用
WithCancel
,
WithDeadline
,
WithTimeout
等方法来创建一个带有取消功能的
Context
,并在需要关闭协程时调用
Cancel
方法来向
Context
发送取消信号。示例代码如下:

ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
    for {
        select {
        // 调用cancel函数后,这里将能够收到通知
        case <-ctx.Done():
            return
        default:
            // do something
        }
    }
}(ctx)
// 在需要关闭协程时调用cancel方法发送取消信号
cancel()

这里,当我们想要终止协程的执行时,只需要调用可取消

context
对象的
Cancel
方法,协程内部将能够通过
context
对象接收到终止协程执行的通知。

3.2 协程内部捕捉终止信号

  协程内部也需要在取消信号传递过来时,能够正确被捕捉到,才能够正常终止流程。这里我们可以使用

select
语句来监听取消信号。
select
语句可以有多个
case
子句,可以同时监听多个
channel
,当
select
语句执行时,它会一直阻塞,直到有一个
case
子句可以执行。
select
语句也可以包含default子句,这个子句在所有的case子句都不能执行时会被执行,通常用于防止select语句的阻塞。如下:

select {
case <-channel:
    // channel有数据到来时执行的代码
default:
    // 所有channel都没有数据时执行的代码
}

context
对象的
Done
方法刚好也是返回一个
channel
,取消信号便是通过该
channel
来进行传递的。所以我们可以在协程内部,通过
select
语句,在其中一个
case
分支来监听取消信号;同时使用一个
default
分支在协程中执行具体的业务逻辑。在终止信号没有到来时,就执行业务逻辑;在收到协程终止信号后,也能够及时终止协程的执行。如下:

go func(ctx context.Context) {
    for {
        select {
        // 调用cancel函数后,这里将能够收到通知
        case <-ctx.Done():
            return
        default:
            // 执行业务逻辑
        }
    }
}(ctx)

3.3 回收协程资源

最后,当协程被终止执行时,需要释放占用的资源,包括文件句柄、内存等,以便其他程序可以继续使用这些资源。在Go语言中,可以使用

defer
语句来确保协程在退出时能够正确地释放资源。比如协程中打开了一个文件,此时可以通过defer语句来关闭,避免资源的泄漏。代码示例如下:

func doWork() {
    file, err := os.Open("test.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    // Do some work
}

在这个例子中,我们在文件打开之后使用

defer
语句注册了一个函数,当协程结束时会自动调用该函数来关闭文件。这样协程无论在何时退出,我们都可以确保文件被正确关闭,避免资源泄漏和其他问题。

3.4 关闭goroutine示例

下面展示一个简单的例子,结合

Context
对象,
select
语句以及
defer
语句这三部分内容,优雅得终止一个协程的运行,具体代码示例如下:

package main

import (
    "context"
    "fmt"
    "time"
)

func worker(ctx context.Context) {
    // 最后,在协程退出前,释放资源.
    defer fmt.Println("worker stopped")

    for {
        // 通过select语句监听取消信号,取消信号没到达,则执行业务逻辑,等下次循环检查
        select {
        default:
            fmt.Println("working")
        case <-ctx.Done():
            return
        }
        time.Sleep(time.Second)
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    // 启动一个协程执行任务
    go worker(ctx)
    // 执行5s后,调用cancel函数终止协程
    time.Sleep(5 * time.Second)
    cancel()

    time.Sleep(2 * time.Second)
}

main
函数中,我们使用
context.WithCancel
函数创建了一个新的
context
,并将其传递给
worker
函数,同时启动协程运行
worker
函数。

worker
函数执行5s后,主协程调用
cancel
函数来终止
worker
协程。之后,
worker
协程中监听取消信号的
select
语句,将能够捕捉到这个信号,执行终止协程操作。

最后,在退出协程时,通过

defer
语句实现资源的释放。综上,我们实现了协程的优雅关闭,同时也正确回收了资源。

4. 需要主动关闭协程运行的常见场景

4.1 协程在执行一个不断重复的任务

协程在执行一个不断重复的任务时,此时协程是不会主动终止运行的。但是在某个时刻之后,不需要再继续执行该任务了,需要主动关闭

goroutine
的执行,释放协程的资源。

这里以

etcd
为例来进行说明。
etcd
主要用于在分布式系统中存储配置信息、元数据和一些小规模的共享数据。也就是说,我们可以在
etcd
当中存储一些键值对。那么,如果我们想要设置键值对的有效期,那该如何实现呢?

etcd
中存在一个租约的概念,租约可以看作是一个时间段,该时间段内某个键值对的存在是有意义的,而在租约到期后,该键值对的存在便没有意义,可以被删除,同时一个租约可以作用于多个键值对。下面先展示如何将一个租约和一个key进行关联的示例:

// client 为 etcd客户端的连接,基于此建立一个Lease实例
// Lease示例提供一些api,能过创建租约,取消租约,续约租约
lease := clientv3.NewLease(client)

// 创建一个租约,同时租约时间为10秒
grantResp, err := lease.Grant(context.Background(), 10)
if err != nil {
    log.Fatal(err)
}
// 租约ID,每一个租约都有一个唯一的ID
leaseID := grantResp.ID

// 将租约与key进行关联,此时该key的有效期,也就是该租约的有效期
_, err = kv.Put(context.Background(), "key1", "value1", clientv3.WithLease(leaseID))
if err != nil {
    log.Fatal(err)
}

以上代码演示了如何在

etcd
中创建一个租约并将其与一个键值对进行关联。首先,通过
etcd
客户端的连接创建了一个
Lease
实例,该实例提供了一些api,可以创建租约、取消租约和续约租约。然后使用
Grant
函数创建了一个租约并指定了租约的有效期为10秒。接下来,获取租约ID,每个租约都有一个唯一的ID。最后,使用
Put
函数将租约与key进行关联,从而将该key的有效期设定为该租约的有效期。

所以,我们如果想要操作

etcd
中键值对的有效期,只需要操作租约的有效期即可。

而刚好,

etcd
其实定义了一个
Lease
接口,该接口定义了对租约的一些操作,能过创建租约,取消租约,同时也支持续约租约,获取过期时间等内容,具体如下:

type Lease interface {
   // 1. 创建一个新的租约
   Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
   // 2. 取消租约
   Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
   // 3. 获取租约的剩余有效期
   TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
   // 4. 获取所有的租约
   Leases(ctx context.Context) (*LeaseLeasesResponse, error)
   // 5. 不断对租约进行续约,这里假设10s后过期,此时大概的含义为每隔10s续约一次租约,调用该方法后,租约将永远不会过期  
   KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
   // 6. 续约一次租约
   KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
   // 7. 关闭Lease实例 
   Close() error
}

到此为止,我们引出了

Lease
接口,而其中
KeepAlive
方法便是我们今日的主角,从该方法定义可以看出,当调用
KeepAlive
方法对某个租约进行续约后,其每隔一段时间都会执行对目标租约的续约操作。这个时候一般都是启动一个协程,由协程来完成对租约的续约操作。

此时协程其实就是在执行一个不断重复的任务,那如果

Lease
接口的实例调用了
Close
方法,想要回收掉
Lease
实例,不会再通过该实例对租约进行操作,回收掉
Lease
所有占据的资源,那么
KeepAlive
方法创建的协程,此时也应该被主动关闭,不应该再继续执行下去。

事实上,当前

etcd
Lease
接口中
KeepAlive
方法的默认实现也是如此。并且对主动关闭协程运行的实现,也是通过
context
传递对象,
select
获取取消信号,最后通过
defer
来回收资源这三者组合起来实现的。

下面来看看执行续约操作的函数,会启动一个协程在后台不断执行,具体实现如下:

func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
   for {
      var tosend []LeaseID
      
      now := time.Now()
      l.mu.Lock()
      // keepAlives 是保存了所有待续约的 租约ID
      for id, ka := range l.keepAlives {
         // 然后nextKeepAlive为下次续约的时间,如果超过该时间,则执行续约操作
         if ka.nextKeepAlive.Before(now) {
            tosend = append(tosend, id)
         }
      }
      l.mu.Unlock()
      // 发送续约请求
      for _, id := range tosend {
         r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
         // 向etcd集群发送续约请求
         if err := stream.Send(r); err != nil {
            return
         }
      }

      select {
      // 每隔500ms执行一次
      case <-time.After(500 * time.Millisecond):
      // 如果接收到终止信号,则直接终止
      case <-l.stopCtx.Done():
         return
      }
   }
}

可以看到,其会不断循环,首先会检查当前时间是否超过了所有租约的下次续约时间,如果超过了,则会将这些租约的 ID 放入

tosend
数组中,并在循环的下一步中向
etcd
集群发送续约请求。接着会等待 500 毫秒,然后再次执行上述操作。正常情况下,其不会退出循环,会一直向
etcd
集群发送续约请求。除非收到了终止信号,其才会退出,从而正常结束协程。

stopCtx
则是
lessor
实例的变量,用于传递取消信号。在创建
lessor
实例时,
stopCtx
是由
context.WithCancel()
函数创建的。这个函数会返回两个对象:一个带有取消方法的
context.Context
对象(即
stopCtx
),以及一个函数对象
stopCancel
,调用这个函数会取消上下文对象。具体如下:

// 创建Lease实例
func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
   // ...省略一些无关内容
   reqLeaderCtx := WithRequireLeader(context.Background())
   // 通过withCancel函数创建cancelCtx对象
   l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
   return l
}

在 

lessor.Close()
 函数中,我们调用 
stopCancel()
 函数来发送取消信号。

func (l *lessor) Close() error {
   l.stopCancel()
   // close for synchronous teardown if stream goroutines never launched
   // 省略无关内容
   return nil
}

因为

sendKeepAliveLoop()
协程会在
stopCtx
上等待信号,所以一旦调用了
stopCancel()
,协程会收到信号并退出。这个机制非常灵活,因为
stopCtx
是实例的成员变量,所以
lessor
实例创建的所有协程,都可以通过监听
stopCtx
来决定是否要退出执行。

5.总结

这篇文章主要介绍了为什么需要主动关闭

goroutine
,以及在Go语言中关闭
goroutine
的常见套路。

文章首先介绍了为什么需要主动关闭

goroutine
。接下来,文章详细介绍了
Go
语言中关闭
goroutine
的常见套路,包括传递终止信号和协程内部捕捉终止信号。在传递终止信号的方案中,文章介绍了如何使用
context
对象传递信号,并使用
select
语句等待信号。在协程内部捕捉终止信号的方案中,文章介绍了如何使用
defer
语句来回收资源。

最后,文章列举了需要主动关闭协程运行的常见场景,如协程在执行一个不断重复的任务,在不再需要继续执行下去的话,就需要主动关闭协程的执行。希望通过本文的介绍,读者能够掌握如何在适当的时候关闭

goroutine
,从而避免资源浪费的问题。

相关文章