星澜

天接云涛连晓雾,星河欲转千帆舞

Go 并发控制模式与实战


前言

在实际的 Go 开发中,我们经常会遇到需要精细化控制并发的场景:防止缓存击穿、控制并发数量、资源复用等。本文将介绍几种常见的并发控制模式,并使用纯 Go 标准库实现,不依赖任何第三方框架。

一、SingleFlight - 防止缓存击穿

问题场景

当多个并发请求同时访问一个不存在的缓存时,所有请求都会穿透到数据库,造成数据库压力骤增:

// ❌ 问题代码
func GetUser(userID string) (*User, error) {
    // 1. 先查缓存
    user, err := cache.Get(userID)
    if err == nil {
        return user, nil
    }

    // 2. 缓存未命中,查询数据库
    // 如果有 100 个并发请求,会同时查询数据库 100 次!
    user, err = db.Query(userID)
    if err != nil {
        return nil, err
    }

    // 3. 写入缓存
    cache.Set(userID, user)
    return user, nil
}

解决方案

SingleFlight 模式:多个并发请求访问同一个资源时,只让第一个请求真正执行,其他请求等待并共享第一个请求的结果。

package main

import (
    "fmt"
    "sync"
    "time"
)

// call 代表一个正在执行的调用
type call struct {
    wg  sync.WaitGroup
    val interface{}
    err error
}

// SingleFlight 实现请求合并
type SingleFlight struct {
    mu    sync.Mutex
    calls map[string]*call
}

func NewSingleFlight() *SingleFlight {
    return &SingleFlight{
        calls: make(map[string]*call),
    }
}

// Do 执行函数,相同 key 的并发请求会共享结果
func (sf *SingleFlight) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    sf.mu.Lock()

    // 检查是否已经有请求在执行
    if c, ok := sf.calls[key]; ok {
        sf.mu.Unlock()
        c.wg.Wait() // 等待第一个请求完成
        return c.val, c.err
    }

    // 第一个请求,创建 call
    c := &call{}
    c.wg.Add(1)
    sf.calls[key] = c
    sf.mu.Unlock()

    // 执行函数
    c.val, c.err = fn()

    // 清理并通知等待的协程
    sf.mu.Lock()
    delete(sf.calls, key)
    sf.mu.Unlock()
    c.wg.Done()

    return c.val, c.err
}

// DoEx 执行函数并返回是否是首次执行(fresh = true 表示当前请求执行了函数)
func (sf *SingleFlight) DoEx(key string, fn func() (interface{}, error)) (val interface{}, fresh bool, err error) {
    sf.mu.Lock()

    if c, ok := sf.calls[key]; ok {
        sf.mu.Unlock()
        c.wg.Wait()
        return c.val, false, c.err // 共享结果
    }

    c := &call{}
    c.wg.Add(1)
    sf.calls[key] = c
    sf.mu.Unlock()

    c.val, c.err = fn()

    sf.mu.Lock()
    delete(sf.calls, key)
    sf.mu.Unlock()
    c.wg.Done()

    return c.val, true, c.err // 新鲜结果
}

使用示例

type UserService struct {
    sf *SingleFlight
    db *DB
    cache *Cache
}

func NewUserService(db *DB, cache *Cache) *UserService {
    return &UserService{
        sf:    NewSingleFlight(),
        db:    db,
        cache: cache,
    }
}

func (s *UserService) GetUser(userID string) (*User, error) {
    // 先查缓存
    if user, err := s.cache.Get(userID); err == nil {
        return user, nil
    }

    // 使用 SingleFlight 查询数据库
    // 即使有 1000 个并发请求,也只会查询数据库 1 次
    val, err := s.sf.Do(userID, func() (interface{}, error) {
        user, err := s.db.Query(userID)
        if err != nil {
            return nil, err
        }

        // 写入缓存
        s.cache.Set(userID, user)
        return user, nil
    })

    if err != nil {
        return nil, err
    }

    return val.(*User), nil
}

