Always have x number of goroutines running at any time

GoGoroutine

Go Problem Overview


I see lots of tutorials and examples on how to make Go wait for x number of goroutines to finish, but what I'm trying to do is have ensure there are always x number running, so a new goroutine is launched as soon as one ends.

Specifically I have a few hundred thousand 'things to do' which is processing some stuff that is coming out of MySQL. So it works like this:

db, err := sql.Open("mysql", connection_string)
checkErr(err)
defer db.Close()

rows,err := db.Query(`SELECT id FROM table`)
checkErr(err)
defer rows.Close()

var id uint
for rows.Next() {
	err := rows.Scan(&id)
	checkErr(err)
	go processTheThing(id)
	}
checkErr(err)
rows.Close()

Currently that will launch several hundred thousand threads of processTheThing(). What I need is that a maximum of x number (we'll call it 20) goroutines are launched. So it starts by launching 20 for the first 20 rows, and from then on it will launch a new goroutine for the next id the moment that one of the current goroutines has finished. So at any point in time there are always 20 running.

I'm sure this is quite simple/standard, but I can't seem to find a good explanation on any of the tutorials or examples or how this is done.

Go Solutions


Solution 1 - Go

You may find Go Concurrency Patterns article interesting, especially Bounded parallelism section, it explains the exact pattern you need.

You can use channel of empty structs as a limiting guard to control number of concurrent worker goroutines:

package main

import "fmt"

func main() {
	maxGoroutines := 10
	guard := make(chan struct{}, maxGoroutines)

	for i := 0; i < 30; i++ {
		guard <- struct{}{} // would block if guard channel is already filled
		go func(n int) {
			worker(n)
			<-guard
		}(i)
	}
}

func worker(i int) { fmt.Println("doing work on", i) }

Solution 2 - Go

Here I think something simple like this will work :

package main

import "fmt"

const MAX = 20

func main() {
	sem := make(chan int, MAX)
	for {
		sem <- 1 // will block if there is MAX ints in sem
		go func() {
			fmt.Println("hello again, world")
			<-sem // removes an int from sem, allowing another to proceed
		}()
	}
}

Solution 3 - Go

Thanks to everyone for helping me out with this. However, I don't feel that anyone really provided something that both worked and was simple/understandable, although you did all help me understand the technique.

What I have done in the end is I think much more understandable and practical as an answer to my specific question, so I will post it here in case anyone else has the same question.

Somehow this ended up looking a lot like what OneOfOne posted, which is great because now I understand that. But OneOfOne's code I found very difficult to understand at first because of the passing functions to functions made it quite confusing to understand what bit was for what. I think this way makes a lot more sense:

package main

import (
"fmt"
"sync"
)

const xthreads = 5 // Total number of threads to use, excluding the main() thread

func doSomething(a int) {
	fmt.Println("My job is",a)
	return
}

func main() {
	var ch = make(chan int, 50) // This number 50 can be anything as long as it's larger than xthreads
	var wg sync.WaitGroup
	
	// This starts xthreads number of goroutines that wait for something to do
	wg.Add(xthreads)
	for i:=0; i<xthreads; i++ {
		go func() {
            for {
                a, ok := <-ch
                if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
					wg.Done()
                    return
                }
                doSomething(a) // do the thing
            }
        }()
	}
	
	// Now the jobs can be added to the channel, which is used as a queue
    for i:=0; i<50; i++ {
        ch <- i // add i to the queue
    }
	
	close(ch) // This tells the goroutines there's nothing else to do
	wg.Wait() // Wait for the threads to finish
}

Solution 4 - Go

  1. Create channel for passing data to goroutines.
  2. Start 20 goroutines that processes the data from channel in a loop.
  3. Send the data to the channel instead of starting a new goroutine.

Solution 5 - Go

Grzegorz Żur's answer is the most efficient way to do it, but for a newcomer it could be hard to implement without reading code, so here's a very simple implementation:

type idProcessor func(id uint)

