2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-01-30 17:54:23 +00:00

Update RDB.Done to remove message from deadlines set

This commit is contained in:
Ken Hibino
2020-06-18 07:10:57 -07:00
parent 9b05dea394
commit 7b9119c703
3 changed files with 97 additions and 13 deletions

View File

@@ -186,8 +186,9 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int, err e
}
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:processed:<yyyy-mm-dd>
// KEYS[3] -> unique key in the format <type>:<payload>:<qname>
// KEYS[2] -> asynq:deadlines
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
// KEYS[4] -> unique key in the format <type>:<payload>:<qname>
// ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
@@ -197,12 +198,16 @@ local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
if x == 0 then
return redis.error_reply("NOT FOUND")
end
local n = redis.call("INCR", KEYS[2])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[2], ARGV[2])
x = redis.call("ZREM", KEYS[2], ARGV[1])
if x == 0 then
return redis.error_reply("NOT FOUND")
end
if string.len(KEYS[3]) > 0 and redis.call("GET", KEYS[3]) == ARGV[3] then
redis.call("DEL", KEYS[3])
local n = redis.call("INCR", KEYS[3])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[3], ARGV[2])
end
if string.len(KEYS[4]) > 0 and redis.call("GET", KEYS[4]) == ARGV[3] then
redis.call("DEL", KEYS[4])
end
return redis.status_reply("OK")
`)
@@ -218,7 +223,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
processedKey := base.ProcessedKey(now)
expireAt := now.Add(statsTTL)
return doneCmd.Run(r.client,
[]string{base.InProgressQueue, processedKey, msg.UniqueKey},
[]string{base.InProgressQueue, base.KeyDeadlines, processedKey, msg.UniqueKey},
encoded, expireAt.Unix(), msg.ID.String()).Err()
}