From 6252cc02e00309feec3ca053728afd95895330a4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 17 Nov 2019 18:44:40 -0800 Subject: [PATCH] Implement exponential backoff retry --- asynq.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/asynq.go b/asynq.go index 9d4f4df..620373e 100644 --- a/asynq.go +++ b/asynq.go @@ -15,6 +15,8 @@ import ( "encoding/json" "fmt" "log" + "math" + "math/rand" "strconv" "time" @@ -59,7 +61,7 @@ type taskMessage struct { // queue name this message should be enqueued to Queue string - // remainig retry count + // max number of retry for this task. Retry int // number of times we've retried so far @@ -151,16 +153,17 @@ func (w *Workers) Run(handler TaskHandler) { go func(task *Task) { err := handler(task) if err != nil { - if msg.Retry == 0 { + if msg.Retried >= msg.Retry { // TODO(hibiken): Add the task to "dead" collection fmt.Println("Retry exausted!!!") return } fmt.Println("RETRY!!!") - delay := 10 * time.Second // TODO(hibiken): Implement exponential backoff. - msg.Retry-- + retryAt := time.Now().Add(delaySeconds((msg.Retried))) + fmt.Printf("[DEBUG] retying the task in %v\n", retryAt.Sub(time.Now())) + msg.Retried++ msg.ErrorMsg = err.Error() - if err := zadd(w.rdb, retry, float64(time.Now().Add(delay).Unix()), &msg); err != nil { + if err := zadd(w.rdb, retry, float64(retryAt.Unix()), &msg); err != nil { // TODO(hibiken): Not sure how to handle this error log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err) return @@ -243,3 +246,11 @@ func zadd(rdb *redis.Client, zset string, zscore float64, msg *taskMessage) erro func listQueues(rdb *redis.Client) []string { return rdb.SMembers(allQueues).Val() } + +// delaySeconds returns a number seconds to delay before retrying. +// Formula taken from https://github.com/mperham/sidekiq. +func delaySeconds(count int) time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) + return time.Duration(s) * time.Second +}