// 使用 DoEx 区分是否为新鲜结果
func (s *UserService) GetUserEx(userID string) (*User, bool, error) {
    if user, err := s.cache.Get(userID); err == nil {
        return user, false, nil
    }

    val, fresh, err := s.sf.DoEx(userID, func() (interface{}, error) {
        return s.db.Query(userID)
    })

    if fresh {
        fmt.Printf("从数据库查询用户: %s\n", userID)
    }

    if err != nil {
        return nil, false, err
    }

    return val.(*User), fresh, nil
}

性能对比

// 测试:100 个协程同时查询同一个用户
func BenchmarkWithoutSingleFlight(b *testing.B) {
    for i := 0; i < b.N; i++ {
        var wg sync.WaitGroup
        for j := 0; j < 100; j++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                // 每个协程都查询数据库
                db.Query("user123")
            }()
        }
        wg.Wait()
    }
}

func BenchmarkWithSingleFlight(b *testing.B) {
    sf := NewSingleFlight()
    for i := 0; i < b.N; i++ {
        var wg sync.WaitGroup
        for j := 0; j < 100; j++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                // 只有第一个协程查询数据库,其他共享结果
                sf.Do("user123", func() (interface{}, error) {
                    return db.Query("user123")
                })
            }()
        }
        wg.Wait()
    }
}

// 结果:
// BenchmarkWithoutSingleFlight    100     50000000 ns/op   (100 次 DB 查询)
// BenchmarkWithSingleFlight       10000   500000 ns/op     (1 次 DB 查询,性能提升 100 倍)

注意事项

二、LockedCalls - 按 Key 串行化执行

与 SingleFlight 的区别

特性 SingleFlight LockedCalls
执行次数 只执行一次 每个请求都执行
结果 共享第一个请求的结果 各自独立的结果
使用场景 读操作 写操作
// SingleFlight: 多个请求共享结果
A ---> 执行 fn() --------> 返回 result_A
B ---> 等待 ------------> 返回 result_A (共享)
C ---> 等待 ------------> 返回 result_A (共享)

// LockedCalls: 每个请求独立执行
A ---> 执行 fn() --------> 返回 result_A
B ---> 等待 ---> 执行 fn() --> 返回 result_B (独立)
C ---> 等待 -----------> 执行 fn() -> 返回 result_C (独立)

实现代码

package main

import "sync"

type LockedCalls struct {
    mu sync.Mutex
    m  map[string]*sync.WaitGroup
}

func NewLockedCalls() *LockedCalls {
    return &LockedCalls{
        m: make(map[string]*sync.WaitGroup),
    }
}

func (lc *LockedCalls) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
begin:
    lc.mu.Lock()

    // 检查该 key 是否正在被处理
    if wg, ok := lc.m[key]; ok {
        lc.mu.Unlock()
        wg.Wait() // 等待其完成
        goto begin // 重新尝试获取锁
    }

    // 当前协程获得执行权
    wg := &sync.WaitGroup{}
    wg.Add(1)
    lc.m[key] = wg
    lc.mu.Unlock()

    // 执行函数
    val, err := fn()

    // 清理并通知
    lc.mu.Lock()
    delete(lc.m, key) // 注意:必须先 delete 再 Done
    lc.mu.Unlock()
    wg.Done()

    return val, err
}

使用示例 1:用户账户操作

type AccountService struct {
    lc *LockedCalls
    db *DB
}

func NewAccountService(db *DB) *AccountService {
    return &AccountService{
        lc: NewLockedCalls(),
        db: db,
    }
}

// 同一用户的充值操作必须串行执行
func (s *AccountService) Recharge(userID string, amount int64) error {
    _, err := s.lc.Do(userID, func() (interface{}, error) {
        // 1. 查询当前余额
        var balance int64
        err := s.db.QueryRow("SELECT balance FROM accounts WHERE user_id = ?", userID).Scan(&balance)
        if err != nil {
            return nil, err
        }

        // 2. 更新余额
        newBalance := balance + amount
        _, err = s.db.Exec("UPDATE accounts SET balance = ? WHERE user_id = ?", newBalance, userID)
        if err != nil {
            return nil, err
        }

        // 3. 记录日志
        _, err = s.db.Exec("INSERT INTO recharge_logs (user_id, amount) VALUES (?, ?)", userID, amount)
        return nil, err
    })

    return err
}

