mirror of
https://github.com/hibiken/asynq.git
synced 2026-01-30 17:54:23 +00:00
Update RDB.Requeue to remove message from deadlines set
This commit is contained in:
@@ -194,12 +194,10 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err
|
||||
// ARGV[3] -> task ID
|
||||
// Note: LREM count ZERO means "remove all elements equal to val"
|
||||
var doneCmd = redis.NewScript(`
|
||||
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||
if x == 0 then
|
||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
x = redis.call("ZREM", KEYS[2], ARGV[1])
|
||||
if x == 0 then
|
||||
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
local n = redis.call("INCR", KEYS[3])
|
||||
@@ -228,12 +226,18 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:in_progress
|
||||
// KEYS[2] -> asynq:queues:<qname>
|
||||
// KEYS[2] -> asynq:deadlines
|
||||
// KEYS[3] -> asynq:queues:<qname>
|
||||
// ARGV[1] -> base.TaskMessage value
|
||||
// Note: Use RPUSH to push to the head of the queue.
|
||||
var requeueCmd = redis.NewScript(`
|
||||
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||
redis.call("RPUSH", KEYS[2], ARGV[1])
|
||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
redis.call("RPUSH", KEYS[3], ARGV[1])
|
||||
return redis.status_reply("OK")`)
|
||||
|
||||
// Requeue moves the task from in-progress queue to the specified queue.
|
||||
@@ -243,7 +247,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
|
||||
return err
|
||||
}
|
||||
return requeueCmd.Run(r.client,
|
||||
[]string{base.InProgressQueue, base.QueueKey(msg.Queue)},
|
||||
[]string{base.InProgressQueue, base.KeyDeadlines, base.QueueKey(msg.Queue)},
|
||||
encoded).Err()
|
||||
}
|
||||
|
||||
@@ -324,12 +328,10 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
|
||||
// ARGV[3] -> retry_at UNIX timestamp
|
||||
// ARGV[4] -> stats expiration timestamp
|
||||
var retryCmd = redis.NewScript(`
|
||||
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||
if x == 0 then
|
||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
x = redis.call("ZREM", KEYS[2], ARGV[1])
|
||||
if x == 0 then
|
||||
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2])
|
||||
@@ -383,12 +385,10 @@ const (
|
||||
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||
// ARGV[6] -> stats expiration timestamp
|
||||
var killCmd = redis.NewScript(`
|
||||
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||
if x == 0 then
|
||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
x = redis.call("ZREM", KEYS[2], ARGV[1])
|
||||
if x == 0 then
|
||||
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2])
|
||||
|
||||
Reference in New Issue
Block a user