From d54975cf3059dc28600d37e7fcfc8c15acb390da Mon Sep 17 00:00:00 2001 From: Carlo Alberto Ferraris Date: Thu, 8 Dec 2022 10:55:25 +0900 Subject: [PATCH] prepare for release --- README.md | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- fast.go | 54 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index aacd7d5..b8f0f9e 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,24 @@ -# fwp (fast worker pool) +# fwp -A simple, fast bounded worker pool with unlimited work queue. +[![GoDoc](https://godoc.org/github.com/CAFxX/fwp?status.svg)](https://godoc.org/github.com/CAFxX/fwp) + +`fwp` (fast worker pool) is a simple, very fast bounded worker pool with an unlimited work queue. + +When the worker pool is idle it consumes no memory (or goroutines). ## Usage ```go +// A worker pool with up to 1000 workers. p := fwp.WorkerPool{Max: 1000} p.Go(func() { // ... }) +p.Go(func() { + // ... +}) +// ... ``` If you need to wait for completion: @@ -27,8 +36,42 @@ p.Go(func() { wg.Wait() ``` +It is possible to submit tasks from inside other tasks: + +```go +p := fwp.WorkerPool{Max: 1000} + +p.Go(func() { + p.Go(func() { + // ... + }) + // ... +}) +``` + +If tasks depend on each other it is recommended, to prevent deadlocks +that may be caused by `Max` tasks becoming blocked at the same time, +to resubmit tasks (instead of blocking) in case a task is executed +before its dependencies are ready: + +```go +p := fwp.WorkerPool{Max: 1000} + +var fn func() +fn = func() { + if some_precondition_is_not_yet_met { + p.Go(fn) + return + } + // ... +} +p.Go(fn) +``` + ## Performance +`fwp` is pretty fast. Indeed it is faster than any other workerpool tested, and for high volumes of short tasks it can even be faster than spawning goroutines without a semaphore: + ``` name time/op FastWorkerPool-6 242ns ± 4% @@ -43,6 +86,17 @@ GoroutineXSyncSemaphore-6 1.66µs ± 5% Goroutine-6 266ns ±18% ``` +The performance is due to three factors: + +- Goroutines are reused to process multiple tasks (this minimizes + allocation of new goroutines as well as stack growths). +- The length of critical sections is kept as short as possible + (this minimizes contention on the mutex that guards the internals + of the worker pool). +- The internal behavior of the pool is adaptive to the workload, + with 2 different regimes selected automatically based on the + number and duration of tasks submitted. + ## License -MIT +[MIT](LICENSE) diff --git a/fast.go b/fast.go index afc78df..e4b8762 100644 --- a/fast.go +++ b/fast.go @@ -4,13 +4,35 @@ import ( "sync" ) +// WorkerPool is a worker pool with bounded workers (up to Max tasks +// can be running concurrently) and unbounded queue (no limit to the +// number of tasks waiting for a worker to become available). This is +// similar to how the native `go` construct operates, but with an +// (optional) bound to the number of tasks executed concurrently. type WorkerPool struct { + // Max is the maximum number of tasks being executed concurrently. + // A value of 0 means no limit, i.e. the WorkerPool behaves exactly + // like the native `go` construct. Max int m sync.Mutex n int q cbuf } +// Go submits a task for asynchronous execution by the worker +// pool. It is similar to the native `go` construct. +// +// The task will be processed by one the pool workers as +// soon as one becomes available. WorkerPool (similarly to Go +// goroutines) provides no guarantees about the order in which +// tasks are executed (if you need such guarantees, use external +// synchronization mechanisms, but taking care to not cause +// deadlocks; you can do this by resubmitting a task to be +// executed later in case some resources can not be acquired). +// +// To wait for one or more tasks to complete use an explicit +// synchronization mechanism such as channels, sync.WaitGroup, +// or similar. func (s *WorkerPool) Go(fn func()) { if s.Max <= 0 { go fn() @@ -52,6 +74,31 @@ func (s *WorkerPool) worker(fn func()) { } } +// Stats returns statistics about the worker pool. +func (s *WorkerPool) Stats() Stats { + s.m.Lock() + r := Stats{ + Running: s.n, + Queued: s.q.len(), + } + s.m.Unlock() + return r +} + +// Stats contains statistics about the worker pool. +type Stats struct { + // Running is the number of tasks currently being run. + // It is never greater than the number of Max workers. + Running int + // Queued is the number of tasks currently queued, waiting + // for a worker to become available for processing. + // This number is only bound by the amount of memory available. + // + // The total number of tasks in the worker pool is therefore + // Queued+Running. + Queued int +} + type cbuf struct { e []func() r int @@ -91,6 +138,13 @@ func (c *cbuf) get() (func(), bool) { return v, true } +func (c *cbuf) len() int { + if c.w >= c.r { + return c.w - c.r + } + return len(c.e) - c.r + c.w +} + func (c *cbuf) reset() { *c = cbuf{} }