2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-02-03 08:17:55 +00:00

Add RDB.ListDeadlineExceeded

This commit is contained in:
Ken Hibino
2020-06-20 06:29:58 -07:00
parent 7c7de0d8e0
commit 7657f560ec
2 changed files with 261 additions and 2 deletions

View File

@@ -9,6 +9,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis/v7"
@@ -470,6 +471,44 @@ func (r *RDB) forward(src string) (int, error) {
return cast.ToInt(res), nil
}
// KEYS[1] -> asynq:deadlines
// KEYS[2] -> asynq:in_progress
// ARGV[1] -> max deadline score in unix time
// ARGV[2] -> queue prefix
/*
var requeueDeadlineExceededCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
local qkey = ARGV[2] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
redis.call("ZREM", KEYS[1], msg)
redis.call("LREM", KEYS[2], 0, msg)
end
return table.getn(msgs)`)
*/
// ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline.
func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) {
var msgs []*base.TaskMessage
opt := &redis.ZRangeBy{
Min: "-inf",
Max: strconv.FormatInt(deadline.Unix(), 10),
}
res, err := r.client.ZRangeByScore(base.KeyDeadlines, opt).Result()
if err != nil {
return nil, err
}
for _, s := range res {
msg, err := base.DecodeMessage(s)
if err != nil {
return nil, err
}
msgs = append(msgs, msg)
}
return msgs, nil
}
// KEYS[1] -> asynq:servers:<host:pid:sid>
// KEYS[2] -> asynq:servers
// KEYS[3] -> asynq:workers<host:pid:sid>
@@ -517,7 +556,7 @@ func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo
// KEYS[2] -> asynq:servers:<host:pid:sid>
// KEYS[3] -> asynq:workers
// KEYS[4] -> asynq:workers<host:pid:sid>
var clearProcessInfoCmd = redis.NewScript(`
var clearServerStateCmd = redis.NewScript(`
redis.call("ZREM", KEYS[1], KEYS[2])
redis.call("DEL", KEYS[2])
redis.call("ZREM", KEYS[3], KEYS[4])
@@ -528,7 +567,7 @@ return redis.status_reply("OK")`)
func (r *RDB) ClearServerState(host string, pid int, serverID string) error {
skey := base.ServerInfoKey(host, pid, serverID)
wkey := base.WorkersKey(host, pid, serverID)
return clearProcessInfoCmd.Run(r.client,
return clearServerStateCmd.Run(r.client,
[]string{base.AllServers, skey, base.AllWorkers, wkey}).Err()
}