Skip to content

Commit

Permalink
prepare for release
Browse files Browse the repository at this point in the history
  • Loading branch information
CAFxX committed Dec 8, 2022
1 parent d11ef4d commit d54975c
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 3 deletions.
60 changes: 57 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# fwp (fast worker pool)
# fwp

A simple, fast bounded worker pool with unlimited work queue.
[![GoDoc](https://godoc.org/github.com/CAFxX/fwp?status.svg)](https://godoc.org/github.com/CAFxX/fwp)

`fwp` (fast worker pool) is a simple, very fast bounded worker pool with an unlimited work queue.

When the worker pool is idle it consumes no memory (or goroutines).

## Usage

```go
// A worker pool with up to 1000 workers.
p := fwp.WorkerPool{Max: 1000}

p.Go(func() {
// ...
})
p.Go(func() {
// ...
})
// ...
```

If you need to wait for completion:
Expand All @@ -27,8 +36,42 @@ p.Go(func() {
wg.Wait()
```

It is possible to submit tasks from inside other tasks:

```go
p := fwp.WorkerPool{Max: 1000}

p.Go(func() {
p.Go(func() {
// ...
})
// ...
})
```

If tasks depend on each other it is recommended, to prevent deadlocks
that may be caused by `Max` tasks becoming blocked at the same time,
to resubmit tasks (instead of blocking) in case a task is executed
before its dependencies are ready:

```go
p := fwp.WorkerPool{Max: 1000}

var fn func()
fn = func() {
if some_precondition_is_not_yet_met {
p.Go(fn)
return
}
// ...
}
p.Go(fn)
```

## Performance

`fwp` is pretty fast. Indeed it is faster than any other workerpool tested, and for high volumes of short tasks it can even be faster than spawning goroutines without a semaphore:

```
name time/op
FastWorkerPool-6 242ns ± 4%
Expand All @@ -43,6 +86,17 @@ GoroutineXSyncSemaphore-6 1.66µs ± 5%
Goroutine-6 266ns ±18%
```

The performance is due to three factors:

- Goroutines are reused to process multiple tasks (this minimizes
allocation of new goroutines as well as stack growths).
- The length of critical sections is kept as short as possible
(this minimizes contention on the mutex that guards the internals
of the worker pool).
- The internal behavior of the pool is adaptive to the workload,
with 2 different regimes selected automatically based on the
number and duration of tasks submitted.

## License

MIT
[MIT](LICENSE)
54 changes: 54 additions & 0 deletions fast.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,35 @@ import (
"sync"
)

// WorkerPool is a worker pool with bounded workers (up to Max tasks
// can be running concurrently) and unbounded queue (no limit to the
// number of tasks waiting for a worker to become available). This is
// similar to how the native `go` construct operates, but with an
// (optional) bound to the number of tasks executed concurrently.
type WorkerPool struct {
// Max is the maximum number of tasks being executed concurrently.
// A value of 0 means no limit, i.e. the WorkerPool behaves exactly
// like the native `go` construct.
Max int
m sync.Mutex
n int
q cbuf
}

// Go submits a task for asynchronous execution by the worker
// pool. It is similar to the native `go` construct.
//
// The task will be processed by one the pool workers as
// soon as one becomes available. WorkerPool (similarly to Go
// goroutines) provides no guarantees about the order in which
// tasks are executed (if you need such guarantees, use external
// synchronization mechanisms, but taking care to not cause
// deadlocks; you can do this by resubmitting a task to be
// executed later in case some resources can not be acquired).
//
// To wait for one or more tasks to complete use an explicit
// synchronization mechanism such as channels, sync.WaitGroup,
// or similar.
func (s *WorkerPool) Go(fn func()) {
if s.Max <= 0 {
go fn()
Expand Down Expand Up @@ -52,6 +74,31 @@ func (s *WorkerPool) worker(fn func()) {
}
}

// Stats returns statistics about the worker pool.
func (s *WorkerPool) Stats() Stats {
s.m.Lock()
r := Stats{
Running: s.n,
Queued: s.q.len(),
}
s.m.Unlock()
return r
}

// Stats contains statistics about the worker pool.
type Stats struct {
// Running is the number of tasks currently being run.
// It is never greater than the number of Max workers.
Running int
// Queued is the number of tasks currently queued, waiting
// for a worker to become available for processing.
// This number is only bound by the amount of memory available.
//
// The total number of tasks in the worker pool is therefore
// Queued+Running.
Queued int
}

type cbuf struct {
e []func()
r int
Expand Down Expand Up @@ -91,6 +138,13 @@ func (c *cbuf) get() (func(), bool) {
return v, true
}

func (c *cbuf) len() int {
if c.w >= c.r {
return c.w - c.r
}
return len(c.e) - c.r + c.w
}

func (c *cbuf) reset() {
*c = cbuf{}
}
Expand Down

0 comments on commit d54975c

Please sign in to comment.