Code first, story later.
package main
import (
"encoding/json"
"fmt"
"log"
"math"
"math/rand"
"sync"
"time"
)
type inputType struct {
offset int
batchSize int
}
// sleep a random amount of time, and return errors sometimes
func fakeAPI(input inputType) (json.RawMessage, error) {
rand.Seed(time.Now().UnixNano())
n := rand.Intn(10)
time.Sleep(time.Duration(n) * time.Millisecond)
mod := math.Mod(float64(input.offset), 11)
if mod == 0 {
return json.RawMessage{}, fmt.Errorf("error: divisible by 11: %d", input.offset)
}
return json.RawMessage(fmt.Sprintf(`{"offset": %d}`, input.offset)), nil
}
func main() {
queue := make(chan inputType)
outs := make(chan json.RawMessage)
errs := make(chan error)
done := make(chan struct{})
wg := sync.WaitGroup{}
// Add jobs to the queue
go func() {
defer close(queue)
for i := 1; i <= 50; i++ {
queue <- inputType{
offset: i,
batchSize: 1,
}
}
}()
// Start up 10 workers. Each worker can process one job at a time.
for i := 0; i <= 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Read from the queue until the messages run out
for input := range queue {
out, err := fakeAPI(input)
if err != nil {
errs <- err
continue
}
outs <- out
}
}()
}
// Close the done channel when all the workers finish
go func() {
wg.Wait()
close(done)
}()
// Do stuff while the channels fill up
for {
select {
case err := <-errs:
// print to stderr
log.Println(err)
case response := <-outs:
// print to stdout
fmt.Println(string(response))
case <-done:
return
}
}
}
I will need this in the future. Don’t want to scroll past the story to find the code.
Today I needed to speed up a list of queries against a third-party API. The API is quite slow to respond with any size of request, and I needed to follow the pagination through to the end. Querying each page of results serially was taking almost half an hour.
My script is already written in Go, so concurrency should be a breeze. The problem is that the API returns HTTP 429 without any of the headers you might like to expect to know how to retry. But that’s to be expected, when I’m potentially sending 1,000 requests at the API at one time.
So I need to limit the number of concurrent requests to a more reasonable number.
I’ve home-grown a funky solution to handle this in the past, but today I found a nicer solution.