日常工作中时常需要写一些一次性并发执行的任务,但又需要根据相关资源的负载情况对并发任务数进行增减, 通常最简单的方法就是停止进程,修改配置,再重启.这样做有点麻烦的是需要记录当前处理进度,避免再次重启 的时候重复执行已完成的操作.
limitworker 用于动态的控制并发的任务数量, 可通过对 fifo 文件的操作增减并发的任务量.
demo
package main
import (
"fmt"
"time"
"github.com/xsyr/limitworker"
)
func foo(id int, dying <-chan struct{}) error {
i := 0
for {
quit := false
select {
case <-dying: quit = true
default:
}
if quit { break }
fmt.Printf("[%d] foo\n", id)
time.Sleep(1 * time.Second)
i++
if i == 20 { break }
}
fmt.Printf("[%d] foo quit\n", id)
return nil
}
func main() {
lw, err := limitworker.New(2, "ctrl", "ctrl.log", foo)
if err != nil {
fmt.Println(err)
return
}
lw.Wait()
lw.Close()
}
增加并发任务
$ echo '+2' > ctrl
ctrl.log
记录并发任务的变化情况:
2017/09/24 10:53:43 limitworker.go:206: delta: +2
2017/09/24 10:53:43 limitworker.go:106: [3]+ (running: 3, termErr: 0, termOk: 0)
2017/09/24 10:53:43 limitworker.go:106: [4]+ (running: 4, termErr: 0, termOk: 0)
减少并发任务
$ echo '-3' > ctrl
ctrl.log
记录并发任务的变化情况:
2017/09/24 10:53:46 limitworker.go:126: [3]- (running: 3, termErr: 0, termOk: 1)
2017/09/24 10:53:46 limitworker.go:126: [1]- (running: 2, termErr: 0, termOk: 2)
2017/09/24 10:53:46 limitworker.go:126: [2]- (running: 1, termErr: 0, termOk: 3)
2017/09/24 10:53:46 limitworker.go:206: delta: -3