标签:Group errGroup sync golang 并发 cancel 超时
需求
在并发控制中,想实现以下功能
1、并发超时控制
2、一个出错,主程序退出
3、兼容errGroup
然后对errGroup进行一次改写
package utils import ( "context" "errors" "fmt" "sync" "time" ) type token struct{} // A Group is a collection of goroutines working on subtasks that are part of // the same overall task. // // A zero Group is valid, has no limit on the number of active goroutines, // and does not cancel on error. type Group struct { cancel func() wg sync.WaitGroup sem chan token errOnce sync.Once err error unsafe bool ctx context.Context } var errTimeout = errors.New("超时了") func (g *Group) done() { if g.sem != nil { <-g.sem } g.wg.Done() } // WithContext returns a new Group and an associated Context derived from ctx. // // The derived Context is canceled the first time a function passed to Go // returns a non-nil error or the first time Wait returns, whichever occurs // first. func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := context.WithCancel(ctx) return &Group{cancel: cancel, unsafe: true, ctx: ctx}, ctx } // GroupTimeoutContext 超时设置 func GroupTimeoutContext(parent context.Context, timeout time.Duration) (*Group, context.Context) { ctx, cancel := context.WithTimeout(parent, timeout) return &Group{cancel: cancel, unsafe: true, ctx: ctx}, ctx } // 超时设置 func GroupTimeout(parent context.Context, timeout time.Duration) *Group { ctx, cancel := context.WithTimeout(parent, timeout) return &Group{cancel: cancel, unsafe: true, ctx: ctx} } // UnsafeGroup 获取不安全的同步锁(出错就退出) func UnsafeGroup() *Group { var g Group g.unsafe = true g.ctx, g.cancel = context.WithCancel(context.Background()) return &g } // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { if g.unsafe { monitor := make(chan struct{}) go func() { g.wg.Wait() close(monitor) }() select { case <-g.ctx.Done(): g.errOnce.Do(func() { if nil == g.err { g.err = errTimeout } }) case <-monitor: } } else { g.wg.Wait() } if g.cancel != nil { g.cancel() } return g.err } // Go calls the given function in a new goroutine. // It blocks until the new goroutine can be added without the number of // active goroutines in the group exceeding the configured limit. // // The first call to return a non-nil error cancels the group; its error will be // returned by Wait. func (g *Group) Go(f func() (err error)) { if g.sem != nil { g.sem <- token{} } if g.unsafe && g.cancel == nil { g.ctx, g.cancel = context.WithCancel(context.Background()) } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { if nil == g.err { g.err = err } if g.cancel != nil { g.cancel() } }) } }() } // TryGo calls the given function in a new goroutine only if the number of // active goroutines in the group is currently below the configured limit. // // The return value reports whether the goroutine was started. func (g *Group) TryGo(f func() error) bool { if g.sem != nil { select { case g.sem <- token{}: // Note: this allows barging iff channels in general allow barging. default: return false } } if g.unsafe && g.cancel == nil { g.ctx, g.cancel = context.WithCancel(context.Background()) } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { if nil == g.err { g.err = err } if g.cancel != nil { g.cancel() } }) } }() return true } // SetLimit limits the number of active goroutines in this group to at most n. // A negative value indicates no limit. // // Any subsequent call to the Go method will block until it can add an active // goroutine without exceeding the configured limit. // // The limit must not be modified while any goroutines in the group are active. func (g *Group) SetLimit(n int) { if n < 0 { g.sem = nil return } if len(g.sem) != 0 { panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) } g.sem = make(chan token, n) } // IsErr 判断是否执行出错 func (g *Group) IsErr(err error, message ...interface{}) bool { // 有错误直接返回 if g.err != nil { return true } // 无错误直接返回 if err == nil { return false } switch len(message) { case 0: g.errOnce.Do(func() { if nil == g.err { g.err = err } if g.cancel != nil { g.cancel() } }) case 1: g.errOnce.Do(func() { if nil == g.err { g.err = fmt.Errorf("%v:%w", message[0], err) } if g.cancel != nil { g.cancel() } }) default: // 格式化输出 g.errOnce.Do(func() { if nil == g.err { s, _ := message[0].(string) g.err = fmt.Errorf("%s:%w", fmt.Sprintf(s, message[1:]...), err) } if g.cancel != nil { g.cancel() } }) } return true } func IsTimeout(err error) bool { if err == nil { return false } return errors.Is(err, errTimeout) }
标签:Group,errGroup,sync,golang,并发,cancel,超时 来源: https://www.cnblogs.com/hardykay/p/16597300.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。