为什么不能用 SingleFlight?

// ❌ 错误示例:使用 SingleFlight
用户 A 发起充值 100 元(请求1)
用户 A 发起充值 200 元(请求2,几乎同时)

使用 SingleFlight:
- 请求1 执行,充值 100 元
- 请求2 等待,共享请求1 的结果
- 结果:只充值了 100 元!请求2 的 200 元丢失了!

使用 LockedCalls:
- 请求1 执行,充值 100 元
- 请求2 等待请求1 完成后,独立执行,充值 200 元
- 结果:正确充值了 300 元 ✅

使用示例 2:文件操作互斥

type FileService struct {
    lc *LockedCalls
}

func (s *FileService) AppendToFile(filename string, content string) error {
    _, err := s.lc.Do(filename, func() (interface{}, error) {
        // 同一文件的多个写操作串行化
        f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
        if err != nil {
            return nil, err
        }
        defer f.Close()

        _, err = f.WriteString(content + "\n")
        return nil, err
    })

    return err
}

性能对比

// 测试:1000 个并发请求,操作 100 个不同的 key,每个操作耗时 10ms

// 方案1:使用全局锁
var globalMutex sync.Mutex
func updateWithGlobalLock(key string) {
    globalMutex.Lock()
    defer globalMutex.Unlock()
    // 所有 key 的更新都被串行化
    time.Sleep(10 * time.Millisecond)
}
// 总耗时:10,000ms (完全串行)

// 方案2:使用 LockedCalls
lc := NewLockedCalls()
func updateWithLockedCalls(key string) {
    lc.Do(key, func() (interface{}, error) {
        time.Sleep(10 * time.Millisecond)
        return nil, nil
    })
}
// 总耗时:100ms (100 个 key 并行执行,每个 key 内部串行)
// 性能提升:100 倍

三、并发数限制 - Limit

问题场景

有时候我们需要限制同时执行的任务数量,避免资源耗尽:

// ❌ 问题代码:批量处理 10000 个任务
func ProcessTasks(tasks []Task) {
    var wg sync.WaitGroup
    for _, task := range tasks {
        wg.Add(1)
        go func(t Task) {
            defer wg.Done()
            t.Process() // 可能创建 10000 个协程,资源耗尽
        }(task)
    }
    wg.Wait()
}

解决方案

使用 channel 实现信号量,控制并发数:

package main

type Limit struct {
    pool chan struct{}
}

func NewLimit(n int) *Limit {
    return &Limit{
        pool: make(chan struct{}, n),
    }
}

// Borrow 获取许可(阻塞式)
func (l *Limit) Borrow() {
    l.pool <- struct{}{}
}

// TryBorrow 尝试获取许可(非阻塞式)
func (l *Limit) TryBorrow() bool {
    select {
    case l.pool <- struct{}{}:
        return true
    default:
        return false
    }
}

// Return 归还许可
func (l *Limit) Return() {
    <-l.pool
}

使用示例 1:限制并发请求数

type APIClient struct {
    client *http.Client
    limit  *Limit
}

func NewAPIClient(maxConcurrent int) *APIClient {
    return &APIClient{
        client: &http.Client{Timeout: 10 * time.Second},
        limit:  NewLimit(maxConcurrent),
    }
}

func (c *APIClient) BatchRequest(urls []string) []Result {
    results := make([]Result, len(urls))
    var wg sync.WaitGroup

    for i, url := range urls {
        wg.Add(1)
        go func(index int, u string) {
            defer wg.Done()

            // 1. 获取许可(如果超过限制,会在此阻塞)
            c.limit.Borrow()
            defer c.limit.Return()

            // 2. 发起请求
            resp, err := c.client.Get(u)
            if err != nil {
                results[index] = Result{Error: err}
                return
            }
            defer resp.Body.Close()

            // 3. 处理响应
            body, _ := io.ReadAll(resp.Body)
            results[index] = Result{Data: body}
        }(i, url)
    }

    wg.Wait()
    return results
}

// 使用
client := NewAPIClient(10) // 最多 10 个并发请求
urls := make([]string, 100) // 100 个 URL
results := client.BatchRequest(urls) // 实际会分 10 批执行

