// dataChan 值接收结构体 type dataChan struct { Data interface{} Err error }
// DataFn 值方法 type DataFn func()(interface{}, error)
// GetDataChansWithCtx 根据值方法列表返回值接收通道, 附带超时控制 funcGetDataChansWithCtx(ctx context.Context, fns []DataFn)chan *dataChan { result := make(chan *dataChan, len(fns)) for _, fn := range fns { gofunc(fn DataFn) { var resp interface{} var err error done := make(chanstruct{}) // create channel with buffer size 1 to avoid goroutine leak panicChan := make(chaninterface{}, 1)
gofunc() { deferfunc() { if p := recover(); p != nil { panicChan <- p } }()
resp, err = fn() close(done) }()
select { case p := <-panicChan: panic(p) case <-ctx.Done(): result <- &dataChan{ Data: nil, Err: ctx.Err(), } case <-done: result <- &dataChan{ Data: resp, Err: err, } } }(fn) } return result }
// GetDataChans 根据值方法列表返回值接收通道 funcGetDataChans(fns []DataFn)chan *dataChan { result := make(chan *dataChan, len(fns)) for _, fn := range fns { gofunc(fn DataFn) { data, err := fn() result <- &dataChan{ Data: data, Err: err, } }(fn) } return result }
funcrun(ans string, datas chan *dataChan, n int)(string, error) { for i := 0; i < n; i++ { data, ok := <-datas if !ok { return"", errors.New("chan close") }
if err := data.Err; err != nil { return"", err }
switch data.Data.(type) { case A: ans += string(data.Data.(A)) case B: ans += string(data.Data.(B)) case C: ans += string(data.Data.(C)) } }