2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-05-22 10:37:07 +08:00

Compare commits

...

20 Commits

Author SHA1 Message Date
Mohamed Sohail
785bb7208c Merge pull request #1094 from fanatics-live/batch-enqueue
Add BatchEnqueue for pipelined multi-task enqueue
2026-05-14 15:34:54 +03:00
Mohamed Sohail
3a7ec97482 Merge pull request #1096 from kayoch1n/feat/header-option
feat: Add an option to specify headers
2026-05-14 15:33:28 +03:00
Mohamed Sohail
d671a4c427 Merge pull request #1100 from ljluestc/fix/669-sentinel-db-parsing
fix: parse DB number from redis-sentinel URI path
2026-05-14 15:32:15 +03:00
Mohamed Sohail
f81d452160 Merge pull request #1101 from sevico/refactor
refactor(cli): migrate commands from Run to RunE and fix group error …
2026-05-14 15:31:49 +03:00
kayoch1n
1af926b147 chore: Add docs string to asynq.Header and remove asynq.Headers 2026-04-15 20:02:30 +08:00
Mohamed Sohail
23905a286f Merge pull request #1104 from Bahtya/fix/pubsub-connection-leak
fix: close pubsub connection on Subscribe error in CancelationPubSub
2026-04-13 13:58:23 +03:00
Erik Nilsen
a32ac05d09 test: add coverage for BatchEnqueue error paths
Add tests for all uncovered BatchEnqueueContext error paths: nil task,
empty/blank typename, invalid options, group rejection, unique rejection,
and broker-down (sleeping broker). Add pipeline error test for rdb.BatchEnqueue
via cancelled context, and TestTaskOptions/TestTaskOptionsNil for the
new Options() accessor on Task.
2026-04-10 10:14:01 -07:00
Erik Nilsen
68f03688e3 docs: add WARNING line for silent duplicate skipping in BatchEnqueue 2026-04-10 09:56:35 -07:00
Erik Nilsen
06a06970d6 Document atomicity guarantees and fix NOSCRIPT bug in BatchEnqueue
Add comprehensive doc comments to BatchEnqueueContext, the Broker
interface method, and the RDB implementation explaining that the batch
uses a Redis pipeline (not MULTI/EXEC), so partial success is possible
and individual Lua scripts are atomic but the batch is not.

Fix a bug where Script.Run inside a pipeline only sends EVALSHA without
the automatic EVAL fallback that non-pipeline calls get. On a fresh
Redis (or after SCRIPT FLUSH), this caused NOSCRIPT errors for every
pipeline-batched script invocation. The fix preloads the required Lua
scripts before building the pipeline.

Also roll back the in-memory queuesPublished cache when the pipeline
fails, preventing stale entries from suppressing future SADD calls.
2026-04-09 11:35:11 -07:00
Bahtya
5586efeae7 test: add test for CancelationPubSub error path
Add TestCancelationPubSubReceiveError to verify that when
Receive() fails in CancelationPubSub(), an error is returned
and the pubsub connection is not leaked.

This provides test coverage for the pubsub.Close() fix that
was missing in the previous commit.

Bahtya
2026-04-09 20:11:53 +08:00
bahtya
dd3c923f44 fix: close pubsub connection on Subscribe error in CancelationPubSub
When redis.Subscribe succeeds but Receive() fails, the pubsub
connection was not being closed before returning the error. This caused
the subscriber goroutine to leak a Redis connection on each retry
iteration, eventually exhausting the connection pool.

Fixes #1095
2026-04-09 18:15:48 +08:00
shiweikang
f8d6677814 refactor(cli): migrate commands from Run to RunE and fix group error handling
Migrate all CLI command handlers from cobra's Run to RunE, replacing
fmt.Println(err) + os.Exit(1) patterns with idiomatic error returns.
This improves testability and lets cobra handle errors consistently
via the existing SilenceUsage/SilenceErrors configuration.

Also refactor getDuration() and getTime() helpers to return errors
instead of calling os.Exit(1).

Fix a bug in group.go where the error from inspector.Groups() was
never checked, causing silent failures (e.g. on Redis connection
errors) to be misreported as "No groups found".
2026-04-05 01:13:41 +08:00
Jiale Lin
07898eade0 fix: parse DB number from redis-sentinel URI path (#669)
Previously, parseRedisSentinelURI ignored the /dbnumber path segment,
making it impossible to connect to any DB other than 0 via sentinel URIs.

This adds DB extraction from the URI path, consistent with how
parseRedisURI already handles it for redis:// and rediss:// schemes.

Closes #669
2026-03-21 11:33:56 -07:00
kayoch1n
5216f1c3be feat: Add an option to create headers from a map 2026-03-17 14:47:34 +08:00
kayoch1n
dbfdfbac5a chore: Update test cases 2026-03-17 12:56:12 +08:00
kayoch1n
5c391f3ffb feat: Add an option to specify headers 2026-03-17 12:29:58 +08:00
Erik Nilsen
7ae0b3fe22 Add Options() accessor on Task for external option merging
Exposes the opts field so callers can read a task's options to merge
with additional per-task options at batch time.
2026-02-25 09:42:30 -08:00
Erik Nilsen
71ebcfa129 Fix BatchEnqueueContext time comparison and add scheduled task support
BatchEnqueueContext had a time comparison bug where `now` was captured
before the loop but `processAt` was set to time.Now() inside
composeOptions during each iteration, causing all immediate tasks to be
incorrectly classified as scheduled and rejected.

Fix: move `now` capture inside the loop, after composeOptions.

Additionally, extend BatchEnqueueContext to support scheduled tasks in
the same pipeline. Tasks with a future ProcessAt are now routed to
scheduleCmd (ZADD to scheduled set) instead of being rejected. Only
unique and group tasks remain unsupported.

Changes:
- Add BatchEnqueueItem type pairing TaskMessage with optional ProcessAt
- Update Broker interface, RDB, and testbroker to use BatchEnqueueItem
- Route immediate tasks to enqueueCmd, scheduled tasks to scheduleCmd
- Return correct TaskState (Pending vs Scheduled) in results
- Add tests for immediate, scheduled, and mixed batch scenarios
2026-02-25 09:42:30 -08:00
Erik Nilsen
4e62d7e29d Add tests for BatchEnqueue: multiple tasks, empty batch, duplicate IDs 2026-02-23 16:30:35 -08:00
Erik Nilsen
f919a605d5 Add BatchEnqueue for pipelined multi-task enqueue
Adds BatchEnqueue to the Broker interface and RDB implementation that
sends multiple enqueueCmd Lua script invocations in a single Redis
pipeline round-trip. Also adds BatchEnqueueContext to the Client as
the public API, returning per-task results for partial-success handling.

Ref: hibiken/asynq#1069
2026-02-23 16:25:49 -08:00
18 changed files with 1070 additions and 231 deletions

View File

@@ -40,6 +40,7 @@ type Task struct {
func (t *Task) Type() string { return t.typename } func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload } func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Headers() map[string]string { return t.headers } func (t *Task) Headers() map[string]string { return t.headers }
func (t *Task) Options() []Option { return t.opts }
// ResultWriter returns a pointer to the ResultWriter associated with the task. // ResultWriter returns a pointer to the ResultWriter associated with the task.
// //
@@ -471,7 +472,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
// redis://[:password@]host[:port][/dbnumber] // redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber] // rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber] // redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] // redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][/dbnumber][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) { func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri) u, err := url.Parse(uri)
if err != nil { if err != nil {
@@ -545,11 +546,20 @@ func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) { func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
addrs := strings.Split(u.Host, ",") addrs := strings.Split(u.Host, ",")
master := u.Query().Get("master") master := u.Query().Get("master")
var db int
var err error
if len(u.Path) > 0 {
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
db, err = strconv.Atoi(xs[0])
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis sentinel uri: database number should be the first segment of the path")
}
}
var password string var password string
if v, ok := u.User.Password(); ok { if v, ok := u.User.Password(); ok {
password = v password = v
} }
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password}, nil return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password, DB: db}, nil
} }
// ResultWriter is a client interface to write result data for a task. // ResultWriter is a client interface to write result data for a task.

View File

@@ -10,6 +10,7 @@ import (
"sort" "sort"
"strings" "strings"
"testing" "testing"
"time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
@@ -148,6 +149,23 @@ func TestParseRedisURI(t *testing.T) {
SentinelPassword: "mypassword", SentinelPassword: "mypassword",
}, },
}, },
{
"redis-sentinel://localhost:5000,localhost:5001,localhost:5002/3?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
DB: 3,
},
},
{
"redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002/7?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
SentinelPassword: "mypassword",
DB: 7,
},
},
} }
for _, tc := range tests { for _, tc := range tests {
@@ -188,6 +206,10 @@ func TestParseRedisURIErrors(t *testing.T) {
"non integer for db numbers for socket", "non integer for db numbers for socket",
"redis-socket:///some/path/to/redis?db=one", "redis-socket:///some/path/to/redis?db=one",
}, },
{
"non integer for db number for sentinel",
"redis-sentinel://localhost:5000/abc?master=mymaster",
},
} }
for _, tc := range tests { for _, tc := range tests {
@@ -198,3 +220,30 @@ func TestParseRedisURIErrors(t *testing.T) {
} }
} }
} }
func TestTaskOptions(t *testing.T) {
opts := []Option{
MaxRetry(3),
Queue("critical"),
Timeout(5 * time.Minute),
}
task := NewTask("mytask", []byte("payload"), opts...)
got := task.Options()
if len(got) != len(opts) {
t.Fatalf("task.Options() returned %d options, want %d", len(got), len(opts))
}
for i, o := range opts {
if got[i].String() != o.String() {
t.Errorf("task.Options()[%d] = %v, want %v", i, got[i], o)
}
}
}
func TestTaskOptionsNil(t *testing.T) {
task := NewTask("mytask", []byte("payload"))
got := task.Options()
if got != nil {
t.Errorf("task.Options() = %v, want nil for task with no options", got)
}
}

