mirror of
https://github.com/hibiken/asynq.git
synced 2026-05-22 10:37:07 +08:00
Compare commits
20 Commits
sohail/v0.
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
785bb7208c | ||
|
|
3a7ec97482 | ||
|
|
d671a4c427 | ||
|
|
f81d452160 | ||
|
|
1af926b147 | ||
|
|
23905a286f | ||
|
|
a32ac05d09 | ||
|
|
68f03688e3 | ||
|
|
06a06970d6 | ||
|
|
5586efeae7 | ||
|
|
dd3c923f44 | ||
|
|
f8d6677814 | ||
|
|
07898eade0 | ||
|
|
5216f1c3be | ||
|
|
dbfdfbac5a | ||
|
|
5c391f3ffb | ||
|
|
7ae0b3fe22 | ||
|
|
71ebcfa129 | ||
|
|
4e62d7e29d | ||
|
|
f919a605d5 |
14
asynq.go
14
asynq.go
@@ -40,6 +40,7 @@ type Task struct {
|
|||||||
func (t *Task) Type() string { return t.typename }
|
func (t *Task) Type() string { return t.typename }
|
||||||
func (t *Task) Payload() []byte { return t.payload }
|
func (t *Task) Payload() []byte { return t.payload }
|
||||||
func (t *Task) Headers() map[string]string { return t.headers }
|
func (t *Task) Headers() map[string]string { return t.headers }
|
||||||
|
func (t *Task) Options() []Option { return t.opts }
|
||||||
|
|
||||||
// ResultWriter returns a pointer to the ResultWriter associated with the task.
|
// ResultWriter returns a pointer to the ResultWriter associated with the task.
|
||||||
//
|
//
|
||||||
@@ -471,7 +472,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
|
|||||||
// redis://[:password@]host[:port][/dbnumber]
|
// redis://[:password@]host[:port][/dbnumber]
|
||||||
// rediss://[:password@]host[:port][/dbnumber]
|
// rediss://[:password@]host[:port][/dbnumber]
|
||||||
// redis-socket://[:password@]path[?db=dbnumber]
|
// redis-socket://[:password@]path[?db=dbnumber]
|
||||||
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
|
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][/dbnumber][?master=masterName]
|
||||||
func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
||||||
u, err := url.Parse(uri)
|
u, err := url.Parse(uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -545,11 +546,20 @@ func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
|
|||||||
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
|
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
|
||||||
addrs := strings.Split(u.Host, ",")
|
addrs := strings.Split(u.Host, ",")
|
||||||
master := u.Query().Get("master")
|
master := u.Query().Get("master")
|
||||||
|
var db int
|
||||||
|
var err error
|
||||||
|
if len(u.Path) > 0 {
|
||||||
|
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
|
||||||
|
db, err = strconv.Atoi(xs[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("asynq: could not parse redis sentinel uri: database number should be the first segment of the path")
|
||||||
|
}
|
||||||
|
}
|
||||||
var password string
|
var password string
|
||||||
if v, ok := u.User.Password(); ok {
|
if v, ok := u.User.Password(); ok {
|
||||||
password = v
|
password = v
|
||||||
}
|
}
|
||||||
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password}, nil
|
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password, DB: db}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResultWriter is a client interface to write result data for a task.
|
// ResultWriter is a client interface to write result data for a task.
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
@@ -148,6 +149,23 @@ func TestParseRedisURI(t *testing.T) {
|
|||||||
SentinelPassword: "mypassword",
|
SentinelPassword: "mypassword",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"redis-sentinel://localhost:5000,localhost:5001,localhost:5002/3?master=mymaster",
|
||||||
|
RedisFailoverClientOpt{
|
||||||
|
MasterName: "mymaster",
|
||||||
|
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
|
||||||
|
DB: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002/7?master=mymaster",
|
||||||
|
RedisFailoverClientOpt{
|
||||||
|
MasterName: "mymaster",
|
||||||
|
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
|
||||||
|
SentinelPassword: "mypassword",
|
||||||
|
DB: 7,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
@@ -188,6 +206,10 @@ func TestParseRedisURIErrors(t *testing.T) {
|
|||||||
"non integer for db numbers for socket",
|
"non integer for db numbers for socket",
|
||||||
"redis-socket:///some/path/to/redis?db=one",
|
"redis-socket:///some/path/to/redis?db=one",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"non integer for db number for sentinel",
|
||||||
|
"redis-sentinel://localhost:5000/abc?master=mymaster",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
@@ -198,3 +220,30 @@ func TestParseRedisURIErrors(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTaskOptions(t *testing.T) {
|
||||||
|
opts := []Option{
|
||||||
|
MaxRetry(3),
|
||||||
|
Queue("critical"),
|
||||||
|
Timeout(5 * time.Minute),
|
||||||
|
}
|
||||||
|
task := NewTask("mytask", []byte("payload"), opts...)
|
||||||
|
|
||||||
|
got := task.Options()
|
||||||
|
if len(got) != len(opts) {
|
||||||
|
t.Fatalf("task.Options() returned %d options, want %d", len(got), len(opts))
|
||||||
|
}
|
||||||
|
for i, o := range opts {
|
||||||
|
if got[i].String() != o.String() {
|
||||||
|
t.Errorf("task.Options()[%d] = %v, want %v", i, got[i], o)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTaskOptionsNil(t *testing.T) {
|
||||||
|
task := NewTask("mytask", []byte("payload"))
|
||||||
|
got := task.Options()
|
||||||
|
if got != nil {
|
||||||
|
t.Errorf("task.Options() = %v, want nil for task with no options", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
174
client.go
174
client.go
@@ -6,7 +6,9 @@ package asynq
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"maps"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -60,6 +62,7 @@ const (
|
|||||||
TaskIDOpt
|
TaskIDOpt
|
||||||
RetentionOpt
|
RetentionOpt
|
||||||
GroupOpt
|
GroupOpt
|
||||||
|
HeaderOpt
|
||||||
)
|
)
|
||||||
|
|
||||||
// Option specifies the task processing behavior.
|
// Option specifies the task processing behavior.
|
||||||
@@ -86,6 +89,7 @@ type (
|
|||||||
processInOption time.Duration
|
processInOption time.Duration
|
||||||
retentionOption time.Duration
|
retentionOption time.Duration
|
||||||
groupOption string
|
groupOption string
|
||||||
|
headerOption [2]string
|
||||||
)
|
)
|
||||||
|
|
||||||
// MaxRetry returns an option to specify the max number of times
|
// MaxRetry returns an option to specify the max number of times
|
||||||
@@ -217,6 +221,27 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st
|
|||||||
func (name groupOption) Type() OptionType { return GroupOpt }
|
func (name groupOption) Type() OptionType { return GroupOpt }
|
||||||
func (name groupOption) Value() interface{} { return string(name) }
|
func (name groupOption) Value() interface{} { return string(name) }
|
||||||
|
|
||||||
|
// Header returns an option to associate the key-value header to the task.
|
||||||
|
//
|
||||||
|
// This option is composable with other Client options and can be used together
|
||||||
|
// with other options like MaxRetry, Queue, etc. For use cases where headers
|
||||||
|
// need to be combined with other options, using Header option is recommended.
|
||||||
|
//
|
||||||
|
// Alternatively, NewTaskWithHeaders can be used to create a task with headers
|
||||||
|
// directly, which may be preferable when headers are an intrinsic part of the
|
||||||
|
// task definition rather than enqueue-time configuration.
|
||||||
|
func Header(key, value string) Option {
|
||||||
|
return headerOption{key, value}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h headerOption) String() string {
|
||||||
|
var bytes []byte
|
||||||
|
bytes, _ = json.Marshal(h)
|
||||||
|
return fmt.Sprintf("Header(%s)", bytes)
|
||||||
|
}
|
||||||
|
func (h headerOption) Type() OptionType { return HeaderOpt }
|
||||||
|
func (h headerOption) Value() interface{} { return [2]string{h[0], h[1]} }
|
||||||
|
|
||||||
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
|
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
|
||||||
//
|
//
|
||||||
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
|
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
|
||||||
@@ -237,6 +262,7 @@ type option struct {
|
|||||||
processAt time.Time
|
processAt time.Time
|
||||||
retention time.Duration
|
retention time.Duration
|
||||||
group string
|
group string
|
||||||
|
headers map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// composeOptions merges user provided options into the default options
|
// composeOptions merges user provided options into the default options
|
||||||
@@ -251,6 +277,7 @@ func composeOptions(opts ...Option) (option, error) {
|
|||||||
timeout: 0, // do not set to defaultTimeout here
|
timeout: 0, // do not set to defaultTimeout here
|
||||||
deadline: time.Time{},
|
deadline: time.Time{},
|
||||||
processAt: time.Now(),
|
processAt: time.Now(),
|
||||||
|
headers: make(map[string]string),
|
||||||
}
|
}
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
switch opt := opt.(type) {
|
switch opt := opt.(type) {
|
||||||
@@ -290,6 +317,9 @@ func composeOptions(opts ...Option) (option, error) {
|
|||||||
return option{}, errors.New("group key cannot be empty")
|
return option{}, errors.New("group key cannot be empty")
|
||||||
}
|
}
|
||||||
res.group = key
|
res.group = key
|
||||||
|
case headerOption:
|
||||||
|
key, value := opt[0], opt[1]
|
||||||
|
res.headers[key] = value
|
||||||
default:
|
default:
|
||||||
// ignore unexpected option
|
// ignore unexpected option
|
||||||
}
|
}
|
||||||
@@ -385,7 +415,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
|
|||||||
ID: opt.taskID,
|
ID: opt.taskID,
|
||||||
Type: task.Type(),
|
Type: task.Type(),
|
||||||
Payload: task.Payload(),
|
Payload: task.Payload(),
|
||||||
Headers: task.Headers(),
|
Headers: maps.Clone(task.Headers()),
|
||||||
Queue: opt.queue,
|
Queue: opt.queue,
|
||||||
Retry: opt.retry,
|
Retry: opt.retry,
|
||||||
Deadline: deadline.Unix(),
|
Deadline: deadline.Unix(),
|
||||||
@@ -394,6 +424,12 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
|
|||||||
GroupKey: opt.group,
|
GroupKey: opt.group,
|
||||||
Retention: int64(opt.retention.Seconds()),
|
Retention: int64(opt.retention.Seconds()),
|
||||||
}
|
}
|
||||||
|
if len(opt.headers) > 0 {
|
||||||
|
if msg.Headers == nil {
|
||||||
|
msg.Headers = make(map[string]string)
|
||||||
|
}
|
||||||
|
maps.Copy(msg.Headers, opt.headers)
|
||||||
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
var state base.TaskState
|
var state base.TaskState
|
||||||
if opt.processAt.After(now) {
|
if opt.processAt.After(now) {
|
||||||
@@ -420,6 +456,142 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
|
|||||||
return newTaskInfo(msg, state, opt.processAt, nil), nil
|
return newTaskInfo(msg, state, opt.processAt, nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BatchEnqueueResult holds the result of enqueuing a single task within a batch.
|
||||||
|
type BatchEnqueueResult struct {
|
||||||
|
TaskInfo *TaskInfo
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchEnqueueContext enqueues multiple tasks in a single Redis pipeline round-trip,
|
||||||
|
// returning a per-task result slice aligned with the input tasks slice.
|
||||||
|
//
|
||||||
|
// # Atomicity Guarantees
|
||||||
|
//
|
||||||
|
// There is no all-or-nothing guarantee across the batch. Each task is executed as
|
||||||
|
// an independent Lua script inside a Redis pipeline. Individual scripts are atomic
|
||||||
|
// (the existence check, hash write, and list/sorted-set push for one task cannot
|
||||||
|
// be partially applied), but the pipeline as a whole is not wrapped in a
|
||||||
|
// MULTI/EXEC transaction. This means:
|
||||||
|
//
|
||||||
|
// - Partial success is possible: some tasks may be enqueued while others are not.
|
||||||
|
// - A task whose ID already exists in Redis is silently skipped (treated as a
|
||||||
|
// no-op by the Lua script), and its result will still show success.
|
||||||
|
// - If the Redis pipeline call itself fails (e.g. connection lost, context
|
||||||
|
// cancelled), every task that passed client-side validation receives that
|
||||||
|
// error — none of them can be assumed to have been enqueued.
|
||||||
|
//
|
||||||
|
// # Validation Errors (pre-pipeline)
|
||||||
|
//
|
||||||
|
// The following are caught before any Redis call and rejected in the
|
||||||
|
// corresponding BatchEnqueueResult.Err without affecting other tasks:
|
||||||
|
//
|
||||||
|
// - nil task
|
||||||
|
// - empty task type name
|
||||||
|
// - invalid options
|
||||||
|
// - group tasks (not supported in batch mode)
|
||||||
|
// - unique tasks (not supported in batch mode)
|
||||||
|
//
|
||||||
|
// # Supported Task Types
|
||||||
|
//
|
||||||
|
// Immediate and scheduled (via [ProcessAt] or [ProcessIn]) tasks are supported.
|
||||||
|
// Group and unique tasks are rejected as described above.
|
||||||
|
func (c *Client) BatchEnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) []BatchEnqueueResult {
|
||||||
|
results := make([]BatchEnqueueResult, len(tasks))
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
|
type itemMeta struct {
|
||||||
|
state base.TaskState
|
||||||
|
processAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
items := make([]base.BatchEnqueueItem, 0, len(tasks))
|
||||||
|
itemIndexes := make([]int, 0, len(tasks))
|
||||||
|
itemMetas := make([]itemMeta, 0, len(tasks))
|
||||||
|
|
||||||
|
for i, task := range tasks {
|
||||||
|
if task == nil {
|
||||||
|
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task cannot be nil")}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(task.Type()) == "" {
|
||||||
|
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task typename cannot be empty")}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
merged := append(task.opts, opts...)
|
||||||
|
opt, err := composeOptions(merged...)
|
||||||
|
if err != nil {
|
||||||
|
results[i] = BatchEnqueueResult{Err: err}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if opt.group != "" {
|
||||||
|
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support group tasks")}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if opt.uniqueTTL > 0 {
|
||||||
|
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support unique tasks")}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
deadline := noDeadline
|
||||||
|
if !opt.deadline.IsZero() {
|
||||||
|
deadline = opt.deadline
|
||||||
|
}
|
||||||
|
timeout := noTimeout
|
||||||
|
if opt.timeout != 0 {
|
||||||
|
timeout = opt.timeout
|
||||||
|
}
|
||||||
|
if deadline.Equal(noDeadline) && timeout == noTimeout {
|
||||||
|
timeout = defaultTimeout
|
||||||
|
}
|
||||||
|
msg := &base.TaskMessage{
|
||||||
|
ID: opt.taskID,
|
||||||
|
Type: task.Type(),
|
||||||
|
Payload: task.Payload(),
|
||||||
|
Headers: task.Headers(),
|
||||||
|
Queue: opt.queue,
|
||||||
|
Retry: opt.retry,
|
||||||
|
Deadline: deadline.Unix(),
|
||||||
|
Timeout: int64(timeout.Seconds()),
|
||||||
|
Retention: int64(opt.retention.Seconds()),
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
scheduled := opt.processAt.After(now)
|
||||||
|
|
||||||
|
item := base.BatchEnqueueItem{Msg: msg}
|
||||||
|
var meta itemMeta
|
||||||
|
if scheduled {
|
||||||
|
item.ProcessAt = opt.processAt
|
||||||
|
meta = itemMeta{state: base.TaskStateScheduled, processAt: opt.processAt}
|
||||||
|
} else {
|
||||||
|
meta = itemMeta{state: base.TaskStatePending, processAt: now}
|
||||||
|
}
|
||||||
|
|
||||||
|
items = append(items, item)
|
||||||
|
itemIndexes = append(itemIndexes, i)
|
||||||
|
itemMetas = append(itemMetas, meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(items) == 0 {
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := c.broker.BatchEnqueue(ctx, items)
|
||||||
|
if err != nil {
|
||||||
|
for _, idx := range itemIndexes {
|
||||||
|
results[idx] = BatchEnqueueResult{Err: err}
|
||||||
|
}
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
|
for j, idx := range itemIndexes {
|
||||||
|
info := newTaskInfo(items[j].Msg, itemMetas[j].state, itemMetas[j].processAt, nil)
|
||||||
|
results[idx] = BatchEnqueueResult{TaskInfo: info}
|
||||||
|
}
|
||||||
|
return results
|
||||||
|
}
|
||||||
|
|
||||||
// Ping performs a ping against the redis connection.
|
// Ping performs a ping against the redis connection.
|
||||||
func (c *Client) Ping() error {
|
func (c *Client) Ping() error {
|
||||||
return c.broker.Ping()
|
return c.broker.Ping()
|
||||||
|
|||||||
272
client_test.go
272
client_test.go
@@ -13,6 +13,8 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"github.com/hibiken/asynq/internal/testbroker"
|
||||||
h "github.com/hibiken/asynq/internal/testutil"
|
h "github.com/hibiken/asynq/internal/testutil"
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
@@ -1339,6 +1341,70 @@ func TestClientEnqueueWithHeaders(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "Task with header option",
|
||||||
|
task: NewTask("store_data", []byte("data"), Header("channel", "email"), Header("user-id", "bob1234")),
|
||||||
|
opts: []Option{},
|
||||||
|
wantInfo: &TaskInfo{
|
||||||
|
Queue: "default",
|
||||||
|
Type: "store_data",
|
||||||
|
Payload: []byte("data"),
|
||||||
|
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
|
||||||
|
State: TaskStatePending,
|
||||||
|
MaxRetry: 25,
|
||||||
|
Retried: 0,
|
||||||
|
LastErr: "",
|
||||||
|
LastFailedAt: time.Time{},
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: time.Time{},
|
||||||
|
NextProcessAt: now,
|
||||||
|
},
|
||||||
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
|
"default": {
|
||||||
|
{
|
||||||
|
Type: "store_data",
|
||||||
|
Payload: []byte("data"),
|
||||||
|
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
|
||||||
|
Retry: 25,
|
||||||
|
Queue: "default",
|
||||||
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
|
Deadline: noDeadline.Unix(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Enqueue task with header option",
|
||||||
|
task: NewTask("store_data", []byte("data")),
|
||||||
|
opts: []Option{Header("channel", "email"), Header("user-id", "bob1234"), MaxRetry(5)},
|
||||||
|
wantInfo: &TaskInfo{
|
||||||
|
Queue: "default",
|
||||||
|
Type: "store_data",
|
||||||
|
Payload: []byte("data"),
|
||||||
|
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
|
||||||
|
State: TaskStatePending,
|
||||||
|
MaxRetry: 5,
|
||||||
|
Retried: 0,
|
||||||
|
LastErr: "",
|
||||||
|
LastFailedAt: time.Time{},
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: time.Time{},
|
||||||
|
NextProcessAt: now,
|
||||||
|
},
|
||||||
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
|
"default": {
|
||||||
|
{
|
||||||
|
Type: "store_data",
|
||||||
|
Payload: []byte("data"),
|
||||||
|
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
|
||||||
|
Retry: 5,
|
||||||
|
Queue: "default",
|
||||||
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
|
Deadline: noDeadline.Unix(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
@@ -1661,3 +1727,209 @@ func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBatchEnqueueContext_ImmediateTasks(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
client := NewClient(getRedisConnOpt(t))
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
tasks := []*Task{
|
||||||
|
NewTask("task1", []byte("payload1")),
|
||||||
|
NewTask("task2", []byte("payload2")),
|
||||||
|
NewTask("task3", []byte("payload3")),
|
||||||
|
}
|
||||||
|
|
||||||
|
results := client.BatchEnqueueContext(context.Background(), tasks)
|
||||||
|
if len(results) != 3 {
|
||||||
|
t.Fatalf("BatchEnqueueContext returned %d results, want 3", len(results))
|
||||||
|
}
|
||||||
|
for i, res := range results {
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Errorf("results[%d].Err = %v, want nil", i, res.Err)
|
||||||
|
}
|
||||||
|
if res.TaskInfo == nil {
|
||||||
|
t.Errorf("results[%d].TaskInfo is nil, want non-nil", i)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if res.TaskInfo.Queue != "default" {
|
||||||
|
t.Errorf("results[%d].TaskInfo.Queue = %q, want %q", i, res.TaskInfo.Queue, "default")
|
||||||
|
}
|
||||||
|
if res.TaskInfo.State != TaskStatePending {
|
||||||
|
t.Errorf("results[%d].TaskInfo.State = %v, want %v", i, res.TaskInfo.State, TaskStatePending)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
gotPending := h.GetPendingMessages(t, r, "default")
|
||||||
|
if len(gotPending) != 3 {
|
||||||
|
t.Errorf("len(pending) = %d, want 3", len(gotPending))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchEnqueueContext_ScheduledTask(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
client := NewClient(getRedisConnOpt(t))
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
future := time.Now().Add(1 * time.Hour)
|
||||||
|
tasks := []*Task{
|
||||||
|
NewTask("scheduled_task", []byte("payload"), ProcessAt(future)),
|
||||||
|
}
|
||||||
|
|
||||||
|
results := client.BatchEnqueueContext(context.Background(), tasks)
|
||||||
|
if len(results) != 1 {
|
||||||
|
t.Fatalf("BatchEnqueueContext returned %d results, want 1", len(results))
|
||||||
|
}
|
||||||
|
if results[0].Err != nil {
|
||||||
|
t.Fatalf("results[0].Err = %v, want nil", results[0].Err)
|
||||||
|
}
|
||||||
|
if results[0].TaskInfo == nil {
|
||||||
|
t.Fatal("results[0].TaskInfo is nil, want non-nil")
|
||||||
|
}
|
||||||
|
if results[0].TaskInfo.State != TaskStateScheduled {
|
||||||
|
t.Errorf("results[0].TaskInfo.State = %v, want %v", results[0].TaskInfo.State, TaskStateScheduled)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotScheduled := h.GetScheduledMessages(t, r, "default")
|
||||||
|
if len(gotScheduled) != 1 {
|
||||||
|
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchEnqueueContext_MixedBatch(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
client := NewClient(getRedisConnOpt(t))
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
future := time.Now().Add(1 * time.Hour)
|
||||||
|
tasks := []*Task{
|
||||||
|
NewTask("immediate1", []byte("p1")),
|
||||||
|
NewTask("scheduled1", []byte("p2"), ProcessAt(future)),
|
||||||
|
NewTask("immediate2", []byte("p3")),
|
||||||
|
NewTask("grouped1", []byte("p4"), Group("mygroup")),
|
||||||
|
NewTask("immediate3", []byte("p5")),
|
||||||
|
}
|
||||||
|
|
||||||
|
results := client.BatchEnqueueContext(context.Background(), tasks)
|
||||||
|
if len(results) != 5 {
|
||||||
|
t.Fatalf("BatchEnqueueContext returned %d results, want 5", len(results))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Immediate tasks (indices 0, 2, 4) should succeed with Pending state.
|
||||||
|
for _, idx := range []int{0, 2, 4} {
|
||||||
|
if results[idx].Err != nil {
|
||||||
|
t.Errorf("results[%d].Err = %v, want nil (immediate task)", idx, results[idx].Err)
|
||||||
|
}
|
||||||
|
if results[idx].TaskInfo == nil {
|
||||||
|
t.Errorf("results[%d].TaskInfo is nil, want non-nil", idx)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if results[idx].TaskInfo.State != TaskStatePending {
|
||||||
|
t.Errorf("results[%d].TaskInfo.State = %v, want %v", idx, results[idx].TaskInfo.State, TaskStatePending)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scheduled task (index 1) should succeed with Scheduled state.
|
||||||
|
if results[1].Err != nil {
|
||||||
|
t.Errorf("results[1].Err = %v, want nil (scheduled task)", results[1].Err)
|
||||||
|
}
|
||||||
|
if results[1].TaskInfo != nil && results[1].TaskInfo.State != TaskStateScheduled {
|
||||||
|
t.Errorf("results[1].TaskInfo.State = %v, want %v", results[1].TaskInfo.State, TaskStateScheduled)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Grouped task (index 3) should be rejected.
|
||||||
|
if results[3].Err == nil {
|
||||||
|
t.Error("results[3].Err is nil, want error for group task")
|
||||||
|
}
|
||||||
|
|
||||||
|
gotPending := h.GetPendingMessages(t, r, "default")
|
||||||
|
if len(gotPending) != 3 {
|
||||||
|
t.Errorf("len(pending) = %d, want 3", len(gotPending))
|
||||||
|
}
|
||||||
|
|
||||||
|
gotScheduled := h.GetScheduledMessages(t, r, "default")
|
||||||
|
if len(gotScheduled) != 1 {
|
||||||
|
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchEnqueueContext_ValidationErrors(t *testing.T) {
|
||||||
|
setup(t)
|
||||||
|
client := NewClient(getRedisConnOpt(t))
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
tasks []*Task
|
||||||
|
opts []Option
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "nil task",
|
||||||
|
tasks: []*Task{nil},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "empty task typename",
|
||||||
|
tasks: []*Task{NewTask("", []byte("payload"))},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "blank task typename",
|
||||||
|
tasks: []*Task{NewTask(" ", []byte("payload"))},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "invalid option: unique TTL less than 1s",
|
||||||
|
tasks: []*Task{NewTask("foo", nil)},
|
||||||
|
opts: []Option{Unique(300 * time.Millisecond)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "group task rejected",
|
||||||
|
tasks: []*Task{NewTask("foo", nil, Group("mygroup"))},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "unique task rejected",
|
||||||
|
tasks: []*Task{NewTask("foo", nil, Unique(time.Hour))},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
results := client.BatchEnqueueContext(context.Background(), tc.tasks, tc.opts...)
|
||||||
|
if len(results) != len(tc.tasks) {
|
||||||
|
t.Errorf("%s: got %d results, want %d", tc.desc, len(results), len(tc.tasks))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for i, res := range results {
|
||||||
|
if res.Err == nil {
|
||||||
|
t.Errorf("%s: results[%d].Err = nil, want non-nil error", tc.desc, i)
|
||||||
|
}
|
||||||
|
if res.TaskInfo != nil {
|
||||||
|
t.Errorf("%s: results[%d].TaskInfo = %v, want nil", tc.desc, i, res.TaskInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchEnqueueContext_BrokerError(t *testing.T) {
|
||||||
|
r := rdb.NewRDB(setup(t))
|
||||||
|
defer r.Close()
|
||||||
|
testBroker := testbroker.NewTestBroker(r)
|
||||||
|
client := &Client{broker: testBroker, sharedConnection: true}
|
||||||
|
|
||||||
|
tasks := []*Task{
|
||||||
|
NewTask("task1", []byte("p1")),
|
||||||
|
NewTask("task2", []byte("p2")),
|
||||||
|
}
|
||||||
|
|
||||||
|
testBroker.Sleep()
|
||||||
|
results := client.BatchEnqueueContext(context.Background(), tasks)
|
||||||
|
testBroker.Wakeup()
|
||||||
|
|
||||||
|
if len(results) != 2 {
|
||||||
|
t.Fatalf("BatchEnqueueContext returned %d results, want 2", len(results))
|
||||||
|
}
|
||||||
|
for i, res := range results {
|
||||||
|
if res.Err == nil {
|
||||||
|
t.Errorf("results[%d].Err = nil, want non-nil error when broker is down", i)
|
||||||
|
}
|
||||||
|
if res.TaskInfo != nil {
|
||||||
|
t.Errorf("results[%d].TaskInfo = %v, want nil on broker error", i, res.TaskInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@
|
|||||||
package asynq
|
package asynq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -1002,6 +1003,13 @@ func parseOption(s string) (Option, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return Retention(d), nil
|
return Retention(d), nil
|
||||||
|
case "Header":
|
||||||
|
var h [2]string
|
||||||
|
err := json.Unmarshal([]byte(arg), &h)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return Header(h[0], h[1]), nil
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("cannot not parse option string %q", s)
|
return nil, fmt.Errorf("cannot not parse option string %q", s)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3526,6 +3526,7 @@ func TestParseOption(t *testing.T) {
|
|||||||
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
|
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
|
||||||
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
|
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
|
||||||
{`Retention(24h)`, RetentionOpt, 24 * time.Hour},
|
{`Retention(24h)`, RetentionOpt, 24 * time.Hour},
|
||||||
|
{`Header(["email", "hello@example.com"])`, HeaderOpt, [2]string{"email", "hello@example.com"}},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
@@ -3573,6 +3574,14 @@ func TestParseOption(t *testing.T) {
|
|||||||
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
|
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
|
||||||
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
|
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
|
||||||
}
|
}
|
||||||
|
case HeaderOpt:
|
||||||
|
gotVal, ok := got.Value().([2]string)
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("returned Option with non array value")
|
||||||
|
}
|
||||||
|
if gotVal != tc.wantVal.([2]string) {
|
||||||
|
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
t.Fatalf("returned Option with unexpected type: %v", got.Type())
|
t.Fatalf("returned Option with unexpected type: %v", got.Type())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -684,6 +684,14 @@ func (l *Lease) IsValid() bool {
|
|||||||
return l.expireAt.After(now) || l.expireAt.Equal(now)
|
return l.expireAt.After(now) || l.expireAt.Equal(now)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BatchEnqueueItem pairs a task message with optional scheduling metadata for
|
||||||
|
// batch enqueue operations. If ProcessAt is zero, the task is enqueued for
|
||||||
|
// immediate processing; otherwise it is added to the scheduled set.
|
||||||
|
type BatchEnqueueItem struct {
|
||||||
|
Msg *TaskMessage
|
||||||
|
ProcessAt time.Time // zero value → immediate
|
||||||
|
}
|
||||||
|
|
||||||
// Broker is a message broker that supports operations to manage task queues.
|
// Broker is a message broker that supports operations to manage task queues.
|
||||||
//
|
//
|
||||||
// See rdb.RDB as a reference implementation.
|
// See rdb.RDB as a reference implementation.
|
||||||
@@ -692,6 +700,12 @@ type Broker interface {
|
|||||||
Close() error
|
Close() error
|
||||||
Enqueue(ctx context.Context, msg *TaskMessage) error
|
Enqueue(ctx context.Context, msg *TaskMessage) error
|
||||||
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
|
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
|
||||||
|
// BatchEnqueue enqueues multiple tasks in a single round-trip. It returns the
|
||||||
|
// count of newly enqueued tasks; duplicate IDs are silently skipped. The error
|
||||||
|
// is non-nil only on infrastructure failure (e.g. lost connection), in which
|
||||||
|
// case the count is meaningless. Individual task scripts are atomic but the
|
||||||
|
// batch as a whole is not transactional — partial success is possible.
|
||||||
|
BatchEnqueue(ctx context.Context, items []BatchEnqueueItem) (int, error)
|
||||||
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
||||||
Done(ctx context.Context, msg *TaskMessage) error
|
Done(ctx context.Context, msg *TaskMessage) error
|
||||||
MarkAsComplete(ctx context.Context, msg *TaskMessage) error
|
MarkAsComplete(ctx context.Context, msg *TaskMessage) error
|
||||||
|
|||||||
@@ -139,6 +139,118 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BatchEnqueue adds all given tasks to Redis using a single pipeline round-trip.
|
||||||
|
// Each item is either enqueued immediately (ProcessAt is zero) or added to the
|
||||||
|
// scheduled sorted set.
|
||||||
|
//
|
||||||
|
// WARNING: tasks whose IDs already exist in Redis are silently skipped.
|
||||||
|
//
|
||||||
|
// The pipeline executes independent Lua scripts per task — there is no
|
||||||
|
// MULTI/EXEC wrapping the batch, so individual tasks may succeed or fail
|
||||||
|
// independently. The returned int is the number of tasks actually written;
|
||||||
|
// skipped duplicates do not count. The returned error is non-nil only when the
|
||||||
|
// pipeline call itself fails (network error, context cancellation, etc.), in
|
||||||
|
// which case no individual result should be trusted.
|
||||||
|
//
|
||||||
|
// Message encoding errors cause an immediate return before any Redis I/O.
|
||||||
|
func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
|
||||||
|
var op errors.Op = "rdb.BatchEnqueue"
|
||||||
|
if len(items) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Preload Lua scripts so that EVALSHA inside the pipeline does not fail with
|
||||||
|
// NOSCRIPT. Script.Run on a pipeline only sends EVALSHA (unlike non-pipeline
|
||||||
|
// Run which retries with EVAL on NOSCRIPT).
|
||||||
|
needsEnqueue, needsSchedule := false, false
|
||||||
|
for _, item := range items {
|
||||||
|
if item.ProcessAt.IsZero() {
|
||||||
|
needsEnqueue = true
|
||||||
|
} else {
|
||||||
|
needsSchedule = true
|
||||||
|
}
|
||||||
|
if needsEnqueue && needsSchedule {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if needsEnqueue {
|
||||||
|
if err := enqueueCmd.Load(ctx, r.client).Err(); err != nil {
|
||||||
|
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load enqueue script: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if needsSchedule {
|
||||||
|
if err := scheduleCmd.Load(ctx, r.client).Err(); err != nil {
|
||||||
|
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load schedule script: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pipe := r.client.Pipeline()
|
||||||
|
|
||||||
|
// Track which pipeline slot holds each item's script result.
|
||||||
|
scriptIdxs := make([]int, 0, len(items))
|
||||||
|
pipeLen := 0
|
||||||
|
|
||||||
|
// Track queues we add to AllQueues in this pipeline so we can roll back the
|
||||||
|
// in-memory cache on failure.
|
||||||
|
var newQueues []string
|
||||||
|
|
||||||
|
now := r.clock.Now().UnixNano()
|
||||||
|
|
||||||
|
for _, item := range items {
|
||||||
|
encoded, err := base.EncodeMessage(item.Msg)
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
|
}
|
||||||
|
if _, found := r.queuesPublished.Load(item.Msg.Queue); !found {
|
||||||
|
pipe.SAdd(ctx, base.AllQueues, item.Msg.Queue)
|
||||||
|
r.queuesPublished.Store(item.Msg.Queue, true)
|
||||||
|
newQueues = append(newQueues, item.Msg.Queue)
|
||||||
|
pipeLen++
|
||||||
|
}
|
||||||
|
|
||||||
|
if item.ProcessAt.IsZero() {
|
||||||
|
keys := []string{
|
||||||
|
base.TaskKey(item.Msg.Queue, item.Msg.ID),
|
||||||
|
base.PendingKey(item.Msg.Queue),
|
||||||
|
}
|
||||||
|
argv := []interface{}{encoded, item.Msg.ID, now}
|
||||||
|
enqueueCmd.Run(ctx, pipe, keys, argv...)
|
||||||
|
} else {
|
||||||
|
keys := []string{
|
||||||
|
base.TaskKey(item.Msg.Queue, item.Msg.ID),
|
||||||
|
base.ScheduledKey(item.Msg.Queue),
|
||||||
|
}
|
||||||
|
argv := []interface{}{encoded, item.ProcessAt.Unix(), item.Msg.ID}
|
||||||
|
scheduleCmd.Run(ctx, pipe, keys, argv...)
|
||||||
|
}
|
||||||
|
scriptIdxs = append(scriptIdxs, pipeLen)
|
||||||
|
pipeLen++
|
||||||
|
}
|
||||||
|
|
||||||
|
cmds, err := pipe.Exec(ctx)
|
||||||
|
if err != nil && err != redis.Nil {
|
||||||
|
for _, q := range newQueues {
|
||||||
|
r.queuesPublished.Delete(q)
|
||||||
|
}
|
||||||
|
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis pipeline error: %v", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueued := 0
|
||||||
|
for _, idx := range scriptIdxs {
|
||||||
|
if idx >= len(cmds) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
res, err := cmds[idx].(*redis.Cmd).Result()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if n, ok := res.(int64); ok && n == 1 {
|
||||||
|
enqueued++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return enqueued, nil
|
||||||
|
}
|
||||||
|
|
||||||
// enqueueUniqueCmd enqueues the task message if the task is unique.
|
// enqueueUniqueCmd enqueues the task message if the task is unique.
|
||||||
//
|
//
|
||||||
// KEYS[1] -> unique key
|
// KEYS[1] -> unique key
|
||||||
@@ -1488,6 +1600,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
|||||||
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
|
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
|
||||||
_, err := pubsub.Receive(ctx)
|
_, err := pubsub.Receive(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
pubsub.Close()
|
||||||
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
||||||
}
|
}
|
||||||
return pubsub, nil
|
return pubsub, nil
|
||||||
|
|||||||
@@ -160,6 +160,156 @@ func TestEnqueueTaskIdConflictError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBatchEnqueue(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
t1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}))
|
||||||
|
t2 := h.NewTaskMessageWithQueue("generate_csv", h.JSON(map[string]interface{}{}), "csv")
|
||||||
|
t3 := h.NewTaskMessageWithQueue("sync", nil, "low")
|
||||||
|
|
||||||
|
enqueueTime := time.Now()
|
||||||
|
r.SetClock(timeutil.NewSimulatedClock(enqueueTime))
|
||||||
|
|
||||||
|
t.Run("enqueue multiple tasks", func(t *testing.T) {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
items := []base.BatchEnqueueItem{
|
||||||
|
{Msg: t1},
|
||||||
|
{Msg: t2},
|
||||||
|
{Msg: t3},
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := r.BatchEnqueue(context.Background(), items)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BatchEnqueue returned error: %v", err)
|
||||||
|
}
|
||||||
|
if n != 3 {
|
||||||
|
t.Errorf("BatchEnqueue returned %d, want 3", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range items {
|
||||||
|
msg := item.Msg
|
||||||
|
pendingKey := base.PendingKey(msg.Queue)
|
||||||
|
pendingIDs := r.client.LRange(context.Background(), pendingKey, 0, -1).Val()
|
||||||
|
found := false
|
||||||
|
for _, id := range pendingIDs {
|
||||||
|
if id == msg.ID {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
t.Errorf("task %s not found in pending list %s", msg.ID, pendingKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
taskKey := base.TaskKey(msg.Queue, msg.ID)
|
||||||
|
state := r.client.HGet(context.Background(), taskKey, "state").Val()
|
||||||
|
if state != "pending" {
|
||||||
|
t.Errorf("state for task %s = %q, want %q", msg.ID, state, "pending")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("empty batch", func(t *testing.T) {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
|
n, err := r.BatchEnqueue(context.Background(), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BatchEnqueue(nil) returned error: %v", err)
|
||||||
|
}
|
||||||
|
if n != 0 {
|
||||||
|
t.Errorf("BatchEnqueue(nil) returned %d, want 0", n)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("duplicate IDs skipped", func(t *testing.T) {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
|
if err := r.Enqueue(context.Background(), t1); err != nil {
|
||||||
|
t.Fatalf("pre-enqueue failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dup := *t1
|
||||||
|
newMsg := h.NewTaskMessage("new_task", nil)
|
||||||
|
|
||||||
|
items := []base.BatchEnqueueItem{
|
||||||
|
{Msg: &dup},
|
||||||
|
{Msg: newMsg},
|
||||||
|
}
|
||||||
|
n, err := r.BatchEnqueue(context.Background(), items)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BatchEnqueue returned error: %v", err)
|
||||||
|
}
|
||||||
|
if n != 1 {
|
||||||
|
t.Errorf("BatchEnqueue returned %d, want 1 (duplicate should be skipped)", n)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("scheduled tasks", func(t *testing.T) {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
|
future := time.Now().Add(1 * time.Hour)
|
||||||
|
s1 := h.NewTaskMessage("deferred_email", nil)
|
||||||
|
items := []base.BatchEnqueueItem{
|
||||||
|
{Msg: t1},
|
||||||
|
{Msg: s1, ProcessAt: future},
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := r.BatchEnqueue(context.Background(), items)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("BatchEnqueue returned error: %v", err)
|
||||||
|
}
|
||||||
|
if n != 2 {
|
||||||
|
t.Errorf("BatchEnqueue returned %d, want 2", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Immediate task should be in pending.
|
||||||
|
pendingIDs := r.client.LRange(context.Background(), base.PendingKey(t1.Queue), 0, -1).Val()
|
||||||
|
foundPending := false
|
||||||
|
for _, id := range pendingIDs {
|
||||||
|
if id == t1.ID {
|
||||||
|
foundPending = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundPending {
|
||||||
|
t.Errorf("immediate task %s not found in pending list", t1.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scheduled task should be in scheduled set.
|
||||||
|
scheduledIDs := r.client.ZRange(context.Background(), base.ScheduledKey(s1.Queue), 0, -1).Val()
|
||||||
|
foundScheduled := false
|
||||||
|
for _, id := range scheduledIDs {
|
||||||
|
if id == s1.ID {
|
||||||
|
foundScheduled = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundScheduled {
|
||||||
|
t.Errorf("scheduled task %s not found in scheduled set", s1.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
taskKey := base.TaskKey(s1.Queue, s1.ID)
|
||||||
|
state := r.client.HGet(context.Background(), taskKey, "state").Val()
|
||||||
|
if state != "scheduled" {
|
||||||
|
t.Errorf("state for scheduled task %s = %q, want %q", s1.ID, state, "scheduled")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("pipeline error from cancelled context", func(t *testing.T) {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
|
msg := h.NewTaskMessage("pipeline_error_task", nil)
|
||||||
|
items := []base.BatchEnqueueItem{{Msg: msg}}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
_, err := r.BatchEnqueue(ctx, items)
|
||||||
|
if err == nil {
|
||||||
|
t.Error("BatchEnqueue with cancelled context returned nil error, want non-nil")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestEnqueueQueueCache(t *testing.T) {
|
func TestEnqueueQueueCache(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
@@ -3274,6 +3424,29 @@ func TestCancelationPubSub(t *testing.T) {
|
|||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCancelationPubSubReceiveError(t *testing.T) {
|
||||||
|
// Use a client connected to a non-existent Redis server to trigger
|
||||||
|
// a Receive() error. This verifies that the pubsub connection is
|
||||||
|
// closed on error, preventing connection leaks.
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:0", // invalid port — connection will fail
|
||||||
|
})
|
||||||
|
r := NewRDB(client)
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
pubsub, err := r.CancelationPubSub()
|
||||||
|
if err == nil {
|
||||||
|
// If no error, we must clean up the pubsub.
|
||||||
|
if pubsub != nil {
|
||||||
|
pubsub.Close()
|
||||||
|
}
|
||||||
|
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
|
||||||
|
}
|
||||||
|
if pubsub != nil {
|
||||||
|
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWriteResult(t *testing.T) {
|
func TestWriteResult(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|||||||
@@ -64,6 +64,15 @@ func (tb *TestBroker) EnqueueUnique(ctx context.Context, msg *base.TaskMessage,
|
|||||||
return tb.real.EnqueueUnique(ctx, msg, ttl)
|
return tb.real.EnqueueUnique(ctx, msg, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tb *TestBroker) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
|
||||||
|
tb.mu.Lock()
|
||||||
|
defer tb.mu.Unlock()
|
||||||
|
if tb.sleeping {
|
||||||
|
return 0, errRedisDown
|
||||||
|
}
|
||||||
|
return tb.real.BatchEnqueue(ctx, items)
|
||||||
|
}
|
||||||
|
|
||||||
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) {
|
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) {
|
||||||
tb.mu.Lock()
|
tb.mu.Lock()
|
||||||
defer tb.mu.Unlock()
|
defer tb.mu.Unlock()
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -36,14 +35,14 @@ var cronListCmd = &cobra.Command{
|
|||||||
Use: "list",
|
Use: "list",
|
||||||
Aliases: []string{"ls"},
|
Aliases: []string{"ls"},
|
||||||
Short: "List cron entries",
|
Short: "List cron entries",
|
||||||
Run: cronList,
|
RunE: cronList,
|
||||||
}
|
}
|
||||||
|
|
||||||
var cronHistoryCmd = &cobra.Command{
|
var cronHistoryCmd = &cobra.Command{
|
||||||
Use: "history <entry_id> [<entry_id>...]",
|
Use: "history <entry_id> [<entry_id>...]",
|
||||||
Short: "Show history of each cron tasks",
|
Short: "Show history of each cron tasks",
|
||||||
Args: cobra.MinimumNArgs(1),
|
Args: cobra.MinimumNArgs(1),
|
||||||
Run: cronHistory,
|
RunE: cronHistory,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1
|
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1
|
||||||
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 bf6a8594-cd03-4968-b36a-8572c5e160dd
|
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 bf6a8594-cd03-4968-b36a-8572c5e160dd
|
||||||
@@ -51,17 +50,16 @@ var cronHistoryCmd = &cobra.Command{
|
|||||||
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 --page=2`),
|
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 --page=2`),
|
||||||
}
|
}
|
||||||
|
|
||||||
func cronList(cmd *cobra.Command, args []string) {
|
func cronList(cmd *cobra.Command, args []string) error {
|
||||||
inspector := createInspector()
|
inspector := createInspector()
|
||||||
|
|
||||||
entries, err := inspector.SchedulerEntries()
|
entries, err := inspector.SchedulerEntries()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return fmt.Errorf("could not fetch scheduler entries: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(entries) == 0 {
|
if len(entries) == 0 {
|
||||||
fmt.Println("No scheduler entries")
|
fmt.Println("No scheduler entries")
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort entries by spec.
|
// Sort entries by spec.
|
||||||
@@ -78,6 +76,7 @@ func cronList(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
printTable(cols, printRows)
|
printTable(cols, printRows)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a string describing when the next enqueue will happen.
|
// Returns a string describing when the next enqueue will happen.
|
||||||
@@ -97,16 +96,14 @@ func prevEnqueue(prevEnqueuedAt time.Time) string {
|
|||||||
return fmt.Sprintf("%v ago", time.Since(prevEnqueuedAt).Round(time.Second))
|
return fmt.Sprintf("%v ago", time.Since(prevEnqueuedAt).Round(time.Second))
|
||||||
}
|
}
|
||||||
|
|
||||||
func cronHistory(cmd *cobra.Command, args []string) {
|
func cronHistory(cmd *cobra.Command, args []string) error {
|
||||||
pageNum, err := cmd.Flags().GetInt("page")
|
pageNum, err := cmd.Flags().GetInt("page")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
pageSize, err := cmd.Flags().GetInt("size")
|
pageSize, err := cmd.Flags().GetInt("size")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
inspector := createInspector()
|
inspector := createInspector()
|
||||||
for i, entryID := range args {
|
for i, entryID := range args {
|
||||||
@@ -136,4 +133,5 @@ func cronHistory(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
printTable(cols, printRows)
|
printTable(cols, printRows)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/MakeNowJust/heredoc/v2"
|
"github.com/MakeNowJust/heredoc/v2"
|
||||||
@@ -32,14 +31,14 @@ var dashCmd = &cobra.Command{
|
|||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq dash
|
$ asynq dash
|
||||||
$ asynq dash --refresh=3s`),
|
$ asynq dash --refresh=3s`),
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
if flagPollInterval < 1*time.Second {
|
if flagPollInterval < 1*time.Second {
|
||||||
fmt.Println("error: --refresh cannot be less than 1s")
|
return fmt.Errorf("--refresh cannot be less than 1s")
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
dash.Run(dash.Options{
|
dash.Run(dash.Options{
|
||||||
PollInterval: flagPollInterval,
|
PollInterval: flagPollInterval,
|
||||||
RedisConnOpt: getRedisConnOpt(),
|
RedisConnOpt: getRedisConnOpt(),
|
||||||
})
|
})
|
||||||
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/MakeNowJust/heredoc/v2"
|
"github.com/MakeNowJust/heredoc/v2"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@@ -31,22 +30,25 @@ var groupListCmd = &cobra.Command{
|
|||||||
Aliases: []string{"ls"},
|
Aliases: []string{"ls"},
|
||||||
Short: "List groups",
|
Short: "List groups",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: groupLists,
|
RunE: groupLists,
|
||||||
}
|
}
|
||||||
|
|
||||||
func groupLists(cmd *cobra.Command, args []string) {
|
func groupLists(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
inspector := createInspector()
|
inspector := createInspector()
|
||||||
groups, err := inspector.Groups(qname)
|
groups, err := inspector.Groups(qname)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not fetch groups: %v", err)
|
||||||
|
}
|
||||||
if len(groups) == 0 {
|
if len(groups) == 0 {
|
||||||
fmt.Printf("No groups found in queue %q\n", qname)
|
fmt.Printf("No groups found in queue %q\n", qname)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
for _, g := range groups {
|
for _, g := range groups {
|
||||||
fmt.Println(g.Group)
|
fmt.Println(g.Group)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/MakeNowJust/heredoc/v2"
|
"github.com/MakeNowJust/heredoc/v2"
|
||||||
"github.com/fatih/color"
|
"github.com/fatih/color"
|
||||||
@@ -44,16 +43,14 @@ var queueListCmd = &cobra.Command{
|
|||||||
Use: "list",
|
Use: "list",
|
||||||
Short: "List queues",
|
Short: "List queues",
|
||||||
Aliases: []string{"ls"},
|
Aliases: []string{"ls"},
|
||||||
// TODO: Use RunE instead?
|
RunE: queueList,
|
||||||
Run: queueList,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var queueInspectCmd = &cobra.Command{
|
var queueInspectCmd = &cobra.Command{
|
||||||
Use: "inspect <queue> [<queue>...]",
|
Use: "inspect <queue> [<queue>...]",
|
||||||
Short: "Display detailed information on one or more queues",
|
Short: "Display detailed information on one or more queues",
|
||||||
Args: cobra.MinimumNArgs(1),
|
Args: cobra.MinimumNArgs(1),
|
||||||
// TODO: Use RunE instead?
|
RunE: queueInspect,
|
||||||
Run: queueInspect,
|
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq queue inspect myqueue
|
$ asynq queue inspect myqueue
|
||||||
$ asynq queue inspect queue1 queue2 queue3`),
|
$ asynq queue inspect queue1 queue2 queue3`),
|
||||||
@@ -63,7 +60,7 @@ var queueHistoryCmd = &cobra.Command{
|
|||||||
Use: "history <queue> [<queue>...]",
|
Use: "history <queue> [<queue>...]",
|
||||||
Short: "Display historical aggregate data from one or more queues",
|
Short: "Display historical aggregate data from one or more queues",
|
||||||
Args: cobra.MinimumNArgs(1),
|
Args: cobra.MinimumNArgs(1),
|
||||||
Run: queueHistory,
|
RunE: queueHistory,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq queue history myqueue
|
$ asynq queue history myqueue
|
||||||
$ asynq queue history queue1 queue2 queue3
|
$ asynq queue history queue1 queue2 queue3
|
||||||
@@ -74,7 +71,7 @@ var queuePauseCmd = &cobra.Command{
|
|||||||
Use: "pause <queue> [<queue>...]",
|
Use: "pause <queue> [<queue>...]",
|
||||||
Short: "Pause one or more queues",
|
Short: "Pause one or more queues",
|
||||||
Args: cobra.MinimumNArgs(1),
|
Args: cobra.MinimumNArgs(1),
|
||||||
Run: queuePause,
|
RunE: queuePause,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq queue pause myqueue
|
$ asynq queue pause myqueue
|
||||||
$ asynq queue pause queue1 queue2 queue3`),
|
$ asynq queue pause queue1 queue2 queue3`),
|
||||||
@@ -85,7 +82,7 @@ var queueUnpauseCmd = &cobra.Command{
|
|||||||
Short: "Resume (unpause) one or more queues",
|
Short: "Resume (unpause) one or more queues",
|
||||||
Args: cobra.MinimumNArgs(1),
|
Args: cobra.MinimumNArgs(1),
|
||||||
Aliases: []string{"unpause"},
|
Aliases: []string{"unpause"},
|
||||||
Run: queueUnpause,
|
RunE: queueUnpause,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq queue resume myqueue
|
$ asynq queue resume myqueue
|
||||||
$ asynq queue resume queue1 queue2 queue3`),
|
$ asynq queue resume queue1 queue2 queue3`),
|
||||||
@@ -96,14 +93,14 @@ var queueRemoveCmd = &cobra.Command{
|
|||||||
Short: "Remove one or more queues",
|
Short: "Remove one or more queues",
|
||||||
Aliases: []string{"rm", "delete"},
|
Aliases: []string{"rm", "delete"},
|
||||||
Args: cobra.MinimumNArgs(1),
|
Args: cobra.MinimumNArgs(1),
|
||||||
Run: queueRemove,
|
RunE: queueRemove,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq queue rm myqueue
|
$ asynq queue rm myqueue
|
||||||
$ asynq queue rm queue1 queue2 queue3
|
$ asynq queue rm queue1 queue2 queue3
|
||||||
$ asynq queue rm myqueue --force`),
|
$ asynq queue rm myqueue --force`),
|
||||||
}
|
}
|
||||||
|
|
||||||
func queueList(cmd *cobra.Command, args []string) {
|
func queueList(cmd *cobra.Command, args []string) error {
|
||||||
type queueInfo struct {
|
type queueInfo struct {
|
||||||
name string
|
name string
|
||||||
keyslot int64
|
keyslot int64
|
||||||
@@ -112,8 +109,7 @@ func queueList(cmd *cobra.Command, args []string) {
|
|||||||
inspector := createInspector()
|
inspector := createInspector()
|
||||||
queues, err := inspector.Queues()
|
queues, err := inspector.Queues()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: Could not fetch list of queues: %v\n", err)
|
return fmt.Errorf("could not fetch list of queues: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
var qs []*queueInfo
|
var qs []*queueInfo
|
||||||
for _, qname := range queues {
|
for _, qname := range queues {
|
||||||
@@ -121,13 +117,13 @@ func queueList(cmd *cobra.Command, args []string) {
|
|||||||
if useRedisCluster {
|
if useRedisCluster {
|
||||||
keyslot, err := inspector.ClusterKeySlot(qname)
|
keyslot, err := inspector.ClusterKeySlot(qname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Errorf("error: Could not get cluster keyslot for %q\n", qname)
|
fmt.Printf("error: could not get cluster keyslot for %q\n", qname)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
q.keyslot = keyslot
|
q.keyslot = keyslot
|
||||||
nodes, err := inspector.ClusterNodes(qname)
|
nodes, err := inspector.ClusterNodes(qname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Errorf("error: Could not get cluster nodes for %q\n", qname)
|
fmt.Printf("error: could not get cluster nodes for %q\n", qname)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
q.nodes = nodes
|
q.nodes = nodes
|
||||||
@@ -148,9 +144,10 @@ func queueList(cmd *cobra.Command, args []string) {
|
|||||||
fmt.Println(q.name)
|
fmt.Println(q.name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func queueInspect(cmd *cobra.Command, args []string) {
|
func queueInspect(cmd *cobra.Command, args []string) error {
|
||||||
inspector := createInspector()
|
inspector := createInspector()
|
||||||
for i, qname := range args {
|
for i, qname := range args {
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
@@ -163,6 +160,7 @@ func queueInspect(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
printQueueInfo(info)
|
printQueueInfo(info)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func printQueueInfo(info *asynq.QueueInfo) {
|
func printQueueInfo(info *asynq.QueueInfo) {
|
||||||
@@ -195,11 +193,10 @@ func printQueueInfo(info *asynq.QueueInfo) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func queueHistory(cmd *cobra.Command, args []string) {
|
func queueHistory(cmd *cobra.Command, args []string) error {
|
||||||
days, err := cmd.Flags().GetInt("days")
|
days, err := cmd.Flags().GetInt("days")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: Internal error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
inspector := createInspector()
|
inspector := createInspector()
|
||||||
for i, qname := range args {
|
for i, qname := range args {
|
||||||
@@ -214,6 +211,7 @@ func queueHistory(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
printDailyStats(stats)
|
printDailyStats(stats)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func printDailyStats(stats []*asynq.DailyStats) {
|
func printDailyStats(stats []*asynq.DailyStats) {
|
||||||
@@ -233,49 +231,63 @@ func printDailyStats(stats []*asynq.DailyStats) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func queuePause(cmd *cobra.Command, args []string) {
|
func queuePause(cmd *cobra.Command, args []string) error {
|
||||||
inspector := createInspector()
|
inspector := createInspector()
|
||||||
|
var firstErr error
|
||||||
for _, qname := range args {
|
for _, qname := range args {
|
||||||
err := inspector.PauseQueue(qname)
|
err := inspector.PauseQueue(qname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fmt.Printf("Successfully paused queue %q\n", qname)
|
fmt.Printf("Successfully paused queue %q\n", qname)
|
||||||
}
|
}
|
||||||
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func queueUnpause(cmd *cobra.Command, args []string) {
|
func queueUnpause(cmd *cobra.Command, args []string) error {
|
||||||
inspector := createInspector()
|
inspector := createInspector()
|
||||||
|
var firstErr error
|
||||||
for _, qname := range args {
|
for _, qname := range args {
|
||||||
err := inspector.UnpauseQueue(qname)
|
err := inspector.UnpauseQueue(qname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fmt.Printf("Successfully unpaused queue %q\n", qname)
|
fmt.Printf("Successfully unpaused queue %q\n", qname)
|
||||||
}
|
}
|
||||||
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func queueRemove(cmd *cobra.Command, args []string) {
|
func queueRemove(cmd *cobra.Command, args []string) error {
|
||||||
// TODO: Use inspector once RemoveQueue become public API.
|
// TODO: Use inspector once RemoveQueue become public API.
|
||||||
force, err := cmd.Flags().GetBool("force")
|
force, err := cmd.Flags().GetBool("force")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: Internal error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r := createRDB()
|
r := createRDB()
|
||||||
|
var firstErr error
|
||||||
for _, qname := range args {
|
for _, qname := range args {
|
||||||
err = r.RemoveQueue(qname, force)
|
err = r.RemoveQueue(qname, force)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.IsQueueNotEmpty(err) {
|
if errors.IsQueueNotEmpty(err) {
|
||||||
fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname)
|
fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname)
|
||||||
continue
|
} else {
|
||||||
}
|
|
||||||
fmt.Printf("error: %v\n", err)
|
fmt.Printf("error: %v\n", err)
|
||||||
|
}
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fmt.Printf("Successfully removed queue %q\n", qname)
|
fmt.Printf("Successfully removed queue %q\n", qname)
|
||||||
}
|
}
|
||||||
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -487,35 +487,31 @@ func isPrintable(data []byte) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Helper to turn a command line flag into a duration
|
// Helper to turn a command line flag into a duration
|
||||||
func getDuration(cmd *cobra.Command, arg string) time.Duration {
|
func getDuration(cmd *cobra.Command, arg string) (time.Duration, error) {
|
||||||
durationStr, err := cmd.Flags().GetString(arg)
|
durationStr, err := cmd.Flags().GetString(arg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return 0, err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
duration, err := time.ParseDuration(durationStr)
|
duration, err := time.ParseDuration(durationStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return 0, err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return duration
|
return duration, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper to turn a command line flag into a time
|
// Helper to turn a command line flag into a time
|
||||||
func getTime(cmd *cobra.Command, arg string) time.Time {
|
func getTime(cmd *cobra.Command, arg string) (time.Time, error) {
|
||||||
timeStr, err := cmd.Flags().GetString(arg)
|
timeStr, err := cmd.Flags().GetString(arg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return time.Time{}, err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
timeVal, err := time.Parse(time.RFC3339, timeStr)
|
timeVal, err := time.Parse(time.RFC3339, timeStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return time.Time{}, err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return timeVal
|
return timeVal, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -44,20 +43,19 @@ The command shows the following for each server:
|
|||||||
|
|
||||||
A "active" server is pulling tasks from queues and processing them.
|
A "active" server is pulling tasks from queues and processing them.
|
||||||
A "stopped" server is no longer pulling new tasks from queues`,
|
A "stopped" server is no longer pulling new tasks from queues`,
|
||||||
Run: serverList,
|
RunE: serverList,
|
||||||
}
|
}
|
||||||
|
|
||||||
func serverList(cmd *cobra.Command, args []string) {
|
func serverList(cmd *cobra.Command, args []string) error {
|
||||||
r := createRDB()
|
r := createRDB()
|
||||||
|
|
||||||
servers, err := r.ListServers()
|
servers, err := r.ListServers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return fmt.Errorf("could not fetch list of servers: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(servers) == 0 {
|
if len(servers) == 0 {
|
||||||
fmt.Println("No running servers")
|
fmt.Println("No running servers")
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sort by hostname and pid
|
// sort by hostname and pid
|
||||||
@@ -80,6 +78,7 @@ func serverList(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
printTable(cols, printRows)
|
printTable(cols, printRows)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func formatQueues(qmap map[string]int) string {
|
func formatQueues(qmap map[string]int) string {
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ var statsCmd = &cobra.Command{
|
|||||||
* Aggregate data for the current day
|
* Aggregate data for the current day
|
||||||
* Basic information about the running redis instance`),
|
* Basic information about the running redis instance`),
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: stats,
|
RunE: stats,
|
||||||
}
|
}
|
||||||
|
|
||||||
var jsonFlag bool
|
var jsonFlag bool
|
||||||
@@ -74,13 +74,12 @@ type FullStats struct {
|
|||||||
RedisInfo map[string]string `json:"redis"`
|
RedisInfo map[string]string `json:"redis"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func stats(cmd *cobra.Command, args []string) {
|
func stats(cmd *cobra.Command, args []string) error {
|
||||||
r := createRDB()
|
r := createRDB()
|
||||||
|
|
||||||
queues, err := r.AllQueues()
|
queues, err := r.AllQueues()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return fmt.Errorf("could not fetch queues: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var aggStats AggregateStats
|
var aggStats AggregateStats
|
||||||
@@ -88,8 +87,7 @@ func stats(cmd *cobra.Command, args []string) {
|
|||||||
for _, qname := range queues {
|
for _, qname := range queues {
|
||||||
s, err := r.CurrentStats(qname)
|
s, err := r.CurrentStats(qname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return fmt.Errorf("could not fetch stats for queue %q: %v", qname, err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
aggStats.Active += s.Active
|
aggStats.Active += s.Active
|
||||||
aggStats.Pending += s.Pending
|
aggStats.Pending += s.Pending
|
||||||
@@ -110,8 +108,7 @@ func stats(cmd *cobra.Command, args []string) {
|
|||||||
info, err = r.RedisInfo()
|
info, err = r.RedisInfo()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return fmt.Errorf("could not fetch redis info: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if jsonFlag {
|
if jsonFlag {
|
||||||
@@ -122,12 +119,11 @@ func stats(cmd *cobra.Command, args []string) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return fmt.Errorf("could not marshal stats to JSON: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println(string(statsJSON))
|
fmt.Println(string(statsJSON))
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
bold := color.New(color.Bold)
|
bold := color.New(color.Bold)
|
||||||
@@ -151,6 +147,7 @@ func stats(cmd *cobra.Command, args []string) {
|
|||||||
printInfo(info)
|
printInfo(info)
|
||||||
}
|
}
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func printStatsByState(s *AggregateStats) {
|
func printStatsByState(s *AggregateStats) {
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/MakeNowJust/heredoc/v2"
|
"github.com/MakeNowJust/heredoc/v2"
|
||||||
@@ -120,14 +119,14 @@ var taskListCmd = &cobra.Command{
|
|||||||
$ asynq task list --queue=myqueue --state=pending
|
$ asynq task list --queue=myqueue --state=pending
|
||||||
$ asynq task list --queue=myqueue --state=aggregating --group=mygroup
|
$ asynq task list --queue=myqueue --state=aggregating --group=mygroup
|
||||||
$ asynq task list --queue=myqueue --state=scheduled --page=2`),
|
$ asynq task list --queue=myqueue --state=scheduled --page=2`),
|
||||||
Run: taskList,
|
RunE: taskList,
|
||||||
}
|
}
|
||||||
|
|
||||||
var taskInspectCmd = &cobra.Command{
|
var taskInspectCmd = &cobra.Command{
|
||||||
Use: "inspect --queue=<queue> --id=<task_id>",
|
Use: "inspect --queue=<queue> --id=<task_id>",
|
||||||
Short: "Display detailed information on the specified task",
|
Short: "Display detailed information on the specified task",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: taskInspect,
|
RunE: taskInspect,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task inspect --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
$ asynq task inspect --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
||||||
}
|
}
|
||||||
@@ -136,7 +135,7 @@ var taskCancelCmd = &cobra.Command{
|
|||||||
Use: "cancel <task_id> [<task_id>...]",
|
Use: "cancel <task_id> [<task_id>...]",
|
||||||
Short: "Cancel one or more active tasks",
|
Short: "Cancel one or more active tasks",
|
||||||
Args: cobra.MinimumNArgs(1),
|
Args: cobra.MinimumNArgs(1),
|
||||||
Run: taskCancel,
|
RunE: taskCancel,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task cancel f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
$ asynq task cancel f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
||||||
}
|
}
|
||||||
@@ -145,7 +144,7 @@ var taskArchiveCmd = &cobra.Command{
|
|||||||
Use: "archive --queue=<queue> --id=<task_id>",
|
Use: "archive --queue=<queue> --id=<task_id>",
|
||||||
Short: "Archive a task with the given id",
|
Short: "Archive a task with the given id",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: taskArchive,
|
RunE: taskArchive,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task archive --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
$ asynq task archive --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
||||||
}
|
}
|
||||||
@@ -155,7 +154,7 @@ var taskDeleteCmd = &cobra.Command{
|
|||||||
Aliases: []string{"remove", "rm"},
|
Aliases: []string{"remove", "rm"},
|
||||||
Short: "Delete a task with the given id",
|
Short: "Delete a task with the given id",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: taskDelete,
|
RunE: taskDelete,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task delete --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
$ asynq task delete --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
||||||
}
|
}
|
||||||
@@ -164,7 +163,7 @@ var taskRunCmd = &cobra.Command{
|
|||||||
Use: "run --queue=<queue> --id=<task_id>",
|
Use: "run --queue=<queue> --id=<task_id>",
|
||||||
Short: "Run a task with the given id",
|
Short: "Run a task with the given id",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: taskRun,
|
RunE: taskRun,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
||||||
}
|
}
|
||||||
@@ -173,7 +172,7 @@ var taskEnqueueCmd = &cobra.Command{
|
|||||||
Use: "enqueue --type_name=footype --payload=barpayload",
|
Use: "enqueue --type_name=footype --payload=barpayload",
|
||||||
Short: "Enqueue a task",
|
Short: "Enqueue a task",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: taskEnqueue,
|
RunE: taskEnqueue,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task enqueue -t footype -l barpayload
|
$ asynq task enqueue -t footype -l barpayload
|
||||||
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
|
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
|
||||||
@@ -183,7 +182,7 @@ var taskArchiveAllCmd = &cobra.Command{
|
|||||||
Use: "archiveall --queue=<queue> --state=<state>",
|
Use: "archiveall --queue=<queue> --state=<state>",
|
||||||
Short: "Archive all tasks in the given state",
|
Short: "Archive all tasks in the given state",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: taskArchiveAll,
|
RunE: taskArchiveAll,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task archiveall --queue=myqueue --state=retry
|
$ asynq task archiveall --queue=myqueue --state=retry
|
||||||
$ asynq task archiveall --queue=myqueue --state=aggregating --group=mygroup`),
|
$ asynq task archiveall --queue=myqueue --state=aggregating --group=mygroup`),
|
||||||
@@ -193,7 +192,7 @@ var taskDeleteAllCmd = &cobra.Command{
|
|||||||
Use: "deleteall --queue=<queue> --state=<state>",
|
Use: "deleteall --queue=<queue> --state=<state>",
|
||||||
Short: "Delete all tasks in the given state",
|
Short: "Delete all tasks in the given state",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: taskDeleteAll,
|
RunE: taskDeleteAll,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task deleteall --queue=myqueue --state=archived
|
$ asynq task deleteall --queue=myqueue --state=archived
|
||||||
$ asynq task deleteall --queue=myqueue --state=aggregating --group=mygroup`),
|
$ asynq task deleteall --queue=myqueue --state=aggregating --group=mygroup`),
|
||||||
@@ -203,74 +202,66 @@ var taskRunAllCmd = &cobra.Command{
|
|||||||
Use: "runall --queue=<queue> --state=<state>",
|
Use: "runall --queue=<queue> --state=<state>",
|
||||||
Short: "Run all tasks in the given state",
|
Short: "Run all tasks in the given state",
|
||||||
Args: cobra.NoArgs,
|
Args: cobra.NoArgs,
|
||||||
Run: taskRunAll,
|
RunE: taskRunAll,
|
||||||
Example: heredoc.Doc(`
|
Example: heredoc.Doc(`
|
||||||
$ asynq task runall --queue=myqueue --state=retry
|
$ asynq task runall --queue=myqueue --state=retry
|
||||||
$ asynq task runall --queue=myqueue --state=aggregating --group=mygroup`),
|
$ asynq task runall --queue=myqueue --state=aggregating --group=mygroup`),
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskList(cmd *cobra.Command, args []string) {
|
func taskList(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
state, err := cmd.Flags().GetString("state")
|
state, err := cmd.Flags().GetString("state")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
pageNum, err := cmd.Flags().GetInt("page")
|
pageNum, err := cmd.Flags().GetInt("page")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
pageSize, err := cmd.Flags().GetInt("size")
|
pageSize, err := cmd.Flags().GetInt("size")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch state {
|
switch state {
|
||||||
case "active":
|
case "active":
|
||||||
listActiveTasks(qname, pageNum, pageSize)
|
return listActiveTasks(qname, pageNum, pageSize)
|
||||||
case "pending":
|
case "pending":
|
||||||
listPendingTasks(qname, pageNum, pageSize)
|
return listPendingTasks(qname, pageNum, pageSize)
|
||||||
case "scheduled":
|
case "scheduled":
|
||||||
listScheduledTasks(qname, pageNum, pageSize)
|
return listScheduledTasks(qname, pageNum, pageSize)
|
||||||
case "retry":
|
case "retry":
|
||||||
listRetryTasks(qname, pageNum, pageSize)
|
return listRetryTasks(qname, pageNum, pageSize)
|
||||||
case "archived":
|
case "archived":
|
||||||
listArchivedTasks(qname, pageNum, pageSize)
|
return listArchivedTasks(qname, pageNum, pageSize)
|
||||||
case "completed":
|
case "completed":
|
||||||
listCompletedTasks(qname, pageNum, pageSize)
|
return listCompletedTasks(qname, pageNum, pageSize)
|
||||||
case "aggregating":
|
case "aggregating":
|
||||||
group, err := cmd.Flags().GetString("group")
|
group, err := cmd.Flags().GetString("group")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if group == "" {
|
if group == "" {
|
||||||
fmt.Println("Flag --group is required for listing aggregating tasks")
|
return fmt.Errorf("flag --group is required for listing aggregating tasks")
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
listAggregatingTasks(qname, group, pageNum, pageSize)
|
return listAggregatingTasks(qname, group, pageNum, pageSize)
|
||||||
default:
|
default:
|
||||||
fmt.Printf("error: state=%q is not supported\n", state)
|
return fmt.Errorf("state=%q is not supported", state)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func listActiveTasks(qname string, pageNum, pageSize int) {
|
func listActiveTasks(qname string, pageNum, pageSize int) error {
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
fmt.Printf("No active tasks in %q queue\n", qname)
|
fmt.Printf("No active tasks in %q queue\n", qname)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"ID", "Type", "Payload"},
|
[]string{"ID", "Type", "Payload"},
|
||||||
@@ -280,18 +271,18 @@ func listActiveTasks(qname string, pageNum, pageSize int) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func listPendingTasks(qname string, pageNum, pageSize int) {
|
func listPendingTasks(qname string, pageNum, pageSize int) error {
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
fmt.Printf("No pending tasks in %q queue\n", qname)
|
fmt.Printf("No pending tasks in %q queue\n", qname)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"ID", "Type", "Payload"},
|
[]string{"ID", "Type", "Payload"},
|
||||||
@@ -301,18 +292,18 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func listScheduledTasks(qname string, pageNum, pageSize int) {
|
func listScheduledTasks(qname string, pageNum, pageSize int) error {
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
fmt.Printf("No scheduled tasks in %q queue\n", qname)
|
fmt.Printf("No scheduled tasks in %q queue\n", qname)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"ID", "Type", "Payload", "Process In"},
|
[]string{"ID", "Type", "Payload", "Process In"},
|
||||||
@@ -322,6 +313,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// formatProcessAt formats next process at time to human friendly string.
|
// formatProcessAt formats next process at time to human friendly string.
|
||||||
@@ -335,16 +327,15 @@ func formatProcessAt(processAt time.Time) string {
|
|||||||
return fmt.Sprintf("in %v", d.Round(time.Second))
|
return fmt.Sprintf("in %v", d.Round(time.Second))
|
||||||
}
|
}
|
||||||
|
|
||||||
func listRetryTasks(qname string, pageNum, pageSize int) {
|
func listRetryTasks(qname string, pageNum, pageSize int) error {
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
fmt.Printf("No retry tasks in %q queue\n", qname)
|
fmt.Printf("No retry tasks in %q queue\n", qname)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"},
|
[]string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"},
|
||||||
@@ -355,18 +346,18 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func listArchivedTasks(qname string, pageNum, pageSize int) {
|
func listArchivedTasks(qname string, pageNum, pageSize int) error {
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
fmt.Printf("No archived tasks in %q queue\n", qname)
|
fmt.Printf("No archived tasks in %q queue\n", qname)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"ID", "Type", "Payload", "Last Failed", "Last Error"},
|
[]string{"ID", "Type", "Payload", "Last Failed", "Last Error"},
|
||||||
@@ -375,18 +366,18 @@ func listArchivedTasks(qname string, pageNum, pageSize int) {
|
|||||||
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.LastFailedAt), t.LastErr)
|
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.LastFailedAt), t.LastErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func listCompletedTasks(qname string, pageNum, pageSize int) {
|
func listCompletedTasks(qname string, pageNum, pageSize int) error {
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
tasks, err := i.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
tasks, err := i.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
fmt.Printf("No completed tasks in %q queue\n", qname)
|
fmt.Printf("No completed tasks in %q queue\n", qname)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"ID", "Type", "Payload", "CompletedAt", "Result"},
|
[]string{"ID", "Type", "Payload", "CompletedAt", "Result"},
|
||||||
@@ -395,18 +386,18 @@ func listCompletedTasks(qname string, pageNum, pageSize int) {
|
|||||||
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.CompletedAt), sprintBytes(t.Result))
|
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.CompletedAt), sprintBytes(t.Result))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func listAggregatingTasks(qname, group string, pageNum, pageSize int) {
|
func listAggregatingTasks(qname, group string, pageNum, pageSize int) error {
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
fmt.Printf("No aggregating tasks in group %q \n", group)
|
fmt.Printf("No aggregating tasks in group %q \n", group)
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"ID", "Type", "Payload", "Group"},
|
[]string{"ID", "Type", "Payload", "Group"},
|
||||||
@@ -416,38 +407,42 @@ func listAggregatingTasks(qname, group string, pageNum, pageSize int) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskCancel(cmd *cobra.Command, args []string) {
|
func taskCancel(cmd *cobra.Command, args []string) error {
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
|
var firstErr error
|
||||||
for _, id := range args {
|
for _, id := range args {
|
||||||
if err := i.CancelProcessing(id); err != nil {
|
if err := i.CancelProcessing(id); err != nil {
|
||||||
fmt.Printf("error: could not send cancelation signal: %v\n", err)
|
fmt.Printf("error: could not send cancelation signal: %v\n", err)
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fmt.Printf("Sent cancelation signal for task %s\n", id)
|
fmt.Printf("Sent cancelation signal for task %s\n", id)
|
||||||
}
|
}
|
||||||
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskInspect(cmd *cobra.Command, args []string) {
|
func taskInspect(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
id, err := cmd.Flags().GetString("id")
|
id, err := cmd.Flags().GetString("id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
info, err := i.GetTaskInfo(qname, id)
|
info, err := i.GetTaskInfo(qname, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return fmt.Errorf("could not get task info: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
printTaskInfo(info)
|
printTaskInfo(info)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func printTaskInfo(info *asynq.TaskInfo) {
|
func printTaskInfo(info *asynq.TaskInfo) {
|
||||||
@@ -486,80 +481,72 @@ func formatPastTime(t time.Time) string {
|
|||||||
return t.Format(time.UnixDate)
|
return t.Format(time.UnixDate)
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskArchive(cmd *cobra.Command, args []string) {
|
func taskArchive(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
id, err := cmd.Flags().GetString("id")
|
id, err := cmd.Flags().GetString("id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
err = i.ArchiveTask(qname, id)
|
err = i.ArchiveTask(qname, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return fmt.Errorf("could not archive task: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
fmt.Println("task archived")
|
fmt.Println("task archived")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskDelete(cmd *cobra.Command, args []string) {
|
func taskDelete(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
id, err := cmd.Flags().GetString("id")
|
id, err := cmd.Flags().GetString("id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
err = i.DeleteTask(qname, id)
|
err = i.DeleteTask(qname, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return fmt.Errorf("could not delete task: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
fmt.Println("task deleted")
|
fmt.Println("task deleted")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskRun(cmd *cobra.Command, args []string) {
|
func taskRun(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
id, err := cmd.Flags().GetString("id")
|
id, err := cmd.Flags().GetString("id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
err = i.RunTask(qname, id)
|
err = i.RunTask(qname, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return fmt.Errorf("could not run task: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
fmt.Println("task is now pending")
|
fmt.Println("task is now pending")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskEnqueue(cmd *cobra.Command, args []string) {
|
func taskEnqueue(cmd *cobra.Command, args []string) error {
|
||||||
typeName, err := cmd.Flags().GetString("type_name")
|
typeName, err := cmd.Flags().GetString("type_name")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
payload, err := cmd.Flags().GetString("payload")
|
payload, err := cmd.Flags().GetString("payload")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// For all of the optional flags, we need to explicitly check whether they were set or
|
// For all of the optional flags, we need to explicitly check whether they were set or
|
||||||
@@ -569,8 +556,7 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
|
|||||||
if cmd.Flags().Changed("retry") {
|
if cmd.Flags().Changed("retry") {
|
||||||
retry, err := cmd.Flags().GetInt("retry")
|
retry, err := cmd.Flags().GetInt("retry")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
opts = append(opts, asynq.MaxRetry(retry))
|
opts = append(opts, asynq.MaxRetry(retry))
|
||||||
}
|
}
|
||||||
@@ -578,8 +564,7 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
|
|||||||
if cmd.Flags().Changed("queue") {
|
if cmd.Flags().Changed("queue") {
|
||||||
queue, err := cmd.Flags().GetString("queue")
|
queue, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
opts = append(opts, asynq.Queue(queue))
|
opts = append(opts, asynq.Queue(queue))
|
||||||
}
|
}
|
||||||
@@ -587,41 +572,63 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
|
|||||||
if cmd.Flags().Changed("id") {
|
if cmd.Flags().Changed("id") {
|
||||||
id, err := cmd.Flags().GetString("id")
|
id, err := cmd.Flags().GetString("id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
opts = append(opts, asynq.TaskID(id))
|
opts = append(opts, asynq.TaskID(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flags().Changed("timeout") {
|
if cmd.Flags().Changed("timeout") {
|
||||||
opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout")))
|
d, err := getDuration(cmd, "timeout")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.Timeout(d))
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flags().Changed("deadline") {
|
if cmd.Flags().Changed("deadline") {
|
||||||
opts = append(opts, asynq.Deadline(getTime(cmd, "deadline")))
|
t, err := getTime(cmd, "deadline")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.Deadline(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flags().Changed("unique") {
|
if cmd.Flags().Changed("unique") {
|
||||||
opts = append(opts, asynq.Unique(getDuration(cmd, "unique")))
|
d, err := getDuration(cmd, "unique")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.Unique(d))
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flags().Changed("process_at") {
|
if cmd.Flags().Changed("process_at") {
|
||||||
opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at")))
|
t, err := getTime(cmd, "process_at")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.ProcessAt(t))
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flags().Changed("process_in") {
|
if cmd.Flags().Changed("process_in") {
|
||||||
opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in")))
|
d, err := getDuration(cmd, "process_in")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.ProcessIn(d))
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flags().Changed("retention") {
|
if cmd.Flags().Changed("retention") {
|
||||||
opts = append(opts, asynq.Retention(getDuration(cmd, "retention")))
|
d, err := getDuration(cmd, "retention")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.Retention(d))
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flags().Changed("group") {
|
if cmd.Flags().Changed("group") {
|
||||||
group, err := cmd.Flags().GetString("group")
|
group, err := cmd.Flags().GetString("group")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
opts = append(opts, asynq.Group(group))
|
opts = append(opts, asynq.Group(group))
|
||||||
}
|
}
|
||||||
@@ -631,23 +638,21 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
taskInfo, err := c.Enqueue(task)
|
taskInfo, err := c.Enqueue(task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return fmt.Errorf("could not enqueue task: %v", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
|
fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskArchiveAll(cmd *cobra.Command, args []string) {
|
func taskArchiveAll(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
state, err := cmd.Flags().GetString("state")
|
state, err := cmd.Flags().GetString("state")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
@@ -662,35 +667,35 @@ func taskArchiveAll(cmd *cobra.Command, args []string) {
|
|||||||
case "aggregating":
|
case "aggregating":
|
||||||
group, err := cmd.Flags().GetString("group")
|
group, err := cmd.Flags().GetString("group")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if group == "" {
|
if group == "" {
|
||||||
fmt.Println("error: Flag --group is required for aggregating tasks")
|
return fmt.Errorf("flag --group is required for aggregating tasks")
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
n, err = i.ArchiveAllAggregatingTasks(qname, group)
|
n, err = i.ArchiveAllAggregatingTasks(qname, group)
|
||||||
default:
|
|
||||||
fmt.Printf("error: unsupported state %q\n", state)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
fmt.Printf("%d tasks archived\n", n)
|
fmt.Printf("%d tasks archived\n", n)
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported state %q", state)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("%d tasks archived\n", n)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskDeleteAll(cmd *cobra.Command, args []string) {
|
func taskDeleteAll(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
state, err := cmd.Flags().GetString("state")
|
state, err := cmd.Flags().GetString("state")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
@@ -709,35 +714,35 @@ func taskDeleteAll(cmd *cobra.Command, args []string) {
|
|||||||
case "aggregating":
|
case "aggregating":
|
||||||
group, err := cmd.Flags().GetString("group")
|
group, err := cmd.Flags().GetString("group")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if group == "" {
|
if group == "" {
|
||||||
fmt.Println("error: Flag --group is required for aggregating tasks")
|
return fmt.Errorf("flag --group is required for aggregating tasks")
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
n, err = i.DeleteAllAggregatingTasks(qname, group)
|
n, err = i.DeleteAllAggregatingTasks(qname, group)
|
||||||
default:
|
|
||||||
fmt.Printf("error: unsupported state %q\n", state)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
fmt.Printf("%d tasks deleted\n", n)
|
fmt.Printf("%d tasks deleted\n", n)
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported state %q", state)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("%d tasks deleted\n", n)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func taskRunAll(cmd *cobra.Command, args []string) {
|
func taskRunAll(cmd *cobra.Command, args []string) error {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
state, err := cmd.Flags().GetString("state")
|
state, err := cmd.Flags().GetString("state")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i := createInspector()
|
i := createInspector()
|
||||||
@@ -752,21 +757,23 @@ func taskRunAll(cmd *cobra.Command, args []string) {
|
|||||||
case "aggregating":
|
case "aggregating":
|
||||||
group, err := cmd.Flags().GetString("group")
|
group, err := cmd.Flags().GetString("group")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
if group == "" {
|
if group == "" {
|
||||||
fmt.Println("error: Flag --group is required for aggregating tasks")
|
return fmt.Errorf("flag --group is required for aggregating tasks")
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
n, err = i.RunAllAggregatingTasks(qname, group)
|
n, err = i.RunAllAggregatingTasks(qname, group)
|
||||||
default:
|
|
||||||
fmt.Printf("error: unsupported state %q\n", state)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error: %v\n", err)
|
return err
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
fmt.Printf("%d tasks are now pending\n", n)
|
fmt.Printf("%d tasks are now pending\n", n)
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported state %q", state)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("%d tasks are now pending\n", n)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user