mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-16 11:45:51 +08:00
Use command BRPUSHLPOP to move from queue to in_progress in redis
This commit is contained in:
@@ -59,11 +59,10 @@ func (p *processor) start() {
|
||||
// exec pulls a task out of the queue and starts a worker goroutine to
|
||||
// process the task.
|
||||
func (p *processor) exec() {
|
||||
// NOTE: BLPOP needs to timeout to avoid blocking forever
|
||||
// NOTE: dequeue needs to timeout to avoid blocking forever
|
||||
// in case of a program shutdown or additon of a new queue.
|
||||
const timeout = 5 * time.Second
|
||||
// TODO(hibiken): sort the list of queues in order of priority
|
||||
msg, err := p.rdb.dequeue(timeout, p.rdb.listQueues()...)
|
||||
msg, err := p.rdb.dequeue(defaultQueue, timeout)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case errQueuePopTimeout:
|
||||
@@ -82,8 +81,8 @@ func (p *processor) exec() {
|
||||
p.sema <- struct{}{} // acquire token
|
||||
go func(task *Task) {
|
||||
defer func() {
|
||||
if err := p.rdb.srem(inProgress, msg); err != nil {
|
||||
log.Printf("[SERVER ERROR] SREM failed: %v\n", err)
|
||||
if err := p.rdb.lrem(inProgress, msg); err != nil {
|
||||
log.Printf("[SERVER ERROR] LREM failed: %v\n", err)
|
||||
}
|
||||
<-p.sema // release token
|
||||
}()
|
||||
|
||||
Reference in New Issue
Block a user