-
Notifications
You must be signed in to change notification settings - Fork 25
/
parallel.go
122 lines (109 loc) · 2.63 KB
/
parallel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package parallel
import (
"sync"
"time"
)
// Parallel instance, which executes pipelines by parallel
type Parallel struct {
wg *sync.WaitGroup
pipes []*Pipeline
wgChild *sync.WaitGroup
children []*Parallel
exception *Handler
}
// NewParallel creates a new Parallel instance
func NewParallel() *Parallel {
res := new(Parallel)
res.wg = new(sync.WaitGroup)
res.wgChild = new(sync.WaitGroup)
res.pipes = make([]*Pipeline, 0, 10)
return res
}
// Except set the exception handling routine, when unexpected panic occur
// this routine will be executed.
func (p *Parallel) Except(f interface{}, args ...interface{}) *Handler {
h := NewHandler(f, args...)
p.exception = h
return h
}
// Register add a new pipeline with a single handler info parallel
func (p *Parallel) Register(f interface{}, args ...interface{}) *Handler {
return p.NewPipeline().Register(f, args...)
}
// NewPipeline create a new pipeline of parallel
func (p *Parallel) NewPipeline() *Pipeline {
pipe := NewPipeline()
p.Add(pipe)
return pipe
}
// Add add new pipelines to parallel
func (p *Parallel) Add(pipes ...*Pipeline) *Parallel {
p.wg.Add(len(pipes))
p.pipes = append(p.pipes, pipes...)
return p
}
// NewChild create a new child of p
func (p *Parallel) NewChild() *Parallel {
child := NewParallel()
child.exception = p.exception
p.AddChildren(child)
return child
}
// AddChildren add children to parallel to handle dependency
func (p *Parallel) AddChildren(children ...*Parallel) *Parallel {
p.wgChild.Add(len(children))
p.children = append(p.children, children...)
return p
}
// Run start up all the jobs
func (p *Parallel) Run() {
for _, child := range p.children {
// this func will never panic
go func(ch *Parallel) {
ch.Run()
p.wgChild.Done()
}(child)
}
p.wgChild.Wait()
p.do()
p.wg.Wait()
}
// Do just do it
func (p *Parallel) do() {
// if only one pipeline no need go routines
if len(p.pipes) == 1 {
p.secure(p.pipes[0])
return
}
for _, pipe := range p.pipes {
go p.secure(pipe)
}
}
// exec pipeline safely
func (p *Parallel) secure(pipe *Pipeline) {
defer func() {
err := recover()
if err != nil {
if err == ErrArgNotFunction || err == ErrInArgLenNotMatch || err == ErrOutArgLenNotMatch || err == ErrRecvArgTypeNotPtr || err == ErrRecvArgNil {
panic(err)
}
if p.exception != nil {
p.exception.OnExcept(err)
}
}
p.wg.Done()
}()
pipe.Do()
}
// RunWithTimeOut start up all the jobs, and time out after d duration
func (p *Parallel) RunWithTimeOut(d time.Duration) {
success := make(chan struct{}, 1)
go func() {
p.Run()
success <- struct{}{}
}()
select {
case <-success:
case <-time.After(d):
}
}