func SpawnStuff(limit uint, proc idProcessor) chan<- uint {
	ch := make(chan uint)
	for i := uint(0); i < limit; i++ {
		go func() {
			for {
				id, ok := <-ch
				if !ok {
					return
				}
				proc(id)
			}
		}()
	}
	return ch
}

func main() {
	runtime.GOMAXPROCS(4)
	var wg sync.WaitGroup //this is just for the demo, otherwise main will return
	fn := func(id uint) {
		fmt.Println(id)
		wg.Done()
	}
	wg.Add(1000)
	ch := SpawnStuff(10, fn)
	for i := uint(0); i < 1000; i++ {
		ch <- i
	}
	close(ch) //should do this to make all the goroutines exit gracefully
	wg.Wait()
}

playground

Solution 6 - Go

This is a simple producer-consumer problem, which in Go can be easily solved using channels to buffer the paquets.

To put it simple: create a channel that accept your IDs. Run a number of routines which will read from the channel in a loop then process the ID. Then run your loop that will feed IDs to the channel.

Example:

func producer() {
    var buffer = make(chan uint)
    
    for i := 0; i < 20; i++ {
        go consumer(buffer)
    }
    
    for _, id :=  range IDs {
        buffer <- id
    }
}

func consumer(buffer chan uint) {
    for {
        id := <- buffer
        // Do your things here
    }
}

Things to know:

  • Unbuffered channels are blocking: if the item wrote into the channel isn't accepted, the routine feeding the item will block until it is
  • My example lack a closing mechanism: you must find a way to make the producer to wait for all consumers to end their loop before returning. The simplest way to do this is with another channel. I let you think about it.

Solution 7 - Go

I've wrote a simple package to handle concurrency for Golang. This package will help you limit the number of goroutines that are allowed to run concurrently: https://github.com/zenthangplus/goccm

Example:

package main

import (
    "fmt"
    "goccm"
    "time"
)

func main()  {
    // Limit 3 goroutines to run concurrently.
    c := goccm.New(3)
    
    for i := 1; i <= 10; i++ {
    	
        // This function have to call before any goroutine
        c.Wait()
        
        go func(i int) {
            fmt.Printf("Job %d is running\n", i)
            time.Sleep(2 * time.Second)
            
            // This function have to when a goroutine has finished
            // Or you can use `defer c.Done()` at the top of goroutine.
            c.Done()
        }(i)
    }
    
    // This function have to call to ensure all goroutines have finished 
    // after close the main program.
    c.WaitAllDone()
}

Solution 8 - Go

Also can take a look here: https://github.com/LiangfengChen/goutil/blob/main/concurrent.go

The example can refer the test case.

func TestParallelCall(t *testing.T) {
	format := "test:%d"
	data := make(map[int]bool)
	mutex := sync.Mutex{}
	val, err := ParallelCall(1000, 10, func(pos int) (interface{}, error) {
		mutex.Lock()
		defer mutex.Unlock()
		data[pos] = true
		return pos, errors.New(fmt.Sprintf(format, pos))
	})

	for i := 0; i < 1000; i++ {
		if _, ok := data[i]; !ok {
			t.Errorf("TestParallelCall pos not found: %d", i)
		}
		if val[i] != i {
			t.Errorf("TestParallelCall return value is not right (%d,%v)", i, val[i])
		}
		if err[i].Error() != fmt.Sprintf(format, i) {
			t.Errorf("TestParallelCall error msg is not correct (%d,%v)", i, err[i])
		}
	}
}

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionAlasdairView Question on Stackoverflow
Solution 1 - GoartyomView Answer on Stackoverflow
Solution 2 - GoEmil DavtyanView Answer on Stackoverflow
Solution 3 - GoAlasdairView Answer on Stackoverflow
Solution 4 - GoGrzegorz ŻurView Answer on Stackoverflow
Solution 5 - GoOneOfOneView Answer on Stackoverflow
Solution 6 - GoElwinarView Answer on Stackoverflow
Solution 7 - GoZen ThắngView Answer on Stackoverflow
Solution 8 - GoLiangfeng ChanView Answer on Stackoverflow