Go Concurrency Rate Limiting

· 396 words · 2 minute read

Code first, story later.

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"math"
	"math/rand"
	"sync"
	"time"
)

type inputType struct {
	offset    int
	batchSize int
}

// sleep a random amount of time, and return errors sometimes
func fakeAPI(input inputType) (json.RawMessage, error) {
	rand.Seed(time.Now().UnixNano())
	n := rand.Intn(10)
	time.Sleep(time.Duration(n) * time.Millisecond)

	mod := math.Mod(float64(input.offset), 11)
	if mod == 0 {
		return json.RawMessage{}, fmt.Errorf("error: divisible by 11: %d", input.offset)
	}
	return json.RawMessage(fmt.Sprintf(`{"offset": %d}`, input.offset)), nil
}

func main() {
	queue := make(chan inputType)
	outs := make(chan json.RawMessage)
	errs := make(chan error)
	done := make(chan struct{})
	wg := sync.WaitGroup{}

	// Add jobs to the queue
	go func() {
		defer close(queue)
		for i := 1; i <= 50; i++ {
			queue <- inputType{
				offset:    i,
				batchSize: 1,
			}
		}
	}()

	// Start up 10 workers. Each worker can process one job at a time.
	for i := 0; i <= 10; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()
			// Read from the queue until the messages run out
			for input := range queue {
				out, err := fakeAPI(input)
				if err != nil {
					errs <- err
					continue
				}
				outs <- out
			}
		}()
	}

	// Close the done channel when all the workers finish
	go func() {
		wg.Wait()
		close(done)
	}()

	// Do stuff while the channels fill up
	for {
		select {
		case err := <-errs:
			// print to stderr
			log.Println(err)
		case response := <-outs:
			// print to stdout
			fmt.Println(string(response))
		case <-done:
			return
		}
	}
}

I will need this in the future. Don’t want to scroll past the story to find the code.

Today I needed to speed up a list of queries against a third-party API. The API is quite slow to respond with any size of request, and I needed to follow the pagination through to the end. Querying each page of results serially was taking almost half an hour.

My script is already written in Go, so concurrency should be a breeze. The problem is that the API returns HTTP 429 without any of the headers you might like to expect to know how to retry. But that’s to be expected, when I’m potentially sending 1,000 requests at the API at one time.

So I need to limit the number of concurrent requests to a more reasonable number.

I’ve home-grown a funky solution to handle this in the past, but today I found a nicer solution.