github twitter
buffered queues in go
Jul 14, 2019
2 minutes read

This is a simple yet powerful pattern for making concurrent buffered queues in go, very useful when you need to run concurrent operations for a list of unknown length. Here’s a sample code with a walk-through:

// fetch takes a list of urls and the concurrency number
func fetch(urls []string, concurrency int) {
        // create one fixed length buffered channel of empty structs
        // for the queue and one for us to know when it all ended
        queue := make(chan struct{}, concurrency)
        finish := make(chan struct{}, 1)

    for _, url := range urls {
                // fill one slot of the buffer, if the buffer is full
                // the code will block until one slot gets empty
                queue <- struct{}{}

                // run the goroutine
                go func(url string) {
                      // download() the actual url

                        // when finished, remove the item from the queue
                        <-queue

                        // after removing, we check the length of the buffer
                        // if it's empty, it means it's finished
                        if len(queue) == 0 {
                                finish <- struct{}{}
                        }
                }(url)
        }

        // finally, wait for finish and close the channels
        select {
        case <-finish:
                close(queue)
                close(finish)
        }
}

urls := []string{"https://...", "..."}
fetch(urls, 3)

Another approach to achieve the same result is by using the sync package with WaitGroup, it then goes like this:

  • replace the finish channel to a var wg sync.WaitGroup
  • add a wg.Add(1) before the loop begins
  • replace the finish <- struct{}{} to a wg.Done()
  • the select statement now becomes a wg.Wait()

The previous function now becomes:

func fetch(urls []string, concurrency int) {
        var wg sync.WaitGroup
        queue := make(chan struct{}, concurrency)

        wg.Add(1)
        for _, url := range urls {
                queue <- struct{}{}

                go func(url string) {
                      // download() the actual url

                        <-queue 
                        if len(queue) == 0 {
                                wg.Done()
                        }
                }(url)
        }

        wg.Wait()
}

urls := []string{"https://...", "..."}
fetch(urls, 3)

Notes on empty structs can be found here.


Back to posts