Go 并发模式实战:Channel、Context 与 Worker Pool

Go 的并发模型是它最大的卖点之一。goroutine 很轻量,channel 很优雅,但用不好就会写出比线程池更难调试的代码。

一、goroutine 不是免费的

每个 goroutine 初始栈只有 2-8KB,确实很轻。但如果你不加限制地启动:

for _, url := range urls {
    go func(u string) {
        resp, _ := http.Get(u)
        // 处理响应...
    }(url)
}

10 万个 URL 就是 10 万个 goroutine 同时发 HTTP 请求。即使内存扛得住,文件描述符和连接池也会先崩。

二、Worker Pool 模式

控制并发数的标准做法:

func workerPool(jobs <-chan string, results chan<- string, workers int) {
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobs {
                result := process(job)
                results <- result
            }
        }()
    }
    go func() {
        wg.Wait()
        close(results)
    }()
}

用法:

jobs := make(chan string, 100)
results := make(chan string, 100)

go workerPool(jobs, results, 10) // 10 个 worker

for _, url := range urls {
    jobs <- url
}
close(jobs)

for result := range results {
    fmt.Println(result)
}

三、Context 控制生命周期

context.Context 是 Go 并发控制的核心。它解决三个问题:

  1. **取消传播**:父任务取消时,所有子任务自动取消
  2. **超时控制**:防止任务无限挂起
  3. **值传递**:在 goroutine 链中传递请求级数据
func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()

    body, _ := io.ReadAll(resp.Body)
    return string(body), nil
}

四、Select 多路复用

select 让一个 goroutine 同时等待多个 channel:

select {
case msg := <-ch1:
    fmt.Println("从 ch1 收到:", msg)
case msg := <-ch2:
    fmt.Println("从 ch2 收到:", msg)
case <-time.After(5 * time.Second):
    fmt.Println("超时")
case <-ctx.Done():
    fmt.Println("被取消:", ctx.Err())
}

五、errgroup 批量并发

golang.org/x/sync/errgroup 是处理"并发执行多个任务,任一失败则全部取消"的标准方案:

g, ctx := errgroup.WithContext(context.Background())

for _, url := range urls {
    url := url
    g.Go(func() error {
        return fetch(ctx, url)
    })
}

if err := g.Wait(); err != nil {
    log.Printf("任务失败: %v", err)
}

六、常见陷阱

  • **goroutine 泄漏**:忘记关闭 channel 或没有退出条件,goroutine 永远不会结束
  • **竞态条件**:多个 goroutine 读写同一个变量,必须用 `sync.Mutex` 或 channel 保护
  • **死锁**:所有 goroutine 都在等待对方发送数据,程序卡死

Go 的并发哲学是"不要通过共享内存来通信,而要通过通信来共享内存"。channel 是第一选择,mutex 是最后手段。