twitter

Fan in and Fan out Pattern with Go Part I

December 01, 2020

At Foodnome, I was tasked to create a KPI(key performance metrics) dashboard to better understand how our business is doing. Many of the KPIs are calculated month by month, and some of the KPIs require data from adjacent months. We quickly thought of using Go with the fan-out and fan-in pattern to solve the problem concurrently. This post uses simple examples explaining how we build out the concurrency modal.

What is fan-in and fan-out

In our original problem, we want to calculate one set of metrics for each month, a total of 12 months. An easy way to solve this is to assign one worker and have it work out all 12 metrics. If each month takes about one min to calculate, it will take 12 mins.

We can agree this is way too slow. In Go, we can use concurrent programming to dedicate 12 workers (fan-out) and each worker can calculate one of the 12 month’s metrics.

Once all 12 workers finish their calculation, we could regroup their data (fan-in). Before implementing this pattern, we need to understand channels since all communications between Goroutines is done through Channels.

Channels

Channel facilitates all the communication between Goroutines. A few key ideas are very important to understand about channels.

  • Sending and receiving of an unbuffered channel is blocking
  • You cannot send to a closed channel
  • You can receive from a closed channel via comma ok
  • It is ok to have an unequal amount of senders and receivers if they are within a goroutine
  • range will stop receiving only when that channel is closed

These are rather confusing at first, but it will make more sense through these examples.

- Sending and receiving of an unbuffered channel is blocking

Unbuffered channel gives you the guarantee between the sender and receiver, but it has the unknown latency. Buffered channel does not offer the guarantee, but it reduces that latency. It also comes with the risk. You could potentially send to a buffered channel with no receiver.

It is very common to see error messages like fatal error: all goroutines are asleep - deadlock!. This is caused by sending to the channel without a receiver or visa versa.

In the program below, c is a channel that takes type int. We are sending 0 to that channel, and hoping to get it to print with fmt.Println(<-c). It will cause an error deaklock! because c <- 0 is a sender, and sending to a channel blocks the program. The program will wait at line for a receiver.

func main() {
    c := make(chan int)
    c <- 0
    fmt.Println(<-c)
}
// output:
fatal error: all goroutines are asleep - deadlock!

Code: https://play.golang.org/p/LIw9Wvf9CEH

There are a few ways to solve this, but the idea is the same which is to make sure there is a receiver when the sender sends the data.

Here is option 1. Using anonymous Goroutines function go func(){...}(), we are sending the receiver out. By the time the program reaches the sender c <- 0, the receiver is ready.

func main() {
    c := make(chan int)
    go func (){
        fmt.Println(<-c)
    }()
    c <- 0
}
// output:
0

Code: https://play.golang.org/p/aC8TcH-BKCO

Option 2. Similar to option 1, but instead we put the sender into goroutine. By the time program reaches the receiver, the receiver waits until the sender sends the data.

func main() {
    c := make(chan int)
    go func() {
        c <- 0
    }()
    fmt.Println(<-c)
}
// output:
0

Code: https://play.golang.org/p/LQsFPnv4dlr

Option 3. Use a buffered channel. Again, buffered channel losses that guarantee. You could remove line 10 and the program still runs with no error, because it is not blocking at line 4.

func main() {
	c := make(chan int, 1)

	c <- 0

	ms := time.Duration(rand.Intn(1e3)) * time.Millisecond
	fmt.Printf("Working %v \n", ms)
	time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)

	fmt.Println(<-c)
}
// output:
0

Code: https://play.golang.org/p/GDuYt8xoLCd

Option 4. The last option is a bit more interesting. We could send both sender and receiver out. The reason we added time.Sleep is because the program would otherwise exit.

func main() {
    c := make(chan int)
    go func() {
        c <- 0
    }()
    go func() {
        fmt.Println(<-c)
    }()

    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    fmt.Println("Done")
}
// output:
0
Done

Code: https://play.golang.org/p/uh3jROTIDue

Sending and receiving on a channel blocks its Goroutine. If that Goroutine happens to be the main Goroutine, the program will cause a fatal error.

- You cannot send to a closed channel

When a channel is closed, sending to it would cause the program to panic. General principle of using Go channels is don’t close a channel from a receiver and close a channel in a sender.

func main() {
    c := make(chan int)

    go func() {
        fmt.Println(<-c)
    }()
    c <- 0

    close(c)
    c <- 1
}
// output
0
panic: send on closed channel

Code: https://play.golang.org/p/2jMjFMv1198

- You can receive from a closed channel and check via comma ok

When receiving from a closed channel, there will not be panic. The value returned from a closed channel is the zero value of that type. You can also verify if the channel is closed with , ok

func main() {
    c := make(chan int)
    go func() {
        fmt.Println(<-c)
    }()
    c <- 42
    close(c)

    fmt.Println(<-c)

    v, ok := <-c
    fmt.Println(v, ok)
}
// output:
42
0
0 false

Code: https://play.golang.org/p/AfbmQEZvkua

- It is ok to have an unequal amount of senders and receivers if they are within a goroutine

This is similar to the first point. Once the main goroutine exits, all the other goroutines exit as well. You don’t have to worry about the unpaired senders or receivers in its goroutine. This would not be the case if either one is in the main goroutine(the one that runs the function main).

Here we have four receivers and one sender. One of the receivers will receive the data from the sender c <- 0, then the program exists with the three other receivers.

func main() {
    c := make(chan int)

    go func() {
        fmt.Println(<-c, "e")
    }()
    go func() {
        fmt.Println(<-c, "l")
    }()
    go func() {
        fmt.Println(<-c, "o")
    }()
    go func() {
        fmt.Println(<-c, "p")
    }()
    c <- 0
}
// output
0 p

Code: https://play.golang.org/p/Q8jGkGUu2wa

- range will NOT stop recieving util channel is closed

range is commonly used since you do not know how many receivers you typically would need. When using range, remember the range will always create one more receiver until its channel is closed.

Running the following example will produce the deadlock error. At the third iteration of range, it creates another <-c and waits for the sender. Since it is in the main goroutine, the program deadlocks.

func main() {
    c := make(chan int)

    go func() {
        for i := 0; i < 3; i++ {
            c <- i
        }
    }()

    for v := range c {
        fmt.Println(v)
    }

}
// output
0
1
2
fatal error: all goroutines are asleep - deadlock!

Code: https://play.golang.org/p/o3a9xvFf9fv

You can solve this in two ways. Using rule number 4 or close the channel. Here is the program using rule number 4. There are two goroutines, sender and receiver. Sender is sending 3 integer data i and receiver is receving as many as sender’s plus one. This is not causing deadlock is because receiver is only blocking its own goroutine not the main goroutine (rule 4). time.Sleep will block the program from exiting.

func main() {
    c := make(chan int)

    // sender
    go func() {
        for i := 0; i < 3; i++ {
            c <- i
        }
    }()

    // receiver
    go func() {
        for v := range c {
            fmt.Println(v)
        }
    }()

    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    fmt.Println("Done")
}
// output
0
1
2
Done

Code: https://play.golang.org/p/CAbj07fzlyK

Another option is to close the channel. Remember once the channel is closed, you could no longer send data into it.

func main() {
    c := make(chan int)

    // sender
    go func() {
        for i := 0; i < 3; i++ {
            c <- i
        }
        // close channel at sender
        close(c)
    }()

    // receiver
    for v := range c {
        fmt.Println(v)
    }
}
// output
0
1
2

Code: https://play.golang.org/p/muIb10SrdIM

That’s it for all my key take aways on channels. Next we can talk about the fan-in and fan-out pattern using channels.