使用示例 2:非阻塞式流量控制

func RateLimitedHandler(w http.ResponseWriter, r *http.Request) {
    // 尝试获取许可,不阻塞
    if !limit.TryBorrow() {
        http.Error(w, "Too many concurrent requests", http.StatusTooManyRequests)
        return
    }
    defer limit.Return()

    // 处理请求
    // ...
}

使用示例 3:结合 Context 的超时控制

func RequestWithTimeout(ctx context.Context, url string) (*Response, error) {
    // 创建一个带超时的 channel
    done := make(chan struct{})

    go func() {
        limit.Borrow()
        close(done)
    }()

    // 等待获取许可或超时
    select {
    case <-done:
        defer limit.Return()
        return doRequest(ctx, url)
    case <-ctx.Done():
        return nil, ctx.Err()
    }
}

四、对象池 - Pool

与 sync.Pool 的区别

Go 标准库的 sync.Pool 很强大,但有局限性:

特性 sync.Pool 自定义 Pool
容量限制 无限制 可设置最大容量
对象过期 不支持 支持最大空闲时间
自定义销毁 不支持 支持 destroy 回调
阻塞获取 不支持 支持阻塞等待
GC 行为 会被 GC 清空 不会被 GC 清空

实现代码

package main

import (
    "sync"
    "time"
)

type Pool struct {
    limit   int                        // 最大容量
    created int                        // 已创建的对象数
    maxAge  time.Duration              // 最大空闲时间
    lock    sync.Mutex
    cond    *sync.Cond                 // 条件变量
    head    *node                      // 空闲对象链表
    create  func() interface{}         // 创建函数
    destroy func(interface{})          // 销毁函数
}

type node struct {
    item     interface{}
    next     *node
    lastUsed time.Time
}

func NewPool(limit int, create func() interface{}, destroy func(interface{}), maxAge time.Duration) *Pool {
    p := &Pool{
        limit:   limit,
        maxAge:  maxAge,
        create:  create,
        destroy: destroy,
    }
    p.cond = sync.NewCond(&p.lock)
    return p
}

func (p *Pool) Get() interface{} {
    p.lock.Lock()
    defer p.lock.Unlock()

    for {
        // case 1: 有空闲对象
        if p.head != nil {
            head := p.head
            p.head = head.next

            // 检查是否过期
            if p.maxAge > 0 && time.Since(head.lastUsed) > p.maxAge {
                p.created--
                p.destroy(head.item)
                continue // 继续寻找
            }

            return head.item
        }

        // case 2: 可以创建新对象
        if p.created < p.limit {
            p.created++
            return p.create()
        }

        // case 3: 池已满,等待其他协程归还
        p.cond.Wait()
    }
}

func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }

    p.lock.Lock()
    defer p.lock.Unlock()

    // 将对象放回链表头部
    p.head = &node{
        item:     x,
        next:     p.head,
        lastUsed: time.Now(),
    }

    // 唤醒一个等待的协程
    p.cond.Signal()
}

使用示例:数据库连接池

type DBPool struct {
    pool *Pool
}

func NewDBPool(maxConns int, connStr string) *DBPool {
    return &DBPool{
        pool: NewPool(
            maxConns,
            // 创建连接
            func() interface{} {
                conn, err := sql.Open("mysql", connStr)
                if err != nil {
                    fmt.Printf("创建连接失败: %v\n", err)
                    return nil
                }
                return conn
            },
            // 销毁连接
            func(x interface{}) {
                if conn, ok := x.(*sql.DB); ok {
                    conn.Close()
                }
            },
            5*time.Minute, // 5 分钟空闲超时
        ),
    }
}

func (p *DBPool) Exec(query string, args ...interface{}) error {
    conn := p.pool.Get().(*sql.DB)
    defer p.pool.Put(conn)

    _, err := conn.Exec(query, args...)
    return err
}

func (p *DBPool) Use(fn func(*sql.DB) error) error {
    conn := p.pool.Get().(*sql.DB)
    defer p.pool.Put(conn)
    return fn(conn)
}

