Using Goroutines, Channels, Contexts, Timers, WaitGroups and Errgroups
You may one day find yourself needing to perform an ETL job from a file to a remote database. Go is a perfect tool to perform this sort of ETL thanks to go’s ability to easily work with threads.
It’s quite a convoluted task to get multiple goroutines to coordinate the read from the same file — the most robust solution might involve using a consensus algorithm which feels like it’s over engineering the solution. Instead I worked with a simpler solution which involved a single parser goroutine to read from one file and multiple sender goroutines to batch and send the data to the remote database:
The rest of this post will be notes from what i learned when i implemented the problem outlined above.
All running code samples can be found at https://github.com/ankur22/go-chan-ctx-timer-waitgroup-errgroup.
Apart from using the built in goroutines and channels, i needed a way to manage the clean shutdown of the goroutines when the jobs completed as well as when there were errors that couldn’t be resolved there and then and required the immediate shutdown of the application — such as HTTP ≥ 400 and ≥ 500 errors.
The file could be very large, which means that the processing and sending of the data could take some time. We obviously want to be able to shut it down, and to do this we can listen for SIGTERM events by creating a channel and registering that channel with signal.Notify:
Using the ok Variable from Channels
Now that we can shutdown our application, we want to coordinate actions between the reader and sender — i.e. when the reader completes the parsing, let the sender know so that it can complete the sending and exit.
Firstly when the reader completes it will close the lines channel (in the github snippet below it is deferred). Secondly we want to know when the lines channel is closed, which we can do by checking the ok variable returned from the lines channel. If ok is false, that means the channel is closed and the goroutine can exit.
We also need to inform the “main” goroutine that the sender has completed, which for now we can do by allowing the sender to close the context’s done channel by calling close().
I think closing the context should be a job for something external to the goroutines, and fortunately the coordination of the goroutines can be done without relying on the context.
From the sync package we can use WaitGroup which waits for all the goroutines to complete before moving on. For the WaitGroup wait() to do its job, we need to increment the WaitGroup counter by calling Add() just before creating the go routine. When a go routine completes we need to remember to call Done(), which decrements the WaitGroup counter. Once the WaitGroup counter is 0 wait() will unblock.
What if one of the goroutines had to close early due to an error? How would it communicate that to the other goroutines? The current use of WaitGroup will suffice, but actually there’s a tool that is in the x package called errgroup which can handle this scenario, and better yet, we don’t need to remember to increment or decrement a counter!
To unblock the wait() either all goroutines should complete by returning a nil error; one or more goroutines return a non nil error; or the parent or current errgroup context is closed. The error returned from wait() is the first non nil error returned from one of the goroutines.
We may want to do something if it has taken longer than we expected to read off a channel such as log an error, check the state of something, or abort. We can use a timer that will fire after a specified time to help deal with this case.
Range over Channel
We can clean up the reading off the channel and not have to explicitly check whether the channel is closed by ranging over the channel. The downside is that it blocks on waiting to read off the channel with range, so if you wanted to use the timer to timeout a goroutine then it wouldn’t be possible:
Note how you need default in this case, otherwise without the default case the goroutine would block on ctx.Done().
Simple Tricks for Efficient Code
Context’s Done Channel
Using ctx.Done() actually performs mutex locks and unlocks behind the scenes to retrieve the done channel. We can retrieve the done channel prior to using it once and avoid all the mutex locks and unlocks.
We can reuse the same timer instead of creating a new one every time it fires, we just need to remember to reset the timer when it fires.
Go has some amazing tools that make concurrency a little simpler, especially compared with other languages. The use of these tools needs to be done with care still, and what might help is https://the-zen-of-go.netlify.app/ — specifically for the examples i’ve given above, know how we’re going to shut down a goroutine before we start one.