日常开发中,最常见的就是处理并发问题,比如要完成一个业务操作,需要读取多个数据源(Map), 得到所有数据之后再进行汇总(Reduce)操作,最终完成业务操作.如果某个数据源读取操作失败, 整个业务操作当失败处理.

例如,需要同时拿到 redis 和 mysql 的数据才能继续业务流程,常见写法如下:

func queryRedisOld(value *int, err chan error) {
    if value == nil {
        err<-fmt.Errorf("null pointer")
        return
    }
    *value = 10
    err<-nil
}

func queryMySQLOld(value *int, err chan error) {
    if value == nil {
        err<-fmt.Errorf("null pointer")
        return
    }
    *value = 20
    err<-nil
}

func oldFashion() {
    var r, m int
    errR, errM := make(chan error), make(chan error)
    go queryRedisOld(&r, errR)
    go queryMySQLOld(&m, errM)

    if err := <-errR; err != nil {
        fmt.Println(err)
        return
    }

    if err := <-errM; err != nil {
        fmt.Println(err)
        return
    }

    fmt.Printf("r = %d, m = %d\n", r, m)
}

func main() {
    oldFashion()
}

tomb

tomb 能更优雅的处理这种情况, tomb.Gos(...)能并行执行多个 操作:

package main

import (
    "fmt"

    "github.com/xsyr/tomb"
)

func queryRedis(value *int) error {
    if value == nil {
        return fmt.Errorf("null pointer")
    }
    *value = 10
    return nil
}

func queryMySQL(value *int) error {
    if value == nil {
        return fmt.Errorf("null pointer")
    }
    *value = 20
    return nil
}

func newFashion() {
    var t tomb.Tomb

    var r, m int
    t.Gos(func() error { return queryRedis(&r) },
          func() error { return queryMySQL(&m) })
    err := t.Wait()
    if err != nil {
        fmt.Println(err)
    } else {
        // do something ...
        fmt.Printf("r = %d, m = %d\n", r, m)
    }
}

func main() {
    newFashion()
}

tomb.Go(…) 的坑

tomb.Go(...) 实现如下:

func (t *Tomb) Go(f func() error) {
	t.init()
	t.m.Lock()
	defer t.m.Unlock()
	select {
	case <-t.dead:
		panic("tomb.Go called after all goroutines terminated")
	default:
	}
	t.alive++
	go t.run(f)
}

留意 panic这句, 接口文档也说了Calling the Go method after all tracked goroutines return causes a runtime panic,但在实际编码过程中,根本就很难保证在调用 tomb.Go(...)的时候之前提交的操作没有完全执行完,所以在使用这个库的时候会存在 panic 的风险. 虽然可以用 recover 捕获panic,但会增加代码量和代码复杂度.所以库里新增 tomb.Gos(...) 接口,用于同时提交多个并发执行的操作,保证不会出现 panic:


// Gos runs multiple f at a time and tracks its termination.
func (t *Tomb) Gos(fs ...Fn) {
	t.init()
	t.m.Lock()
	defer t.m.Unlock()
	select {
	case <-t.dead:
		panic("tomb.Go called after all goroutines terminated")
	default:
	}

    for _, f := range fs {
        t.alive++
        go t.run(f)
    }
}