“Go play with some concurrency”

Gaurav Kumar
5 min readDec 30, 2023

--

Preface: The blog below is based on just 2 weeks of experience handling parallelism and concurrency in golang, and does not address a lot of things like mutex, errgroups, or something like thread manager (as in Java), so kindly proceed with caution.

Moving back to what is important here, as a Backend engineer we often need to process things concurrently. For instance we may have a simple job which reads data from a source, does some processing, and writes it to some sink.

package main

import (
"fmt"
"time"
)

func read() {
for i := 0; i < 15; i++ {
fmt.Println("read some data from file: ", i)
time.Sleep(1 * time.Second) // sleeps for 1s
}
}

func process() {

for i := 0; i < 15; i++ {
// we only process for some logic
if i%3 == 0 {
fmt.Println("process: ", i)
}
time.Sleep(1 * time.Second) // sleeps for 1s
}
}

func write() {
for i := 0; i < 15; i++ {
fmt.Println("write: ", i)
time.Sleep(1 * time.Second) // sleeps for 1s
}
}

func main() {
fmt.Println("Hello, main func")

// read some file
read()

time.Sleep(1 * time.Second)

// process the data
process()

time.Sleep(1 * time.Second)

// write to some file
write()
}

It will not work if I have to read the entire file, store it in memory, process the entire batch and then write it. So what is the solution to this?

Concurrency? Absolutely not, you can just read the file line by line -> process it -> write it. Go Playground sample

Well, we may however find the 2nd code very simple especially if you have a small usecase, this is where you should stop! But, in the above code can go wrong in many ways:

  • what if process() is resource heavy, but less frequent, it is going to block read and write of other events as well which are not supposed to be processed?
  • is this really an efficient way to do this? Are we utilising the resource we have at our disposal?

Well concurrency is about doing a lot of different processes together, now I also get confused between concurrency, async functions, parallelism, and similar stuffs. So, afaik:

  • Parallelism is something that will draw out more resources to simultaneously execute processes.
  • Asynchronous functions/task, in go context, are what allow you to run a process in separate goroutine, while the main thread is done executing. One might use waitgroup to handle these, so that all the desired goroutines complete their execution before termination.

Concurrency to me is more like the entire concept where we have a lot processes, which may or may not be running in parallel, some in sync, some not.

Before we write the final block of code (not actually final, it can be improved), we might wanna go through a few common problem we might run into while write a concurrent pipeline.

  • Deadlock: occurs usually when two concurrent processes are waiting on each other, so that they can proceed. Maybe you might take a look at the 4 Coffman conditions that lead to deadlock.
  • Race Condition: this occurs way often because the data might be being written and read with the same variable. Hence it is necessary to understand the order in which we want the pipeline to be running. Maybe use channels to pass data through various processes!

Actually there are also other conditions like Starvation, Livelock, and Memory Access Synchronisation that need to be taken care of while designing the pipeline. Also it will help to design the processes in a way that two different processes depend as less on each other as possible. They are obviously going to have some sort of data stream/channel being passed across them, but it is usually better to bundle all the other aspects like maybe something other than data (errors, etc) together instead of passing multiple channels and handle them with select ** case manner.

Also defining the signature of function with whether data is being written or read only to the channel (chan<- & <-chan) helps prevent us making any error with it. I also try closing the channel once the write operation is complete, i.e., at the senders end.

Below is one sample I wrote, with some of these considerations in mind. After going through it once, I can already see a few things that need to be fixed below, so I hope anyone reading this finds them too!!


package main

import (
"fmt"
"strconv"
"sync"
"time"
)

// waitgroup can be declared global
// or passed through functions as pointers and used
var wg sync.WaitGroup

type (
Input struct {
record Data
}

Data struct {
ID int
name string
Title string
}
)

func read(
ch chan<- Input,
) {
// generally a good practice to close the channel
defer wg.Done()
defer close(ch)
for i := 0; i < 15; i++ {
fmt.Println("read: ", i)

rec := ValueModel{
ID: i,
name: "dummy" + strconv.Itoa(i),
Title: "dummy" + strconv.Itoa(i),
}
ch <- Input{
record: rec,
}
time.Sleep(1 * time.Second)
}
fmt.Println("done reading, closing gen")
}

func process(
inpChan <-chan Input,
) error {
for in := range inpChan {
fmt.Println("process: ", in)
if in.record.ID%6 == 0 {
return fmt.Errorf("error at 6")
}
time.Sleep(1 * time.Second)
}

return nil
}

func main() {
fmt.Println("Hello main func")
err := run()
if err != nil {
fmt.Println(err)
}
fmt.Println("done executing")
}

func run() error {
wg.Add(1) // the routine where read takes place

// guesses why error chan has bufferlimit
// but input channel is unbounded?
errChan := make(chan *error, 3)
inpChan := make(chan Input)

go read(inpChan) // function called in goroutine

// process is assumed to be timeconsuming
// hence there will be multiple goroutines running it -> parallelism

for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := process(inpChan)
if err != nil {
errChan <- &err
}
fmt.Println("Done should have been hit here")
}()
}

wg.Wait()

close(errChan)
failedCount := 0
// see how loging err could have been done earlier
for err := range errChan {
failedCount++
fmt.Println("printing error: ", err)
}
if failedCount > 0 {
return fmt.Errorf("failed to read/process: %v records", failedCount)
}
return nil
}

TLDR: If I am not mistaken, one of the purpose of Golang was to abstract out some aspects of concurrency, but to also give more control. And it is better to read some book, rather than this blog to get all the concepts!

You have my appreciation if you read this till the end!

--

--