Practical Go concurrency

Simply relying on chan, sync.WaitGroup to build a scalable and complex Go program is often not enough. In this post, we will talk about several tools and techniques to better control the resources.

Outline

  1. Pipelining: Jobs and workers
  2. Rate limiter
    1. Constant interval
    2. Token bucket algorithm
    3. Leaky bucket algorithm
  3. Synchronization
    1. Atomic operations
    2. Mutex
    3. Semaphore
  4. Managing goroutines
    1. Context
    2. Errgroup
  5. Closing remarks

Before we start

Readers should be familiar with Go’s basic concurrency concepts. For example, Concurrency is not Parallelism.

Pipelining: Jobs and workers

Suppose we have a number of jobs, n_jobs, and we can only afford to spawn n_workers goroutines where n_workers < n_jobs to process the jobs concurrently. We can use shared channels to achieve this.

Each worker will digest a job from the jobs inbound channel and send the processed result to the results outbound channel.

func worker(jobs <-chan int, results chan<- int) {
	for job := range jobs {
		// do some work
		result := job
		// send the result when it's done
		results <- result
	}
}

We then construct the pipeline (job distribution, worker processing, result handling) in the parent function. In practice, the number of items sent to the results channel may not always equal to the number of jobs (ex: failure during worker processing), so we need a signal to indicate that all the workers are finished and no more results will be sent in order to close the results channel. We will be using sync.WaitGroup for this purpose.

func main() {
	...
	n_jobs := 10
	n_workers := 3
	jobs := make(chan int)
	results := make(chan int)

	// distribute the jobs
	go func() {
		for j := 0; j < n_jobs; j++ {
			jobs <- j
		}
		// close the jobs channel after all jobs are distributed
		close(jobs)
	}()

	// spawn the workers
	var wg sync.WaitGroup
	for w := 0; w < n_workers; w++ {
		wg.Add(1)
		// idiomatic way to record that a worker has finished
		go func() {
			defer wg.Done()
			worker(jobs, results)
		}()
	}

	// monitor the workers status to close the results channel
	go func() {
		wg.Wait()
		close(results)
	}()

	// process the results
	for result := range results {
		// do something with the result
		fmt.Println(result)
	}
	...
}

Closing channels from earlier stages to signal they’re finished rather than relying on counters not only reduces the complexity of the code, it also helps maintain the correct execution order so we don’t accidentally exit the program prematurely. For example, when the workers have finished processing but the results are still being written.

Rate limiter

Suppose that we have a number of jobs to process, but we want to make sure they are distributed in a manageable rate.

Constant interval

The simplest rate limiter can be implemented with time.Ticker (or time.Tick, time.Sleep).

func main() {
	...
	limiter := time.NewTicker(200 * time.Millisecond)
	// submit 10 jobs
	for j := 0; j < 10; j++ {
		<-limiter.C // sleep for 200ms
		fmt.Println(j)
	}
	limiter.Stop()
	...
}

The main difference between time.Tick and time.NewTicker is that time.Tick has no way of shutting down.

Token bucket algorithm

Suppose we have a bucket with maximum capacity of t tokens, the tokens are added to the bucket at a constant rate r and each job submission will consume a token. A basic token bucket algorithm allows us to submit as many jobs as the number of tokens in the bucket. The supplementary Go package golang.org/x/time/rate implements this algorithm.

func main() {
	...
	// r = 1/second, t = 4
	limiter := rate.NewLimiter(1, 4)
	ctx := context.Background()
	// submit 10 jobs
	for j := 0; j < 10; j++ {
		if err := limiter.Wait(ctx); err == nil {
			fmt.Println(j)
		}
	}
	...
}

The benefit of this algorithm is that it allows us to have more control over the job submissions. We can either submit jobs as soon as there are tokens available or submit a number of jobs simultaneously as long as there are enough tokens. However, when used in a multi-user setting (multiple users, each with multiple jobs to submit), this flexibility could allow a greedy user to drain the tokens and block others from submitting, it’s also undesirable for workers that require a steady flow of inputs.

Leaky bucket algorithm

With a leaky bucket algorithm, we submit a number of jobs to a bucket and it leaks the jobs at a constant rate to the workers. It can be seen as a token bucket algorithm with a bucket capacity of 1 (no burst allowed). The go.uber.org/ratelimit package implements a slightly more efficient version than the official golang.org/x/time/rate.

The benefit of the leaky bucket algorithm is that the jobs are submitted in a constant rate so processing is more reliable. However, we may be handling the jobs sub-optimally (not maximizing the system throughput).

Synchronization

In concurrent processing, we might read and write data simultaneously. To avoid unexpected behaviors (ex: overriding), Go provides several synchronization methods.

Atomic operations

The official sync/atomic package provides a basic interface to synchronize concurrent computations on a variable.

func main() {
	...
	var wg sync.WaitGroup
	var shared atomic.Uint64

	// simulate 3 concurrent workers
	for w := 0; w < 3; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			// each worker does some work on the shared variable
			for i := 0; i < 200; i++ {
				shared.Add(1)
			}
		}()
	}

	wg.Wait()
	fmt.Printf("%v\n", shared.Load())
	...
}

Be aware that Go encourages minimal usage of such manipulations:

Except for special, low-level applications, synchronization is better done with channels or the facilities of the sync package. Share memory by communicating; don’t communicate by sharing memory.

There is a legit use case of atomic variables when implementing a pubsub system that we will talk about in the future.

Mutex

A mutex is a lock that limits access of some particular data to a single thread/goroutine to achieve synchronized computation. It can be seen as a more powerful version of the sync/atomic primitives since it allows us to work with more complex data structures.

