2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-06-10 19:50:46 +08:00

Compare commits

..

44 Commits

Author SHA1 Message Date
dependabot[bot]
1fad76c014 build(deps): bump github.com/redis/go-redis/v9 from 9.14.1 to 9.20.0
Bumps [github.com/redis/go-redis/v9](https://github.com/redis/go-redis) from 9.14.1 to 9.20.0.
- [Release notes](https://github.com/redis/go-redis/releases)
- [Changelog](https://github.com/redis/go-redis/blob/master/RELEASE-NOTES.md)
- [Commits](https://github.com/redis/go-redis/compare/v9.14.1...v9.20.0)

---
updated-dependencies:
- dependency-name: github.com/redis/go-redis/v9
  dependency-version: 9.20.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-06-03 16:01:15 +00:00
Mohamed Sohail
785bb7208c Merge pull request #1094 from fanatics-live/batch-enqueue
Add BatchEnqueue for pipelined multi-task enqueue
2026-05-14 15:34:54 +03:00
Mohamed Sohail
3a7ec97482 Merge pull request #1096 from kayoch1n/feat/header-option
feat: Add an option to specify headers
2026-05-14 15:33:28 +03:00
Mohamed Sohail
d671a4c427 Merge pull request #1100 from ljluestc/fix/669-sentinel-db-parsing
fix: parse DB number from redis-sentinel URI path
2026-05-14 15:32:15 +03:00
Mohamed Sohail
f81d452160 Merge pull request #1101 from sevico/refactor
refactor(cli): migrate commands from Run to RunE and fix group error …
2026-05-14 15:31:49 +03:00
kayoch1n
1af926b147 chore: Add docs string to asynq.Header and remove asynq.Headers 2026-04-15 20:02:30 +08:00
Mohamed Sohail
23905a286f Merge pull request #1104 from Bahtya/fix/pubsub-connection-leak
fix: close pubsub connection on Subscribe error in CancelationPubSub
2026-04-13 13:58:23 +03:00
Erik Nilsen
a32ac05d09 test: add coverage for BatchEnqueue error paths
Add tests for all uncovered BatchEnqueueContext error paths: nil task,
empty/blank typename, invalid options, group rejection, unique rejection,
and broker-down (sleeping broker). Add pipeline error test for rdb.BatchEnqueue
via cancelled context, and TestTaskOptions/TestTaskOptionsNil for the
new Options() accessor on Task.
2026-04-10 10:14:01 -07:00
Erik Nilsen
68f03688e3 docs: add WARNING line for silent duplicate skipping in BatchEnqueue 2026-04-10 09:56:35 -07:00
Erik Nilsen
06a06970d6 Document atomicity guarantees and fix NOSCRIPT bug in BatchEnqueue
Add comprehensive doc comments to BatchEnqueueContext, the Broker
interface method, and the RDB implementation explaining that the batch
uses a Redis pipeline (not MULTI/EXEC), so partial success is possible
and individual Lua scripts are atomic but the batch is not.

Fix a bug where Script.Run inside a pipeline only sends EVALSHA without
the automatic EVAL fallback that non-pipeline calls get. On a fresh
Redis (or after SCRIPT FLUSH), this caused NOSCRIPT errors for every
pipeline-batched script invocation. The fix preloads the required Lua
scripts before building the pipeline.

Also roll back the in-memory queuesPublished cache when the pipeline
fails, preventing stale entries from suppressing future SADD calls.
2026-04-09 11:35:11 -07:00
Bahtya
5586efeae7 test: add test for CancelationPubSub error path
Add TestCancelationPubSubReceiveError to verify that when
Receive() fails in CancelationPubSub(), an error is returned
and the pubsub connection is not leaked.

This provides test coverage for the pubsub.Close() fix that
was missing in the previous commit.

Bahtya
2026-04-09 20:11:53 +08:00
bahtya
dd3c923f44 fix: close pubsub connection on Subscribe error in CancelationPubSub
When redis.Subscribe succeeds but Receive() fails, the pubsub
connection was not being closed before returning the error. This caused
the subscriber goroutine to leak a Redis connection on each retry
iteration, eventually exhausting the connection pool.

Fixes #1095
2026-04-09 18:15:48 +08:00
Mohamed Sohail
f81c78e68d Merge pull request #1092 from NilPuig/fix/memory-usage-nil-guard
Fix nil panic in memoryUsageCmd Lua script
2026-04-09 10:08:50 +03:00
shiweikang
f8d6677814 refactor(cli): migrate commands from Run to RunE and fix group error handling
Migrate all CLI command handlers from cobra's Run to RunE, replacing
fmt.Println(err) + os.Exit(1) patterns with idiomatic error returns.
This improves testability and lets cobra handle errors consistently
via the existing SilenceUsage/SilenceErrors configuration.

Also refactor getDuration() and getTime() helpers to return errors
instead of calling os.Exit(1).

Fix a bug in group.go where the error from inspector.Groups() was
never checked, causing silent failures (e.g. on Redis connection
errors) to be misreported as "No groups found".
2026-04-05 01:13:41 +08:00
Jiale Lin
07898eade0 fix: parse DB number from redis-sentinel URI path (#669)
Previously, parseRedisSentinelURI ignored the /dbnumber path segment,
making it impossible to connect to any DB other than 0 via sentinel URIs.

This adds DB extraction from the URI path, consistent with how
parseRedisURI already handles it for redis:// and rediss:// schemes.

Closes #669
2026-03-21 11:33:56 -07:00
kayoch1n
5216f1c3be feat: Add an option to create headers from a map 2026-03-17 14:47:34 +08:00
kayoch1n
dbfdfbac5a chore: Update test cases 2026-03-17 12:56:12 +08:00
kayoch1n
5c391f3ffb feat: Add an option to specify headers 2026-03-17 12:29:58 +08:00
Erik Nilsen
7ae0b3fe22 Add Options() accessor on Task for external option merging
Exposes the opts field so callers can read a task's options to merge
with additional per-task options at batch time.
2026-02-25 09:42:30 -08:00
Erik Nilsen
71ebcfa129 Fix BatchEnqueueContext time comparison and add scheduled task support
BatchEnqueueContext had a time comparison bug where `now` was captured
before the loop but `processAt` was set to time.Now() inside
composeOptions during each iteration, causing all immediate tasks to be
incorrectly classified as scheduled and rejected.

Fix: move `now` capture inside the loop, after composeOptions.

Additionally, extend BatchEnqueueContext to support scheduled tasks in
the same pipeline. Tasks with a future ProcessAt are now routed to
scheduleCmd (ZADD to scheduled set) instead of being rejected. Only
unique and group tasks remain unsupported.

Changes:
- Add BatchEnqueueItem type pairing TaskMessage with optional ProcessAt
- Update Broker interface, RDB, and testbroker to use BatchEnqueueItem
- Route immediate tasks to enqueueCmd, scheduled tasks to scheduleCmd
- Return correct TaskState (Pending vs Scheduled) in results
- Add tests for immediate, scheduled, and mixed batch scenarios
2026-02-25 09:42:30 -08:00
Erik Nilsen
4e62d7e29d Add tests for BatchEnqueue: multiple tasks, empty batch, duplicate IDs 2026-02-23 16:30:35 -08:00
Erik Nilsen
f919a605d5 Add BatchEnqueue for pipelined multi-task enqueue
Adds BatchEnqueue to the Broker interface and RDB implementation that
sends multiple enqueueCmd Lua script invocations in a single Redis
pipeline round-trip. Also adds BatchEnqueueContext to the Client as
the public API, returning per-task results for partial-success handling.

Ref: hibiken/asynq#1069
2026-02-23 16:25:49 -08:00
Nil
2fd155e31d Fix nil guard for MEMORY USAGE in memoryUsageCmd Lua script
MEMORY USAGE returns nil for keys that no longer exist (e.g., expired
or deleted task keys). In Lua, nil is converted to false (a boolean).
The script then attempts arithmetic on this boolean value, causing:

  ERR user_script:30: attempt to perform arithmetic on local 'bytes'
  (a boolean value)

This breaks the /api/queues endpoint in asynqmon, showing "Could not
retrieve queues live data" in the UI.

The fix adds nil guards around all three MEMORY USAGE calls on task
keys, and a divide-by-zero guard on agg_task_sample_size.

Tested in production with Redis 7.2 and asynq v0.25.1 worker.

Fixes #728
Related to #901
2026-02-07 15:13:58 +08:00
Mohamed Sohail
d704b68a42 Prepare release (docs): v0.26.0 (#1084)
* pre-release: v0.26.0

* deps upgrades
* min go version set to 1.24.0

* feat: done add-username-cli (#1083)

* Feature: Add Headers Support to Tasks (#1070)

* feat(task): Add headers support to tasks

* fix: cleanup copy map code

* fix: Add tests

* Add --tls option to dash command (#1073)

* Add --tls option to dash command

* Switch order so it works better when both --tls and --tls_server are set

* docs: update CHANGELOG

---------

Co-authored-by: Artemii Kulikov <91570054+vlle@users.noreply.github.com>
Co-authored-by: Joe <85931983+joejoe-am@users.noreply.github.com>
Co-authored-by: Thomas Hansen <th4019@gmail.com>
2026-02-03 09:36:26 +03:00
Mohammed Sohail
a8db5b5571 docs: update CHANGELOG 2026-02-03 09:05:40 +03:00
Thomas Hansen
e4248e2749 Add --tls option to dash command (#1073)
* Add --tls option to dash command

* Switch order so it works better when both --tls and --tls_server are set
2026-02-03 09:05:40 +03:00
Joe
c4876e7247 Feature: Add Headers Support to Tasks (#1070)
* feat(task): Add headers support to tasks

* fix: cleanup copy map code

* fix: Add tests
2026-02-03 09:05:40 +03:00
Artemii Kulikov
dd2c3de356 feat: done add-username-cli (#1083) 2026-02-03 09:05:40 +03:00
Thomas Hansen
ff887e1f89 Add --tls option to dash command (#1073)
* Add --tls option to dash command

* Switch order so it works better when both --tls and --tls_server are set
2025-11-10 11:08:53 +03:00
Joe
5de9b1faf0 Feature: Add Headers Support to Tasks (#1070)
* feat(task): Add headers support to tasks

* fix: cleanup copy map code

* fix: Add tests
2025-11-04 20:07:59 +03:00
Artemii Kulikov
8261a03f0d feat: done add-username-cli (#1083) 2025-11-04 20:02:14 +03:00
Mohammed Sohail
74c47eb8bb pre-release: v0.26.0
* deps upgrades
* min go version set to 1.24.0
2025-11-04 19:42:48 +03:00
Mohammed Sohail
e9037f003d ci: prepare github ci for go 1.24.x and 1.25.x, turn off noisy linter
* linter is producing uncessary noise
* benchstat is slow on PRs, we will turn it off untill it can be upgraded
2025-11-04 19:29:49 +03:00
Mohammed Sohail
604175e6ca ci: format code with golangci-lint 2025-11-04 19:06:18 +03:00
AmirReza Fahimi
1831a07efe fix: correct error message text in ResultWriter.Write (#1054)
Co-authored-by: amirreza.fahimidero <amirreza.fahimidero@snapp.cab>
Co-authored-by: Mohamed Sohail <sohailsameja@gmail.com>
2025-11-04 18:28:08 +03:00
Benjamin Grosse
d64f0b7ed0 wrap all fmt.Errorf errors (#1047)
Users need to be able to match with `errors.Is()` also on external
errors, for example `context.Canceled`.
2025-11-04 18:25:47 +03:00
aziz-the-dev
a889ef0b08 Implement UpdateTaskPayload method for inspector (#1042)
Co-authored-by: Aziz Aliyev <aziz.aliyev@idda.az>
2025-11-04 18:25:10 +03:00
Marin Atanasov Nikolov
093ba04266 servemux: NotFoundHandler returns ErrHandlerNotFound error (#1031)
This allows asynq middlewares to be used to inspect returned errors and decide
to silence errors about `handlers not found for task'.
2025-11-04 18:17:50 +03:00
Khash Sajadi
c327bc40a2 docs: Update server.go (#1010)
Typo in the docs
2025-04-01 09:06:12 +03:00
Broderick Westrope
ea0c6e93f0 chore: fix godoc comment (#1009) 2025-04-01 09:05:18 +03:00
Mohammed Sohail
489e21920b release: v0.25.1 2024-12-11 09:19:37 +03:00
Mohamed Sohail
043dcfbf56 fix: call Stop on all other signals to correctly set the server state for the shutdown procedure to complete successfully (#982)
* fixes: #979
2024-12-11 09:05:00 +03:00
Robin Joseph
02907551b4 feat(dash): Add --insecure option (#980) 2024-12-09 09:09:12 +03:00
Mohamed Sohail
127fac2e90 fix: NewScheduler incorrectly creates underlying Client, closing broker properly (#977)
* fix: NewScheduler wrongly creates a client whose sharedConnection value is always true

* This is affecting the PeriodicManager as well as the Scheduler

* fix: closing the Client also closes the broker

* The error was also previously unhandled. For shared connections an error will be returned by the broker itself because the sharedConnection bool is also set on the client. This also means we can get rid of the sharedConnection flag on the Scheduler itself and let it work internally.
2024-12-06 08:40:04 +03:00
42 changed files with 2194 additions and 618 deletions

View File

@@ -9,6 +9,7 @@ on: [pull_request]
jobs:
incoming:
runs-on: ubuntu-latest
if: false
services:
redis:
image: redis:7
@@ -31,6 +32,7 @@ jobs:
current:
runs-on: ubuntu-latest
if: false
services:
redis:
image: redis:7
@@ -55,6 +57,7 @@ jobs:
benchstat:
needs: [incoming, current]
if: false
runs-on: ubuntu-latest
steps:
- name: Checkout

View File

@@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.22.x, 1.23.x]
go-version: [1.24.x, 1.25.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -45,7 +45,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.22.x, 1.23.x]
go-version: [1.24.x, 1.25.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -70,6 +70,7 @@ jobs:
golangci:
name: lint
runs-on: ubuntu-latest
if: false
steps:
- uses: actions/checkout@v4

View File

@@ -7,6 +7,45 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.26.0] - 2026-02-03
### Upgrades
- Prepare CI for Go 1.24.x and 1.25.x (commit: e9037f0)
### Added
- Add Headers support to tasks (PR: https://github.com/hibiken/asynq/pull/1070)
- Add `--tls` option to dash command (PR: https://github.com/hibiken/asynq/pull/1073)
- Add `--username` CLI flag for Redis ACL authentication (PR: https://github.com/hibiken/asynq/pull/1083)
- Add `UpdateTaskPayload` method for inspector (PR: https://github.com/hibiken/asynq/pull/1042)
### Fixes
- Fix: Correct error message text in ResultWriter.Write (PR: https://github.com/hibiken/asynq/pull/1054)
- Fix: Wrap all fmt.Errorf errors with %w (PR: https://github.com/hibiken/asynq/pull/1047)
- Fix: ServeMux.NotFoundHandler returns ErrHandlerNotFound error (PR: https://github.com/hibiken/asynq/pull/1031)
### Changed
- Docs: Update server.go documentation (PR: https://github.com/hibiken/asynq/pull/1010)
- Chore: Fix godoc comment (PR: https://github.com/hibiken/asynq/pull/1009)
## [0.25.1] - 2024-12-11
### Upgrades
* Some packages
### Added
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
### Fixes
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
## [0.25.0] - 2024-10-29
### Upgrades

View File

@@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) {
}
tasks := make([]*Task, len(msgs))
for i, m := range msgs {
tasks[i] = NewTask(m.Type, m.Payload)
tasks[i] = NewTaskWithHeaders(m.Type, m.Payload, m.Headers)
}
aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline)

View File

@@ -8,14 +8,15 @@ import (
"context"
"crypto/tls"
"fmt"
"maps"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
)
// Task represents a unit of work to be performed.
@@ -26,6 +27,9 @@ type Task struct {
// payload holds data needed to perform the task.
payload []byte
// headers holds additional metadata for the task.
headers map[string]string
// opts holds options for the task.
opts []Option
@@ -33,8 +37,10 @@ type Task struct {
w *ResultWriter
}
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Headers() map[string]string { return t.headers }
func (t *Task) Options() []Option { return t.opts }
// ResultWriter returns a pointer to the ResultWriter associated with the task.
//
@@ -48,6 +54,21 @@ func NewTask(typename string, payload []byte, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: nil,
opts: opts,
}
}
// NewTaskWithHeaders returns a new Task given a type name, payload data, and headers.
// Options can be passed to configure task processing behavior.
// TODO: In the next major (breaking) release, fold this functionality into NewTask
//
// so that headers are supported directly. After that, remove this method.
func NewTaskWithHeaders(typename string, payload []byte, headers map[string]string, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: maps.Clone(headers),
opts: opts,
}
}
@@ -57,6 +78,7 @@ func newTask(typename string, payload []byte, w *ResultWriter) *Task {
return &Task{
typename: typename,
payload: payload,
headers: make(map[string]string),
w: w,
}
}
@@ -75,6 +97,9 @@ type TaskInfo struct {
// Payload is the payload data of the task.
Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// State indicates the task state.
State TaskState
@@ -145,6 +170,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
Queue: msg.Queue,
Type: msg.Type,
Payload: msg.Payload, // Do we need to make a copy?
Headers: msg.Headers,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastErr: msg.ErrorMsg,
@@ -442,14 +468,15 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
//
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are:
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
//
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][/dbnumber][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
return nil, fmt.Errorf("asynq: could not parse redis uri: %w", err)
}
switch u.Scheme {
case "redis", "rediss":
@@ -519,11 +546,20 @@ func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
addrs := strings.Split(u.Host, ",")
master := u.Query().Get("master")
var db int
var err error
if len(u.Path) > 0 {
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
db, err = strconv.Atoi(xs[0])
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis sentinel uri: database number should be the first segment of the path")
}
}
var password string
if v, ok := u.User.Password(); ok {
password = v
}
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password}, nil
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password, DB: db}, nil
}
// ResultWriter is a client interface to write result data for a task.
@@ -539,7 +575,7 @@ type ResultWriter struct {
func (w *ResultWriter) Write(data []byte) (n int, err error) {
select {
case <-w.ctx.Done():
return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err())
return 0, fmt.Errorf("failed to write task result: %w", w.ctx.Err())
default:
}
return w.broker.WriteResult(w.qname, w.id, data)

View File

@@ -10,12 +10,13 @@ import (
"sort"
"strings"
"testing"
"time"
"github.com/redis/go-redis/v9"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/log"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
)
//============================================================================
@@ -148,6 +149,23 @@ func TestParseRedisURI(t *testing.T) {
SentinelPassword: "mypassword",
},
},
{
"redis-sentinel://localhost:5000,localhost:5001,localhost:5002/3?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
DB: 3,
},
},
{
"redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002/7?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
SentinelPassword: "mypassword",
DB: 7,
},
},
}
for _, tc := range tests {
@@ -188,6 +206,10 @@ func TestParseRedisURIErrors(t *testing.T) {
"non integer for db numbers for socket",
"redis-socket:///some/path/to/redis?db=one",
},
{
"non integer for db number for sentinel",
"redis-sentinel://localhost:5000/abc?master=mymaster",
},
}
for _, tc := range tests {
@@ -198,3 +220,30 @@ func TestParseRedisURIErrors(t *testing.T) {
}
}
}
func TestTaskOptions(t *testing.T) {
opts := []Option{
MaxRetry(3),
Queue("critical"),
Timeout(5 * time.Minute),
}
task := NewTask("mytask", []byte("payload"), opts...)
got := task.Options()
if len(got) != len(opts) {
t.Fatalf("task.Options() returned %d options, want %d", len(got), len(opts))
}
for i, o := range opts {
if got[i].String() != o.String() {
t.Errorf("task.Options()[%d] = %v, want %v", i, got[i], o)
}
}
}
func TestTaskOptionsNil(t *testing.T) {
task := NewTask("mytask", []byte("payload"))
got := task.Options()
if got != nil {
t.Errorf("task.Options() = %v, want nil for task with no options", got)
}
}

173
client.go
View File

@@ -6,7 +6,9 @@ package asynq
import (
"context"
"encoding/json"
"fmt"
"maps"
"strings"
"time"
@@ -60,6 +62,7 @@ const (
TaskIDOpt
RetentionOpt
GroupOpt
HeaderOpt
)
// Option specifies the task processing behavior.
@@ -86,6 +89,7 @@ type (
processInOption time.Duration
retentionOption time.Duration
groupOption string
headerOption [2]string
)
// MaxRetry returns an option to specify the max number of times
@@ -217,6 +221,27 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st
func (name groupOption) Type() OptionType { return GroupOpt }
func (name groupOption) Value() interface{} { return string(name) }
// Header returns an option to associate the key-value header to the task.
//
// This option is composable with other Client options and can be used together
// with other options like MaxRetry, Queue, etc. For use cases where headers
// need to be combined with other options, using Header option is recommended.
//
// Alternatively, NewTaskWithHeaders can be used to create a task with headers
// directly, which may be preferable when headers are an intrinsic part of the
// task definition rather than enqueue-time configuration.
func Header(key, value string) Option {
return headerOption{key, value}
}
func (h headerOption) String() string {
var bytes []byte
bytes, _ = json.Marshal(h)
return fmt.Sprintf("Header(%s)", bytes)
}
func (h headerOption) Type() OptionType { return HeaderOpt }
func (h headerOption) Value() interface{} { return [2]string{h[0], h[1]} }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
@@ -237,6 +262,7 @@ type option struct {
processAt time.Time
retention time.Duration
group string
headers map[string]string
}
// composeOptions merges user provided options into the default options
@@ -251,6 +277,7 @@ func composeOptions(opts ...Option) (option, error) {
timeout: 0, // do not set to defaultTimeout here
deadline: time.Time{},
processAt: time.Now(),
headers: make(map[string]string),
}
for _, opt := range opts {
switch opt := opt.(type) {
@@ -290,6 +317,9 @@ func composeOptions(opts ...Option) (option, error) {
return option{}, errors.New("group key cannot be empty")
}
res.group = key
case headerOption:
key, value := opt[0], opt[1]
res.headers[key] = value
default:
// ignore unexpected option
}
@@ -385,6 +415,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: maps.Clone(task.Headers()),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
@@ -393,6 +424,12 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
GroupKey: opt.group,
Retention: int64(opt.retention.Seconds()),
}
if len(opt.headers) > 0 {
if msg.Headers == nil {
msg.Headers = make(map[string]string)
}
maps.Copy(msg.Headers, opt.headers)
}
now := time.Now()
var state base.TaskState
if opt.processAt.After(now) {
@@ -419,6 +456,142 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
return newTaskInfo(msg, state, opt.processAt, nil), nil
}
// BatchEnqueueResult holds the result of enqueuing a single task within a batch.
type BatchEnqueueResult struct {
TaskInfo *TaskInfo
Err error
}
// BatchEnqueueContext enqueues multiple tasks in a single Redis pipeline round-trip,
// returning a per-task result slice aligned with the input tasks slice.
//
// # Atomicity Guarantees
//
// There is no all-or-nothing guarantee across the batch. Each task is executed as
// an independent Lua script inside a Redis pipeline. Individual scripts are atomic
// (the existence check, hash write, and list/sorted-set push for one task cannot
// be partially applied), but the pipeline as a whole is not wrapped in a
// MULTI/EXEC transaction. This means:
//
// - Partial success is possible: some tasks may be enqueued while others are not.
// - A task whose ID already exists in Redis is silently skipped (treated as a
// no-op by the Lua script), and its result will still show success.
// - If the Redis pipeline call itself fails (e.g. connection lost, context
// cancelled), every task that passed client-side validation receives that
// error — none of them can be assumed to have been enqueued.
//
// # Validation Errors (pre-pipeline)
//
// The following are caught before any Redis call and rejected in the
// corresponding BatchEnqueueResult.Err without affecting other tasks:
//
// - nil task
// - empty task type name
// - invalid options
// - group tasks (not supported in batch mode)
// - unique tasks (not supported in batch mode)
//
// # Supported Task Types
//
// Immediate and scheduled (via [ProcessAt] or [ProcessIn]) tasks are supported.
// Group and unique tasks are rejected as described above.
func (c *Client) BatchEnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) []BatchEnqueueResult {
results := make([]BatchEnqueueResult, len(tasks))
if len(tasks) == 0 {
return results
}
type itemMeta struct {
state base.TaskState
processAt time.Time
}
items := make([]base.BatchEnqueueItem, 0, len(tasks))
itemIndexes := make([]int, 0, len(tasks))
itemMetas := make([]itemMeta, 0, len(tasks))
for i, task := range tasks {
if task == nil {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task cannot be nil")}
continue
}
if strings.TrimSpace(task.Type()) == "" {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task typename cannot be empty")}
continue
}
merged := append(task.opts, opts...)
opt, err := composeOptions(merged...)
if err != nil {
results[i] = BatchEnqueueResult{Err: err}
continue
}
if opt.group != "" {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support group tasks")}
continue
}
if opt.uniqueTTL > 0 {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support unique tasks")}
continue
}
deadline := noDeadline
if !opt.deadline.IsZero() {
deadline = opt.deadline
}
timeout := noTimeout
if opt.timeout != 0 {
timeout = opt.timeout
}
if deadline.Equal(noDeadline) && timeout == noTimeout {
timeout = defaultTimeout
}
msg := &base.TaskMessage{
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: task.Headers(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()),
Retention: int64(opt.retention.Seconds()),
}
now := time.Now()
scheduled := opt.processAt.After(now)
item := base.BatchEnqueueItem{Msg: msg}
var meta itemMeta
if scheduled {
item.ProcessAt = opt.processAt
meta = itemMeta{state: base.TaskStateScheduled, processAt: opt.processAt}
} else {
meta = itemMeta{state: base.TaskStatePending, processAt: now}
}
items = append(items, item)
itemIndexes = append(itemIndexes, i)
itemMetas = append(itemMetas, meta)
}
if len(items) == 0 {
return results
}
_, err := c.broker.BatchEnqueue(ctx, items)
if err != nil {
for _, idx := range itemIndexes {
results[idx] = BatchEnqueueResult{Err: err}
}
return results
}
for j, idx := range itemIndexes {
info := newTaskInfo(items[j].Msg, itemMetas[j].state, itemMetas[j].processAt, nil)
results[idx] = BatchEnqueueResult{TaskInfo: info}
}
return results
}
// Ping performs a ping against the redis connection.
func (c *Client) Ping() error {
return c.broker.Ping()

View File

@@ -13,6 +13,8 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
)
@@ -1191,3 +1193,743 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
}
}
}
func TestClientEnqueueWithHeaders(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"user-id": "123",
"request-id": "abc-def-ghi",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantPending map[string][]*base.TaskMessage
}{
{
desc: "Task with headers",
task: NewTaskWithHeaders("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}), headers),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with empty headers",
task: NewTaskWithHeaders("process_data", []byte("data"), map[string]string{}),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "process_data",
Payload: []byte("data"),
Headers: map[string]string{},
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "process_data",
Payload: []byte("data"),
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with nil headers",
task: NewTaskWithHeaders("cleanup", nil, nil),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "cleanup",
Payload: nil,
Headers: nil,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "cleanup",
Payload: nil,
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with headers and custom options",
task: NewTaskWithHeaders("notify", []byte("notification"), map[string]string{"channel": "email"}),
opts: []Option{MaxRetry(5), Queue("notifications")},
wantInfo: &TaskInfo{
Queue: "notifications",
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"notifications": {
{
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
Retry: 5,
Queue: "notifications",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with header option",
task: NewTask("store_data", []byte("data"), Header("channel", "email"), Header("user-id", "bob1234")),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
State: TaskStatePending,
MaxRetry: 25,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
Retry: 25,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Enqueue task with header option",
task: NewTask("store_data", []byte("data")),
opts: []Option{Header("channel", "email"), Header("user-id", "bob1234"), MaxRetry(5)},
wantInfo: &TaskInfo{
Queue: "default",
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
Retry: 5,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantPending {
got := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
}
}
func TestClientEnqueueWithHeadersScheduled(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
oneHourLater := now.Add(time.Hour)
headers := map[string]string{
"correlation-id": "xyz-123",
"source": "api",
}
tests := []struct {
desc string
task *Task
processAt time.Time
opts []Option
wantInfo *TaskInfo
wantScheduled map[string][]base.Z
}{
{
desc: "Schedule task with headers",
task: NewTaskWithHeaders("scheduled_task", []byte("payload"), headers),
processAt: oneHourLater,
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
State: TaskStateScheduled,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: oneHourLater,
},
wantScheduled: map[string][]base.Z{
"default": {
{
Message: &base.TaskMessage{
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
Score: oneHourLater.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
opts := append(tc.opts, ProcessAt(tc.processAt))
gotInfo, err := client.Enqueue(tc.task, opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
}
}
}
}
func TestNewTaskWithHeaders(t *testing.T) {
tests := []struct {
desc string
typename string
payload []byte
headers map[string]string
opts []Option
want *Task
}{
{
desc: "Task with headers",
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
want: &Task{
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
},
},
{
desc: "Task with empty headers",
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
want: &Task{
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
},
},
{
desc: "Task with nil headers",
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
want: &Task{
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
},
},
}
for _, tc := range tests {
got := NewTaskWithHeaders(tc.typename, tc.payload, tc.headers, tc.opts...)
if got.Type() != tc.want.typename {
t.Errorf("%s: Type() = %q, want %q", tc.desc, got.Type(), tc.want.typename)
}
if diff := cmp.Diff(tc.want.payload, got.Payload()); diff != "" {
t.Errorf("%s: Payload() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if diff := cmp.Diff(tc.want.headers, got.Headers()); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if tc.headers != nil && got.Headers() != nil {
tc.headers["modified"] = "test"
if _, exists := got.Headers()["modified"]; exists {
t.Errorf("%s: Headers should be cloned, but modification affected task headers", tc.desc)
}
}
}
}
func TestTaskHeadersMethod(t *testing.T) {
tests := []struct {
desc string
task *Task
want map[string]string
wantNil bool
}{
{
desc: "Task created with NewTask has nil headers",
task: NewTask("test", []byte("data")),
want: nil,
wantNil: true,
},
{
desc: "Task created with NewTaskWithHeaders has headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{"key": "value"}),
want: map[string]string{"key": "value"},
},
{
desc: "Task created with empty headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{}),
want: map[string]string{},
},
{
desc: "Task created with nil headers",
task: NewTaskWithHeaders("test", []byte("data"), nil),
want: nil,
wantNil: true,
},
}
for _, tc := range tests {
got := tc.task.Headers()
if tc.wantNil {
if got != nil {
t.Errorf("%s: Headers() = %v, want nil", tc.desc, got)
}
} else {
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
}
}
}
func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"batch-id": "batch-123",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantGroups map[string]map[string][]base.Z
}{
{
desc: "Task with headers and group",
task: NewTaskWithHeaders("batch_process", []byte("item1"), headers),
opts: []Option{Group("batch-123")},
wantInfo: &TaskInfo{
Queue: "default",
Group: "batch-123",
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
State: TaskStateAggregating,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: time.Time{},
},
wantGroups: map[string]map[string][]base.Z{
"default": {
"batch-123": {
{
Message: &base.TaskMessage{
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
GroupKey: "batch-123",
},
Score: now.Unix(),
},
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, groups := range tc.wantGroups {
for groupKey, want := range groups {
got := h.GetGroupEntries(t, r, qname, groupKey)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.GroupKey(qname, groupKey), diff)
}
}
}
}
}
func TestBatchEnqueueContext_ImmediateTasks(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
tasks := []*Task{
NewTask("task1", []byte("payload1")),
NewTask("task2", []byte("payload2")),
NewTask("task3", []byte("payload3")),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 3 {
t.Fatalf("BatchEnqueueContext returned %d results, want 3", len(results))
}
for i, res := range results {
if res.Err != nil {
t.Errorf("results[%d].Err = %v, want nil", i, res.Err)
}
if res.TaskInfo == nil {
t.Errorf("results[%d].TaskInfo is nil, want non-nil", i)
continue
}
if res.TaskInfo.Queue != "default" {
t.Errorf("results[%d].TaskInfo.Queue = %q, want %q", i, res.TaskInfo.Queue, "default")
}
if res.TaskInfo.State != TaskStatePending {
t.Errorf("results[%d].TaskInfo.State = %v, want %v", i, res.TaskInfo.State, TaskStatePending)
}
}
gotPending := h.GetPendingMessages(t, r, "default")
if len(gotPending) != 3 {
t.Errorf("len(pending) = %d, want 3", len(gotPending))
}
}
func TestBatchEnqueueContext_ScheduledTask(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
future := time.Now().Add(1 * time.Hour)
tasks := []*Task{
NewTask("scheduled_task", []byte("payload"), ProcessAt(future)),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 1 {
t.Fatalf("BatchEnqueueContext returned %d results, want 1", len(results))
}
if results[0].Err != nil {
t.Fatalf("results[0].Err = %v, want nil", results[0].Err)
}
if results[0].TaskInfo == nil {
t.Fatal("results[0].TaskInfo is nil, want non-nil")
}
if results[0].TaskInfo.State != TaskStateScheduled {
t.Errorf("results[0].TaskInfo.State = %v, want %v", results[0].TaskInfo.State, TaskStateScheduled)
}
gotScheduled := h.GetScheduledMessages(t, r, "default")
if len(gotScheduled) != 1 {
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
}
}
func TestBatchEnqueueContext_MixedBatch(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
future := time.Now().Add(1 * time.Hour)
tasks := []*Task{
NewTask("immediate1", []byte("p1")),
NewTask("scheduled1", []byte("p2"), ProcessAt(future)),
NewTask("immediate2", []byte("p3")),
NewTask("grouped1", []byte("p4"), Group("mygroup")),
NewTask("immediate3", []byte("p5")),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 5 {
t.Fatalf("BatchEnqueueContext returned %d results, want 5", len(results))
}
// Immediate tasks (indices 0, 2, 4) should succeed with Pending state.
for _, idx := range []int{0, 2, 4} {
if results[idx].Err != nil {
t.Errorf("results[%d].Err = %v, want nil (immediate task)", idx, results[idx].Err)
}
if results[idx].TaskInfo == nil {
t.Errorf("results[%d].TaskInfo is nil, want non-nil", idx)
continue
}
if results[idx].TaskInfo.State != TaskStatePending {
t.Errorf("results[%d].TaskInfo.State = %v, want %v", idx, results[idx].TaskInfo.State, TaskStatePending)
}
}
// Scheduled task (index 1) should succeed with Scheduled state.
if results[1].Err != nil {
t.Errorf("results[1].Err = %v, want nil (scheduled task)", results[1].Err)
}
if results[1].TaskInfo != nil && results[1].TaskInfo.State != TaskStateScheduled {
t.Errorf("results[1].TaskInfo.State = %v, want %v", results[1].TaskInfo.State, TaskStateScheduled)
}
// Grouped task (index 3) should be rejected.
if results[3].Err == nil {
t.Error("results[3].Err is nil, want error for group task")
}
gotPending := h.GetPendingMessages(t, r, "default")
if len(gotPending) != 3 {
t.Errorf("len(pending) = %d, want 3", len(gotPending))
}
gotScheduled := h.GetScheduledMessages(t, r, "default")
if len(gotScheduled) != 1 {
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
}
}
func TestBatchEnqueueContext_ValidationErrors(t *testing.T) {
setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
tests := []struct {
desc string
tasks []*Task
opts []Option
}{
{
desc: "nil task",
tasks: []*Task{nil},
},
{
desc: "empty task typename",
tasks: []*Task{NewTask("", []byte("payload"))},
},
{
desc: "blank task typename",
tasks: []*Task{NewTask(" ", []byte("payload"))},
},
{
desc: "invalid option: unique TTL less than 1s",
tasks: []*Task{NewTask("foo", nil)},
opts: []Option{Unique(300 * time.Millisecond)},
},
{
desc: "group task rejected",
tasks: []*Task{NewTask("foo", nil, Group("mygroup"))},
},
{
desc: "unique task rejected",
tasks: []*Task{NewTask("foo", nil, Unique(time.Hour))},
},
}
for _, tc := range tests {
results := client.BatchEnqueueContext(context.Background(), tc.tasks, tc.opts...)
if len(results) != len(tc.tasks) {
t.Errorf("%s: got %d results, want %d", tc.desc, len(results), len(tc.tasks))
continue
}
for i, res := range results {
if res.Err == nil {
t.Errorf("%s: results[%d].Err = nil, want non-nil error", tc.desc, i)
}
if res.TaskInfo != nil {
t.Errorf("%s: results[%d].TaskInfo = %v, want nil", tc.desc, i, res.TaskInfo)
}
}
}
}
func TestBatchEnqueueContext_BrokerError(t *testing.T) {
r := rdb.NewRDB(setup(t))
defer r.Close()
testBroker := testbroker.NewTestBroker(r)
client := &Client{broker: testBroker, sharedConnection: true}
tasks := []*Task{
NewTask("task1", []byte("p1")),
NewTask("task2", []byte("p2")),
}
testBroker.Sleep()
results := client.BatchEnqueueContext(context.Background(), tasks)
testBroker.Wakeup()
if len(results) != 2 {
t.Fatalf("BatchEnqueueContext returned %d results, want 2", len(results))
}
for i, res := range results {
if res.Err == nil {
t.Errorf("results[%d].Err = nil, want non-nil error when broker is down", i)
}
if res.TaskInfo != nil {
t.Errorf("results[%d].TaskInfo = %v, want nil on broker error", i, res.TaskInfo)
}
}
}

81
doc.go
View File

@@ -8,41 +8,41 @@ Package asynq provides a framework for Redis based distrubted task queue.
Asynq uses Redis as a message broker. To connect to redis,
specify the connection using one of RedisConnOpt types.
redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "xxxxx",
DB: 2,
}
redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "xxxxx",
DB: 2,
}
The Client is used to enqueue a task.
client := asynq.NewClient(redisConnOpt)
client := asynq.NewClient(redisConnOpt)
// Task is created with two parameters: its type and payload.
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
// Task is created with two parameters: its type and payload.
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
task := asynq.NewTask("example", b)
task := asynq.NewTask("example", b)
// Enqueue the task to be processed immediately.
info, err := client.Enqueue(task)
// Enqueue the task to be processed immediately.
info, err := client.Enqueue(task)
// Schedule the task to be processed after one minute.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
// Schedule the task to be processed after one minute.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
The Server is used to run the task processing workers with a given
handler.
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 10,
})
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 10,
})
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
Handler is an interface type with a method which
takes a task and returns an error. Handler should return nil if
@@ -50,23 +50,24 @@ the processing is successful, otherwise return a non-nil error.
If handler panics or returns a non-nil error, the task will be retried in the future.
Example of a type that implements the Handler interface.
type TaskHandler struct {
// ...
}
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
switch task.Type {
case "example":
var data ExamplePayload
if err := json.Unmarshal(task.Payload(), &data); err != nil {
return err
}
// perform task with the data
type TaskHandler struct {
// ...
}
default:
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
switch task.Type {
case "example":
var data ExamplePayload
if err := json.Unmarshal(task.Payload(), &data); err != nil {
return err
}
// perform task with the data
default:
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}
*/
package asynq

View File

@@ -123,7 +123,7 @@ func ExampleResultWriter() {
res := []byte("task result data")
n, err := task.ResultWriter().Write(res) // implements io.Writer
if err != nil {
return fmt.Errorf("failed to write task result: %v", err)
return fmt.Errorf("failed to write task result: %w", err)
}
log.Printf(" %d bytes written", n)
return nil

18
go.mod
View File

@@ -1,20 +1,20 @@
module github.com/hibiken/asynq
go 1.22
go 1.24.0
require (
github.com/google/go-cmp v0.6.0
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.7.0
github.com/redis/go-redis/v9 v9.20.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.7.0
github.com/spf13/cast v1.10.0
go.uber.org/goleak v1.3.0
golang.org/x/sys v0.27.0
golang.org/x/time v0.8.0
google.golang.org/protobuf v1.35.2
golang.org/x/sys v0.37.0
golang.org/x/time v0.14.0
google.golang.org/protobuf v1.36.10
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
)

36
go.sum
View File

@@ -2,41 +2,45 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/redis/go-redis/v9 v9.20.0 h1:WnQYxLkgO2xiXTCJY0ldIiI8dNqCDlQAG+AtaH7a2a0=
github.com/redis/go-redis/v9 v9.20.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -5,6 +5,7 @@
package asynq
import (
"encoding/json"
"fmt"
"strconv"
"strings"
@@ -245,7 +246,7 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
case errors.IsTaskNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
}
@@ -316,7 +317,7 @@ func Page(n int) ListOption {
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -325,7 +326,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -344,7 +345,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -353,11 +354,11 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
expired, err := i.rdb.ListLeaseExpired(time.Now(), queue)
if err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
expiredSet := make(map[string]struct{}) // set of expired message IDs
for _, msg := range expired {
@@ -384,7 +385,7 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -393,7 +394,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -413,7 +414,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -422,7 +423,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -442,7 +443,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -451,7 +452,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -471,7 +472,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -480,7 +481,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -500,7 +501,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -509,7 +510,7 @@ func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -583,6 +584,30 @@ func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error)
return int(n), err
}
// UpdateTaskPayload updates a task with the given id from the given queue with given payload.
// The task needs to be in scheduled state,
// otherwise UpdateTaskPayload will return an error.
//
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is not in scheduled state, it returns a non-nil error.
func (i *Inspector) UpdateTaskPayload(queue, id string, payload []byte) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
}
err := i.rdb.UpdateTaskPayload(queue, id, payload)
switch {
case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
}
return nil
}
// DeleteTask deletes a task with the given id from the given queue.
// The task needs to be in pending, scheduled, retry, or archived state,
// otherwise DeleteTask will return an error.
@@ -592,7 +617,7 @@ func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error)
// If the task is in active state, it returns a non-nil error.
func (i *Inspector) DeleteTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
err := i.rdb.DeleteTask(queue, id)
switch {
@@ -601,7 +626,7 @@ func (i *Inspector) DeleteTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
@@ -656,7 +681,7 @@ func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) {
// If the task is in pending or active state, it returns a non-nil error.
func (i *Inspector) RunTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
err := i.rdb.RunTask(queue, id)
switch {
@@ -665,7 +690,7 @@ func (i *Inspector) RunTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
}
@@ -728,7 +753,7 @@ func (i *Inspector) ArchiveTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
}
@@ -978,6 +1003,13 @@ func parseOption(s string) (Option, error) {
return nil, err
}
return Retention(d), nil
case "Header":
var h [2]string
err := json.Unmarshal([]byte(arg), &h)
if err != nil {
return nil, err
}
return Header(h[0], h[1]), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}

View File

@@ -2369,6 +2369,148 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
}
}
func TestInspectorUpdateTaskPayloadUpdatesScheduledTaskPayload(t *testing.T) {
r := setup(t)
defer r.Close()
m1_old := h.NewTaskMessage("task1", []byte("m1_old"))
m1_new := h.NewTaskMessage("task1", nil)
m1_new.ID = m1_old.ID
m2_old := h.NewTaskMessage("task2", nil)
m2_new := h.NewTaskMessage("task2", []byte("m2_new"))
m2_new.ID = m2_old.ID
m3_old := h.NewTaskMessageWithQueue("task3", []byte("m3_old"), "custom")
m3_new := h.NewTaskMessageWithQueue("task3", []byte("m3_new"), "custom")
m3_new.ID = m3_old.ID
now := time.Now()
z1_old := base.Z{Message: m1_old, Score: now.Add(5 * time.Minute).Unix()}
z1_new := base.Z{Message: m1_new, Score: now.Add(5 * time.Minute).Unix()}
z2_old := base.Z{Message: m2_old, Score: now.Add(15 * time.Minute).Unix()}
z2_new := base.Z{Message: m2_new, Score: now.Add(15 * time.Minute).Unix()}
z3_old := base.Z{Message: m3_old, Score: now.Add(2 * time.Minute).Unix()}
z3_new := base.Z{Message: m3_new, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
qname string
id string
newPayload []byte
wantScheduled map[string][]base.Z
}{
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "default",
id: createScheduledTask(z2_old).ID,
newPayload: m2_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_old, z2_new},
"custom": {z3_old},
},
},
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "default",
id: createScheduledTask(z1_old).ID,
newPayload: m1_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_new, z2_old},
"custom": {z3_old},
},
},
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "custom",
id: createScheduledTask(z3_old).ID,
newPayload: m3_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_new},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllScheduledQueues(t, r, tc.scheduled)
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); err != nil {
t.Errorf("UpdateTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
}
}
}
}
func TestInspectorUpdateTaskPayloadError(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
tasks map[string][]base.Z
qname string
id string
newPayload []byte
wantErr error
}{
{
tasks: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "nonexistent",
id: createScheduledTask(z2).ID,
newPayload: nil,
wantErr: ErrQueueNotFound,
},
{
tasks: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "default",
id: uuid.NewString(),
newPayload: nil,
wantErr: ErrTaskNotFound,
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllScheduledQueues(t, r, tc.tasks)
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); !errors.Is(err, tc.wantErr) {
t.Errorf("UpdateTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr)
continue
}
}
}
func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
r := setup(t)
defer r.Close()
@@ -3384,6 +3526,7 @@ func TestParseOption(t *testing.T) {
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
{`Retention(24h)`, RetentionOpt, 24 * time.Hour},
{`Header(["email", "hello@example.com"])`, HeaderOpt, [2]string{"email", "hello@example.com"}},
}
for _, tc := range tests {
@@ -3431,6 +3574,14 @@ func TestParseOption(t *testing.T) {
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case HeaderOpt:
gotVal, ok := got.Value().([2]string)
if !ok {
t.Fatal("returned Option with non array value")
}
if gotVal != tc.wantVal.([2]string) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
default:
t.Fatalf("returned Option with unexpected type: %v", got.Type())
}

View File

@@ -23,7 +23,7 @@ import (
)
// Version of asynq library and CLI.
const Version = "0.25.0"
const Version = "0.26.0"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"
@@ -240,6 +240,9 @@ type TaskMessage struct {
// Payload holds data needed to process the task.
Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// ID is a unique identifier for each task.
ID string
@@ -304,6 +307,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
return proto.Marshal(&pb.TaskMessage{
Type: msg.Type,
Payload: msg.Payload,
Headers: msg.Headers,
Id: msg.ID,
Queue: msg.Queue,
Retry: int32(msg.Retry),
@@ -328,6 +332,7 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
return &TaskMessage{
Type: pbmsg.GetType(),
Payload: pbmsg.GetPayload(),
Headers: pbmsg.GetHeaders(),
ID: pbmsg.GetId(),
Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()),
@@ -679,6 +684,14 @@ func (l *Lease) IsValid() bool {
return l.expireAt.After(now) || l.expireAt.Equal(now)
}
// BatchEnqueueItem pairs a task message with optional scheduling metadata for
// batch enqueue operations. If ProcessAt is zero, the task is enqueued for
// immediate processing; otherwise it is added to the scheduled set.
type BatchEnqueueItem struct {
Msg *TaskMessage
ProcessAt time.Time // zero value → immediate
}
// Broker is a message broker that supports operations to manage task queues.
//
// See rdb.RDB as a reference implementation.
@@ -687,6 +700,12 @@ type Broker interface {
Close() error
Enqueue(ctx context.Context, msg *TaskMessage) error
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
// BatchEnqueue enqueues multiple tasks in a single round-trip. It returns the
// count of newly enqueued tasks; duplicate IDs are silently skipped. The error
// is non-nil only on infrastructure failure (e.g. lost connection), in which
// case the count is meaningless. Individual task scripts are atomic but the
// batch as a whole is not transactional — partial success is possible.
BatchEnqueue(ctx context.Context, items []BatchEnqueueItem) (int, error)
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
Done(ctx context.Context, msg *TaskMessage) error
MarkAsComplete(ctx context.Context, msg *TaskMessage) error

View File

@@ -107,6 +107,7 @@ type Op string
// only the last one is recorded.
//
// The types are:
//
// errors.Op
// The operation being performed, usually the method
// being invoked (Get, Put, etc.).

View File

@@ -4,8 +4,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.34.2
// protoc v3.19.6
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: asynq.proto
package proto
@@ -16,6 +16,7 @@ import (
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
@@ -27,16 +28,15 @@ const (
// TaskMessage is the internal representation of a task with additional
// metadata fields.
// Next ID: 15
// Next ID: 16
type TaskMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Type indicates the kind of the task to be performed.
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
// Payload holds data needed to process the task.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
// Headers holds additional metadata for the task.
Headers map[string]string `protobuf:"bytes,15,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
// Unique identifier for the task.
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
// Name of the queue to which this task belongs.
@@ -71,16 +71,16 @@ type TaskMessage struct {
// Time when the task completed in success in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
// This field is populated if result_ttl > 0 upon completion.
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *TaskMessage) Reset() {
*x = TaskMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *TaskMessage) String() string {
@@ -91,7 +91,7 @@ func (*TaskMessage) ProtoMessage() {}
func (x *TaskMessage) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -120,6 +120,13 @@ func (x *TaskMessage) GetPayload() []byte {
return nil
}
func (x *TaskMessage) GetHeaders() map[string]string {
if x != nil {
return x.Headers
}
return nil
}
func (x *TaskMessage) GetId() string {
if x != nil {
return x.Id
@@ -206,10 +213,7 @@ func (x *TaskMessage) GetCompletedAt() int64 {
// ServerInfo holds information about a running server.
type ServerInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Host machine the server is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the server process.
@@ -221,7 +225,7 @@ type ServerInfo struct {
// List of queue names with their priorities.
// The server will consume tasks from the queues and prioritize
// queues with higher priority numbers.
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
// If set, the server will always consume tasks from a queue with higher
// priority.
StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"`
@@ -231,15 +235,15 @@ type ServerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Number of workers currently processing tasks.
ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ServerInfo) Reset() {
*x = ServerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ServerInfo) String() string {
@@ -250,7 +254,7 @@ func (*ServerInfo) ProtoMessage() {}
func (x *ServerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -330,10 +334,7 @@ func (x *ServerInfo) GetActiveWorkerCount() int32 {
// WorkerInfo holds information about a running worker.
type WorkerInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Host matchine this worker is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the process in which this worker is running.
@@ -352,16 +353,16 @@ type WorkerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Deadline by which the worker needs to complete processing
// the task. If worker exceeds the deadline, the task will fail.
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *WorkerInfo) Reset() {
*x = WorkerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *WorkerInfo) String() string {
@@ -372,7 +373,7 @@ func (*WorkerInfo) ProtoMessage() {}
func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -453,10 +454,7 @@ func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp {
// SchedulerEntry holds information about a periodic task registered
// with a scheduler.
type SchedulerEntry struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Identifier of the scheduler entry.
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// Periodic schedule spec of the entry.
@@ -472,15 +470,15 @@ type SchedulerEntry struct {
// Last time the task was enqueued.
// Zero time if task was never enqueued.
PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SchedulerEntry) Reset() {
*x = SchedulerEntry{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SchedulerEntry) String() string {
@@ -491,7 +489,7 @@ func (*SchedulerEntry) ProtoMessage() {}
func (x *SchedulerEntry) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -558,23 +556,20 @@ func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp {
// SchedulerEnqueueEvent holds information about an enqueue event
// by a scheduler.
type SchedulerEnqueueEvent struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// ID of the task that was enqueued.
TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
// Time the task was enqueued.
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SchedulerEnqueueEvent) Reset() {
*x = SchedulerEnqueueEvent{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SchedulerEnqueueEvent) String() string {
@@ -585,7 +580,7 @@ func (*SchedulerEnqueueEvent) ProtoMessage() {}
func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -616,146 +611,106 @@ func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp {
var File_asynq_proto protoreflect.FileDescriptor
var file_asynq_proto_rawDesc = []byte{
0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61,
0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79,
0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c,
0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74,
0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12,
0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05,
0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66,
0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c,
0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07,
0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74,
0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79,
0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65,
0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c,
0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28,
0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c,
0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01,
0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22,
0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12,
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f,
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52,
0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69,
0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49,
0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79,
0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65,
0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74,
0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20,
0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72,
0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73,
0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61,
0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65,
0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20,
0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65,
0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f,
0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65,
0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05,
0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65,
0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65,
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a,
0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61,
0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75,
0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f,
0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18,
0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e,
0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65,
0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a,
0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75,
0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17,
0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79,
0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
const file_asynq_proto_rawDesc = "" +
"\n" +
"\vasynq.proto\x12\x05asynq\x1a\x1fgoogle/protobuf/timestamp.proto\"\xfe\x03\n" +
"\vTaskMessage\x12\x12\n" +
"\x04type\x18\x01 \x01(\tR\x04type\x12\x18\n" +
"\apayload\x18\x02 \x01(\fR\apayload\x129\n" +
"\aheaders\x18\x0f \x03(\v2\x1f.asynq.TaskMessage.HeadersEntryR\aheaders\x12\x0e\n" +
"\x02id\x18\x03 \x01(\tR\x02id\x12\x14\n" +
"\x05queue\x18\x04 \x01(\tR\x05queue\x12\x14\n" +
"\x05retry\x18\x05 \x01(\x05R\x05retry\x12\x18\n" +
"\aretried\x18\x06 \x01(\x05R\aretried\x12\x1b\n" +
"\terror_msg\x18\a \x01(\tR\berrorMsg\x12$\n" +
"\x0elast_failed_at\x18\v \x01(\x03R\flastFailedAt\x12\x18\n" +
"\atimeout\x18\b \x01(\x03R\atimeout\x12\x1a\n" +
"\bdeadline\x18\t \x01(\x03R\bdeadline\x12\x1d\n" +
"\n" +
"unique_key\x18\n" +
" \x01(\tR\tuniqueKey\x12\x1b\n" +
"\tgroup_key\x18\x0e \x01(\tR\bgroupKey\x12\x1c\n" +
"\tretention\x18\f \x01(\x03R\tretention\x12!\n" +
"\fcompleted_at\x18\r \x01(\x03R\vcompletedAt\x1a:\n" +
"\fHeadersEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x8f\x03\n" +
"\n" +
"ServerInfo\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12 \n" +
"\vconcurrency\x18\x04 \x01(\x05R\vconcurrency\x125\n" +
"\x06queues\x18\x05 \x03(\v2\x1d.asynq.ServerInfo.QueuesEntryR\x06queues\x12'\n" +
"\x0fstrict_priority\x18\x06 \x01(\bR\x0estrictPriority\x12\x16\n" +
"\x06status\x18\a \x01(\tR\x06status\x129\n" +
"\n" +
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x12.\n" +
"\x13active_worker_count\x18\t \x01(\x05R\x11activeWorkerCount\x1a9\n" +
"\vQueuesEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\x05R\x05value:\x028\x01\"\xb1\x02\n" +
"\n" +
"WorkerInfo\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12\x17\n" +
"\atask_id\x18\x04 \x01(\tR\x06taskId\x12\x1b\n" +
"\ttask_type\x18\x05 \x01(\tR\btaskType\x12!\n" +
"\ftask_payload\x18\x06 \x01(\fR\vtaskPayload\x12\x14\n" +
"\x05queue\x18\a \x01(\tR\x05queue\x129\n" +
"\n" +
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x126\n" +
"\bdeadline\x18\t \x01(\v2\x1a.google.protobuf.TimestampR\bdeadline\"\xad\x02\n" +
"\x0eSchedulerEntry\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" +
"\x04spec\x18\x02 \x01(\tR\x04spec\x12\x1b\n" +
"\ttask_type\x18\x03 \x01(\tR\btaskType\x12!\n" +
"\ftask_payload\x18\x04 \x01(\fR\vtaskPayload\x12'\n" +
"\x0fenqueue_options\x18\x05 \x03(\tR\x0eenqueueOptions\x12F\n" +
"\x11next_enqueue_time\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\x0fnextEnqueueTime\x12F\n" +
"\x11prev_enqueue_time\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\x0fprevEnqueueTime\"o\n" +
"\x15SchedulerEnqueueEvent\x12\x17\n" +
"\atask_id\x18\x01 \x01(\tR\x06taskId\x12=\n" +
"\fenqueue_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\venqueueTimeB)Z'github.com/hibiken/asynq/internal/protob\x06proto3"
var (
file_asynq_proto_rawDescOnce sync.Once
file_asynq_proto_rawDescData = file_asynq_proto_rawDesc
file_asynq_proto_rawDescData []byte
)
func file_asynq_proto_rawDescGZIP() []byte {
file_asynq_proto_rawDescOnce.Do(func() {
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData)
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)))
})
return file_asynq_proto_rawDescData
}
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_asynq_proto_goTypes = []any{
(*TaskMessage)(nil), // 0: asynq.TaskMessage
(*ServerInfo)(nil), // 1: asynq.ServerInfo
(*WorkerInfo)(nil), // 2: asynq.WorkerInfo
(*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry
(*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent
nil, // 5: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp
nil, // 5: asynq.TaskMessage.HeadersEntry
nil, // 6: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
}
var file_asynq_proto_depIdxs = []int32{
5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
7, // [7:7] is the sub-list for method output_type
7, // [7:7] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
5, // 0: asynq.TaskMessage.headers:type_name -> asynq.TaskMessage.HeadersEntry
6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
8, // [8:8] is the sub-list for method output_type
8, // [8:8] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
}
func init() { file_asynq_proto_init() }
@@ -763,75 +718,13 @@ func file_asynq_proto_init() {
if File_asynq_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_asynq_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*TaskMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*ServerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*WorkerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[3].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEntry); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[4].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEnqueueEvent); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_asynq_proto_rawDesc,
RawDescriptor: unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)),
NumEnums: 0,
NumMessages: 6,
NumMessages: 7,
NumExtensions: 0,
NumServices: 0,
},
@@ -840,7 +733,6 @@ func file_asynq_proto_init() {
MessageInfos: file_asynq_proto_msgTypes,
}.Build()
File_asynq_proto = out.File
file_asynq_proto_rawDesc = nil
file_asynq_proto_goTypes = nil
file_asynq_proto_depIdxs = nil
}

View File

@@ -11,7 +11,7 @@ option go_package = "github.com/hibiken/asynq/internal/proto";
// TaskMessage is the internal representation of a task with additional
// metadata fields.
// Next ID: 15
// Next ID: 16
message TaskMessage {
// Type indicates the kind of the task to be performed.
string type = 1;
@@ -19,6 +19,9 @@ message TaskMessage {
// Payload holds data needed to process the task.
bytes payload = 2;
// Headers holds additional metadata for the task.
map<string, string> headers = 15;
// Unique identifier for the task.
string id = 3;

View File

@@ -264,7 +264,9 @@ for i=1,2 do
if (table.getn(ids) > 0) then
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
sample_total = sample_total + bytes
if bytes then
sample_total = sample_total + bytes
end
end
local n = redis.call("LLEN", KEYS[i])
local avg = sample_total / table.getn(ids)
@@ -281,7 +283,9 @@ for i=3,6 do
if (table.getn(ids) > 0) then
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
sample_total = sample_total + bytes
if bytes then
sample_total = sample_total + bytes
end
end
local n = redis.call("ZCARD", KEYS[i])
local avg = sample_total / table.getn(ids)
@@ -304,13 +308,17 @@ if table.getn(groups) > 0 then
local ids = redis.call("ZRANGE", group_key, 0, sample_size - 1)
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
agg_task_sample_total = agg_task_sample_total + bytes
agg_task_sample_size = agg_task_sample_size + 1
if bytes then
agg_task_sample_total = agg_task_sample_total + bytes
agg_task_sample_size = agg_task_sample_size + 1
end
end
end
end
local avg = agg_task_sample_total / agg_task_sample_size
memusg = memusg + (avg * agg_task_count)
if agg_task_sample_size > 0 then
local avg = agg_task_sample_total / agg_task_sample_size
memusg = memusg + (avg * agg_task_count)
end
end
return memusg
`)
@@ -1412,6 +1420,93 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
return n, nil
}
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// --
// ARGV[1] -> task message data
//
// Output:
// Numeric code indicating the status:
// Returns 1 if task is successfully updated.
// Returns 0 if task is not found.
// Returns -1 if task is not in scheduled state.
var updateTaskPayloadCmd = redis.NewScript(`
-- Check if given taks exists
if redis.call("EXISTS", KEYS[1]) == 0 then
return 0
end
local state, pending_since, group, unique_key = unpack(redis.call("HMGET", KEYS[1], "state", "pending_since", "group", "unique_key"))
if state ~= "scheduled" then
return -1
end
local redis_call_args = {"state", state}
if pending_since then
table.insert(redis_call_args, "pending_since")
table.insert(redis_call_args, pending_since)
end
if group then
table.insert(redis_call_args, "group")
table.insert(redis_call_args, group)
end
if unique_key then
table.insert(redis_call_args, "unique_key")
table.insert(redis_call_args, unique_key)
end
redis.call("HSET", KEYS[1], "msg", ARGV[1], unpack(redis_call_args))
return 1
`)
// UpdateTaskPayload finds a task that matches the id from the given queue and updates it's payload.
// It returns nil if it successfully updated the task payload.
//
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
// If a task is in active state it returns non-nil error with Code FailedPrecondition.
func (r *RDB) UpdateTaskPayload(qname, id string, payload []byte) error {
var op errors.Op = "rdb.UpdateTask"
if err := r.checkQueueExists(qname); err != nil {
return errors.E(op, errors.CanonicalCode(err), err)
}
taskInfo, err := r.GetTaskInfo(qname, id)
if err != nil {
return errors.E(op, errors.Unknown, err)
}
taskInfo.Message.Payload = payload
encoded, err := base.EncodeMessage(taskInfo.Message)
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
keys := []string{
base.TaskKey(qname, id),
}
argv := []interface{}{
encoded,
}
res, err := updateTaskPayloadCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, err)
}
n, ok := res.(int64)
if !ok {
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: updateTaskCmd script returned unexported value %v", res))
}
switch n {
case 1:
return nil
case 0:
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id})
case -1:
return errors.E(op, errors.FailedPrecondition, "cannot update task that is not in scheduled state.")
default:
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from updateTaskCmd script: %d", n))
}
}
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:groups

View File

@@ -139,6 +139,118 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
return nil
}
// BatchEnqueue adds all given tasks to Redis using a single pipeline round-trip.
// Each item is either enqueued immediately (ProcessAt is zero) or added to the
// scheduled sorted set.
//
// WARNING: tasks whose IDs already exist in Redis are silently skipped.
//
// The pipeline executes independent Lua scripts per task — there is no
// MULTI/EXEC wrapping the batch, so individual tasks may succeed or fail
// independently. The returned int is the number of tasks actually written;
// skipped duplicates do not count. The returned error is non-nil only when the
// pipeline call itself fails (network error, context cancellation, etc.), in
// which case no individual result should be trusted.
//
// Message encoding errors cause an immediate return before any Redis I/O.
func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
var op errors.Op = "rdb.BatchEnqueue"
if len(items) == 0 {
return 0, nil
}
// Preload Lua scripts so that EVALSHA inside the pipeline does not fail with
// NOSCRIPT. Script.Run on a pipeline only sends EVALSHA (unlike non-pipeline
// Run which retries with EVAL on NOSCRIPT).
needsEnqueue, needsSchedule := false, false
for _, item := range items {
if item.ProcessAt.IsZero() {
needsEnqueue = true
} else {
needsSchedule = true
}
if needsEnqueue && needsSchedule {
break
}
}
if needsEnqueue {
if err := enqueueCmd.Load(ctx, r.client).Err(); err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load enqueue script: %v", err))
}
}
if needsSchedule {
if err := scheduleCmd.Load(ctx, r.client).Err(); err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load schedule script: %v", err))
}
}
pipe := r.client.Pipeline()
// Track which pipeline slot holds each item's script result.
scriptIdxs := make([]int, 0, len(items))
pipeLen := 0
// Track queues we add to AllQueues in this pipeline so we can roll back the
// in-memory cache on failure.
var newQueues []string
now := r.clock.Now().UnixNano()
for _, item := range items {
encoded, err := base.EncodeMessage(item.Msg)
if err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if _, found := r.queuesPublished.Load(item.Msg.Queue); !found {
pipe.SAdd(ctx, base.AllQueues, item.Msg.Queue)
r.queuesPublished.Store(item.Msg.Queue, true)
newQueues = append(newQueues, item.Msg.Queue)
pipeLen++
}
if item.ProcessAt.IsZero() {
keys := []string{
base.TaskKey(item.Msg.Queue, item.Msg.ID),
base.PendingKey(item.Msg.Queue),
}
argv := []interface{}{encoded, item.Msg.ID, now}
enqueueCmd.Run(ctx, pipe, keys, argv...)
} else {
keys := []string{
base.TaskKey(item.Msg.Queue, item.Msg.ID),
base.ScheduledKey(item.Msg.Queue),
}
argv := []interface{}{encoded, item.ProcessAt.Unix(), item.Msg.ID}
scheduleCmd.Run(ctx, pipe, keys, argv...)
}
scriptIdxs = append(scriptIdxs, pipeLen)
pipeLen++
}
cmds, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
for _, q := range newQueues {
r.queuesPublished.Delete(q)
}
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis pipeline error: %v", err))
}
enqueued := 0
for _, idx := range scriptIdxs {
if idx >= len(cmds) {
continue
}
res, err := cmds[idx].(*redis.Cmd).Result()
if err != nil {
continue
}
if n, ok := res.(int64); ok && n == 1 {
enqueued++
}
}
return enqueued, nil
}
// enqueueUniqueCmd enqueues the task message if the task is unique.
//
// KEYS[1] -> unique key
@@ -1488,6 +1600,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
_, err := pubsub.Receive(ctx)
if err != nil {
pubsub.Close()
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
return pubsub, nil

View File

@@ -160,6 +160,156 @@ func TestEnqueueTaskIdConflictError(t *testing.T) {
}
}
func TestBatchEnqueue(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}))
t2 := h.NewTaskMessageWithQueue("generate_csv", h.JSON(map[string]interface{}{}), "csv")
t3 := h.NewTaskMessageWithQueue("sync", nil, "low")
enqueueTime := time.Now()
r.SetClock(timeutil.NewSimulatedClock(enqueueTime))
t.Run("enqueue multiple tasks", func(t *testing.T) {
h.FlushDB(t, r.client)
items := []base.BatchEnqueueItem{
{Msg: t1},
{Msg: t2},
{Msg: t3},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 3 {
t.Errorf("BatchEnqueue returned %d, want 3", n)
}
for _, item := range items {
msg := item.Msg
pendingKey := base.PendingKey(msg.Queue)
pendingIDs := r.client.LRange(context.Background(), pendingKey, 0, -1).Val()
found := false
for _, id := range pendingIDs {
if id == msg.ID {
found = true
break
}
}
if !found {
t.Errorf("task %s not found in pending list %s", msg.ID, pendingKey)
}
taskKey := base.TaskKey(msg.Queue, msg.ID)
state := r.client.HGet(context.Background(), taskKey, "state").Val()
if state != "pending" {
t.Errorf("state for task %s = %q, want %q", msg.ID, state, "pending")
}
}
})
t.Run("empty batch", func(t *testing.T) {
h.FlushDB(t, r.client)
n, err := r.BatchEnqueue(context.Background(), nil)
if err != nil {
t.Fatalf("BatchEnqueue(nil) returned error: %v", err)
}
if n != 0 {
t.Errorf("BatchEnqueue(nil) returned %d, want 0", n)
}
})
t.Run("duplicate IDs skipped", func(t *testing.T) {
h.FlushDB(t, r.client)
if err := r.Enqueue(context.Background(), t1); err != nil {
t.Fatalf("pre-enqueue failed: %v", err)
}
dup := *t1
newMsg := h.NewTaskMessage("new_task", nil)
items := []base.BatchEnqueueItem{
{Msg: &dup},
{Msg: newMsg},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 1 {
t.Errorf("BatchEnqueue returned %d, want 1 (duplicate should be skipped)", n)
}
})
t.Run("scheduled tasks", func(t *testing.T) {
h.FlushDB(t, r.client)
future := time.Now().Add(1 * time.Hour)
s1 := h.NewTaskMessage("deferred_email", nil)
items := []base.BatchEnqueueItem{
{Msg: t1},
{Msg: s1, ProcessAt: future},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 2 {
t.Errorf("BatchEnqueue returned %d, want 2", n)
}
// Immediate task should be in pending.
pendingIDs := r.client.LRange(context.Background(), base.PendingKey(t1.Queue), 0, -1).Val()
foundPending := false
for _, id := range pendingIDs {
if id == t1.ID {
foundPending = true
}
}
if !foundPending {
t.Errorf("immediate task %s not found in pending list", t1.ID)
}
// Scheduled task should be in scheduled set.
scheduledIDs := r.client.ZRange(context.Background(), base.ScheduledKey(s1.Queue), 0, -1).Val()
foundScheduled := false
for _, id := range scheduledIDs {
if id == s1.ID {
foundScheduled = true
}
}
if !foundScheduled {
t.Errorf("scheduled task %s not found in scheduled set", s1.ID)
}
taskKey := base.TaskKey(s1.Queue, s1.ID)
state := r.client.HGet(context.Background(), taskKey, "state").Val()
if state != "scheduled" {
t.Errorf("state for scheduled task %s = %q, want %q", s1.ID, state, "scheduled")
}
})
t.Run("pipeline error from cancelled context", func(t *testing.T) {
h.FlushDB(t, r.client)
msg := h.NewTaskMessage("pipeline_error_task", nil)
items := []base.BatchEnqueueItem{{Msg: msg}}
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := r.BatchEnqueue(ctx, items)
if err == nil {
t.Error("BatchEnqueue with cancelled context returned nil error, want non-nil")
}
})
}
func TestEnqueueQueueCache(t *testing.T) {
r := setup(t)
defer r.Close()
@@ -3274,6 +3424,29 @@ func TestCancelationPubSub(t *testing.T) {
mu.Unlock()
}
func TestCancelationPubSubReceiveError(t *testing.T) {
// Use a client connected to a non-existent Redis server to trigger
// a Receive() error. This verifies that the pubsub connection is
// closed on error, preventing connection leaks.
client := redis.NewClient(&redis.Options{
Addr: "localhost:0", // invalid port — connection will fail
})
r := NewRDB(client)
defer r.Close()
pubsub, err := r.CancelationPubSub()
if err == nil {
// If no error, we must clean up the pubsub.
if pubsub != nil {
pubsub.Close()
}
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
}
if pubsub != nil {
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
}
}
func TestWriteResult(t *testing.T) {
r := setup(t)
defer r.Close()

View File

@@ -64,6 +64,15 @@ func (tb *TestBroker) EnqueueUnique(ctx context.Context, msg *base.TaskMessage,
return tb.real.EnqueueUnique(ctx, msg, ttl)
}
func (tb *TestBroker) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return 0, errRedisDown
}
return tb.real.BatchEnqueue(ctx, items)
}
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) {
tb.mu.Lock()
defer tb.mu.Unlock()

View File

@@ -122,10 +122,10 @@ func (mgr *PeriodicTaskManager) Start() error {
panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize")
}
if err := mgr.initialSync(); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
if err := mgr.s.Start(); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
mgr.wg.Add(1)
go func() {
@@ -168,11 +168,11 @@ func (mgr *PeriodicTaskManager) Run() error {
func (mgr *PeriodicTaskManager) initialSync() error {
configs, err := mgr.p.GetConfigs()
if err != nil {
return fmt.Errorf("initial call to GetConfigs failed: %v", err)
return fmt.Errorf("initial call to GetConfigs failed: %w", err)
}
for _, c := range configs {
if err := validatePeriodicTaskConfig(c); err != nil {
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %v", err)
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %w", err)
}
}
mgr.add(configs)

View File

@@ -230,6 +230,7 @@ func (p *processor) exec() {
ctx: ctx,
},
)
task.headers = msg.Headers
resCh <- p.perform(ctx, task)
}()
@@ -333,7 +334,7 @@ var RevokeTask = errors.New("revoke task")
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
p.errHandler.HandleError(ctx, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers), err)
}
switch {
case errors.Is(err, RevokeTask):
@@ -354,7 +355,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
d := p.retryDelayFunc(msg.Retried, e, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
retryAt := time.Now().Add(d)
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
if err != nil {

View File

@@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() {
}
func (r *recoverer) retry(msg *base.TaskMessage, err error) {
delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload))
delay := r.retryDelayFunc(msg.Retried, err, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
retryAt := time.Now().Add(delay)
if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)

View File

@@ -6,12 +6,16 @@ package asynq
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
)
// ErrHandlerNotFound indicates that no task handler was found for a given pattern.
var ErrHandlerNotFound = errors.New("handler not found for task")
// ServeMux is a multiplexer for asynchronous tasks.
// It matches the type of each task against a list of registered patterns
// and calls the handler for the pattern that most closely matches the
@@ -149,8 +153,8 @@ func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
// NotFound returns an error indicating that the handler was not found for the given task.
func NotFound(ctx context.Context, task *Task) error {
return fmt.Errorf("handler not found for task %q", task.Type())
return fmt.Errorf("%w %q", ErrHandlerNotFound, task.Type())
}
// NotFoundHandler returns a simple task handler that returns a ``not found`` error.
// NotFoundHandler returns a simple task handler that returns a not found error.
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }

View File

@@ -24,7 +24,7 @@ func makeFakeHandler(identity string) Handler {
}
// makeFakeMiddleware returns a middleware function that appends the given identity
//to the global invoked slice.
// to the global invoked slice.
func makeFakeMiddleware(identity string) MiddlewareFunc {
return func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, t *Task) error {

View File

@@ -174,16 +174,15 @@ type Config struct {
// })
//
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
//
// we can also handle panic error like:
// func reportError(ctx context, task *asynq.Task, err error) {
// if asynq.IsPanic(err) {
// if asynq.IsPanicError(err) {
// errorReportingService.Notify(err)
// }
// })
//
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler
// Logger specifies the logger used by the server instance.

View File

@@ -24,8 +24,10 @@ func (srv *Server) waitForSignals() {
if sig == unix.SIGTSTP {
srv.Stop()
continue
} else {
srv.Stop()
break
}
break
}
}

View File

@@ -8,9 +8,9 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/redis/go-redis/v9"
)
type subscriber struct {

View File

@@ -39,6 +39,7 @@ By default, CLI will try to connect to a redis server running at `localhost:6379
--config string config file to set flag defaut values (default is $HOME/.asynq.yaml)
-n, --db int redis database number (default is 0)
-h, --help help for asynq
-U, --username string username to use when connecting to redis server
-p, --password string password to use when connecting to redis server
-u, --uri string redis server URI (default "127.0.0.1:6379")

View File

@@ -7,7 +7,6 @@ package cmd
import (
"fmt"
"io"
"os"
"sort"
"time"
@@ -36,14 +35,14 @@ var cronListCmd = &cobra.Command{
Use: "list",
Aliases: []string{"ls"},
Short: "List cron entries",
Run: cronList,
RunE: cronList,
}
var cronHistoryCmd = &cobra.Command{
Use: "history <entry_id> [<entry_id>...]",
Short: "Show history of each cron tasks",
Args: cobra.MinimumNArgs(1),
Run: cronHistory,
RunE: cronHistory,
Example: heredoc.Doc(`
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 bf6a8594-cd03-4968-b36a-8572c5e160dd
@@ -51,17 +50,16 @@ var cronHistoryCmd = &cobra.Command{
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 --page=2`),
}
func cronList(cmd *cobra.Command, args []string) {
func cronList(cmd *cobra.Command, args []string) error {
inspector := createInspector()
entries, err := inspector.SchedulerEntries()
if err != nil {
fmt.Println(err)
os.Exit(1)
return fmt.Errorf("could not fetch scheduler entries: %v", err)
}
if len(entries) == 0 {
fmt.Println("No scheduler entries")
return
return nil
}
// Sort entries by spec.
@@ -78,6 +76,7 @@ func cronList(cmd *cobra.Command, args []string) {
}
}
printTable(cols, printRows)
return nil
}
// Returns a string describing when the next enqueue will happen.
@@ -97,16 +96,14 @@ func prevEnqueue(prevEnqueuedAt time.Time) string {
return fmt.Sprintf("%v ago", time.Since(prevEnqueuedAt).Round(time.Second))
}
func cronHistory(cmd *cobra.Command, args []string) {
func cronHistory(cmd *cobra.Command, args []string) error {
pageNum, err := cmd.Flags().GetInt("page")
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
pageSize, err := cmd.Flags().GetInt("size")
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
inspector := createInspector()
for i, entryID := range args {
@@ -136,4 +133,5 @@ func cronHistory(cmd *cobra.Command, args []string) {
}
printTable(cols, printRows)
}
return nil
}

View File

@@ -6,7 +6,6 @@ package cmd
import (
"fmt"
"os"
"time"
"github.com/MakeNowJust/heredoc/v2"
@@ -32,14 +31,14 @@ var dashCmd = &cobra.Command{
Example: heredoc.Doc(`
$ asynq dash
$ asynq dash --refresh=3s`),
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
if flagPollInterval < 1*time.Second {
fmt.Println("error: --refresh cannot be less than 1s")
os.Exit(1)
return fmt.Errorf("--refresh cannot be less than 1s")
}
dash.Run(dash.Options{
PollInterval: flagPollInterval,
RedisConnOpt: getRedisConnOpt(),
})
return nil
},
}

View File

@@ -16,11 +16,12 @@ import (
// ScreenDrawer is used to draw contents on screen.
//
// Usage example:
// d := NewScreenDrawer(s)
// d.Println("Hello world", mystyle)
// d.NL() // adds newline
// d.Print("foo", mystyle.Bold(true))
// d.Print("bar", mystyle.Italic(true))
//
// d := NewScreenDrawer(s)
// d.Println("Hello world", mystyle)
// d.NL() // adds newline
// d.Print("foo", mystyle.Bold(true))
// d.Print("bar", mystyle.Italic(true))
type ScreenDrawer struct {
l *LineDrawer
}

View File

@@ -6,7 +6,6 @@ package cmd
import (
"fmt"
"os"
"github.com/MakeNowJust/heredoc/v2"
"github.com/spf13/cobra"
@@ -31,22 +30,25 @@ var groupListCmd = &cobra.Command{
Aliases: []string{"ls"},
Short: "List groups",
Args: cobra.NoArgs,
Run: groupLists,
RunE: groupLists,
}
func groupLists(cmd *cobra.Command, args []string) {
func groupLists(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
inspector := createInspector()
groups, err := inspector.Groups(qname)
if err != nil {
return fmt.Errorf("could not fetch groups: %v", err)
}
if len(groups) == 0 {
fmt.Printf("No groups found in queue %q\n", qname)
return
return nil
}
for _, g := range groups {
fmt.Println(g.Group)
}
return nil
}

View File

@@ -7,7 +7,6 @@ package cmd
import (
"fmt"
"io"
"os"
"github.com/MakeNowJust/heredoc/v2"
"github.com/fatih/color"
@@ -44,16 +43,14 @@ var queueListCmd = &cobra.Command{
Use: "list",
Short: "List queues",
Aliases: []string{"ls"},
// TODO: Use RunE instead?
Run: queueList,
RunE: queueList,
}
var queueInspectCmd = &cobra.Command{
Use: "inspect <queue> [<queue>...]",
Short: "Display detailed information on one or more queues",
Args: cobra.MinimumNArgs(1),
// TODO: Use RunE instead?
Run: queueInspect,
Args: cobra.MinimumNArgs(1),
RunE: queueInspect,
Example: heredoc.Doc(`
$ asynq queue inspect myqueue
$ asynq queue inspect queue1 queue2 queue3`),
@@ -62,8 +59,8 @@ var queueInspectCmd = &cobra.Command{
var queueHistoryCmd = &cobra.Command{
Use: "history <queue> [<queue>...]",
Short: "Display historical aggregate data from one or more queues",
Args: cobra.MinimumNArgs(1),
Run: queueHistory,
Args: cobra.MinimumNArgs(1),
RunE: queueHistory,
Example: heredoc.Doc(`
$ asynq queue history myqueue
$ asynq queue history queue1 queue2 queue3
@@ -74,7 +71,7 @@ var queuePauseCmd = &cobra.Command{
Use: "pause <queue> [<queue>...]",
Short: "Pause one or more queues",
Args: cobra.MinimumNArgs(1),
Run: queuePause,
RunE: queuePause,
Example: heredoc.Doc(`
$ asynq queue pause myqueue
$ asynq queue pause queue1 queue2 queue3`),
@@ -85,7 +82,7 @@ var queueUnpauseCmd = &cobra.Command{
Short: "Resume (unpause) one or more queues",
Args: cobra.MinimumNArgs(1),
Aliases: []string{"unpause"},
Run: queueUnpause,
RunE: queueUnpause,
Example: heredoc.Doc(`
$ asynq queue resume myqueue
$ asynq queue resume queue1 queue2 queue3`),
@@ -96,14 +93,14 @@ var queueRemoveCmd = &cobra.Command{
Short: "Remove one or more queues",
Aliases: []string{"rm", "delete"},
Args: cobra.MinimumNArgs(1),
Run: queueRemove,
RunE: queueRemove,
Example: heredoc.Doc(`
$ asynq queue rm myqueue
$ asynq queue rm queue1 queue2 queue3
$ asynq queue rm myqueue --force`),
}
func queueList(cmd *cobra.Command, args []string) {
func queueList(cmd *cobra.Command, args []string) error {
type queueInfo struct {
name string
keyslot int64
@@ -112,8 +109,7 @@ func queueList(cmd *cobra.Command, args []string) {
inspector := createInspector()
queues, err := inspector.Queues()
if err != nil {
fmt.Printf("error: Could not fetch list of queues: %v\n", err)
os.Exit(1)
return fmt.Errorf("could not fetch list of queues: %v", err)
}
var qs []*queueInfo
for _, qname := range queues {
@@ -121,13 +117,13 @@ func queueList(cmd *cobra.Command, args []string) {
if useRedisCluster {
keyslot, err := inspector.ClusterKeySlot(qname)
if err != nil {
fmt.Errorf("error: Could not get cluster keyslot for %q\n", qname)
fmt.Printf("error: could not get cluster keyslot for %q\n", qname)
continue
}
q.keyslot = keyslot
nodes, err := inspector.ClusterNodes(qname)
if err != nil {
fmt.Errorf("error: Could not get cluster nodes for %q\n", qname)
fmt.Printf("error: could not get cluster nodes for %q\n", qname)
continue
}
q.nodes = nodes
@@ -148,9 +144,10 @@ func queueList(cmd *cobra.Command, args []string) {
fmt.Println(q.name)
}
}
return nil
}
func queueInspect(cmd *cobra.Command, args []string) {
func queueInspect(cmd *cobra.Command, args []string) error {
inspector := createInspector()
for i, qname := range args {
if i > 0 {
@@ -163,6 +160,7 @@ func queueInspect(cmd *cobra.Command, args []string) {
}
printQueueInfo(info)
}
return nil
}
func printQueueInfo(info *asynq.QueueInfo) {
@@ -195,11 +193,10 @@ func printQueueInfo(info *asynq.QueueInfo) {
)
}
func queueHistory(cmd *cobra.Command, args []string) {
func queueHistory(cmd *cobra.Command, args []string) error {
days, err := cmd.Flags().GetInt("days")
if err != nil {
fmt.Printf("error: Internal error: %v\n", err)
os.Exit(1)
return err
}
inspector := createInspector()
for i, qname := range args {
@@ -214,6 +211,7 @@ func queueHistory(cmd *cobra.Command, args []string) {
}
printDailyStats(stats)
}
return nil
}
func printDailyStats(stats []*asynq.DailyStats) {
@@ -233,49 +231,63 @@ func printDailyStats(stats []*asynq.DailyStats) {
)
}
func queuePause(cmd *cobra.Command, args []string) {
func queuePause(cmd *cobra.Command, args []string) error {
inspector := createInspector()
var firstErr error
for _, qname := range args {
err := inspector.PauseQueue(qname)
if err != nil {
fmt.Println(err)
if firstErr == nil {
firstErr = err
}
continue
}
fmt.Printf("Successfully paused queue %q\n", qname)
}
return firstErr
}
func queueUnpause(cmd *cobra.Command, args []string) {
func queueUnpause(cmd *cobra.Command, args []string) error {
inspector := createInspector()
var firstErr error
for _, qname := range args {
err := inspector.UnpauseQueue(qname)
if err != nil {
fmt.Println(err)
if firstErr == nil {
firstErr = err
}
continue
}
fmt.Printf("Successfully unpaused queue %q\n", qname)
}
return firstErr
}
func queueRemove(cmd *cobra.Command, args []string) {
func queueRemove(cmd *cobra.Command, args []string) error {
// TODO: Use inspector once RemoveQueue become public API.
force, err := cmd.Flags().GetBool("force")
if err != nil {
fmt.Printf("error: Internal error: %v\n", err)
os.Exit(1)
return err
}
r := createRDB()
var firstErr error
for _, qname := range args {
err = r.RemoveQueue(qname, force)
if err != nil {
if errors.IsQueueNotEmpty(err) {
fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname)
continue
} else {
fmt.Printf("error: %v\n", err)
}
if firstErr == nil {
firstErr = err
}
fmt.Printf("error: %v\n", err)
continue
}
fmt.Printf("Successfully removed queue %q\n", qname)
}
return firstErr
}

View File

@@ -1,3 +1,4 @@
//
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
@@ -36,10 +37,13 @@ var (
uri string
db int
password string
username string
useRedisCluster bool
clusterAddrs string
tlsServerName string
insecure bool
useTLS bool
)
// rootCmd represents the base command when called without any subcommands
@@ -257,7 +261,6 @@ func adjustPadding(lines ...*displayLine) {
func rpad(s string, padding int) string {
tmpl := fmt.Sprintf("%%-%ds ", padding)
return fmt.Sprintf(tmpl, s)
}
// lpad adds padding to the left of a string.
@@ -308,19 +311,26 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "Redis server URI")
rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "Redis database number (default is 0)")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password to use when connecting to redis server")
rootCmd.PersistentFlags().StringVarP(&username, "username", "U", "", "Username to use when connecting to Redis (ACL username)")
rootCmd.PersistentFlags().BoolVar(&useRedisCluster, "cluster", false, "Connect to redis cluster")
rootCmd.PersistentFlags().StringVar(&clusterAddrs, "cluster_addrs",
"127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005",
"List of comma-separated redis server addresses")
rootCmd.PersistentFlags().BoolVar(&useTLS, "tls", false, "Enable TLS connection")
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
"", "Server name for TLS validation")
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
false, "Allow insecure TLS connection by skipping cert validation")
// Bind flags with config.
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password"))
viper.BindPFlag("username", rootCmd.PersistentFlags().Lookup("username"))
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
viper.BindPFlag("tls", rootCmd.PersistentFlags().Lookup("tls"))
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
}
// initConfig reads in config file and ENV variables if set.
@@ -357,6 +367,7 @@ func createRDB() *rdb.RDB {
c = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
})
} else {
@@ -364,6 +375,7 @@ func createRDB() *rdb.RDB {
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
})
}
@@ -386,6 +398,7 @@ func getRedisConnOpt() asynq.RedisConnOpt {
return asynq.RedisClusterClientOpt{
Addrs: addrs,
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
}
}
@@ -393,16 +406,22 @@ func getRedisConnOpt() asynq.RedisConnOpt {
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
}
}
func getTLSConfig() *tls.Config {
tlsServer := viper.GetString("tls_server")
if tlsServer == "" {
return nil
if tlsServer != "" {
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
}
return &tls.Config{ServerName: tlsServer}
if viper.GetBool("tls") {
return &tls.Config{InsecureSkipVerify: viper.GetBool("insecure")}
}
return nil
}
// printTable is a helper function to print data in table format.
@@ -410,18 +429,22 @@ func getTLSConfig() *tls.Config {
// cols is a list of headers and printRow specifies how to print rows.
//
// Example:
// type User struct {
// Name string
// Addr string
// Age int
// }
//
// type User struct {
// Name string
// Addr string
// Age int
// }
//
// data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...}
// cols := []string{"Name", "Addr", "Age"}
// printRows := func(w io.Writer, tmpl string) {
// for _, u := range data {
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// }
// }
//
// printRows := func(w io.Writer, tmpl string) {
// for _, u := range data {
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// }
// }
//
// printTable(cols, printRows)
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
format := strings.Repeat("%v\t", len(cols)) + "\n"
@@ -464,35 +487,31 @@ func isPrintable(data []byte) bool {
}
// Helper to turn a command line flag into a duration
func getDuration(cmd *cobra.Command, arg string) time.Duration {
func getDuration(cmd *cobra.Command, arg string) (time.Duration, error) {
durationStr, err := cmd.Flags().GetString(arg)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return 0, err
}
duration, err := time.ParseDuration(durationStr)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return 0, err
}
return duration
return duration, nil
}
// Helper to turn a command line flag into a time
func getTime(cmd *cobra.Command, arg string) time.Time {
func getTime(cmd *cobra.Command, arg string) (time.Time, error) {
timeStr, err := cmd.Flags().GetString(arg)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return time.Time{}, err
}
timeVal, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return time.Time{}, err
}
return timeVal
return timeVal, nil
}

View File

@@ -7,7 +7,6 @@ package cmd
import (
"fmt"
"io"
"os"
"sort"
"strings"
"time"
@@ -44,20 +43,19 @@ The command shows the following for each server:
A "active" server is pulling tasks from queues and processing them.
A "stopped" server is no longer pulling new tasks from queues`,
Run: serverList,
RunE: serverList,
}
func serverList(cmd *cobra.Command, args []string) {
func serverList(cmd *cobra.Command, args []string) error {
r := createRDB()
servers, err := r.ListServers()
if err != nil {
fmt.Println(err)
os.Exit(1)
return fmt.Errorf("could not fetch list of servers: %v", err)
}
if len(servers) == 0 {
fmt.Println("No running servers")
return
return nil
}
// sort by hostname and pid
@@ -80,6 +78,7 @@ func serverList(cmd *cobra.Command, args []string) {
}
}
printTable(cols, printRows)
return nil
}
func formatQueues(qmap map[string]int) string {

View File

@@ -35,7 +35,7 @@ var statsCmd = &cobra.Command{
* Aggregate data for the current day
* Basic information about the running redis instance`),
Args: cobra.NoArgs,
Run: stats,
RunE: stats,
}
var jsonFlag bool
@@ -74,13 +74,12 @@ type FullStats struct {
RedisInfo map[string]string `json:"redis"`
}
func stats(cmd *cobra.Command, args []string) {
func stats(cmd *cobra.Command, args []string) error {
r := createRDB()
queues, err := r.AllQueues()
if err != nil {
fmt.Println(err)
os.Exit(1)
return fmt.Errorf("could not fetch queues: %v", err)
}
var aggStats AggregateStats
@@ -88,8 +87,7 @@ func stats(cmd *cobra.Command, args []string) {
for _, qname := range queues {
s, err := r.CurrentStats(qname)
if err != nil {
fmt.Println(err)
os.Exit(1)
return fmt.Errorf("could not fetch stats for queue %q: %v", qname, err)
}
aggStats.Active += s.Active
aggStats.Pending += s.Pending
@@ -110,8 +108,7 @@ func stats(cmd *cobra.Command, args []string) {
info, err = r.RedisInfo()
}
if err != nil {
fmt.Println(err)
os.Exit(1)
return fmt.Errorf("could not fetch redis info: %v", err)
}
if jsonFlag {
@@ -122,12 +119,11 @@ func stats(cmd *cobra.Command, args []string) {
})
if err != nil {
fmt.Println(err)
os.Exit(1)
return fmt.Errorf("could not marshal stats to JSON: %v", err)
}
fmt.Println(string(statsJSON))
return
return nil
}
bold := color.New(color.Bold)
@@ -151,6 +147,7 @@ func stats(cmd *cobra.Command, args []string) {
printInfo(info)
}
fmt.Println()
return nil
}
func printStatsByState(s *AggregateStats) {

View File

@@ -7,7 +7,6 @@ package cmd
import (
"fmt"
"io"
"os"
"time"
"github.com/MakeNowJust/heredoc/v2"
@@ -120,14 +119,14 @@ var taskListCmd = &cobra.Command{
$ asynq task list --queue=myqueue --state=pending
$ asynq task list --queue=myqueue --state=aggregating --group=mygroup
$ asynq task list --queue=myqueue --state=scheduled --page=2`),
Run: taskList,
RunE: taskList,
}
var taskInspectCmd = &cobra.Command{
Use: "inspect --queue=<queue> --id=<task_id>",
Short: "Display detailed information on the specified task",
Args: cobra.NoArgs,
Run: taskInspect,
RunE: taskInspect,
Example: heredoc.Doc(`
$ asynq task inspect --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -136,7 +135,7 @@ var taskCancelCmd = &cobra.Command{
Use: "cancel <task_id> [<task_id>...]",
Short: "Cancel one or more active tasks",
Args: cobra.MinimumNArgs(1),
Run: taskCancel,
RunE: taskCancel,
Example: heredoc.Doc(`
$ asynq task cancel f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -145,7 +144,7 @@ var taskArchiveCmd = &cobra.Command{
Use: "archive --queue=<queue> --id=<task_id>",
Short: "Archive a task with the given id",
Args: cobra.NoArgs,
Run: taskArchive,
RunE: taskArchive,
Example: heredoc.Doc(`
$ asynq task archive --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -155,7 +154,7 @@ var taskDeleteCmd = &cobra.Command{
Aliases: []string{"remove", "rm"},
Short: "Delete a task with the given id",
Args: cobra.NoArgs,
Run: taskDelete,
RunE: taskDelete,
Example: heredoc.Doc(`
$ asynq task delete --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -164,7 +163,7 @@ var taskRunCmd = &cobra.Command{
Use: "run --queue=<queue> --id=<task_id>",
Short: "Run a task with the given id",
Args: cobra.NoArgs,
Run: taskRun,
RunE: taskRun,
Example: heredoc.Doc(`
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -173,7 +172,7 @@ var taskEnqueueCmd = &cobra.Command{
Use: "enqueue --type_name=footype --payload=barpayload",
Short: "Enqueue a task",
Args: cobra.NoArgs,
Run: taskEnqueue,
RunE: taskEnqueue,
Example: heredoc.Doc(`
$ asynq task enqueue -t footype -l barpayload
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
@@ -183,7 +182,7 @@ var taskArchiveAllCmd = &cobra.Command{
Use: "archiveall --queue=<queue> --state=<state>",
Short: "Archive all tasks in the given state",
Args: cobra.NoArgs,
Run: taskArchiveAll,
RunE: taskArchiveAll,
Example: heredoc.Doc(`
$ asynq task archiveall --queue=myqueue --state=retry
$ asynq task archiveall --queue=myqueue --state=aggregating --group=mygroup`),
@@ -193,7 +192,7 @@ var taskDeleteAllCmd = &cobra.Command{
Use: "deleteall --queue=<queue> --state=<state>",
Short: "Delete all tasks in the given state",
Args: cobra.NoArgs,
Run: taskDeleteAll,
RunE: taskDeleteAll,
Example: heredoc.Doc(`
$ asynq task deleteall --queue=myqueue --state=archived
$ asynq task deleteall --queue=myqueue --state=aggregating --group=mygroup`),
@@ -203,74 +202,66 @@ var taskRunAllCmd = &cobra.Command{
Use: "runall --queue=<queue> --state=<state>",
Short: "Run all tasks in the given state",
Args: cobra.NoArgs,
Run: taskRunAll,
RunE: taskRunAll,
Example: heredoc.Doc(`
$ asynq task runall --queue=myqueue --state=retry
$ asynq task runall --queue=myqueue --state=aggregating --group=mygroup`),
}
func taskList(cmd *cobra.Command, args []string) {
func taskList(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
state, err := cmd.Flags().GetString("state")
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
pageNum, err := cmd.Flags().GetInt("page")
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
pageSize, err := cmd.Flags().GetInt("size")
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
switch state {
case "active":
listActiveTasks(qname, pageNum, pageSize)
return listActiveTasks(qname, pageNum, pageSize)
case "pending":
listPendingTasks(qname, pageNum, pageSize)
return listPendingTasks(qname, pageNum, pageSize)
case "scheduled":
listScheduledTasks(qname, pageNum, pageSize)
return listScheduledTasks(qname, pageNum, pageSize)
case "retry":
listRetryTasks(qname, pageNum, pageSize)
return listRetryTasks(qname, pageNum, pageSize)
case "archived":
listArchivedTasks(qname, pageNum, pageSize)
return listArchivedTasks(qname, pageNum, pageSize)
case "completed":
listCompletedTasks(qname, pageNum, pageSize)
return listCompletedTasks(qname, pageNum, pageSize)
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
if group == "" {
fmt.Println("Flag --group is required for listing aggregating tasks")
os.Exit(1)
return fmt.Errorf("flag --group is required for listing aggregating tasks")
}
listAggregatingTasks(qname, group, pageNum, pageSize)
return listAggregatingTasks(qname, group, pageNum, pageSize)
default:
fmt.Printf("error: state=%q is not supported\n", state)
os.Exit(1)
return fmt.Errorf("state=%q is not supported", state)
}
}
func listActiveTasks(qname string, pageNum, pageSize int) {
func listActiveTasks(qname string, pageNum, pageSize int) error {
i := createInspector()
tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
if len(tasks) == 0 {
fmt.Printf("No active tasks in %q queue\n", qname)
return
return nil
}
printTable(
[]string{"ID", "Type", "Payload"},
@@ -280,18 +271,18 @@ func listActiveTasks(qname string, pageNum, pageSize int) {
}
},
)
return nil
}
func listPendingTasks(qname string, pageNum, pageSize int) {
func listPendingTasks(qname string, pageNum, pageSize int) error {
i := createInspector()
tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
if len(tasks) == 0 {
fmt.Printf("No pending tasks in %q queue\n", qname)
return
return nil
}
printTable(
[]string{"ID", "Type", "Payload"},
@@ -301,18 +292,18 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
}
},
)
return nil
}
func listScheduledTasks(qname string, pageNum, pageSize int) {
func listScheduledTasks(qname string, pageNum, pageSize int) error {
i := createInspector()
tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
if len(tasks) == 0 {
fmt.Printf("No scheduled tasks in %q queue\n", qname)
return
return nil
}
printTable(
[]string{"ID", "Type", "Payload", "Process In"},
@@ -322,6 +313,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
}
},
)
return nil
}
// formatProcessAt formats next process at time to human friendly string.
@@ -335,16 +327,15 @@ func formatProcessAt(processAt time.Time) string {
return fmt.Sprintf("in %v", d.Round(time.Second))
}
func listRetryTasks(qname string, pageNum, pageSize int) {
func listRetryTasks(qname string, pageNum, pageSize int) error {
i := createInspector()
tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
if len(tasks) == 0 {
fmt.Printf("No retry tasks in %q queue\n", qname)
return
return nil
}
printTable(
[]string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"},
@@ -355,18 +346,18 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
}
},
)
return nil
}
func listArchivedTasks(qname string, pageNum, pageSize int) {
func listArchivedTasks(qname string, pageNum, pageSize int) error {
i := createInspector()
tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
if len(tasks) == 0 {
fmt.Printf("No archived tasks in %q queue\n", qname)
return
return nil
}
printTable(
[]string{"ID", "Type", "Payload", "Last Failed", "Last Error"},
@@ -375,18 +366,18 @@ func listArchivedTasks(qname string, pageNum, pageSize int) {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.LastFailedAt), t.LastErr)
}
})
return nil
}
func listCompletedTasks(qname string, pageNum, pageSize int) {
func listCompletedTasks(qname string, pageNum, pageSize int) error {
i := createInspector()
tasks, err := i.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
if len(tasks) == 0 {
fmt.Printf("No completed tasks in %q queue\n", qname)
return
return nil
}
printTable(
[]string{"ID", "Type", "Payload", "CompletedAt", "Result"},
@@ -395,18 +386,18 @@ func listCompletedTasks(qname string, pageNum, pageSize int) {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.CompletedAt), sprintBytes(t.Result))
}
})
return nil
}
func listAggregatingTasks(qname, group string, pageNum, pageSize int) {
func listAggregatingTasks(qname, group string, pageNum, pageSize int) error {
i := createInspector()
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
return err
}
if len(tasks) == 0 {
fmt.Printf("No aggregating tasks in group %q \n", group)
return
return nil
}
printTable(
[]string{"ID", "Type", "Payload", "Group"},
@@ -416,38 +407,42 @@ func listAggregatingTasks(qname, group string, pageNum, pageSize int) {
}
},
)
return nil
}
func taskCancel(cmd *cobra.Command, args []string) {
func taskCancel(cmd *cobra.Command, args []string) error {
i := createInspector()
var firstErr error
for _, id := range args {
if err := i.CancelProcessing(id); err != nil {
fmt.Printf("error: could not send cancelation signal: %v\n", err)
if firstErr == nil {
firstErr = err
}
continue
}
fmt.Printf("Sent cancelation signal for task %s\n", id)
}
return firstErr
}
func taskInspect(cmd *cobra.Command, args []string) {
func taskInspect(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
id, err := cmd.Flags().GetString("id")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
i := createInspector()
info, err := i.GetTaskInfo(qname, id)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return fmt.Errorf("could not get task info: %v", err)
}
printTaskInfo(info)
return nil
}
func printTaskInfo(info *asynq.TaskInfo) {
@@ -486,80 +481,72 @@ func formatPastTime(t time.Time) string {
return t.Format(time.UnixDate)
}
func taskArchive(cmd *cobra.Command, args []string) {
func taskArchive(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
id, err := cmd.Flags().GetString("id")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
i := createInspector()
err = i.ArchiveTask(qname, id)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return fmt.Errorf("could not archive task: %v", err)
}
fmt.Println("task archived")
return nil
}
func taskDelete(cmd *cobra.Command, args []string) {
func taskDelete(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
id, err := cmd.Flags().GetString("id")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
i := createInspector()
err = i.DeleteTask(qname, id)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return fmt.Errorf("could not delete task: %v", err)
}
fmt.Println("task deleted")
return nil
}
func taskRun(cmd *cobra.Command, args []string) {
func taskRun(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
id, err := cmd.Flags().GetString("id")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
i := createInspector()
err = i.RunTask(qname, id)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return fmt.Errorf("could not run task: %v", err)
}
fmt.Println("task is now pending")
return nil
}
func taskEnqueue(cmd *cobra.Command, args []string) {
func taskEnqueue(cmd *cobra.Command, args []string) error {
typeName, err := cmd.Flags().GetString("type_name")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
payload, err := cmd.Flags().GetString("payload")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
// For all of the optional flags, we need to explicitly check whether they were set or
@@ -569,8 +556,7 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
if cmd.Flags().Changed("retry") {
retry, err := cmd.Flags().GetInt("retry")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
opts = append(opts, asynq.MaxRetry(retry))
}
@@ -578,8 +564,7 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
if cmd.Flags().Changed("queue") {
queue, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
opts = append(opts, asynq.Queue(queue))
}
@@ -587,41 +572,63 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
if cmd.Flags().Changed("id") {
id, err := cmd.Flags().GetString("id")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
opts = append(opts, asynq.TaskID(id))
}
if cmd.Flags().Changed("timeout") {
opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout")))
d, err := getDuration(cmd, "timeout")
if err != nil {
return err
}
opts = append(opts, asynq.Timeout(d))
}
if cmd.Flags().Changed("deadline") {
opts = append(opts, asynq.Deadline(getTime(cmd, "deadline")))
t, err := getTime(cmd, "deadline")
if err != nil {
return err
}
opts = append(opts, asynq.Deadline(t))
}
if cmd.Flags().Changed("unique") {
opts = append(opts, asynq.Unique(getDuration(cmd, "unique")))
d, err := getDuration(cmd, "unique")
if err != nil {
return err
}
opts = append(opts, asynq.Unique(d))
}
if cmd.Flags().Changed("process_at") {
opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at")))
t, err := getTime(cmd, "process_at")
if err != nil {
return err
}
opts = append(opts, asynq.ProcessAt(t))
}
if cmd.Flags().Changed("process_in") {
opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in")))
d, err := getDuration(cmd, "process_in")
if err != nil {
return err
}
opts = append(opts, asynq.ProcessIn(d))
}
if cmd.Flags().Changed("retention") {
opts = append(opts, asynq.Retention(getDuration(cmd, "retention")))
d, err := getDuration(cmd, "retention")
if err != nil {
return err
}
opts = append(opts, asynq.Retention(d))
}
if cmd.Flags().Changed("group") {
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
opts = append(opts, asynq.Group(group))
}
@@ -631,23 +638,21 @@ func taskEnqueue(cmd *cobra.Command, args []string) {
taskInfo, err := c.Enqueue(task)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return fmt.Errorf("could not enqueue task: %v", err)
}
fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
return nil
}
func taskArchiveAll(cmd *cobra.Command, args []string) {
func taskArchiveAll(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
state, err := cmd.Flags().GetString("state")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
i := createInspector()
@@ -662,35 +667,35 @@ func taskArchiveAll(cmd *cobra.Command, args []string) {
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
if group == "" {
fmt.Println("error: Flag --group is required for aggregating tasks")
os.Exit(1)
return fmt.Errorf("flag --group is required for aggregating tasks")
}
n, err = i.ArchiveAllAggregatingTasks(qname, group)
if err != nil {
return err
}
fmt.Printf("%d tasks archived\n", n)
return nil
default:
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
return fmt.Errorf("unsupported state %q", state)
}
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
fmt.Printf("%d tasks archived\n", n)
return nil
}
func taskDeleteAll(cmd *cobra.Command, args []string) {
func taskDeleteAll(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
state, err := cmd.Flags().GetString("state")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
i := createInspector()
@@ -709,35 +714,35 @@ func taskDeleteAll(cmd *cobra.Command, args []string) {
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
if group == "" {
fmt.Println("error: Flag --group is required for aggregating tasks")
os.Exit(1)
return fmt.Errorf("flag --group is required for aggregating tasks")
}
n, err = i.DeleteAllAggregatingTasks(qname, group)
if err != nil {
return err
}
fmt.Printf("%d tasks deleted\n", n)
return nil
default:
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
return fmt.Errorf("unsupported state %q", state)
}
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
fmt.Printf("%d tasks deleted\n", n)
return nil
}
func taskRunAll(cmd *cobra.Command, args []string) {
func taskRunAll(cmd *cobra.Command, args []string) error {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
state, err := cmd.Flags().GetString("state")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
i := createInspector()
@@ -752,22 +757,23 @@ func taskRunAll(cmd *cobra.Command, args []string) {
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
if group == "" {
fmt.Println("error: Flag --group is required for aggregating tasks")
os.Exit(1)
return fmt.Errorf("flag --group is required for aggregating tasks")
}
n, err = i.RunAllAggregatingTasks(qname, group)
if err != nil {
return err
}
fmt.Printf("%d tasks are now pending\n", n)
return nil
default:
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
return fmt.Errorf("unsupported state %q", state)
}
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
return err
}
fmt.Printf("%d tasks are now pending\n", n)
return nil
}

View File

@@ -25,13 +25,13 @@ type QueueMetricsCollector struct {
func (qmc *QueueMetricsCollector) collectQueueInfo() ([]*asynq.QueueInfo, error) {
qnames, err := qmc.inspector.Queues()
if err != nil {
return nil, fmt.Errorf("failed to get queue names: %v", err)
return nil, fmt.Errorf("failed to get queue names: %w", err)
}
infos := make([]*asynq.QueueInfo, len(qnames))
for i, qname := range qnames {
qinfo, err := qmc.inspector.GetQueueInfo(qname)
if err != nil {
return nil, fmt.Errorf("failed to get queue info: %v", err)
return nil, fmt.Errorf("failed to get queue info: %w", err)
}
infos[i] = qinfo
}

View File

@@ -94,7 +94,7 @@ func (s *Semaphore) Release(ctx context.Context) error {
n, err := s.rc.ZRem(ctx, semaphoreKey(s.scope), taskID).Result()
if err != nil {
return fmt.Errorf("redis command failed: %v", err)
return fmt.Errorf("redis command failed: %w", err)
}
if n == 0 {