mirror of
https://github.com/hibiken/asynq.git
synced 2026-05-07 08:25:56 +08:00
Implement UpdateTaskPayload method for inspector (#1042)
Co-authored-by: Aziz Aliyev <aziz.aliyev@idda.az>
This commit is contained in:
@@ -1412,6 +1412,93 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Input:
|
||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||
// --
|
||||
// ARGV[1] -> task message data
|
||||
//
|
||||
// Output:
|
||||
// Numeric code indicating the status:
|
||||
// Returns 1 if task is successfully updated.
|
||||
// Returns 0 if task is not found.
|
||||
// Returns -1 if task is not in scheduled state.
|
||||
var updateTaskPayloadCmd = redis.NewScript(`
|
||||
-- Check if given taks exists
|
||||
if redis.call("EXISTS", KEYS[1]) == 0 then
|
||||
return 0
|
||||
end
|
||||
local state, pending_since, group, unique_key = unpack(redis.call("HMGET", KEYS[1], "state", "pending_since", "group", "unique_key"))
|
||||
if state ~= "scheduled" then
|
||||
return -1
|
||||
end
|
||||
local redis_call_args = {"state", state}
|
||||
|
||||
if pending_since then
|
||||
table.insert(redis_call_args, "pending_since")
|
||||
table.insert(redis_call_args, pending_since)
|
||||
end
|
||||
if group then
|
||||
table.insert(redis_call_args, "group")
|
||||
table.insert(redis_call_args, group)
|
||||
end
|
||||
if unique_key then
|
||||
table.insert(redis_call_args, "unique_key")
|
||||
table.insert(redis_call_args, unique_key)
|
||||
end
|
||||
redis.call("HSET", KEYS[1], "msg", ARGV[1], unpack(redis_call_args))
|
||||
return 1
|
||||
`)
|
||||
|
||||
// UpdateTaskPayload finds a task that matches the id from the given queue and updates it's payload.
|
||||
// It returns nil if it successfully updated the task payload.
|
||||
//
|
||||
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
|
||||
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
|
||||
// If a task is in active state it returns non-nil error with Code FailedPrecondition.
|
||||
func (r *RDB) UpdateTaskPayload(qname, id string, payload []byte) error {
|
||||
var op errors.Op = "rdb.UpdateTask"
|
||||
if err := r.checkQueueExists(qname); err != nil {
|
||||
return errors.E(op, errors.CanonicalCode(err), err)
|
||||
}
|
||||
|
||||
taskInfo, err := r.GetTaskInfo(qname, id)
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, err)
|
||||
}
|
||||
|
||||
taskInfo.Message.Payload = payload
|
||||
|
||||
encoded, err := base.EncodeMessage(taskInfo.Message)
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||
}
|
||||
keys := []string{
|
||||
base.TaskKey(qname, id),
|
||||
}
|
||||
argv := []interface{}{
|
||||
encoded,
|
||||
}
|
||||
|
||||
res, err := updateTaskPayloadCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, err)
|
||||
}
|
||||
n, ok := res.(int64)
|
||||
if !ok {
|
||||
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: updateTaskCmd script returned unexported value %v", res))
|
||||
}
|
||||
switch n {
|
||||
case 1:
|
||||
return nil
|
||||
case 0:
|
||||
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id})
|
||||
case -1:
|
||||
return errors.E(op, errors.FailedPrecondition, "cannot update task that is not in scheduled state.")
|
||||
default:
|
||||
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from updateTaskCmd script: %d", n))
|
||||
}
|
||||
}
|
||||
|
||||
// Input:
|
||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||
// KEYS[2] -> asynq:{<qname>}:groups
|
||||
|
||||
Reference in New Issue
Block a user