type ComplexData struct {
	mu  sync.Mutex
	ctr uint64
}
func main() {
	...
	var wg sync.WaitGroup
	shared := ComplexData{ctr: 0}

	// simulate 3 concurrent workers
	for w := 0; w < 3; w++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for i := 0; i < 200; i++ {
				shared.mu.Lock() // start blocking
				shared.ctr += 1
				shared.mu.Unlock() // release resource
			}
		}()
	}

	wg.Wait()
	fmt.Printf("%v\n", shared.ctr)
	...
}

Similar to sync.WaitGroup, a mutex should not be copied, instead it should be passed by pointers.

Semaphore

Broadly speaking, a semaphore is a more general mutex. Instead of restricting access of some data to a single thread/goroutine (mutex), a semaphore allows access to at most a predetermined number of threads/goroutines. When this number is equal to 1, a semaphore is effectively a mutex.

There are two ways to implement a semaphore, the first is a naive version where a simple buffered channel will suffice, the second is a more general solution where the golang.org/x/sync/semaphore package is used.

Semaphore with a buffered channel

We will leverage the fact that a buffered channel will block until there are spaces available. An empty struct is the most efficient data structure for this purpose.

func main() {
	...
	// allow at most 3 concurrent goroutines (workers)
	sem := make(chan struct{}, 3)

	for j := 0; j < 10; j++ {
		// occupy a spot in the buffered channel
		sem <- struct{}{}  // block if the buffer is full
		ji := j // explicit loop-body-local variable
		go func() {
			fmt.Println(ji)
			time.Sleep(100 * time.Millisecond) // simulate work
			<-sem // release a spot
		}()
	}
	...
}

Semaphore with golang.org/x/sync/semaphore

This package offers weighted concurrency. That is, we have the flexibility to decide whether a particular thread/goroutine should take up more units of concurrency given a total budget. The following snippet implements an equivalent version to the example above.

func main() {
	...
	// allow at most 3 units of concurrent goroutines (workers)
	sem := semaphore.NewWeighted(3)
	ctx := context.Background()

	for j := 0; j < 10; j++ {
		sem.Acquire(ctx, 1) // each goroutine takes up 1 unit
		ji := j // explicit loop-body-local variable
		go func() {
			defer sem.Release(1) // release 1 spot
			fmt.Println(ji)
			time.Sleep(100 * time.Millisecond) // simulate work
		}()
	}
	...
}

Managing goroutines

Whether it’s a user cancellation event, deadline reached/timeouts, or errors, we need a way to relay that information to the running goroutines so we can stop them properly.

Context

A context acts as a global signal distributor, it can be created with terminating conditions such as custom user cancellations, deadline reached or timeouts. When a context is terminated (cancelled), the ctx.Done() channel is closed and the receiving end is immediately notified. Typically, goroutines use a select statement to monitor the status of the ctx.Done() channel.

The following is an example for handling timeouts gracefully, play around with the simulated workload (the sleep duration, currently 100ms) to see the effect.

func main() {
	...
	var wg sync.WaitGroup
	timeout := time.Duration(300 * time.Millisecond)
	ctx, cancel := context.WithTimeout(
		context.Background(), timeout,
	)
	defer cancel() // cancel as soon as we are finished

	// simulate 3 concurrent workers
	for w := 0; w < 3; w++ {
		wg.Add(1)
		wi := w // explicit loop-body-local variable
		go func() {
			defer wg.Done()
			// simulate multiple jobs
			for j := 0; j < 8; j++ {
				select {
				case <-ctx.Done():
					fmt.Printf("Worker %v timed out\n", wi)
					return // not to leak the goroutine
				default:
					fmt.Printf("Worker %v doing %v\n", wi, j)
				}
				// simulate work
				time.Sleep(100 * time.Millisecond)
			}
			fmt.Printf("Worker %v is done\n", wi)
		}()
	}

	wg.Wait()
	...
}

A context can also carry global variables (key-value pairs) so that all goroutines can read them.

	...
	ctx = context.WithValue(ctx, "theKey", "theValue")
	...

Errgroup

We have seen how a context can be used to send signals to all the goroutines. This is exactly what we need when an error occurs in a goroutine: we want to gracefully shutdown other running goroutines (ex: store the state) and not leak resources. Notice that in the previous example, we still need sync.WaitGroup to synchronize the goroutines normally. Combining sync.WaitGroup, context, and the need for error handling, the golang.org/x/sync/errgroup package is born.

The following is an extension to the above example.

func main() {
	...
	timeout := time.Duration(300 * time.Millisecond)
	ctx, cancel := context.WithTimeout(
		context.Background(), timeout,
	)
	defer cancel() // cancel as soon as we are finished
	g, ctx := errgroup.WithContext(ctx)

	// simulate 3 concurrent workers
	for w := 0; w < 3; w++ {
		wi := w // explicit loop-body-local variable
		g.Go(func() error {
			// simulate multiple jobs
			for j := 0; j < 8; j++ {
				select {
				case <-ctx.Done():
					// could be a timeout or an error
					fmt.Printf("Worker %v cancelled\n", wi)
					return ctx.Err()
				default:
					fmt.Printf("Worker %v doing %v\n", wi, j)
					// simulate an error
					if wi == 1 && j == 4 {
						return fmt.Errorf("simulated error")
					}
				}
				// simulate work
				time.Sleep(10 * time.Millisecond)
			}
			fmt.Printf("Worker %v is done\n", wi)
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Println(err)
		os.Exit(1)
	}
	...
}

Play around with the simulated workload or the simulated error to see the effect.

Closing remarks

Go’s concurrency pattern is very powerful and flexible but sometimes we may leak resources due to exactly these reasons. For example, spawning uncontrolled amount of goroutines or failing to stop them when their computations are no longer valid. The ideas introduced in this post can (largely) help reducing this problem and make the program scale more easily.