Practical Go concurrency
- tags
- #Golang #Concurrency
- published
- reading time
- 9 minutes
Simply relying on chan
, sync.WaitGroup
to build a scalable and
complex Go program is often not enough. In this post, we will talk about
several tools and techniques to better control the resources.
Outline
Before we start
Readers should be familiar with Go’s basic concurrency concepts. For example, Concurrency is not Parallelism.
Pipelining: Jobs and workers
Suppose we have a number of jobs, n_jobs
, and we can only afford to
spawn n_workers
goroutines where n_workers < n_jobs
to process the
jobs concurrently. We can use shared channels to achieve this.
Each worker will digest a job from the jobs
inbound channel and send
the processed result to the results
outbound channel.
func worker(jobs <-chan int, results chan<- int) {
for job := range jobs {
// do some work
result := job
// send the result when it's done
results <- result
}
}
We then construct the pipeline (job distribution, worker processing,
result handling) in the parent function. In practice, the number of
items sent to the results
channel may not always equal to the number
of jobs (ex: failure during worker processing), so we need a signal to
indicate that all the workers are finished and no more results will be
sent in order to close the results
channel. We will be using
sync.WaitGroup
for this purpose.
func main() {
...
n_jobs := 10
n_workers := 3
jobs := make(chan int)
results := make(chan int)
// distribute the jobs
go func() {
for j := 0; j < n_jobs; j++ {
jobs <- j
}
// close the jobs channel after all jobs are distributed
close(jobs)
}()
// spawn the workers
var wg sync.WaitGroup
for w := 0; w < n_workers; w++ {
wg.Add(1)
// idiomatic way to record that a worker has finished
go func() {
defer wg.Done()
worker(jobs, results)
}()
}
// monitor the workers status to close the results channel
go func() {
wg.Wait()
close(results)
}()
// process the results
for result := range results {
// do something with the result
fmt.Println(result)
}
...
}
Closing channels from earlier stages to signal they’re finished rather than relying on counters not only reduces the complexity of the code, it also helps maintain the correct execution order so we don’t accidentally exit the program prematurely. For example, when the workers have finished processing but the results are still being written.
Rate limiter
Suppose that we have a number of jobs to process, but we want to make sure they are distributed in a manageable rate.
Constant interval
The simplest rate limiter can be implemented with time.Ticker
(or
time.Tick
, time.Sleep
).
func main() {
...
limiter := time.NewTicker(200 * time.Millisecond)
// submit 10 jobs
for j := 0; j < 10; j++ {
<-limiter.C // sleep for 200ms
fmt.Println(j)
}
limiter.Stop()
...
}
The main difference between time.Tick
and time.NewTicker
is that
time.Tick
has no way of shutting down.
Token bucket algorithm
Suppose we have a bucket with maximum capacity of t
tokens, the tokens
are added to the bucket at a constant rate r
and each job submission
will consume a token. A basic token bucket algorithm allows us to submit
as many jobs as the number of tokens in the bucket. The supplementary Go
package golang.org/x/time/rate
implements this algorithm.
func main() {
...
// r = 1/second, t = 4
limiter := rate.NewLimiter(1, 4)
ctx := context.Background()
// submit 10 jobs
for j := 0; j < 10; j++ {
if err := limiter.Wait(ctx); err == nil {
fmt.Println(j)
}
}
...
}
The benefit of this algorithm is that it allows us to have more control over the job submissions. We can either submit jobs as soon as there are tokens available or submit a number of jobs simultaneously as long as there are enough tokens. However, when used in a multi-user setting (multiple users, each with multiple jobs to submit), this flexibility could allow a greedy user to drain the tokens and block others from submitting, it’s also undesirable for workers that require a steady flow of inputs.
Leaky bucket algorithm
With a leaky bucket algorithm, we submit a number of jobs to a bucket
and it leaks the jobs at a constant rate to the workers. It can be
seen as a token bucket algorithm with a bucket capacity of 1 (no burst
allowed). The go.uber.org/ratelimit
package implements a
slightly more efficient version than the official
golang.org/x/time/rate
.
The benefit of the leaky bucket algorithm is that the jobs are submitted in a constant rate so processing is more reliable. However, we may be handling the jobs sub-optimally (not maximizing the system throughput).
Synchronization
In concurrent processing, we might read and write data simultaneously. To avoid unexpected behaviors (ex: overriding), Go provides several synchronization methods.
Atomic operations
The official sync/atomic
package provides a basic interface to
synchronize concurrent computations on a variable.
func main() {
...
var wg sync.WaitGroup
var shared atomic.Uint64
// simulate 3 concurrent workers
for w := 0; w < 3; w++ {
wg.Add(1)
go func() {
defer wg.Done()
// each worker does some work on the shared variable
for i := 0; i < 200; i++ {
shared.Add(1)
}
}()
}
wg.Wait()
fmt.Printf("%v\n", shared.Load())
...
}
Be aware that Go encourages minimal usage of such manipulations:
Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package. Share memory by communicating; don’t communicate by sharing memory.
There is a legit use case of atomic variables when implementing a pubsub system that we will talk about in the future.
Mutex
A mutex is a lock that limits access of some particular data to a single
thread/goroutine to achieve synchronized computation. It can be seen as
a more powerful version of the sync/atomic
primitives since it allows
us to work with more complex data structures.
type ComplexData struct {
mu sync.Mutex
ctr uint64
}
func main() {
...
var wg sync.WaitGroup
shared := ComplexData{ctr: 0}
// simulate 3 concurrent workers
for w := 0; w < 3; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 200; i++ {
shared.mu.Lock() // start blocking
shared.ctr += 1
shared.mu.Unlock() // release resource
}
}()
}
wg.Wait()
fmt.Printf("%v\n", shared.ctr)
...
}
Similar to sync.WaitGroup
, a mutex should not be copied, instead it
should be passed by pointers.
Semaphore
Broadly speaking, a semaphore is a more general mutex. Instead of restricting access of some data to a single thread/goroutine (mutex), a semaphore allows access to at most a predetermined number of threads/goroutines. When this number is equal to 1, a semaphore is effectively a mutex.
There are two ways to implement a semaphore, the first is a naive
version where a simple buffered channel will suffice, the second is a
more general solution where the golang.org/x/sync/semaphore
package is
used.
Semaphore with a buffered channel
We will leverage the fact that a buffered channel will block until there are spaces available. An empty struct is the most efficient data structure for this purpose.
func main() {
...
// allow at most 3 concurrent goroutines (workers)
sem := make(chan struct{}, 3)
for j := 0; j < 10; j++ {
// occupy a spot in the buffered channel
sem <- struct{}{} // block if the buffer is full
ji := j // explicit loop-body-local variable
go func() {
fmt.Println(ji)
time.Sleep(100 * time.Millisecond) // simulate work
<-sem // release a spot
}()
}
...
}
Semaphore with golang.org/x/sync/semaphore
This package offers weighted concurrency. That is, we have the flexibility to decide whether a particular thread/goroutine should take up more units of concurrency given a total budget. The following snippet implements an equivalent version to the example above.
func main() {
...
// allow at most 3 units of concurrent goroutines (workers)
sem := semaphore.NewWeighted(3)
ctx := context.Background()
for j := 0; j < 10; j++ {
sem.Acquire(ctx, 1) // each goroutine takes up 1 unit
ji := j // explicit loop-body-local variable
go func() {
defer sem.Release(1) // release 1 spot
fmt.Println(ji)
time.Sleep(100 * time.Millisecond) // simulate work
}()
}
...
}
Managing goroutines
Whether it’s a user cancellation event, deadline reached/timeouts, or errors, we need a way to relay that information to the running goroutines so we can stop them properly.
Context
A context acts as a global signal distributor, it can be created with
terminating conditions such as custom user cancellations, deadline
reached or timeouts. When a context is terminated (cancelled), the
ctx.Done()
channel is closed and the receiving end is immediately
notified. Typically, goroutines use a select
statement to monitor the
status of the ctx.Done()
channel.
The following is an example for handling timeouts gracefully, play around with the simulated workload (the sleep duration, currently 100ms) to see the effect.
func main() {
...
var wg sync.WaitGroup
timeout := time.Duration(300 * time.Millisecond)
ctx, cancel := context.WithTimeout(
context.Background(), timeout,
)
defer cancel() // cancel as soon as we are finished
// simulate 3 concurrent workers
for w := 0; w < 3; w++ {
wg.Add(1)
wi := w // explicit loop-body-local variable
go func() {
defer wg.Done()
// simulate multiple jobs
for j := 0; j < 8; j++ {
select {
case <-ctx.Done():
fmt.Printf("Worker %v timed out\n", wi)
return // not to leak the goroutine
default:
fmt.Printf("Worker %v doing %v\n", wi, j)
}
// simulate work
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Worker %v is done\n", wi)
}()
}
wg.Wait()
...
}
A context
can also carry global variables (key-value pairs) so that
all goroutines can read them.
...
ctx = context.WithValue(ctx, "theKey", "theValue")
...
Errgroup
We have seen how a context can be used to send signals to all the
goroutines. This is exactly what we need when an error occurs in a
goroutine: we want to gracefully shutdown other running goroutines (ex:
store the state) and not leak resources. Notice that in the previous
example, we still need sync.WaitGroup
to synchronize the goroutines
normally. Combining sync.WaitGroup
, context
, and the need for error
handling, the golang.org/x/sync/errgroup
package is born.
The following is an extension to the above example.
func main() {
...
timeout := time.Duration(300 * time.Millisecond)
ctx, cancel := context.WithTimeout(
context.Background(), timeout,
)
defer cancel() // cancel as soon as we are finished
g, ctx := errgroup.WithContext(ctx)
// simulate 3 concurrent workers
for w := 0; w < 3; w++ {
wi := w // explicit loop-body-local variable
g.Go(func() error {
// simulate multiple jobs
for j := 0; j < 8; j++ {
select {
case <-ctx.Done():
// could be a timeout or an error
fmt.Printf("Worker %v cancelled\n", wi)
return ctx.Err()
default:
fmt.Printf("Worker %v doing %v\n", wi, j)
// simulate an error
if wi == 1 && j == 4 {
return fmt.Errorf("simulated error")
}
}
// simulate work
time.Sleep(10 * time.Millisecond)
}
fmt.Printf("Worker %v is done\n", wi)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Println(err)
os.Exit(1)
}
...
}
Play around with the simulated workload or the simulated error to see the effect.
Closing remarks
Go’s concurrency pattern is very powerful and flexible but sometimes we may leak resources due to exactly these reasons. For example, spawning uncontrolled amount of goroutines or failing to stop them when their computations are no longer valid. The ideas introduced in this post can (largely) help reducing this problem and make the program scale more easily.