174
client.go
View File

@@ -6,7 +6,9 @@ package asynq
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"maps"
"strings" "strings"
"time" "time"
@@ -60,6 +62,7 @@ const (
TaskIDOpt TaskIDOpt
RetentionOpt RetentionOpt
GroupOpt GroupOpt
HeaderOpt
) )
// Option specifies the task processing behavior. // Option specifies the task processing behavior.
@@ -86,6 +89,7 @@ type (
processInOption time.Duration processInOption time.Duration
retentionOption time.Duration retentionOption time.Duration
groupOption string groupOption string
headerOption [2]string
) )
// MaxRetry returns an option to specify the max number of times // MaxRetry returns an option to specify the max number of times
@@ -217,6 +221,27 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st
func (name groupOption) Type() OptionType { return GroupOpt } func (name groupOption) Type() OptionType { return GroupOpt }
func (name groupOption) Value() interface{} { return string(name) } func (name groupOption) Value() interface{} { return string(name) }
// Header returns an option to associate the key-value header to the task.
//
// This option is composable with other Client options and can be used together
// with other options like MaxRetry, Queue, etc. For use cases where headers
// need to be combined with other options, using Header option is recommended.
//
// Alternatively, NewTaskWithHeaders can be used to create a task with headers
// directly, which may be preferable when headers are an intrinsic part of the
// task definition rather than enqueue-time configuration.
func Header(key, value string) Option {
return headerOption{key, value}
}
func (h headerOption) String() string {
var bytes []byte
bytes, _ = json.Marshal(h)
return fmt.Sprintf("Header(%s)", bytes)
}
func (h headerOption) Type() OptionType { return HeaderOpt }
func (h headerOption) Value() interface{} { return [2]string{h[0], h[1]} }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
// //
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option. // ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
@@ -237,6 +262,7 @@ type option struct {
processAt time.Time processAt time.Time
retention time.Duration retention time.Duration
group string group string
headers map[string]string
} }
// composeOptions merges user provided options into the default options // composeOptions merges user provided options into the default options
@@ -251,6 +277,7 @@ func composeOptions(opts ...Option) (option, error) {
timeout: 0, // do not set to defaultTimeout here timeout: 0, // do not set to defaultTimeout here
deadline: time.Time{}, deadline: time.Time{},
processAt: time.Now(), processAt: time.Now(),
headers: make(map[string]string),
} }
for _, opt := range opts { for _, opt := range opts {
switch opt := opt.(type) { switch opt := opt.(type) {
@@ -290,6 +317,9 @@ func composeOptions(opts ...Option) (option, error) {
return option{}, errors.New("group key cannot be empty") return option{}, errors.New("group key cannot be empty")
} }
res.group = key res.group = key
case headerOption:
key, value := opt[0], opt[1]
res.headers[key] = value
default: default:
// ignore unexpected option // ignore unexpected option
} }
@@ -385,7 +415,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
ID: opt.taskID, ID: opt.taskID,
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Headers: task.Headers(), Headers: maps.Clone(task.Headers()),
Queue: opt.queue, Queue: opt.queue,
Retry: opt.retry, Retry: opt.retry,
Deadline: deadline.Unix(), Deadline: deadline.Unix(),
@@ -394,6 +424,12 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
GroupKey: opt.group, GroupKey: opt.group,
Retention: int64(opt.retention.Seconds()), Retention: int64(opt.retention.Seconds()),
} }
if len(opt.headers) > 0 {
if msg.Headers == nil {
msg.Headers = make(map[string]string)
}
maps.Copy(msg.Headers, opt.headers)
}
now := time.Now() now := time.Now()
var state base.TaskState var state base.TaskState
if opt.processAt.After(now) { if opt.processAt.After(now) {
@@ -420,6 +456,142 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
return newTaskInfo(msg, state, opt.processAt, nil), nil return newTaskInfo(msg, state, opt.processAt, nil), nil
} }
// BatchEnqueueResult holds the result of enqueuing a single task within a batch.
type BatchEnqueueResult struct {
TaskInfo *TaskInfo
Err error
}
// BatchEnqueueContext enqueues multiple tasks in a single Redis pipeline round-trip,
// returning a per-task result slice aligned with the input tasks slice.
//
// # Atomicity Guarantees
//
// There is no all-or-nothing guarantee across the batch. Each task is executed as
// an independent Lua script inside a Redis pipeline. Individual scripts are atomic
// (the existence check, hash write, and list/sorted-set push for one task cannot
// be partially applied), but the pipeline as a whole is not wrapped in a
// MULTI/EXEC transaction. This means:
//
// - Partial success is possible: some tasks may be enqueued while others are not.
// - A task whose ID already exists in Redis is silently skipped (treated as a
// no-op by the Lua script), and its result will still show success.
// - If the Redis pipeline call itself fails (e.g. connection lost, context
// cancelled), every task that passed client-side validation receives that
// error — none of them can be assumed to have been enqueued.
//
// # Validation Errors (pre-pipeline)
//
// The following are caught before any Redis call and rejected in the
// corresponding BatchEnqueueResult.Err without affecting other tasks:
//
// - nil task
// - empty task type name
// - invalid options
// - group tasks (not supported in batch mode)
// - unique tasks (not supported in batch mode)
//
// # Supported Task Types
//
// Immediate and scheduled (via [ProcessAt] or [ProcessIn]) tasks are supported.
// Group and unique tasks are rejected as described above.
func (c *Client) BatchEnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) []BatchEnqueueResult {
results := make([]BatchEnqueueResult, len(tasks))
if len(tasks) == 0 {
return results
}
type itemMeta struct {
state base.TaskState
processAt time.Time
}
items := make([]base.BatchEnqueueItem, 0, len(tasks))
itemIndexes := make([]int, 0, len(tasks))
itemMetas := make([]itemMeta, 0, len(tasks))
for i, task := range tasks {
if task == nil {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task cannot be nil")}
continue
}
if strings.TrimSpace(task.Type()) == "" {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task typename cannot be empty")}
continue
}
merged := append(task.opts, opts...)
opt, err := composeOptions(merged...)
if err != nil {
results[i] = BatchEnqueueResult{Err: err}
continue
}
if opt.group != "" {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support group tasks")}
continue
}
if opt.uniqueTTL > 0 {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support unique tasks")}
continue
}
deadline := noDeadline
if !opt.deadline.IsZero() {
deadline = opt.deadline
}
timeout := noTimeout
if opt.timeout != 0 {
timeout = opt.timeout
}
if deadline.Equal(noDeadline) && timeout == noTimeout {
timeout = defaultTimeout
}
msg := &base.TaskMessage{
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: task.Headers(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()),
Retention: int64(opt.retention.Seconds()),
}
now := time.Now()
scheduled := opt.processAt.After(now)
item := base.BatchEnqueueItem{Msg: msg}
var meta itemMeta
if scheduled {
item.ProcessAt = opt.processAt
meta = itemMeta{state: base.TaskStateScheduled, processAt: opt.processAt}
} else {
meta = itemMeta{state: base.TaskStatePending, processAt: now}
}
items = append(items, item)
itemIndexes = append(itemIndexes, i)
itemMetas = append(itemMetas, meta)
}
if len(items) == 0 {
return results
}
_, err := c.broker.BatchEnqueue(ctx, items)
if err != nil {
for _, idx := range itemIndexes {
results[idx] = BatchEnqueueResult{Err: err}
}
return results
}
for j, idx := range itemIndexes {
info := newTaskInfo(items[j].Msg, itemMetas[j].state, itemMetas[j].processAt, nil)
results[idx] = BatchEnqueueResult{TaskInfo: info}
}
return results
}
// Ping performs a ping against the redis connection. // Ping performs a ping against the redis connection.
func (c *Client) Ping() error { func (c *Client) Ping() error {
return c.broker.Ping() return c.broker.Ping()

View File

@@ -13,6 +13,8 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
h "github.com/hibiken/asynq/internal/testutil" h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
@@ -1339,6 +1341,70 @@ func TestClientEnqueueWithHeaders(t *testing.T) {
}, },
}, },
}, },
{
desc: "Task with header option",
task: NewTask("store_data", []byte("data"), Header("channel", "email"), Header("user-id", "bob1234")),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
State: TaskStatePending,
MaxRetry: 25,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
Retry: 25,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Enqueue task with header option",
task: NewTask("store_data", []byte("data")),
opts: []Option{Header("channel", "email"), Header("user-id", "bob1234"), MaxRetry(5)},
wantInfo: &TaskInfo{
Queue: "default",
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
Retry: 5,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
} }
for _, tc := range tests { for _, tc := range tests {
@@ -1661,3 +1727,209 @@ func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
} }
} }
} }
func TestBatchEnqueueContext_ImmediateTasks(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
tasks := []*Task{
NewTask("task1", []byte("payload1")),
NewTask("task2", []byte("payload2")),
NewTask("task3", []byte("payload3")),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 3 {
t.Fatalf("BatchEnqueueContext returned %d results, want 3", len(results))
}
for i, res := range results {
if res.Err != nil {
t.Errorf("results[%d].Err = %v, want nil", i, res.Err)
}
if res.TaskInfo == nil {
t.Errorf("results[%d].TaskInfo is nil, want non-nil", i)
continue
}
if res.TaskInfo.Queue != "default" {
t.Errorf("results[%d].TaskInfo.Queue = %q, want %q", i, res.TaskInfo.Queue, "default")
}
if res.TaskInfo.State != TaskStatePending {
t.Errorf("results[%d].TaskInfo.State = %v, want %v", i, res.TaskInfo.State, TaskStatePending)
}
}
gotPending := h.GetPendingMessages(t, r, "default")
if len(gotPending) != 3 {
t.Errorf("len(pending) = %d, want 3", len(gotPending))
}
}
func TestBatchEnqueueContext_ScheduledTask(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
future := time.Now().Add(1 * time.Hour)
tasks := []*Task{
NewTask("scheduled_task", []byte("payload"), ProcessAt(future)),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 1 {
t.Fatalf("BatchEnqueueContext returned %d results, want 1", len(results))
}
if results[0].Err != nil {
t.Fatalf("results[0].Err = %v, want nil", results[0].Err)
}
if results[0].TaskInfo == nil {
t.Fatal("results[0].TaskInfo is nil, want non-nil")
}
if results[0].TaskInfo.State != TaskStateScheduled {
t.Errorf("results[0].TaskInfo.State = %v, want %v", results[0].TaskInfo.State, TaskStateScheduled)
}
gotScheduled := h.GetScheduledMessages(t, r, "default")
if len(gotScheduled) != 1 {
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
}
}
func TestBatchEnqueueContext_MixedBatch(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
future := time.Now().Add(1 * time.Hour)
tasks := []*Task{
NewTask("immediate1", []byte("p1")),
NewTask("scheduled1", []byte("p2"), ProcessAt(future)),
NewTask("immediate2", []byte("p3")),
NewTask("grouped1", []byte("p4"), Group("mygroup")),
NewTask("immediate3", []byte("p5")),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 5 {
t.Fatalf("BatchEnqueueContext returned %d results, want 5", len(results))
}
// Immediate tasks (indices 0, 2, 4) should succeed with Pending state.
for _, idx := range []int{0, 2, 4} {
if results[idx].Err != nil {
t.Errorf("results[%d].Err = %v, want nil (immediate task)", idx, results[idx].Err)
}
if results[idx].TaskInfo == nil {
t.Errorf("results[%d].TaskInfo is nil, want non-nil", idx)
continue
}
if results[idx].TaskInfo.State != TaskStatePending {
t.Errorf("results[%d].TaskInfo.State = %v, want %v", idx, results[idx].TaskInfo.State, TaskStatePending)
}
}
// Scheduled task (index 1) should succeed with Scheduled state.
if results[1].Err != nil {
t.Errorf("results[1].Err = %v, want nil (scheduled task)", results[1].Err)
}
if results[1].TaskInfo != nil && results[1].TaskInfo.State != TaskStateScheduled {
t.Errorf("results[1].TaskInfo.State = %v, want %v", results[1].TaskInfo.State, TaskStateScheduled)
}
// Grouped task (index 3) should be rejected.
if results[3].Err == nil {
t.Error("results[3].Err is nil, want error for group task")
}
gotPending := h.GetPendingMessages(t, r, "default")
if len(gotPending) != 3 {
t.Errorf("len(pending) = %d, want 3", len(gotPending))
}
gotScheduled := h.GetScheduledMessages(t, r, "default")
if len(gotScheduled) != 1 {
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
}
}
func TestBatchEnqueueContext_ValidationErrors(t *testing.T) {
setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
tests := []struct {
desc string
tasks []*Task
opts []Option
}{
{
desc: "nil task",
tasks: []*Task{nil},
},
{
desc: "empty task typename",
tasks: []*Task{NewTask("", []byte("payload"))},
},
{
desc: "blank task typename",
tasks: []*Task{NewTask(" ", []byte("payload"))},
},
{
desc: "invalid option: unique TTL less than 1s",
tasks: []*Task{NewTask("foo", nil)},
opts: []Option{Unique(300 * time.Millisecond)},
},
{
desc: "group task rejected",
tasks: []*Task{NewTask("foo", nil, Group("mygroup"))},
},
{
desc: "unique task rejected",
tasks: []*Task{NewTask("foo", nil, Unique(time.Hour))},
},
}
for _, tc := range tests {
results := client.BatchEnqueueContext(context.Background(), tc.tasks, tc.opts...)
if len(results) != len(tc.tasks) {
t.Errorf("%s: got %d results, want %d", tc.desc, len(results), len(tc.tasks))
continue
}
for i, res := range results {
if res.Err == nil {
t.Errorf("%s: results[%d].Err = nil, want non-nil error", tc.desc, i)
}
if res.TaskInfo != nil {
t.Errorf("%s: results[%d].TaskInfo = %v, want nil", tc.desc, i, res.TaskInfo)
}
}
}
}
func TestBatchEnqueueContext_BrokerError(t *testing.T) {
r := rdb.NewRDB(setup(t))
defer r.Close()
testBroker := testbroker.NewTestBroker(r)
client := &Client{broker: testBroker, sharedConnection: true}
tasks := []*Task{
NewTask("task1", []byte("p1")),
NewTask("task2", []byte("p2")),
}
testBroker.Sleep()
results := client.BatchEnqueueContext(context.Background(), tasks)
testBroker.Wakeup()
if len(results) != 2 {
t.Fatalf("BatchEnqueueContext returned %d results, want 2", len(results))
}
for i, res := range results {
if res.Err == nil {
t.Errorf("results[%d].Err = nil, want non-nil error when broker is down", i)
}
if res.TaskInfo != nil {
t.Errorf("results[%d].TaskInfo = %v, want nil on broker error", i, res.TaskInfo)
}
}
}

View File

@@ -5,6 +5,7 @@
package asynq package asynq
import ( import (
"encoding/json"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@@ -1002,6 +1003,13 @@ func parseOption(s string) (Option, error) {
return nil, err return nil, err
} }
return Retention(d), nil return Retention(d), nil
case "Header":
var h [2]string
err := json.Unmarshal([]byte(arg), &h)
if err != nil {
return nil, err
}
return Header(h[0], h[1]), nil
default: default:
return nil, fmt.Errorf("cannot not parse option string %q", s) return nil, fmt.Errorf("cannot not parse option string %q", s)
} }

View File

@@ -3526,6 +3526,7 @@ func TestParseOption(t *testing.T) {
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow}, {ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute}, {`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
{`Retention(24h)`, RetentionOpt, 24 * time.Hour}, {`Retention(24h)`, RetentionOpt, 24 * time.Hour},
{`Header(["email", "hello@example.com"])`, HeaderOpt, [2]string{"email", "hello@example.com"}},
} }
for _, tc := range tests { for _, tc := range tests {
@@ -3573,6 +3574,14 @@ func TestParseOption(t *testing.T) {
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) { if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
} }
case HeaderOpt:
gotVal, ok := got.Value().([2]string)
if !ok {
t.Fatal("returned Option with non array value")
}
if gotVal != tc.wantVal.([2]string) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
default: default:
t.Fatalf("returned Option with unexpected type: %v", got.Type()) t.Fatalf("returned Option with unexpected type: %v", got.Type())
} }

View File

@@ -684,6 +684,14 @@ func (l *Lease) IsValid() bool {
return l.expireAt.After(now) || l.expireAt.Equal(now) return l.expireAt.After(now) || l.expireAt.Equal(now)
} }
// BatchEnqueueItem pairs a task message with optional scheduling metadata for
// batch enqueue operations. If ProcessAt is zero, the task is enqueued for
// immediate processing; otherwise it is added to the scheduled set.
type BatchEnqueueItem struct {
Msg *TaskMessage
ProcessAt time.Time // zero value → immediate
}
// Broker is a message broker that supports operations to manage task queues. // Broker is a message broker that supports operations to manage task queues.
// //
// See rdb.RDB as a reference implementation. // See rdb.RDB as a reference implementation.
@@ -692,6 +700,12 @@ type Broker interface {
Close() error Close() error
Enqueue(ctx context.Context, msg *TaskMessage) error Enqueue(ctx context.Context, msg *TaskMessage) error
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
// BatchEnqueue enqueues multiple tasks in a single round-trip. It returns the
// count of newly enqueued tasks; duplicate IDs are silently skipped. The error
// is non-nil only on infrastructure failure (e.g. lost connection), in which
// case the count is meaningless. Individual task scripts are atomic but the
// batch as a whole is not transactional — partial success is possible.
BatchEnqueue(ctx context.Context, items []BatchEnqueueItem) (int, error)
Dequeue(qnames ...string) (*TaskMessage, time.Time, error) Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
Done(ctx context.Context, msg *TaskMessage) error Done(ctx context.Context, msg *TaskMessage) error
MarkAsComplete(ctx context.Context, msg *TaskMessage) error MarkAsComplete(ctx context.Context, msg *TaskMessage) error

View File

@@ -139,6 +139,118 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
return nil return nil
} }
// BatchEnqueue adds all given tasks to Redis using a single pipeline round-trip.
// Each item is either enqueued immediately (ProcessAt is zero) or added to the
// scheduled sorted set.
//
// WARNING: tasks whose IDs already exist in Redis are silently skipped.
//
// The pipeline executes independent Lua scripts per task — there is no
// MULTI/EXEC wrapping the batch, so individual tasks may succeed or fail
// independently. The returned int is the number of tasks actually written;
// skipped duplicates do not count. The returned error is non-nil only when the
// pipeline call itself fails (network error, context cancellation, etc.), in
// which case no individual result should be trusted.
//
// Message encoding errors cause an immediate return before any Redis I/O.
func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
var op errors.Op = "rdb.BatchEnqueue"
if len(items) == 0 {
return 0, nil
}
// Preload Lua scripts so that EVALSHA inside the pipeline does not fail with
// NOSCRIPT. Script.Run on a pipeline only sends EVALSHA (unlike non-pipeline
// Run which retries with EVAL on NOSCRIPT).
needsEnqueue, needsSchedule := false, false
for _, item := range items {
if item.ProcessAt.IsZero() {
needsEnqueue = true
} else {
needsSchedule = true
}
if needsEnqueue && needsSchedule {
break
}
}
if needsEnqueue {
if err := enqueueCmd.Load(ctx, r.client).Err(); err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load enqueue script: %v", err))
}
}
if needsSchedule {
if err := scheduleCmd.Load(ctx, r.client).Err(); err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load schedule script: %v", err))
}
}
pipe := r.client.Pipeline()
// Track which pipeline slot holds each item's script result.
scriptIdxs := make([]int, 0, len(items))
pipeLen := 0
// Track queues we add to AllQueues in this pipeline so we can roll back the
// in-memory cache on failure.
var newQueues []string
now := r.clock.Now().UnixNano()
for _, item := range items {
encoded, err := base.EncodeMessage(item.Msg)
if err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if _, found := r.queuesPublished.Load(item.Msg.Queue); !found {
pipe.SAdd(ctx, base.AllQueues, item.Msg.Queue)
r.queuesPublished.Store(item.Msg.Queue, true)
newQueues = append(newQueues, item.Msg.Queue)
pipeLen++
}
if item.ProcessAt.IsZero() {
keys := []string{
base.TaskKey(item.Msg.Queue, item.Msg.ID),
base.PendingKey(item.Msg.Queue),
}
argv := []interface{}{encoded, item.Msg.ID, now}
enqueueCmd.Run(ctx, pipe, keys, argv...)
} else {
keys := []string{
base.TaskKey(item.Msg.Queue, item.Msg.ID),
base.ScheduledKey(item.Msg.Queue),
}
argv := []interface{}{encoded, item.ProcessAt.Unix(), item.Msg.ID}
scheduleCmd.Run(ctx, pipe, keys, argv...)
}
scriptIdxs = append(scriptIdxs, pipeLen)
pipeLen++
}
cmds, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
for _, q := range newQueues {
r.queuesPublished.Delete(q)
}
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis pipeline error: %v", err))
}
enqueued := 0
for _, idx := range scriptIdxs {
if idx >= len(cmds) {
continue
}
res, err := cmds[idx].(*redis.Cmd).Result()
if err != nil {
continue
}
if n, ok := res.(int64); ok && n == 1 {
enqueued++
}
}
return enqueued, nil
}
// enqueueUniqueCmd enqueues the task message if the task is unique. // enqueueUniqueCmd enqueues the task message if the task is unique.
// //
// KEYS[1] -> unique key // KEYS[1] -> unique key
@@ -1488,6 +1600,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
pubsub := r.client.Subscribe(ctx, base.CancelChannel) pubsub := r.client.Subscribe(ctx, base.CancelChannel)
_, err := pubsub.Receive(ctx) _, err := pubsub.Receive(ctx)
if err != nil { if err != nil {
pubsub.Close()
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err)) return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
} }
return pubsub, nil return pubsub, nil

View File

@@ -160,6 +160,156 @@ func TestEnqueueTaskIdConflictError(t *testing.T) {
} }
} }
func TestBatchEnqueue(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}))
t2 := h.NewTaskMessageWithQueue("generate_csv", h.JSON(map[string]interface{}{}), "csv")
t3 := h.NewTaskMessageWithQueue("sync", nil, "low")
enqueueTime := time.Now()
r.SetClock(timeutil.NewSimulatedClock(enqueueTime))
t.Run("enqueue multiple tasks", func(t *testing.T) {
h.FlushDB(t, r.client)
items := []base.BatchEnqueueItem{
{Msg: t1},
{Msg: t2},
{Msg: t3},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 3 {
t.Errorf("BatchEnqueue returned %d, want 3", n)
}
for _, item := range items {
msg := item.Msg
pendingKey := base.PendingKey(msg.Queue)
pendingIDs := r.client.LRange(context.Background(), pendingKey, 0, -1).Val()
found := false
for _, id := range pendingIDs {
if id == msg.ID {
found = true
break
}
}
if !found {
t.Errorf("task %s not found in pending list %s", msg.ID, pendingKey)
}
taskKey := base.TaskKey(msg.Queue, msg.ID)
state := r.client.HGet(context.Background(), taskKey, "state").Val()
if state != "pending" {
t.Errorf("state for task %s = %q, want %q", msg.ID, state, "pending")
}
}
})
t.Run("empty batch", func(t *testing.T) {
h.FlushDB(t, r.client)
n, err := r.BatchEnqueue(context.Background(), nil)
if err != nil {
t.Fatalf("BatchEnqueue(nil) returned error: %v", err)
}
if n != 0 {
t.Errorf("BatchEnqueue(nil) returned %d, want 0", n)
}
})
t.Run("duplicate IDs skipped", func(t *testing.T) {
h.FlushDB(t, r.client)
if err := r.Enqueue(context.Background(), t1); err != nil {
t.Fatalf("pre-enqueue failed: %v", err)
}
dup := *t1
newMsg := h.NewTaskMessage("new_task", nil)
items := []base.BatchEnqueueItem{
{Msg: &dup},
{Msg: newMsg},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 1 {
t.Errorf("BatchEnqueue returned %d, want 1 (duplicate should be skipped)", n)
}
})
t.Run("scheduled tasks", func(t *testing.T) {
h.FlushDB(t, r.client)
future := time.Now().Add(1 * time.Hour)
s1 := h.NewTaskMessage("deferred_email", nil)
items := []base.BatchEnqueueItem{
{Msg: t1},
{Msg: s1, ProcessAt: future},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 2 {
t.Errorf("BatchEnqueue returned %d, want 2", n)
}
// Immediate task should be in pending.
pendingIDs := r.client.LRange(context.Background(), base.PendingKey(t1.Queue), 0, -1).Val()
foundPending := false
for _, id := range pendingIDs {
if id == t1.ID {
foundPending = true
}
}
if !foundPending {
t.Errorf("immediate task %s not found in pending list", t1.ID)
}
// Scheduled task should be in scheduled set.
scheduledIDs := r.client.ZRange(context.Background(), base.ScheduledKey(s1.Queue), 0, -1).Val()
foundScheduled := false
for _, id := range scheduledIDs {
if id == s1.ID {
foundScheduled = true
}
}
if !foundScheduled {
t.Errorf("scheduled task %s not found in scheduled set", s1.ID)
}
taskKey := base.TaskKey(s1.Queue, s1.ID)
state := r.client.HGet(context.Background(), taskKey, "state").Val()
if state != "scheduled" {
t.Errorf("state for scheduled task %s = %q, want %q", s1.ID, state, "scheduled")
}
})
t.Run("pipeline error from cancelled context", func(t *testing.T) {
h.FlushDB(t, r.client)
msg := h.NewTaskMessage("pipeline_error_task", nil)
items := []base.BatchEnqueueItem{{Msg: msg}}
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := r.BatchEnqueue(ctx, items)
if err == nil {
t.Error("BatchEnqueue with cancelled context returned nil error, want non-nil")
}
})
}
func TestEnqueueQueueCache(t *testing.T) { func TestEnqueueQueueCache(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
@@ -3274,6 +3424,29 @@ func TestCancelationPubSub(t *testing.T) {
mu.Unlock() mu.Unlock()
} }
func TestCancelationPubSubReceiveError(t *testing.T) {
// Use a client connected to a non-existent Redis server to trigger
// a Receive() error. This verifies that the pubsub connection is
// closed on error, preventing connection leaks.
client := redis.NewClient(&redis.Options{
Addr: "localhost:0", // invalid port — connection will fail
})
r := NewRDB(client)
defer r.Close()
pubsub, err := r.CancelationPubSub()
if err == nil {
// If no error, we must clean up the pubsub.
if pubsub != nil {
pubsub.Close()
}
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
}
if pubsub != nil {
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
}
}
func TestWriteResult(t *testing.T) { func TestWriteResult(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()

View File

@@ -64,6 +64,15 @@ func (tb *TestBroker) EnqueueUnique(ctx context.Context, msg *base.TaskMessage,
return tb.real.EnqueueUnique(ctx, msg, ttl) return tb.real.EnqueueUnique(ctx, msg, ttl)
} }
func (tb *TestBroker) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return 0, errRedisDown
}
return tb.real.BatchEnqueue(ctx, items)
}
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) { func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()

View File

@@ -7,7 +7,6 @@ package cmd
import ( import (
"fmt" "fmt"
"io" "io"
"os"
"sort" "sort"
"time" "time"
@@ -36,14 +35,14 @@ var cronListCmd = &cobra.Command{
Use: "list", Use: "list",
Aliases: []string{"ls"}, Aliases: []string{"ls"},
Short: "List cron entries", Short: "List cron entries",
Run: cronList, RunE: cronList,
} }
var cronHistoryCmd = &cobra.Command{ var cronHistoryCmd = &cobra.Command{
Use: "history <entry_id> [<entry_id>...]", Use: "history <entry_id> [<entry_id>...]",
Short: "Show history of each cron tasks", Short: "Show history of each cron tasks",
Args: cobra.MinimumNArgs(1), Args: cobra.MinimumNArgs(1),
Run: cronHistory, RunE: cronHistory,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 $ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 bf6a8594-cd03-4968-b36a-8572c5e160dd $ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 bf6a8594-cd03-4968-b36a-8572c5e160dd
@@ -51,17 +50,16 @@ var cronHistoryCmd = &cobra.Command{
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 --page=2`), $ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 --page=2`),
} }
func cronList(cmd *cobra.Command, args []string) { func cronList(cmd *cobra.Command, args []string) error {
inspector := createInspector() inspector := createInspector()
entries, err := inspector.SchedulerEntries() entries, err := inspector.SchedulerEntries()
if err != nil { if err != nil {
fmt.Println(err) return fmt.Errorf("could not fetch scheduler entries: %v", err)
os.Exit(1)
} }
if len(entries) == 0 { if len(entries) == 0 {
fmt.Println("No scheduler entries") fmt.Println("No scheduler entries")
return return nil
} }
// Sort entries by spec. // Sort entries by spec.
@@ -78,6 +76,7 @@ func cronList(cmd *cobra.Command, args []string) {
} }
} }
printTable(cols, printRows) printTable(cols, printRows)
return nil
} }
// Returns a string describing when the next enqueue will happen. // Returns a string describing when the next enqueue will happen.
@@ -97,16 +96,14 @@ func prevEnqueue(prevEnqueuedAt time.Time) string {
return fmt.Sprintf("%v ago", time.Since(prevEnqueuedAt).Round(time.Second)) return fmt.Sprintf("%v ago", time.Since(prevEnqueuedAt).Round(time.Second))
} }
func cronHistory(cmd *cobra.Command, args []string) { func cronHistory(cmd *cobra.Command, args []string) error {
pageNum, err := cmd.Flags().GetInt("page") pageNum, err := cmd.Flags().GetInt("page")
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
pageSize, err := cmd.Flags().GetInt("size") pageSize, err := cmd.Flags().GetInt("size")
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
inspector := createInspector() inspector := createInspector()
for i, entryID := range args { for i, entryID := range args {
@@ -136,4 +133,5 @@ func cronHistory(cmd *cobra.Command, args []string) {
} }
printTable(cols, printRows) printTable(cols, printRows)
} }
return nil
} }

View File

@@ -6,7 +6,6 @@ package cmd
import ( import (
"fmt" "fmt"
"os"
"time" "time"
"github.com/MakeNowJust/heredoc/v2" "github.com/MakeNowJust/heredoc/v2"
@@ -32,14 +31,14 @@ var dashCmd = &cobra.Command{
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq dash $ asynq dash
$ asynq dash --refresh=3s`), $ asynq dash --refresh=3s`),
Run: func(cmd *cobra.Command, args []string) { RunE: func(cmd *cobra.Command, args []string) error {
if flagPollInterval < 1*time.Second { if flagPollInterval < 1*time.Second {
fmt.Println("error: --refresh cannot be less than 1s") return fmt.Errorf("--refresh cannot be less than 1s")
os.Exit(1)
} }
dash.Run(dash.Options{ dash.Run(dash.Options{
PollInterval: flagPollInterval, PollInterval: flagPollInterval,
RedisConnOpt: getRedisConnOpt(), RedisConnOpt: getRedisConnOpt(),
}) })
return nil
}, },
} }

View File

@@ -6,7 +6,6 @@ package cmd
import ( import (
"fmt" "fmt"
"os"
"github.com/MakeNowJust/heredoc/v2" "github.com/MakeNowJust/heredoc/v2"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -31,22 +30,25 @@ var groupListCmd = &cobra.Command{
Aliases: []string{"ls"}, Aliases: []string{"ls"},
Short: "List groups", Short: "List groups",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: groupLists, RunE: groupLists,
} }
func groupLists(cmd *cobra.Command, args []string) { func groupLists(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
inspector := createInspector() inspector := createInspector()
groups, err := inspector.Groups(qname) groups, err := inspector.Groups(qname)
if err != nil {
return fmt.Errorf("could not fetch groups: %v", err)
}
if len(groups) == 0 { if len(groups) == 0 {
fmt.Printf("No groups found in queue %q\n", qname) fmt.Printf("No groups found in queue %q\n", qname)
return return nil
} }
for _, g := range groups { for _, g := range groups {
fmt.Println(g.Group) fmt.Println(g.Group)
} }
return nil
} }

View File

@@ -7,7 +7,6 @@ package cmd
import ( import (
"fmt" "fmt"
"io" "io"
"os"
"github.com/MakeNowJust/heredoc/v2" "github.com/MakeNowJust/heredoc/v2"
"github.com/fatih/color" "github.com/fatih/color"
@@ -44,16 +43,14 @@ var queueListCmd = &cobra.Command{
Use: "list", Use: "list",
Short: "List queues", Short: "List queues",
Aliases: []string{"ls"}, Aliases: []string{"ls"},
// TODO: Use RunE instead? RunE: queueList,
Run: queueList,
} }
var queueInspectCmd = &cobra.Command{ var queueInspectCmd = &cobra.Command{
Use: "inspect <queue> [<queue>...]", Use: "inspect <queue> [<queue>...]",
Short: "Display detailed information on one or more queues", Short: "Display detailed information on one or more queues",
Args: cobra.MinimumNArgs(1), Args: cobra.MinimumNArgs(1),
// TODO: Use RunE instead? RunE: queueInspect,
Run: queueInspect,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq queue inspect myqueue $ asynq queue inspect myqueue
$ asynq queue inspect queue1 queue2 queue3`), $ asynq queue inspect queue1 queue2 queue3`),
@@ -63,7 +60,7 @@ var queueHistoryCmd = &cobra.Command{
Use: "history <queue> [<queue>...]", Use: "history <queue> [<queue>...]",
Short: "Display historical aggregate data from one or more queues", Short: "Display historical aggregate data from one or more queues",
Args: cobra.MinimumNArgs(1), Args: cobra.MinimumNArgs(1),
Run: queueHistory, RunE: queueHistory,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq queue history myqueue $ asynq queue history myqueue
$ asynq queue history queue1 queue2 queue3 $ asynq queue history queue1 queue2 queue3
@@ -74,7 +71,7 @@ var queuePauseCmd = &cobra.Command{
Use: "pause <queue> [<queue>...]", Use: "pause <queue> [<queue>...]",
Short: "Pause one or more queues", Short: "Pause one or more queues",
Args: cobra.MinimumNArgs(1), Args: cobra.MinimumNArgs(1),
Run: queuePause, RunE: queuePause,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq queue pause myqueue $ asynq queue pause myqueue
$ asynq queue pause queue1 queue2 queue3`), $ asynq queue pause queue1 queue2 queue3`),
@@ -85,7 +82,7 @@ var queueUnpauseCmd = &cobra.Command{
Short: "Resume (unpause) one or more queues", Short: "Resume (unpause) one or more queues",
Args: cobra.MinimumNArgs(1), Args: cobra.MinimumNArgs(1),
Aliases: []string{"unpause"}, Aliases: []string{"unpause"},
Run: queueUnpause, RunE: queueUnpause,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq queue resume myqueue $ asynq queue resume myqueue
$ asynq queue resume queue1 queue2 queue3`), $ asynq queue resume queue1 queue2 queue3`),
@@ -96,14 +93,14 @@ var queueRemoveCmd = &cobra.Command{
Short: "Remove one or more queues", Short: "Remove one or more queues",
Aliases: []string{"rm", "delete"}, Aliases: []string{"rm", "delete"},
Args: cobra.MinimumNArgs(1), Args: cobra.MinimumNArgs(1),
Run: queueRemove, RunE: queueRemove,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq queue rm myqueue $ asynq queue rm myqueue
$ asynq queue rm queue1 queue2 queue3 $ asynq queue rm queue1 queue2 queue3
$ asynq queue rm myqueue --force`), $ asynq queue rm myqueue --force`),
} }
func queueList(cmd *cobra.Command, args []string) { func queueList(cmd *cobra.Command, args []string) error {
type queueInfo struct { type queueInfo struct {
name string name string
keyslot int64 keyslot int64
@@ -112,8 +109,7 @@ func queueList(cmd *cobra.Command, args []string) {
inspector := createInspector() inspector := createInspector()
queues, err := inspector.Queues() queues, err := inspector.Queues()
if err != nil { if err != nil {
fmt.Printf("error: Could not fetch list of queues: %v\n", err) return fmt.Errorf("could not fetch list of queues: %v", err)
os.Exit(1)
} }
var qs []*queueInfo var qs []*queueInfo
for _, qname := range queues { for _, qname := range queues {
@@ -121,13 +117,13 @@ func queueList(cmd *cobra.Command, args []string) {
if useRedisCluster { if useRedisCluster {
keyslot, err := inspector.ClusterKeySlot(qname) keyslot, err := inspector.ClusterKeySlot(qname)
if err != nil { if err != nil {
fmt.Errorf("error: Could not get cluster keyslot for %q\n", qname) fmt.Printf("error: could not get cluster keyslot for %q\n", qname)
continue continue
} }
q.keyslot = keyslot q.keyslot = keyslot
nodes, err := inspector.ClusterNodes(qname) nodes, err := inspector.ClusterNodes(qname)
if err != nil { if err != nil {
fmt.Errorf("error: Could not get cluster nodes for %q\n", qname) fmt.Printf("error: could not get cluster nodes for %q\n", qname)
continue continue
} }
q.nodes = nodes q.nodes = nodes
@@ -148,9 +144,10 @@ func queueList(cmd *cobra.Command, args []string) {
fmt.Println(q.name) fmt.Println(q.name)
} }
} }
return nil
} }
func queueInspect(cmd *cobra.Command, args []string) { func queueInspect(cmd *cobra.Command, args []string) error {
inspector := createInspector() inspector := createInspector()
for i, qname := range args { for i, qname := range args {
if i > 0 { if i > 0 {
@@ -163,6 +160,7 @@ func queueInspect(cmd *cobra.Command, args []string) {
} }
printQueueInfo(info) printQueueInfo(info)
} }
return nil
} }
func printQueueInfo(info *asynq.QueueInfo) { func printQueueInfo(info *asynq.QueueInfo) {
@@ -195,11 +193,10 @@ func printQueueInfo(info *asynq.QueueInfo) {
) )
} }
func queueHistory(cmd *cobra.Command, args []string) { func queueHistory(cmd *cobra.Command, args []string) error {
days, err := cmd.Flags().GetInt("days") days, err := cmd.Flags().GetInt("days")
if err != nil { if err != nil {
fmt.Printf("error: Internal error: %v\n", err) return err
os.Exit(1)
} }
inspector := createInspector() inspector := createInspector()
for i, qname := range args { for i, qname := range args {
@@ -214,6 +211,7 @@ func queueHistory(cmd *cobra.Command, args []string) {
} }
printDailyStats(stats) printDailyStats(stats)
} }
return nil
} }
func printDailyStats(stats []*asynq.DailyStats) { func printDailyStats(stats []*asynq.DailyStats) {
@@ -233,49 +231,63 @@ func printDailyStats(stats []*asynq.DailyStats) {
) )
} }
func queuePause(cmd *cobra.Command, args []string) { func queuePause(cmd *cobra.Command, args []string) error {
inspector := createInspector() inspector := createInspector()
var firstErr error
for _, qname := range args { for _, qname := range args {
err := inspector.PauseQueue(qname) err := inspector.PauseQueue(qname)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
if firstErr == nil {
firstErr = err
}
continue continue
} }
fmt.Printf("Successfully paused queue %q\n", qname) fmt.Printf("Successfully paused queue %q\n", qname)
} }
return firstErr
} }
func queueUnpause(cmd *cobra.Command, args []string) { func queueUnpause(cmd *cobra.Command, args []string) error {
inspector := createInspector() inspector := createInspector()
var firstErr error
for _, qname := range args { for _, qname := range args {
err := inspector.UnpauseQueue(qname) err := inspector.UnpauseQueue(qname)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
if firstErr == nil {
firstErr = err
}
continue continue
} }
fmt.Printf("Successfully unpaused queue %q\n", qname) fmt.Printf("Successfully unpaused queue %q\n", qname)
} }
return firstErr
} }
func queueRemove(cmd *cobra.Command, args []string) { func queueRemove(cmd *cobra.Command, args []string) error {
// TODO: Use inspector once RemoveQueue become public API. // TODO: Use inspector once RemoveQueue become public API.
force, err := cmd.Flags().GetBool("force") force, err := cmd.Flags().GetBool("force")
if err != nil { if err != nil {
fmt.Printf("error: Internal error: %v\n", err) return err
os.Exit(1)
} }
r := createRDB() r := createRDB()
var firstErr error
for _, qname := range args { for _, qname := range args {
err = r.RemoveQueue(qname, force) err = r.RemoveQueue(qname, force)
if err != nil { if err != nil {
if errors.IsQueueNotEmpty(err) { if errors.IsQueueNotEmpty(err) {
fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname) fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname)
continue } else {
}
fmt.Printf("error: %v\n", err) fmt.Printf("error: %v\n", err)
}
if firstErr == nil {
firstErr = err
}
continue continue
} }
fmt.Printf("Successfully removed queue %q\n", qname) fmt.Printf("Successfully removed queue %q\n", qname)
} }
return firstErr
} }

View File

@@ -487,35 +487,31 @@ func isPrintable(data []byte) bool {
} }
// Helper to turn a command line flag into a duration // Helper to turn a command line flag into a duration
func getDuration(cmd *cobra.Command, arg string) time.Duration { func getDuration(cmd *cobra.Command, arg string) (time.Duration, error) {
durationStr, err := cmd.Flags().GetString(arg) durationStr, err := cmd.Flags().GetString(arg)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return 0, err
os.Exit(1)
} }
duration, err := time.ParseDuration(durationStr) duration, err := time.ParseDuration(durationStr)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return 0, err
os.Exit(1)
} }
return duration return duration, nil
} }
// Helper to turn a command line flag into a time // Helper to turn a command line flag into a time
func getTime(cmd *cobra.Command, arg string) time.Time { func getTime(cmd *cobra.Command, arg string) (time.Time, error) {
timeStr, err := cmd.Flags().GetString(arg) timeStr, err := cmd.Flags().GetString(arg)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return time.Time{}, err
os.Exit(1)
} }
timeVal, err := time.Parse(time.RFC3339, timeStr) timeVal, err := time.Parse(time.RFC3339, timeStr)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return time.Time{}, err
os.Exit(1)
} }
return timeVal return timeVal, nil
} }

View File

@@ -7,7 +7,6 @@ package cmd
import ( import (
"fmt" "fmt"
"io" "io"
"os"
"sort" "sort"
"strings" "strings"
"time" "time"
@@ -44,20 +43,19 @@ The command shows the following for each server:
A "active" server is pulling tasks from queues and processing them. A "active" server is pulling tasks from queues and processing them.
A "stopped" server is no longer pulling new tasks from queues`, A "stopped" server is no longer pulling new tasks from queues`,
Run: serverList, RunE: serverList,
} }
func serverList(cmd *cobra.Command, args []string) { func serverList(cmd *cobra.Command, args []string) error {
r := createRDB() r := createRDB()
servers, err := r.ListServers() servers, err := r.ListServers()
if err != nil { if err != nil {
fmt.Println(err) return fmt.Errorf("could not fetch list of servers: %v", err)
os.Exit(1)
} }
if len(servers) == 0 { if len(servers) == 0 {
fmt.Println("No running servers") fmt.Println("No running servers")
return return nil
} }
// sort by hostname and pid // sort by hostname and pid
@@ -80,6 +78,7 @@ func serverList(cmd *cobra.Command, args []string) {
} }
} }
printTable(cols, printRows) printTable(cols, printRows)
return nil
} }
func formatQueues(qmap map[string]int) string { func formatQueues(qmap map[string]int) string {

View File

@@ -35,7 +35,7 @@ var statsCmd = &cobra.Command{
* Aggregate data for the current day * Aggregate data for the current day
* Basic information about the running redis instance`), * Basic information about the running redis instance`),
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: stats, RunE: stats,
} }
var jsonFlag bool var jsonFlag bool
@@ -74,13 +74,12 @@ type FullStats struct {
RedisInfo map[string]string `json:"redis"` RedisInfo map[string]string `json:"redis"`
} }
func stats(cmd *cobra.Command, args []string) { func stats(cmd *cobra.Command, args []string) error {
r := createRDB() r := createRDB()
queues, err := r.AllQueues() queues, err := r.AllQueues()
if err != nil { if err != nil {
fmt.Println(err) return fmt.Errorf("could not fetch queues: %v", err)
os.Exit(1)
} }
var aggStats AggregateStats var aggStats AggregateStats
@@ -88,8 +87,7 @@ func stats(cmd *cobra.Command, args []string) {
for _, qname := range queues { for _, qname := range queues {
s, err := r.CurrentStats(qname) s, err := r.CurrentStats(qname)
if err != nil { if err != nil {
fmt.Println(err) return fmt.Errorf("could not fetch stats for queue %q: %v", qname, err)
os.Exit(1)
} }
aggStats.Active += s.Active aggStats.Active += s.Active
aggStats.Pending += s.Pending aggStats.Pending += s.Pending
@@ -110,8 +108,7 @@ func stats(cmd *cobra.Command, args []string) {
info, err = r.RedisInfo() info, err = r.RedisInfo()
} }
if err != nil { if err != nil {
fmt.Println(err) return fmt.Errorf("could not fetch redis info: %v", err)
os.Exit(1)
} }
if jsonFlag { if jsonFlag {
@@ -122,12 +119,11 @@ func stats(cmd *cobra.Command, args []string) {
}) })
if err != nil { if err != nil {
fmt.Println(err) return fmt.Errorf("could not marshal stats to JSON: %v", err)
os.Exit(1)
} }
fmt.Println(string(statsJSON)) fmt.Println(string(statsJSON))
return return nil
} }
bold := color.New(color.Bold) bold := color.New(color.Bold)
@@ -151,6 +147,7 @@ func stats(cmd *cobra.Command, args []string) {
printInfo(info) printInfo(info)
} }
fmt.Println() fmt.Println()
return nil
} }
func printStatsByState(s *AggregateStats) { func printStatsByState(s *AggregateStats) {

View File

@@ -7,7 +7,6 @@ package cmd
import ( import (
"fmt" "fmt"
"io" "io"
"os"
"time" "time"
"github.com/MakeNowJust/heredoc/v2" "github.com/MakeNowJust/heredoc/v2"
@@ -120,14 +119,14 @@ var taskListCmd = &cobra.Command{
$ asynq task list --queue=myqueue --state=pending $ asynq task list --queue=myqueue --state=pending
$ asynq task list --queue=myqueue --state=aggregating --group=mygroup $ asynq task list --queue=myqueue --state=aggregating --group=mygroup
$ asynq task list --queue=myqueue --state=scheduled --page=2`), $ asynq task list --queue=myqueue --state=scheduled --page=2`),
Run: taskList, RunE: taskList,
} }
var taskInspectCmd = &cobra.Command{ var taskInspectCmd = &cobra.Command{
Use: "inspect --queue=<queue> --id=<task_id>", Use: "inspect --queue=<queue> --id=<task_id>",
Short: "Display detailed information on the specified task", Short: "Display detailed information on the specified task",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: taskInspect, RunE: taskInspect,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task inspect --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`), $ asynq task inspect --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
} }
@@ -136,7 +135,7 @@ var taskCancelCmd = &cobra.Command{
Use: "cancel <task_id> [<task_id>...]", Use: "cancel <task_id> [<task_id>...]",
Short: "Cancel one or more active tasks", Short: "Cancel one or more active tasks",
Args: cobra.MinimumNArgs(1), Args: cobra.MinimumNArgs(1),
Run: taskCancel, RunE: taskCancel,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task cancel f1720682-f5a6-4db1-8953-4f48ae541d0f`), $ asynq task cancel f1720682-f5a6-4db1-8953-4f48ae541d0f`),
} }
@@ -145,7 +144,7 @@ var taskArchiveCmd = &cobra.Command{
Use: "archive --queue=<queue> --id=<task_id>", Use: "archive --queue=<queue> --id=<task_id>",
Short: "Archive a task with the given id", Short: "Archive a task with the given id",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: taskArchive, RunE: taskArchive,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task archive --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`), $ asynq task archive --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
} }
@@ -155,7 +154,7 @@ var taskDeleteCmd = &cobra.Command{
Aliases: []string{"remove", "rm"}, Aliases: []string{"remove", "rm"},
Short: "Delete a task with the given id", Short: "Delete a task with the given id",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: taskDelete, RunE: taskDelete,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task delete --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`), $ asynq task delete --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
} }
@@ -164,7 +163,7 @@ var taskRunCmd = &cobra.Command{
Use: "run --queue=<queue> --id=<task_id>", Use: "run --queue=<queue> --id=<task_id>",
Short: "Run a task with the given id", Short: "Run a task with the given id",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: taskRun, RunE: taskRun,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`), $ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
} }
@@ -173,7 +172,7 @@ var taskEnqueueCmd = &cobra.Command{
Use: "enqueue --type_name=footype --payload=barpayload", Use: "enqueue --type_name=footype --payload=barpayload",
Short: "Enqueue a task", Short: "Enqueue a task",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: taskEnqueue, RunE: taskEnqueue,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task enqueue -t footype -l barpayload $ asynq task enqueue -t footype -l barpayload
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`), $ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
@@ -183,7 +182,7 @@ var taskArchiveAllCmd = &cobra.Command{
Use: "archiveall --queue=<queue> --state=<state>", Use: "archiveall --queue=<queue> --state=<state>",
Short: "Archive all tasks in the given state", Short: "Archive all tasks in the given state",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: taskArchiveAll, RunE: taskArchiveAll,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task archiveall --queue=myqueue --state=retry $ asynq task archiveall --queue=myqueue --state=retry
$ asynq task archiveall --queue=myqueue --state=aggregating --group=mygroup`), $ asynq task archiveall --queue=myqueue --state=aggregating --group=mygroup`),
@@ -193,7 +192,7 @@ var taskDeleteAllCmd = &cobra.Command{
Use: "deleteall --queue=<queue> --state=<state>", Use: "deleteall --queue=<queue> --state=<state>",
Short: "Delete all tasks in the given state", Short: "Delete all tasks in the given state",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: taskDeleteAll, RunE: taskDeleteAll,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task deleteall --queue=myqueue --state=archived $ asynq task deleteall --queue=myqueue --state=archived
$ asynq task deleteall --queue=myqueue --state=aggregating --group=mygroup`), $ asynq task deleteall --queue=myqueue --state=aggregating --group=mygroup`),
@@ -203,74 +202,66 @@ var taskRunAllCmd = &cobra.Command{
Use: "runall --queue=<queue> --state=<state>", Use: "runall --queue=<queue> --state=<state>",
Short: "Run all tasks in the given state", Short: "Run all tasks in the given state",
Args: cobra.NoArgs, Args: cobra.NoArgs,
Run: taskRunAll, RunE: taskRunAll,
Example: heredoc.Doc(` Example: heredoc.Doc(`
$ asynq task runall --queue=myqueue --state=retry $ asynq task runall --queue=myqueue --state=retry
$ asynq task runall --queue=myqueue --state=aggregating --group=mygroup`), $ asynq task runall --queue=myqueue --state=aggregating --group=mygroup`),
} }
func taskList(cmd *cobra.Command, args []string) { func taskList(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
state, err := cmd.Flags().GetString("state") state, err := cmd.Flags().GetString("state")
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
pageNum, err := cmd.Flags().GetInt("page") pageNum, err := cmd.Flags().GetInt("page")
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
pageSize, err := cmd.Flags().GetInt("size") pageSize, err := cmd.Flags().GetInt("size")
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
switch state { switch state {
case "active": case "active":
listActiveTasks(qname, pageNum, pageSize) return listActiveTasks(qname, pageNum, pageSize)
case "pending": case "pending":
listPendingTasks(qname, pageNum, pageSize) return listPendingTasks(qname, pageNum, pageSize)
case "scheduled": case "scheduled":
listScheduledTasks(qname, pageNum, pageSize) return listScheduledTasks(qname, pageNum, pageSize)
case "retry": case "retry":
listRetryTasks(qname, pageNum, pageSize) return listRetryTasks(qname, pageNum, pageSize)
case "archived": case "archived":
listArchivedTasks(qname, pageNum, pageSize) return listArchivedTasks(qname, pageNum, pageSize)
case "completed": case "completed":
listCompletedTasks(qname, pageNum, pageSize) return listCompletedTasks(qname, pageNum, pageSize)
case "aggregating": case "aggregating":
group, err := cmd.Flags().GetString("group") group, err := cmd.Flags().GetString("group")
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
if group == "" { if group == "" {
fmt.Println("Flag --group is required for listing aggregating tasks") return fmt.Errorf("flag --group is required for listing aggregating tasks")
os.Exit(1)
} }
listAggregatingTasks(qname, group, pageNum, pageSize) return listAggregatingTasks(qname, group, pageNum, pageSize)
default: default:
fmt.Printf("error: state=%q is not supported\n", state) return fmt.Errorf("state=%q is not supported", state)
os.Exit(1)
} }
} }
func listActiveTasks(qname string, pageNum, pageSize int) { func listActiveTasks(qname string, pageNum, pageSize int) error {
i := createInspector() i := createInspector()
tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
if len(tasks) == 0 { if len(tasks) == 0 {
fmt.Printf("No active tasks in %q queue\n", qname) fmt.Printf("No active tasks in %q queue\n", qname)
return return nil
} }
printTable( printTable(
[]string{"ID", "Type", "Payload"}, []string{"ID", "Type", "Payload"},
@@ -280,18 +271,18 @@ func listActiveTasks(qname string, pageNum, pageSize int) {
} }
}, },
) )
return nil
} }
func listPendingTasks(qname string, pageNum, pageSize int) { func listPendingTasks(qname string, pageNum, pageSize int) error {
i := createInspector() i := createInspector()
tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
if len(tasks) == 0 { if len(tasks) == 0 {
fmt.Printf("No pending tasks in %q queue\n", qname) fmt.Printf("No pending tasks in %q queue\n", qname)
return return nil
} }
printTable( printTable(
[]string{"ID", "Type", "Payload"}, []string{"ID", "Type", "Payload"},
@@ -301,18 +292,18 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
} }
}, },
) )
return nil
} }
func listScheduledTasks(qname string, pageNum, pageSize int) { func listScheduledTasks(qname string, pageNum, pageSize int) error {
i := createInspector() i := createInspector()
tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
if len(tasks) == 0 { if len(tasks) == 0 {
fmt.Printf("No scheduled tasks in %q queue\n", qname) fmt.Printf("No scheduled tasks in %q queue\n", qname)
return return nil
} }
printTable( printTable(
[]string{"ID", "Type", "Payload", "Process In"}, []string{"ID", "Type", "Payload", "Process In"},
@@ -322,6 +313,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
} }
}, },
) )
return nil
} }
// formatProcessAt formats next process at time to human friendly string. // formatProcessAt formats next process at time to human friendly string.
@@ -335,16 +327,15 @@ func formatProcessAt(processAt time.Time) string {
return fmt.Sprintf("in %v", d.Round(time.Second)) return fmt.Sprintf("in %v", d.Round(time.Second))
} }
func listRetryTasks(qname string, pageNum, pageSize int) { func listRetryTasks(qname string, pageNum, pageSize int) error {
i := createInspector() i := createInspector()
tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
if len(tasks) == 0 { if len(tasks) == 0 {
fmt.Printf("No retry tasks in %q queue\n", qname) fmt.Printf("No retry tasks in %q queue\n", qname)
return return nil
} }
printTable( printTable(
[]string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"}, []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"},
@@ -355,18 +346,18 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
} }
}, },
) )
return nil
} }
func listArchivedTasks(qname string, pageNum, pageSize int) { func listArchivedTasks(qname string, pageNum, pageSize int) error {
i := createInspector() i := createInspector()
tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
if len(tasks) == 0 { if len(tasks) == 0 {
fmt.Printf("No archived tasks in %q queue\n", qname) fmt.Printf("No archived tasks in %q queue\n", qname)
return return nil
} }
printTable( printTable(
[]string{"ID", "Type", "Payload", "Last Failed", "Last Error"}, []string{"ID", "Type", "Payload", "Last Failed", "Last Error"},
@@ -375,18 +366,18 @@ func listArchivedTasks(qname string, pageNum, pageSize int) {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.LastFailedAt), t.LastErr) fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.LastFailedAt), t.LastErr)
} }
}) })
return nil
} }
func listCompletedTasks(qname string, pageNum, pageSize int) { func listCompletedTasks(qname string, pageNum, pageSize int) error {
i := createInspector() i := createInspector()
tasks, err := i.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
if len(tasks) == 0 { if len(tasks) == 0 {
fmt.Printf("No completed tasks in %q queue\n", qname) fmt.Printf("No completed tasks in %q queue\n", qname)
return return nil
} }
printTable( printTable(
[]string{"ID", "Type", "Payload", "CompletedAt", "Result"}, []string{"ID", "Type", "Payload", "CompletedAt", "Result"},
@@ -395,18 +386,18 @@ func listCompletedTasks(qname string, pageNum, pageSize int) {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.CompletedAt), sprintBytes(t.Result)) fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.CompletedAt), sprintBytes(t.Result))
} }
}) })
return nil
} }
func listAggregatingTasks(qname, group string, pageNum, pageSize int) { func listAggregatingTasks(qname, group string, pageNum, pageSize int) error {
i := createInspector() i := createInspector()
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) return err
os.Exit(1)
} }
if len(tasks) == 0 { if len(tasks) == 0 {
fmt.Printf("No aggregating tasks in group %q \n", group) fmt.Printf("No aggregating tasks in group %q \n", group)
return return nil
} }
printTable( printTable(
[]string{"ID", "Type", "Payload", "Group"}, []string{"ID", "Type", "Payload", "Group"},
@@ -416,38 +407,42 @@ func listAggregatingTasks(qname, group string, pageNum, pageSize int) {
} }
}, },
) )
return nil
} }
func taskCancel(cmd *cobra.Command, args []string) { func taskCancel(cmd *cobra.Command, args []string) error {
i := createInspector() i := createInspector()
var firstErr error
for _, id := range args { for _, id := range args {
if err := i.CancelProcessing(id); err != nil { if err := i.CancelProcessing(id); err != nil {
fmt.Printf("error: could not send cancelation signal: %v\n", err) fmt.Printf("error: could not send cancelation signal: %v\n", err)
if firstErr == nil {
firstErr = err
}
continue continue
} }
fmt.Printf("Sent cancelation signal for task %s\n", id) fmt.Printf("Sent cancelation signal for task %s\n", id)
} }
return firstErr
} }
func taskInspect(cmd *cobra.Command, args []string) { func taskInspect(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
id, err := cmd.Flags().GetString("id") id, err := cmd.Flags().GetString("id")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
i := createInspector() i := createInspector()
info, err := i.GetTaskInfo(qname, id) info, err := i.GetTaskInfo(qname, id)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return fmt.Errorf("could not get task info: %v", err)
os.Exit(1)
} }
printTaskInfo(info) printTaskInfo(info)
return nil
} }
func printTaskInfo(info *asynq.TaskInfo) { func printTaskInfo(info *asynq.TaskInfo) {
@@ -486,80 +481,72 @@ func formatPastTime(t time.Time) string {
return t.Format(time.UnixDate) return t.Format(time.UnixDate)
} }
func taskArchive(cmd *cobra.Command, args []string) { func taskArchive(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
id, err := cmd.Flags().GetString("id") id, err := cmd.Flags().GetString("id")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
i := createInspector() i := createInspector()
err = i.ArchiveTask(qname, id) err = i.ArchiveTask(qname, id)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return fmt.Errorf("could not archive task: %v", err)
os.Exit(1)
} }
fmt.Println("task archived") fmt.Println("task archived")
return nil
} }
func taskDelete(cmd *cobra.Command, args []string) { func taskDelete(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
id, err := cmd.Flags().GetString("id") id, err := cmd.Flags().GetString("id")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
i := createInspector() i := createInspector()
err = i.DeleteTask(qname, id) err = i.DeleteTask(qname, id)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return fmt.Errorf("could not delete task: %v", err)
os.Exit(1)
} }
fmt.Println("task deleted") fmt.Println("task deleted")
return nil
} }
func taskRun(cmd *cobra.Command, args []string) { func taskRun(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
id, err := cmd.Flags().GetString("id") id, err := cmd.Flags().GetString("id")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
i := createInspector() i := createInspector()
err = i.RunTask(qname, id) err = i.RunTask(qname, id)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return fmt.Errorf("could not run task: %v", err)
os.Exit(1)
} }
fmt.Println("task is now pending") fmt.Println("task is now pending")
return nil
} }
func taskEnqueue(cmd *cobra.Command, args []string) { func taskEnqueue(cmd *cobra.Command, args []string) error {
typeName, err := cmd.Flags().GetString("type_name") typeName, err := cmd.Flags().GetString("type_name")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
payload, err := cmd.Flags().GetString("payload") payload, err := cmd.Flags().GetString("payload")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
// For all of the optional flags, we need to explicitly check whether they were set or // For all of the optional flags, we need to explicitly check whether they were set or
@@ -569,8 +556,7 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
if cmd.Flags().Changed("retry") { if cmd.Flags().Changed("retry") {
retry, err := cmd.Flags().GetInt("retry") retry, err := cmd.Flags().GetInt("retry")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
opts = append(opts, asynq.MaxRetry(retry)) opts = append(opts, asynq.MaxRetry(retry))
} }
@@ -578,8 +564,7 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
if cmd.Flags().Changed("queue") { if cmd.Flags().Changed("queue") {
queue, err := cmd.Flags().GetString("queue") queue, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
opts = append(opts, asynq.Queue(queue)) opts = append(opts, asynq.Queue(queue))
} }
@@ -587,41 +572,63 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
if cmd.Flags().Changed("id") { if cmd.Flags().Changed("id") {
id, err := cmd.Flags().GetString("id") id, err := cmd.Flags().GetString("id")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
opts = append(opts, asynq.TaskID(id)) opts = append(opts, asynq.TaskID(id))
} }
if cmd.Flags().Changed("timeout") { if cmd.Flags().Changed("timeout") {
opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout"))) d, err := getDuration(cmd, "timeout")
if err != nil {
return err
}
opts = append(opts, asynq.Timeout(d))
} }
if cmd.Flags().Changed("deadline") { if cmd.Flags().Changed("deadline") {
opts = append(opts, asynq.Deadline(getTime(cmd, "deadline"))) t, err := getTime(cmd, "deadline")
if err != nil {
return err
}
opts = append(opts, asynq.Deadline(t))
} }
if cmd.Flags().Changed("unique") { if cmd.Flags().Changed("unique") {
opts = append(opts, asynq.Unique(getDuration(cmd, "unique"))) d, err := getDuration(cmd, "unique")
if err != nil {
return err
}
opts = append(opts, asynq.Unique(d))
} }
if cmd.Flags().Changed("process_at") { if cmd.Flags().Changed("process_at") {
opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at"))) t, err := getTime(cmd, "process_at")
if err != nil {
return err
}
opts = append(opts, asynq.ProcessAt(t))
} }
if cmd.Flags().Changed("process_in") { if cmd.Flags().Changed("process_in") {
opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in"))) d, err := getDuration(cmd, "process_in")
if err != nil {
return err
}
opts = append(opts, asynq.ProcessIn(d))
} }
if cmd.Flags().Changed("retention") { if cmd.Flags().Changed("retention") {
opts = append(opts, asynq.Retention(getDuration(cmd, "retention"))) d, err := getDuration(cmd, "retention")
if err != nil {
return err
}
opts = append(opts, asynq.Retention(d))
} }
if cmd.Flags().Changed("group") { if cmd.Flags().Changed("group") {
group, err := cmd.Flags().GetString("group") group, err := cmd.Flags().GetString("group")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
opts = append(opts, asynq.Group(group)) opts = append(opts, asynq.Group(group))
} }
@@ -631,23 +638,21 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
taskInfo, err := c.Enqueue(task) taskInfo, err := c.Enqueue(task)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return fmt.Errorf("could not enqueue task: %v", err)
os.Exit(1)
} }
fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue) fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
return nil
} }
func taskArchiveAll(cmd *cobra.Command, args []string) { func taskArchiveAll(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
state, err := cmd.Flags().GetString("state") state, err := cmd.Flags().GetString("state")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
i := createInspector() i := createInspector()
@@ -662,35 +667,35 @@ func taskArchiveAll(cmd *cobra.Command, args []string) {
case "aggregating": case "aggregating":
group, err := cmd.Flags().GetString("group") group, err := cmd.Flags().GetString("group")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
if group == "" { if group == "" {
fmt.Println("error: Flag --group is required for aggregating tasks") return fmt.Errorf("flag --group is required for aggregating tasks")
os.Exit(1)
} }
n, err = i.ArchiveAllAggregatingTasks(qname, group) n, err = i.ArchiveAllAggregatingTasks(qname, group)
default:
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
fmt.Printf("%d tasks archived\n", n) fmt.Printf("%d tasks archived\n", n)
return nil
default:
return fmt.Errorf("unsupported state %q", state)
}
if err != nil {
return err
}
fmt.Printf("%d tasks archived\n", n)
return nil
} }
func taskDeleteAll(cmd *cobra.Command, args []string) { func taskDeleteAll(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
state, err := cmd.Flags().GetString("state") state, err := cmd.Flags().GetString("state")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
i := createInspector() i := createInspector()
@@ -709,35 +714,35 @@ func taskDeleteAll(cmd *cobra.Command, args []string) {
case "aggregating": case "aggregating":
group, err := cmd.Flags().GetString("group") group, err := cmd.Flags().GetString("group")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
if group == "" { if group == "" {
fmt.Println("error: Flag --group is required for aggregating tasks") return fmt.Errorf("flag --group is required for aggregating tasks")
os.Exit(1)
} }
n, err = i.DeleteAllAggregatingTasks(qname, group) n, err = i.DeleteAllAggregatingTasks(qname, group)
default:
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
fmt.Printf("%d tasks deleted\n", n) fmt.Printf("%d tasks deleted\n", n)
return nil
default:
return fmt.Errorf("unsupported state %q", state)
}
if err != nil {
return err
}
fmt.Printf("%d tasks deleted\n", n)
return nil
} }
func taskRunAll(cmd *cobra.Command, args []string) { func taskRunAll(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
state, err := cmd.Flags().GetString("state") state, err := cmd.Flags().GetString("state")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
i := createInspector() i := createInspector()
@@ -752,21 +757,23 @@ func taskRunAll(cmd *cobra.Command, args []string) {
case "aggregating": case "aggregating":
group, err := cmd.Flags().GetString("group") group, err := cmd.Flags().GetString("group")
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
if group == "" { if group == "" {
fmt.Println("error: Flag --group is required for aggregating tasks") return fmt.Errorf("flag --group is required for aggregating tasks")
os.Exit(1)
} }
n, err = i.RunAllAggregatingTasks(qname, group) n, err = i.RunAllAggregatingTasks(qname, group)
default:
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) return err
os.Exit(1)
} }
fmt.Printf("%d tasks are now pending\n", n) fmt.Printf("%d tasks are now pending\n", n)
return nil
default:
return fmt.Errorf("unsupported state %q", state)
}
if err != nil {
return err
}
fmt.Printf("%d tasks are now pending\n", n)
return nil
} }