// 使用
dbPool := NewDBPool(20, "user:pass@tcp(127.0.0.1:3306)/db")

err := dbPool.Use(func(conn *sql.DB) error {
    return conn.QueryRow("SELECT * FROM users WHERE id = ?", 123).Scan(&user)
})

性能对比

// 测试:1000 次创建 1MB 缓冲区

type LargeBuffer struct {
    data [1024 * 1024]byte // 1MB
}

// 不使用对象池
func BenchmarkWithoutPool(b *testing.B) {
    for i := 0; i < b.N; i++ {
        for j := 0; j < 1000; j++ {
            buf := &LargeBuffer{}
            _ = buf
        }
    }
}

// 使用对象池
func BenchmarkWithPool(b *testing.B) {
    pool := NewPool(
        100, // 最多保留 100 个缓冲区
        func() interface{} { return &LargeBuffer{} },
        func(x interface{}) {},
        5*time.Minute,
    )

    for i := 0; i < b.N; i++ {
        for j := 0; j < 1000; j++ {
            buf := pool.Get().(*LargeBuffer)
            pool.Put(buf)
        }
    }
}

// 结果:
// BenchmarkWithoutPool    内存分配: 1GB,GC: ~50 次,耗时: ~500ms
// BenchmarkWithPool       内存分配: 100MB,GC: ~5 次,耗时: ~50ms
// 性能提升:10 倍,内存占用降低 90%

五、其他实用模式

5.1 Barrier - 简化锁操作

type Barrier struct {
    lock sync.Mutex
}

func (b *Barrier) Guard(fn func()) {
    b.lock.Lock()
    defer b.lock.Unlock()
    fn()
}

// 使用示例
var counter int
var barrier Barrier

// 传统方式
mutex.Lock()
counter++
mutex.Unlock()

// 使用 Barrier(更简洁,不会忘记 Unlock)
barrier.Guard(func() {
    counter++
})

5.2 带超时的条件变量

type Cond struct {
    signal chan struct{}
}

func NewCond() *Cond {
    return &Cond{
        signal: make(chan struct{}, 1),
    }
}

func (c *Cond) Signal() {
    select {
    case c.signal <- struct{}{}:
    default:
    }
}

func (c *Cond) Wait(timeout time.Duration) bool {
    timer := time.NewTimer(timeout)
    defer timer.Stop()

    select {
    case <-c.signal:
        return true // 收到信号
    case <-timer.C:
        return false // 超时
    }
}

// 使用示例
cond := NewCond()

// 协程1:等待条件
go func() {
    if cond.Wait(5 * time.Second) {
        fmt.Println("收到信号")
    } else {
        fmt.Println("等待超时")
    }
}()

// 协程2:发送信号
time.Sleep(2 * time.Second)
cond.Signal()

总结

模式 使用场景 核心特点
SingleFlight 防缓存击穿、请求合并 多个请求共享一次执行结果
LockedCalls 写操作、顺序保证 按 key 串行化,每个请求独立执行
Limit 并发数控制 基于 channel 的信号量实现
Pool 资源复用 限制容量、支持过期、自定义生命周期

最佳实践

  1. 选择合适的模式

    • 读操作 + 防缓存击穿 → SingleFlight
    • 写操作 + 顺序保证 → LockedCalls
    • 并发限制 → Limit
    • 资源复用 → Pool
  2. 正确处理错误

    // ✅ 记录错误日志
    val, err := sf.Do(key, func() (interface{}, error) {
        data, err := queryDB(key)
        if err != nil {
            log.Printf("查询失败: %v", err)
            return nil, err
        }
        return data, nil
    })
    
  3. 使用 defer 确保资源归还

    // ✅ 推荐模式
    func useResource() error {
        resource := pool.Get()
        defer pool.Put(resource)
        return doWork(resource)
    }
    
  4. 设置合理的参数

    • Pool 容量 = 并发请求数 × 平均处理时间 / 单次操作时间
    • 过期时间:数据库连接 5-10 分钟,临时对象 1-2 分钟

参考资料

  1. Go 并发编程实战
  2. Go 语言设计与实现
  3. golang.org/x/sync