mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-16 23:55:51 +08:00
Rate limit error logs
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type processor struct {
|
||||
@@ -30,6 +31,9 @@ type processor struct {
|
||||
// channel via which to send sync requests to syncer.
|
||||
syncRequestCh chan<- *syncRequest
|
||||
|
||||
// rate limiter to prevent spamming logs with a bunch of errors.
|
||||
errLogLimiter *rate.Limiter
|
||||
|
||||
// sema is a counting semaphore to ensure the number of active workers
|
||||
// does not exceed the limit.
|
||||
sema chan struct{}
|
||||
@@ -66,6 +70,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
|
||||
orderedQueues: orderedQueues,
|
||||
retryDelayFunc: fn,
|
||||
syncRequestCh: syncRequestCh,
|
||||
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
|
||||
sema: make(chan struct{}, n),
|
||||
done: make(chan struct{}),
|
||||
abort: make(chan struct{}),
|
||||
@@ -136,7 +141,9 @@ func (p *processor) exec() {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
logger.error("Dequeue error: %v", err)
|
||||
if p.errLogLimiter.Allow() {
|
||||
logger.error("Dequeue error: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user