2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-02 04:45:50 +08:00

Compare commits

...

26 Commits

Author SHA1 Message Date
Mohamed Sohail
d704b68a42 Prepare release (docs): v0.26.0 (#1084)
* pre-release: v0.26.0

* deps upgrades
* min go version set to 1.24.0

* feat: done add-username-cli (#1083)

* Feature: Add Headers Support to Tasks (#1070)

* feat(task): Add headers support to tasks

* fix: cleanup copy map code

* fix: Add tests

* Add --tls option to dash command (#1073)

* Add --tls option to dash command

* Switch order so it works better when both --tls and --tls_server are set

* docs: update CHANGELOG

---------

Co-authored-by: Artemii Kulikov <91570054+vlle@users.noreply.github.com>
Co-authored-by: Joe <85931983+joejoe-am@users.noreply.github.com>
Co-authored-by: Thomas Hansen <th4019@gmail.com>
2026-02-03 09:36:26 +03:00
Mohammed Sohail
a8db5b5571 docs: update CHANGELOG 2026-02-03 09:05:40 +03:00
Thomas Hansen
e4248e2749 Add --tls option to dash command (#1073)
* Add --tls option to dash command

* Switch order so it works better when both --tls and --tls_server are set
2026-02-03 09:05:40 +03:00
Joe
c4876e7247 Feature: Add Headers Support to Tasks (#1070)
* feat(task): Add headers support to tasks

* fix: cleanup copy map code

* fix: Add tests
2026-02-03 09:05:40 +03:00
Artemii Kulikov
dd2c3de356 feat: done add-username-cli (#1083) 2026-02-03 09:05:40 +03:00
Thomas Hansen
ff887e1f89 Add --tls option to dash command (#1073)
* Add --tls option to dash command

* Switch order so it works better when both --tls and --tls_server are set
2025-11-10 11:08:53 +03:00
Joe
5de9b1faf0 Feature: Add Headers Support to Tasks (#1070)
* feat(task): Add headers support to tasks

* fix: cleanup copy map code

* fix: Add tests
2025-11-04 20:07:59 +03:00
Artemii Kulikov
8261a03f0d feat: done add-username-cli (#1083) 2025-11-04 20:02:14 +03:00
Mohammed Sohail
74c47eb8bb pre-release: v0.26.0
* deps upgrades
* min go version set to 1.24.0
2025-11-04 19:42:48 +03:00
Mohammed Sohail
e9037f003d ci: prepare github ci for go 1.24.x and 1.25.x, turn off noisy linter
* linter is producing uncessary noise
* benchstat is slow on PRs, we will turn it off untill it can be upgraded
2025-11-04 19:29:49 +03:00
Mohammed Sohail
604175e6ca ci: format code with golangci-lint 2025-11-04 19:06:18 +03:00
AmirReza Fahimi
1831a07efe fix: correct error message text in ResultWriter.Write (#1054)
Co-authored-by: amirreza.fahimidero <amirreza.fahimidero@snapp.cab>
Co-authored-by: Mohamed Sohail <sohailsameja@gmail.com>
2025-11-04 18:28:08 +03:00
Benjamin Grosse
d64f0b7ed0 wrap all fmt.Errorf errors (#1047)
Users need to be able to match with `errors.Is()` also on external
errors, for example `context.Canceled`.
2025-11-04 18:25:47 +03:00
aziz-the-dev
a889ef0b08 Implement UpdateTaskPayload method for inspector (#1042)
Co-authored-by: Aziz Aliyev <aziz.aliyev@idda.az>
2025-11-04 18:25:10 +03:00
Marin Atanasov Nikolov
093ba04266 servemux: NotFoundHandler returns ErrHandlerNotFound error (#1031)
This allows asynq middlewares to be used to inspect returned errors and decide
to silence errors about `handlers not found for task'.
2025-11-04 18:17:50 +03:00
Khash Sajadi
c327bc40a2 docs: Update server.go (#1010)
Typo in the docs
2025-04-01 09:06:12 +03:00
Broderick Westrope
ea0c6e93f0 chore: fix godoc comment (#1009) 2025-04-01 09:05:18 +03:00
Mohammed Sohail
489e21920b release: v0.25.1 2024-12-11 09:19:37 +03:00
Mohamed Sohail
043dcfbf56 fix: call Stop on all other signals to correctly set the server state for the shutdown procedure to complete successfully (#982)
* fixes: #979
2024-12-11 09:05:00 +03:00
Robin Joseph
02907551b4 feat(dash): Add --insecure option (#980) 2024-12-09 09:09:12 +03:00
Mohamed Sohail
127fac2e90 fix: NewScheduler incorrectly creates underlying Client, closing broker properly (#977)
* fix: NewScheduler wrongly creates a client whose sharedConnection value is always true

* This is affecting the PeriodicManager as well as the Scheduler

* fix: closing the Client also closes the broker

* The error was also previously unhandled. For shared connections an error will be returned by the broker itself because the sharedConnection bool is also set on the client. This also means we can get rid of the sharedConnection flag on the Scheduler itself and let it work internally.
2024-12-06 08:40:04 +03:00
dependabot[bot]
106c07adaa build(deps): bump codecov/codecov-action from 4 to 5 (#970)
Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 4 to 5.
- [Release notes](https://github.com/codecov/codecov-action/releases)
- [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md)
- [Commits](https://github.com/codecov/codecov-action/compare/v4...v5)

---
updated-dependencies:
- dependency-name: codecov/codecov-action
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-11-19 07:37:44 +03:00
dependabot[bot]
1c7195ff1a build(deps): bump google.golang.org/protobuf from 1.35.1 to 1.35.2 (#971)
Bumps google.golang.org/protobuf from 1.35.1 to 1.35.2.

---
updated-dependencies:
- dependency-name: google.golang.org/protobuf
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-11-19 07:37:13 +03:00
Xijun Dai
12cbba4926 feat(periodic_task_manager): Add RedisUniversalClient support (#958)
Signed-off-by: Xijun Dai <daixijun1990@gmail.com>
2024-11-13 14:48:56 +03:00
Khash Sajadi
80479b528d Include registration error in the log (#657)
* Include registration error in the log

* remove chatty debug log

this will show in the logs every 5 seconds as debug (not even trace) which leads to a lot of noise
2024-11-13 14:09:59 +03:00
dependabot[bot]
e14c312fe3 build(deps): bump golang.org/x/sys from 0.26.0 to 0.27.0 (#963)
Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.26.0 to 0.27.0.
- [Commits](https://github.com/golang/sys/compare/v0.26.0...v0.27.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sys
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2024-11-12 08:38:50 +03:00
34 changed files with 1140 additions and 398 deletions

View File

@@ -9,6 +9,7 @@ on: [pull_request]
jobs:
incoming:
runs-on: ubuntu-latest
if: false
services:
redis:
image: redis:7
@@ -31,6 +32,7 @@ jobs:
current:
runs-on: ubuntu-latest
if: false
services:
redis:
image: redis:7
@@ -55,6 +57,7 @@ jobs:
benchstat:
needs: [incoming, current]
if: false
runs-on: ubuntu-latest
steps:
- name: Checkout

View File

@@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.22.x, 1.23.x]
go-version: [1.24.x, 1.25.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -39,13 +39,13 @@ jobs:
run: go test -run=^$ -bench=. -loglevel=debug ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
uses: codecov/codecov-action@v5
build-tool:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.22.x, 1.23.x]
go-version: [1.24.x, 1.25.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -70,6 +70,7 @@ jobs:
golangci:
name: lint
runs-on: ubuntu-latest
if: false
steps:
- uses: actions/checkout@v4

View File

@@ -7,6 +7,45 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.26.0] - 2026-02-03
### Upgrades
- Prepare CI for Go 1.24.x and 1.25.x (commit: e9037f0)
### Added
- Add Headers support to tasks (PR: https://github.com/hibiken/asynq/pull/1070)
- Add `--tls` option to dash command (PR: https://github.com/hibiken/asynq/pull/1073)
- Add `--username` CLI flag for Redis ACL authentication (PR: https://github.com/hibiken/asynq/pull/1083)
- Add `UpdateTaskPayload` method for inspector (PR: https://github.com/hibiken/asynq/pull/1042)
### Fixes
- Fix: Correct error message text in ResultWriter.Write (PR: https://github.com/hibiken/asynq/pull/1054)
- Fix: Wrap all fmt.Errorf errors with %w (PR: https://github.com/hibiken/asynq/pull/1047)
- Fix: ServeMux.NotFoundHandler returns ErrHandlerNotFound error (PR: https://github.com/hibiken/asynq/pull/1031)
### Changed
- Docs: Update server.go documentation (PR: https://github.com/hibiken/asynq/pull/1010)
- Chore: Fix godoc comment (PR: https://github.com/hibiken/asynq/pull/1009)
## [0.25.1] - 2024-12-11
### Upgrades
* Some packages
### Added
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
### Fixes
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
## [0.25.0] - 2024-10-29
### Upgrades

View File

@@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) {
}
tasks := make([]*Task, len(msgs))
for i, m := range msgs {
tasks[i] = NewTask(m.Type, m.Payload)
tasks[i] = NewTaskWithHeaders(m.Type, m.Payload, m.Headers)
}
aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline)

View File

@@ -8,14 +8,15 @@ import (
"context"
"crypto/tls"
"fmt"
"maps"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
)
// Task represents a unit of work to be performed.
@@ -26,6 +27,9 @@ type Task struct {
// payload holds data needed to perform the task.
payload []byte
// headers holds additional metadata for the task.
headers map[string]string
// opts holds options for the task.
opts []Option
@@ -33,8 +37,9 @@ type Task struct {
w *ResultWriter
}
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Headers() map[string]string { return t.headers }
// ResultWriter returns a pointer to the ResultWriter associated with the task.
//
@@ -48,6 +53,21 @@ func NewTask(typename string, payload []byte, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: nil,
opts: opts,
}
}
// NewTaskWithHeaders returns a new Task given a type name, payload data, and headers.
// Options can be passed to configure task processing behavior.
// TODO: In the next major (breaking) release, fold this functionality into NewTask
//
// so that headers are supported directly. After that, remove this method.
func NewTaskWithHeaders(typename string, payload []byte, headers map[string]string, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: maps.Clone(headers),
opts: opts,
}
}
@@ -57,6 +77,7 @@ func newTask(typename string, payload []byte, w *ResultWriter) *Task {
return &Task{
typename: typename,
payload: payload,
headers: make(map[string]string),
w: w,
}
}
@@ -75,6 +96,9 @@ type TaskInfo struct {
// Payload is the payload data of the task.
Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// State indicates the task state.
State TaskState
@@ -145,6 +169,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
Queue: msg.Queue,
Type: msg.Type,
Payload: msg.Payload, // Do we need to make a copy?
Headers: msg.Headers,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastErr: msg.ErrorMsg,
@@ -442,14 +467,15 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
//
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are:
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
//
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
return nil, fmt.Errorf("asynq: could not parse redis uri: %w", err)
}
switch u.Scheme {
case "redis", "rediss":
@@ -539,7 +565,7 @@ type ResultWriter struct {
func (w *ResultWriter) Write(data []byte) (n int, err error) {
select {
case <-w.ctx.Done():
return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err())
return 0, fmt.Errorf("failed to write task result: %w", w.ctx.Err())
default:
}
return w.broker.WriteResult(w.qname, w.id, data)

View File

@@ -11,11 +11,11 @@ import (
"strings"
"testing"
"github.com/redis/go-redis/v9"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/log"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
)
//============================================================================

View File

@@ -385,6 +385,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: task.Headers(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),

View File

@@ -1191,3 +1191,473 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
}
}
}
func TestClientEnqueueWithHeaders(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"user-id": "123",
"request-id": "abc-def-ghi",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantPending map[string][]*base.TaskMessage
}{
{
desc: "Task with headers",
task: NewTaskWithHeaders("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}), headers),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with empty headers",
task: NewTaskWithHeaders("process_data", []byte("data"), map[string]string{}),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "process_data",
Payload: []byte("data"),
Headers: map[string]string{},
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "process_data",
Payload: []byte("data"),
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with nil headers",
task: NewTaskWithHeaders("cleanup", nil, nil),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "cleanup",
Payload: nil,
Headers: nil,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "cleanup",
Payload: nil,
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with headers and custom options",
task: NewTaskWithHeaders("notify", []byte("notification"), map[string]string{"channel": "email"}),
opts: []Option{MaxRetry(5), Queue("notifications")},
wantInfo: &TaskInfo{
Queue: "notifications",
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"notifications": {
{
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
Retry: 5,
Queue: "notifications",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantPending {
got := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
}
}
func TestClientEnqueueWithHeadersScheduled(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
oneHourLater := now.Add(time.Hour)
headers := map[string]string{
"correlation-id": "xyz-123",
"source": "api",
}
tests := []struct {
desc string
task *Task
processAt time.Time
opts []Option
wantInfo *TaskInfo
wantScheduled map[string][]base.Z
}{
{
desc: "Schedule task with headers",
task: NewTaskWithHeaders("scheduled_task", []byte("payload"), headers),
processAt: oneHourLater,
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
State: TaskStateScheduled,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: oneHourLater,
},
wantScheduled: map[string][]base.Z{
"default": {
{
Message: &base.TaskMessage{
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
Score: oneHourLater.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
opts := append(tc.opts, ProcessAt(tc.processAt))
gotInfo, err := client.Enqueue(tc.task, opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
}
}
}
}
func TestNewTaskWithHeaders(t *testing.T) {
tests := []struct {
desc string
typename string
payload []byte
headers map[string]string
opts []Option
want *Task
}{
{
desc: "Task with headers",
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
want: &Task{
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
},
},
{
desc: "Task with empty headers",
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
want: &Task{
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
},
},
{
desc: "Task with nil headers",
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
want: &Task{
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
},
},
}
for _, tc := range tests {
got := NewTaskWithHeaders(tc.typename, tc.payload, tc.headers, tc.opts...)
if got.Type() != tc.want.typename {
t.Errorf("%s: Type() = %q, want %q", tc.desc, got.Type(), tc.want.typename)
}
if diff := cmp.Diff(tc.want.payload, got.Payload()); diff != "" {
t.Errorf("%s: Payload() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if diff := cmp.Diff(tc.want.headers, got.Headers()); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if tc.headers != nil && got.Headers() != nil {
tc.headers["modified"] = "test"
if _, exists := got.Headers()["modified"]; exists {
t.Errorf("%s: Headers should be cloned, but modification affected task headers", tc.desc)
}
}
}
}
func TestTaskHeadersMethod(t *testing.T) {
tests := []struct {
desc string
task *Task
want map[string]string
wantNil bool
}{
{
desc: "Task created with NewTask has nil headers",
task: NewTask("test", []byte("data")),
want: nil,
wantNil: true,
},
{
desc: "Task created with NewTaskWithHeaders has headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{"key": "value"}),
want: map[string]string{"key": "value"},
},
{
desc: "Task created with empty headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{}),
want: map[string]string{},
},
{
desc: "Task created with nil headers",
task: NewTaskWithHeaders("test", []byte("data"), nil),
want: nil,
wantNil: true,
},
}
for _, tc := range tests {
got := tc.task.Headers()
if tc.wantNil {
if got != nil {
t.Errorf("%s: Headers() = %v, want nil", tc.desc, got)
}
} else {
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
}
}
}
func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"batch-id": "batch-123",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantGroups map[string]map[string][]base.Z
}{
{
desc: "Task with headers and group",
task: NewTaskWithHeaders("batch_process", []byte("item1"), headers),
opts: []Option{Group("batch-123")},
wantInfo: &TaskInfo{
Queue: "default",
Group: "batch-123",
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
State: TaskStateAggregating,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: time.Time{},
},
wantGroups: map[string]map[string][]base.Z{
"default": {
"batch-123": {
{
Message: &base.TaskMessage{
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
GroupKey: "batch-123",
},
Score: now.Unix(),
},
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, groups := range tc.wantGroups {
for groupKey, want := range groups {
got := h.GetGroupEntries(t, r, qname, groupKey)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.GroupKey(qname, groupKey), diff)
}
}
}
}
}

81
doc.go
View File

@@ -8,41 +8,41 @@ Package asynq provides a framework for Redis based distrubted task queue.
Asynq uses Redis as a message broker. To connect to redis,
specify the connection using one of RedisConnOpt types.
redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "xxxxx",
DB: 2,
}
redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "xxxxx",
DB: 2,
}
The Client is used to enqueue a task.
client := asynq.NewClient(redisConnOpt)
client := asynq.NewClient(redisConnOpt)
// Task is created with two parameters: its type and payload.
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
// Task is created with two parameters: its type and payload.
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
task := asynq.NewTask("example", b)
task := asynq.NewTask("example", b)
// Enqueue the task to be processed immediately.
info, err := client.Enqueue(task)
// Enqueue the task to be processed immediately.
info, err := client.Enqueue(task)
// Schedule the task to be processed after one minute.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
// Schedule the task to be processed after one minute.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
The Server is used to run the task processing workers with a given
handler.
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 10,
})
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 10,
})
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
Handler is an interface type with a method which
takes a task and returns an error. Handler should return nil if
@@ -50,23 +50,24 @@ the processing is successful, otherwise return a non-nil error.
If handler panics or returns a non-nil error, the task will be retried in the future.
Example of a type that implements the Handler interface.
type TaskHandler struct {
// ...
}
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
switch task.Type {
case "example":
var data ExamplePayload
if err := json.Unmarshal(task.Payload(), &data); err != nil {
return err
}
// perform task with the data
type TaskHandler struct {
// ...
}
default:
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
switch task.Type {
case "example":
var data ExamplePayload
if err := json.Unmarshal(task.Payload(), &data); err != nil {
return err
}
// perform task with the data
default:
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}
*/
package asynq

View File

@@ -123,7 +123,7 @@ func ExampleResultWriter() {
res := []byte("task result data")
n, err := task.ResultWriter().Write(res) // implements io.Writer
if err != nil {
return fmt.Errorf("failed to write task result: %v", err)
return fmt.Errorf("failed to write task result: %w", err)
}
log.Printf(" %d bytes written", n)
return nil

16
go.mod
View File

@@ -1,20 +1,20 @@
module github.com/hibiken/asynq
go 1.22
go 1.24.0
require (
github.com/google/go-cmp v0.6.0
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.7.0
github.com/redis/go-redis/v9 v9.14.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.7.0
github.com/spf13/cast v1.10.0
go.uber.org/goleak v1.3.0
golang.org/x/sys v0.26.0
golang.org/x/time v0.8.0
google.golang.org/protobuf v1.35.1
golang.org/x/sys v0.37.0
golang.org/x/time v0.14.0
google.golang.org/protobuf v1.36.10
)
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)

28
go.sum
View File

@@ -2,16 +2,16 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -20,23 +20,23 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -245,7 +245,7 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
case errors.IsTaskNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
}
@@ -316,7 +316,7 @@ func Page(n int) ListOption {
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -325,7 +325,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -344,7 +344,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -353,11 +353,11 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
expired, err := i.rdb.ListLeaseExpired(time.Now(), queue)
if err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
expiredSet := make(map[string]struct{}) // set of expired message IDs
for _, msg := range expired {
@@ -384,7 +384,7 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -393,7 +393,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -413,7 +413,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -422,7 +422,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -442,7 +442,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -451,7 +451,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -471,7 +471,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -480,7 +480,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -500,7 +500,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -509,7 +509,7 @@ func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -583,6 +583,30 @@ func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error)
return int(n), err
}
// UpdateTaskPayload updates a task with the given id from the given queue with given payload.
// The task needs to be in scheduled state,
// otherwise UpdateTaskPayload will return an error.
//
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is not in scheduled state, it returns a non-nil error.
func (i *Inspector) UpdateTaskPayload(queue, id string, payload []byte) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
}
err := i.rdb.UpdateTaskPayload(queue, id, payload)
switch {
case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
}
return nil
}
// DeleteTask deletes a task with the given id from the given queue.
// The task needs to be in pending, scheduled, retry, or archived state,
// otherwise DeleteTask will return an error.
@@ -592,7 +616,7 @@ func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error)
// If the task is in active state, it returns a non-nil error.
func (i *Inspector) DeleteTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
err := i.rdb.DeleteTask(queue, id)
switch {
@@ -601,7 +625,7 @@ func (i *Inspector) DeleteTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
@@ -656,7 +680,7 @@ func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) {
// If the task is in pending or active state, it returns a non-nil error.
func (i *Inspector) RunTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
err := i.rdb.RunTask(queue, id)
switch {
@@ -665,7 +689,7 @@ func (i *Inspector) RunTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
}
@@ -728,7 +752,7 @@ func (i *Inspector) ArchiveTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
}

View File

@@ -2369,6 +2369,148 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
}
}
func TestInspectorUpdateTaskPayloadUpdatesScheduledTaskPayload(t *testing.T) {
r := setup(t)
defer r.Close()
m1_old := h.NewTaskMessage("task1", []byte("m1_old"))
m1_new := h.NewTaskMessage("task1", nil)
m1_new.ID = m1_old.ID
m2_old := h.NewTaskMessage("task2", nil)
m2_new := h.NewTaskMessage("task2", []byte("m2_new"))
m2_new.ID = m2_old.ID
m3_old := h.NewTaskMessageWithQueue("task3", []byte("m3_old"), "custom")
m3_new := h.NewTaskMessageWithQueue("task3", []byte("m3_new"), "custom")
m3_new.ID = m3_old.ID
now := time.Now()
z1_old := base.Z{Message: m1_old, Score: now.Add(5 * time.Minute).Unix()}
z1_new := base.Z{Message: m1_new, Score: now.Add(5 * time.Minute).Unix()}
z2_old := base.Z{Message: m2_old, Score: now.Add(15 * time.Minute).Unix()}
z2_new := base.Z{Message: m2_new, Score: now.Add(15 * time.Minute).Unix()}
z3_old := base.Z{Message: m3_old, Score: now.Add(2 * time.Minute).Unix()}
z3_new := base.Z{Message: m3_new, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
qname string
id string
newPayload []byte
wantScheduled map[string][]base.Z
}{
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "default",
id: createScheduledTask(z2_old).ID,
newPayload: m2_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_old, z2_new},
"custom": {z3_old},
},
},
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "default",
id: createScheduledTask(z1_old).ID,
newPayload: m1_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_new, z2_old},
"custom": {z3_old},
},
},
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "custom",
id: createScheduledTask(z3_old).ID,
newPayload: m3_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_new},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllScheduledQueues(t, r, tc.scheduled)
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); err != nil {
t.Errorf("UpdateTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
}
}
}
}
func TestInspectorUpdateTaskPayloadError(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
tasks map[string][]base.Z
qname string
id string
newPayload []byte
wantErr error
}{
{
tasks: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "nonexistent",
id: createScheduledTask(z2).ID,
newPayload: nil,
wantErr: ErrQueueNotFound,
},
{
tasks: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "default",
id: uuid.NewString(),
newPayload: nil,
wantErr: ErrTaskNotFound,
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllScheduledQueues(t, r, tc.tasks)
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); !errors.Is(err, tc.wantErr) {
t.Errorf("UpdateTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr)
continue
}
}
}
func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
r := setup(t)
defer r.Close()

View File

@@ -23,7 +23,7 @@ import (
)
// Version of asynq library and CLI.
const Version = "0.25.0"
const Version = "0.26.0"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"
@@ -240,6 +240,9 @@ type TaskMessage struct {
// Payload holds data needed to process the task.
Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// ID is a unique identifier for each task.
ID string
@@ -304,6 +307,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
return proto.Marshal(&pb.TaskMessage{
Type: msg.Type,
Payload: msg.Payload,
Headers: msg.Headers,
Id: msg.ID,
Queue: msg.Queue,
Retry: int32(msg.Retry),
@@ -328,6 +332,7 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
return &TaskMessage{
Type: pbmsg.GetType(),
Payload: pbmsg.GetPayload(),
Headers: pbmsg.GetHeaders(),
ID: pbmsg.GetId(),
Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()),

View File

@@ -107,6 +107,7 @@ type Op string
// only the last one is recorded.
//
// The types are:
//
// errors.Op
// The operation being performed, usually the method
// being invoked (Get, Put, etc.).

View File

@@ -4,8 +4,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.34.2
// protoc v3.19.6
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: asynq.proto
package proto
@@ -16,6 +16,7 @@ import (
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
@@ -27,16 +28,15 @@ const (
// TaskMessage is the internal representation of a task with additional
// metadata fields.
// Next ID: 15
// Next ID: 16
type TaskMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Type indicates the kind of the task to be performed.
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
// Payload holds data needed to process the task.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
// Headers holds additional metadata for the task.
Headers map[string]string `protobuf:"bytes,15,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
// Unique identifier for the task.
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
// Name of the queue to which this task belongs.
@@ -71,16 +71,16 @@ type TaskMessage struct {
// Time when the task completed in success in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
// This field is populated if result_ttl > 0 upon completion.
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *TaskMessage) Reset() {
*x = TaskMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *TaskMessage) String() string {
@@ -91,7 +91,7 @@ func (*TaskMessage) ProtoMessage() {}
func (x *TaskMessage) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -120,6 +120,13 @@ func (x *TaskMessage) GetPayload() []byte {
return nil
}
func (x *TaskMessage) GetHeaders() map[string]string {
if x != nil {
return x.Headers
}
return nil
}
func (x *TaskMessage) GetId() string {
if x != nil {
return x.Id
@@ -206,10 +213,7 @@ func (x *TaskMessage) GetCompletedAt() int64 {
// ServerInfo holds information about a running server.
type ServerInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Host machine the server is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the server process.
@@ -221,7 +225,7 @@ type ServerInfo struct {
// List of queue names with their priorities.
// The server will consume tasks from the queues and prioritize
// queues with higher priority numbers.
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
// If set, the server will always consume tasks from a queue with higher
// priority.
StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"`
@@ -231,15 +235,15 @@ type ServerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Number of workers currently processing tasks.
ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ServerInfo) Reset() {
*x = ServerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ServerInfo) String() string {
@@ -250,7 +254,7 @@ func (*ServerInfo) ProtoMessage() {}
func (x *ServerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -330,10 +334,7 @@ func (x *ServerInfo) GetActiveWorkerCount() int32 {
// WorkerInfo holds information about a running worker.
type WorkerInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Host matchine this worker is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the process in which this worker is running.
@@ -352,16 +353,16 @@ type WorkerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Deadline by which the worker needs to complete processing
// the task. If worker exceeds the deadline, the task will fail.
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *WorkerInfo) Reset() {
*x = WorkerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *WorkerInfo) String() string {
@@ -372,7 +373,7 @@ func (*WorkerInfo) ProtoMessage() {}
func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -453,10 +454,7 @@ func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp {
// SchedulerEntry holds information about a periodic task registered
// with a scheduler.
type SchedulerEntry struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Identifier of the scheduler entry.
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// Periodic schedule spec of the entry.
@@ -472,15 +470,15 @@ type SchedulerEntry struct {
// Last time the task was enqueued.
// Zero time if task was never enqueued.
PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SchedulerEntry) Reset() {
*x = SchedulerEntry{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SchedulerEntry) String() string {
@@ -491,7 +489,7 @@ func (*SchedulerEntry) ProtoMessage() {}
func (x *SchedulerEntry) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -558,23 +556,20 @@ func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp {
// SchedulerEnqueueEvent holds information about an enqueue event
// by a scheduler.
type SchedulerEnqueueEvent struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// ID of the task that was enqueued.
TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
// Time the task was enqueued.
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SchedulerEnqueueEvent) Reset() {
*x = SchedulerEnqueueEvent{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SchedulerEnqueueEvent) String() string {
@@ -585,7 +580,7 @@ func (*SchedulerEnqueueEvent) ProtoMessage() {}
func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -616,146 +611,106 @@ func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp {
var File_asynq_proto protoreflect.FileDescriptor
var file_asynq_proto_rawDesc = []byte{
0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61,
0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79,
0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c,
0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74,
0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12,
0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05,
0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66,
0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c,
0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07,
0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74,
0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79,
0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65,
0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c,
0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28,
0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c,
0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01,
0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22,
0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12,
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f,
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52,
0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69,
0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49,
0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79,
0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65,
0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74,
0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20,
0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72,
0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73,
0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61,
0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65,
0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20,
0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65,
0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f,
0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65,
0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05,
0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65,
0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65,
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a,
0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61,
0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75,
0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f,
0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18,
0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e,
0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65,
0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a,
0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75,
0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17,
0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79,
0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
const file_asynq_proto_rawDesc = "" +
"\n" +
"\vasynq.proto\x12\x05asynq\x1a\x1fgoogle/protobuf/timestamp.proto\"\xfe\x03\n" +
"\vTaskMessage\x12\x12\n" +
"\x04type\x18\x01 \x01(\tR\x04type\x12\x18\n" +
"\apayload\x18\x02 \x01(\fR\apayload\x129\n" +
"\aheaders\x18\x0f \x03(\v2\x1f.asynq.TaskMessage.HeadersEntryR\aheaders\x12\x0e\n" +
"\x02id\x18\x03 \x01(\tR\x02id\x12\x14\n" +
"\x05queue\x18\x04 \x01(\tR\x05queue\x12\x14\n" +
"\x05retry\x18\x05 \x01(\x05R\x05retry\x12\x18\n" +
"\aretried\x18\x06 \x01(\x05R\aretried\x12\x1b\n" +
"\terror_msg\x18\a \x01(\tR\berrorMsg\x12$\n" +
"\x0elast_failed_at\x18\v \x01(\x03R\flastFailedAt\x12\x18\n" +
"\atimeout\x18\b \x01(\x03R\atimeout\x12\x1a\n" +
"\bdeadline\x18\t \x01(\x03R\bdeadline\x12\x1d\n" +
"\n" +
"unique_key\x18\n" +
" \x01(\tR\tuniqueKey\x12\x1b\n" +
"\tgroup_key\x18\x0e \x01(\tR\bgroupKey\x12\x1c\n" +
"\tretention\x18\f \x01(\x03R\tretention\x12!\n" +
"\fcompleted_at\x18\r \x01(\x03R\vcompletedAt\x1a:\n" +
"\fHeadersEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x8f\x03\n" +
"\n" +
"ServerInfo\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12 \n" +
"\vconcurrency\x18\x04 \x01(\x05R\vconcurrency\x125\n" +
"\x06queues\x18\x05 \x03(\v2\x1d.asynq.ServerInfo.QueuesEntryR\x06queues\x12'\n" +
"\x0fstrict_priority\x18\x06 \x01(\bR\x0estrictPriority\x12\x16\n" +
"\x06status\x18\a \x01(\tR\x06status\x129\n" +
"\n" +
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x12.\n" +
"\x13active_worker_count\x18\t \x01(\x05R\x11activeWorkerCount\x1a9\n" +
"\vQueuesEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\x05R\x05value:\x028\x01\"\xb1\x02\n" +
"\n" +
"WorkerInfo\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12\x17\n" +
"\atask_id\x18\x04 \x01(\tR\x06taskId\x12\x1b\n" +
"\ttask_type\x18\x05 \x01(\tR\btaskType\x12!\n" +
"\ftask_payload\x18\x06 \x01(\fR\vtaskPayload\x12\x14\n" +
"\x05queue\x18\a \x01(\tR\x05queue\x129\n" +
"\n" +
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x126\n" +
"\bdeadline\x18\t \x01(\v2\x1a.google.protobuf.TimestampR\bdeadline\"\xad\x02\n" +
"\x0eSchedulerEntry\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" +
"\x04spec\x18\x02 \x01(\tR\x04spec\x12\x1b\n" +
"\ttask_type\x18\x03 \x01(\tR\btaskType\x12!\n" +
"\ftask_payload\x18\x04 \x01(\fR\vtaskPayload\x12'\n" +
"\x0fenqueue_options\x18\x05 \x03(\tR\x0eenqueueOptions\x12F\n" +
"\x11next_enqueue_time\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\x0fnextEnqueueTime\x12F\n" +
"\x11prev_enqueue_time\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\x0fprevEnqueueTime\"o\n" +
"\x15SchedulerEnqueueEvent\x12\x17\n" +
"\atask_id\x18\x01 \x01(\tR\x06taskId\x12=\n" +
"\fenqueue_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\venqueueTimeB)Z'github.com/hibiken/asynq/internal/protob\x06proto3"
var (
file_asynq_proto_rawDescOnce sync.Once
file_asynq_proto_rawDescData = file_asynq_proto_rawDesc
file_asynq_proto_rawDescData []byte
)
func file_asynq_proto_rawDescGZIP() []byte {
file_asynq_proto_rawDescOnce.Do(func() {
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData)
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)))
})
return file_asynq_proto_rawDescData
}
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_asynq_proto_goTypes = []any{
(*TaskMessage)(nil), // 0: asynq.TaskMessage
(*ServerInfo)(nil), // 1: asynq.ServerInfo
(*WorkerInfo)(nil), // 2: asynq.WorkerInfo
(*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry
(*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent
nil, // 5: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp
nil, // 5: asynq.TaskMessage.HeadersEntry
nil, // 6: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
}
var file_asynq_proto_depIdxs = []int32{
5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
7, // [7:7] is the sub-list for method output_type
7, // [7:7] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
5, // 0: asynq.TaskMessage.headers:type_name -> asynq.TaskMessage.HeadersEntry
6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
8, // [8:8] is the sub-list for method output_type
8, // [8:8] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
}
func init() { file_asynq_proto_init() }
@@ -763,75 +718,13 @@ func file_asynq_proto_init() {
if File_asynq_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_asynq_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*TaskMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*ServerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*WorkerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[3].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEntry); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[4].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEnqueueEvent); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_asynq_proto_rawDesc,
RawDescriptor: unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)),
NumEnums: 0,
NumMessages: 6,
NumMessages: 7,
NumExtensions: 0,
NumServices: 0,
},
@@ -840,7 +733,6 @@ func file_asynq_proto_init() {
MessageInfos: file_asynq_proto_msgTypes,
}.Build()
File_asynq_proto = out.File
file_asynq_proto_rawDesc = nil
file_asynq_proto_goTypes = nil
file_asynq_proto_depIdxs = nil
}

View File

@@ -11,7 +11,7 @@ option go_package = "github.com/hibiken/asynq/internal/proto";
// TaskMessage is the internal representation of a task with additional
// metadata fields.
// Next ID: 15
// Next ID: 16
message TaskMessage {
// Type indicates the kind of the task to be performed.
string type = 1;
@@ -19,6 +19,9 @@ message TaskMessage {
// Payload holds data needed to process the task.
bytes payload = 2;
// Headers holds additional metadata for the task.
map<string, string> headers = 15;
// Unique identifier for the task.
string id = 3;

View File

@@ -1412,6 +1412,93 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
return n, nil
}
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// --
// ARGV[1] -> task message data
//
// Output:
// Numeric code indicating the status:
// Returns 1 if task is successfully updated.
// Returns 0 if task is not found.
// Returns -1 if task is not in scheduled state.
var updateTaskPayloadCmd = redis.NewScript(`
-- Check if given taks exists
if redis.call("EXISTS", KEYS[1]) == 0 then
return 0
end
local state, pending_since, group, unique_key = unpack(redis.call("HMGET", KEYS[1], "state", "pending_since", "group", "unique_key"))
if state ~= "scheduled" then
return -1
end
local redis_call_args = {"state", state}
if pending_since then
table.insert(redis_call_args, "pending_since")
table.insert(redis_call_args, pending_since)
end
if group then
table.insert(redis_call_args, "group")
table.insert(redis_call_args, group)
end
if unique_key then
table.insert(redis_call_args, "unique_key")
table.insert(redis_call_args, unique_key)
end
redis.call("HSET", KEYS[1], "msg", ARGV[1], unpack(redis_call_args))
return 1
`)
// UpdateTaskPayload finds a task that matches the id from the given queue and updates it's payload.
// It returns nil if it successfully updated the task payload.
//
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
// If a task is in active state it returns non-nil error with Code FailedPrecondition.
func (r *RDB) UpdateTaskPayload(qname, id string, payload []byte) error {
var op errors.Op = "rdb.UpdateTask"
if err := r.checkQueueExists(qname); err != nil {
return errors.E(op, errors.CanonicalCode(err), err)
}
taskInfo, err := r.GetTaskInfo(qname, id)
if err != nil {
return errors.E(op, errors.Unknown, err)
}
taskInfo.Message.Payload = payload
encoded, err := base.EncodeMessage(taskInfo.Message)
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
keys := []string{
base.TaskKey(qname, id),
}
argv := []interface{}{
encoded,
}
res, err := updateTaskPayloadCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, err)
}
n, ok := res.(int64)
if !ok {
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: updateTaskCmd script returned unexported value %v", res))
}
switch n {
case 1:
return nil
case 0:
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id})
case -1:
return errors.E(op, errors.FailedPrecondition, "cannot update task that is not in scheduled state.")
default:
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from updateTaskCmd script: %d", n))
}
}
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:groups

View File

@@ -10,6 +10,8 @@ import (
"sort"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
// PeriodicTaskManager manages scheduling of periodic tasks.
@@ -27,9 +29,12 @@ type PeriodicTaskManagerOpts struct {
// Required: must be non nil
PeriodicTaskConfigProvider PeriodicTaskConfigProvider
// Required: must be non nil
// Optional: if RedisUniversalClient is nil must be non nil
RedisConnOpt RedisConnOpt
// Optional: if RedisUniversalClient is non nil, RedisConnOpt is ignored.
RedisUniversalClient redis.UniversalClient
// Optional: scheduler options
*SchedulerOpts
@@ -45,10 +50,16 @@ func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager,
if opts.PeriodicTaskConfigProvider == nil {
return nil, fmt.Errorf("PeriodicTaskConfigProvider cannot be nil")
}
if opts.RedisConnOpt == nil {
return nil, fmt.Errorf("RedisConnOpt cannot be nil")
if opts.RedisConnOpt == nil && opts.RedisUniversalClient == nil {
return nil, fmt.Errorf("RedisConnOpt/RedisUniversalClient cannot be nil")
}
scheduler := NewScheduler(opts.RedisConnOpt, opts.SchedulerOpts)
var scheduler *Scheduler
if opts.RedisUniversalClient != nil {
scheduler = NewSchedulerFromRedisClient(opts.RedisUniversalClient, opts.SchedulerOpts)
} else {
scheduler = NewScheduler(opts.RedisConnOpt, opts.SchedulerOpts)
}
syncInterval := opts.SyncInterval
if syncInterval == 0 {
syncInterval = defaultSyncInterval
@@ -111,10 +122,10 @@ func (mgr *PeriodicTaskManager) Start() error {
panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize")
}
if err := mgr.initialSync(); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
if err := mgr.s.Start(); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
mgr.wg.Add(1)
go func() {
@@ -157,11 +168,11 @@ func (mgr *PeriodicTaskManager) Run() error {
func (mgr *PeriodicTaskManager) initialSync() error {
configs, err := mgr.p.GetConfigs()
if err != nil {
return fmt.Errorf("initial call to GetConfigs failed: %v", err)
return fmt.Errorf("initial call to GetConfigs failed: %w", err)
}
for _, c := range configs {
if err := validatePeriodicTaskConfig(c); err != nil {
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %v", err)
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %w", err)
}
}
mgr.add(configs)
@@ -172,8 +183,8 @@ func (mgr *PeriodicTaskManager) add(configs []*PeriodicTaskConfig) {
for _, c := range configs {
entryID, err := mgr.s.Register(c.Cronspec, c.Task, c.Opts...)
if err != nil {
mgr.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q",
c.Cronspec, c.Task.Type())
mgr.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q err=%v",
c.Cronspec, c.Task.Type(), err)
continue
}
mgr.m[c.hash()] = entryID

View File

@@ -230,6 +230,7 @@ func (p *processor) exec() {
ctx: ctx,
},
)
task.headers = msg.Headers
resCh <- p.perform(ctx, task)
}()
@@ -333,7 +334,7 @@ var RevokeTask = errors.New("revoke task")
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
p.errHandler.HandleError(ctx, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers), err)
}
switch {
case errors.Is(err, RevokeTask):
@@ -354,7 +355,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
d := p.retryDelayFunc(msg.Retried, e, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
retryAt := time.Now().Add(d)
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
if err != nil {

View File

@@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() {
}
func (r *recoverer) retry(msg *base.TaskMessage, err error) {
delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload))
delay := r.retryDelayFunc(msg.Retried, err, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
retryAt := time.Now().Add(delay)
if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)

View File

@@ -44,9 +44,6 @@ type Scheduler struct {
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}
const defaultHeartbeatInterval = 10 * time.Second
@@ -54,12 +51,18 @@ const defaultHeartbeatInterval = 10 * time.Second
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
scheduler := newScheduler(opts)
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false
rdb := rdb.NewRDB(redisClient)
scheduler.rdb = rdb
scheduler.client = &Client{broker: rdb, sharedConnection: false}
return scheduler
}
@@ -67,6 +70,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
scheduler := newScheduler(opts)
scheduler.rdb = rdb.NewRDB(c)
scheduler.client = NewClientFromRedisClient(c)
return scheduler
}
func newScheduler(opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
@@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
state: &serverState{value: srvStateNew},
heartbeatInterval: heartbeatInterval,
logger: logger,
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
@@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() {
if err := s.client.Close(); err != nil {
s.logger.Errorf("Failed to close redis client connection: %v", err)
}
if !s.sharedConnection {
s.rdb.Close()
}
s.logger.Info("Scheduler stopped")
}
@@ -334,7 +341,6 @@ func (s *Scheduler) beat() {
}
entries = append(entries, e)
}
s.logger.Debugf("Writing entries %v", entries)
if err := s.rdb.WriteSchedulerEntries(s.id, entries, s.heartbeatInterval*2); err != nil {
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
}

View File

@@ -6,12 +6,16 @@ package asynq
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
)
// ErrHandlerNotFound indicates that no task handler was found for a given pattern.
var ErrHandlerNotFound = errors.New("handler not found for task")
// ServeMux is a multiplexer for asynchronous tasks.
// It matches the type of each task against a list of registered patterns
// and calls the handler for the pattern that most closely matches the
@@ -149,8 +153,8 @@ func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
// NotFound returns an error indicating that the handler was not found for the given task.
func NotFound(ctx context.Context, task *Task) error {
return fmt.Errorf("handler not found for task %q", task.Type())
return fmt.Errorf("%w %q", ErrHandlerNotFound, task.Type())
}
// NotFoundHandler returns a simple task handler that returns a ``not found`` error.
// NotFoundHandler returns a simple task handler that returns a not found error.
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }

View File

@@ -24,7 +24,7 @@ func makeFakeHandler(identity string) Handler {
}
// makeFakeMiddleware returns a middleware function that appends the given identity
//to the global invoked slice.
// to the global invoked slice.
func makeFakeMiddleware(identity string) MiddlewareFunc {
return func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, t *Task) error {

View File

@@ -174,16 +174,15 @@ type Config struct {
// })
//
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
//
// we can also handle panic error like:
// func reportError(ctx context, task *asynq.Task, err error) {
// if asynq.IsPanic(err) {
// if asynq.IsPanicError(err) {
// errorReportingService.Notify(err)
// }
// })
//
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler
// Logger specifies the logger used by the server instance.

View File

@@ -24,8 +24,10 @@ func (srv *Server) waitForSignals() {
if sig == unix.SIGTSTP {
srv.Stop()
continue
} else {
srv.Stop()
break
}
break
}
}

View File

@@ -8,9 +8,9 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/redis/go-redis/v9"
)
type subscriber struct {

View File

@@ -39,6 +39,7 @@ By default, CLI will try to connect to a redis server running at `localhost:6379
--config string config file to set flag defaut values (default is $HOME/.asynq.yaml)
-n, --db int redis database number (default is 0)
-h, --help help for asynq
-U, --username string username to use when connecting to redis server
-p, --password string password to use when connecting to redis server
-u, --uri string redis server URI (default "127.0.0.1:6379")

View File

@@ -16,11 +16,12 @@ import (
// ScreenDrawer is used to draw contents on screen.
//
// Usage example:
// d := NewScreenDrawer(s)
// d.Println("Hello world", mystyle)
// d.NL() // adds newline
// d.Print("foo", mystyle.Bold(true))
// d.Print("bar", mystyle.Italic(true))
//
// d := NewScreenDrawer(s)
// d.Println("Hello world", mystyle)
// d.NL() // adds newline
// d.Print("foo", mystyle.Bold(true))
// d.Print("bar", mystyle.Italic(true))
type ScreenDrawer struct {
l *LineDrawer
}

View File

@@ -1,3 +1,4 @@
//
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
@@ -36,10 +37,13 @@ var (
uri string
db int
password string
username string
useRedisCluster bool
clusterAddrs string
tlsServerName string
insecure bool
useTLS bool
)
// rootCmd represents the base command when called without any subcommands
@@ -257,7 +261,6 @@ func adjustPadding(lines ...*displayLine) {
func rpad(s string, padding int) string {
tmpl := fmt.Sprintf("%%-%ds ", padding)
return fmt.Sprintf(tmpl, s)
}
// lpad adds padding to the left of a string.
@@ -308,19 +311,26 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "Redis server URI")
rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "Redis database number (default is 0)")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password to use when connecting to redis server")
rootCmd.PersistentFlags().StringVarP(&username, "username", "U", "", "Username to use when connecting to Redis (ACL username)")
rootCmd.PersistentFlags().BoolVar(&useRedisCluster, "cluster", false, "Connect to redis cluster")
rootCmd.PersistentFlags().StringVar(&clusterAddrs, "cluster_addrs",
"127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005",
"List of comma-separated redis server addresses")
rootCmd.PersistentFlags().BoolVar(&useTLS, "tls", false, "Enable TLS connection")
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
"", "Server name for TLS validation")
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
false, "Allow insecure TLS connection by skipping cert validation")
// Bind flags with config.
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password"))
viper.BindPFlag("username", rootCmd.PersistentFlags().Lookup("username"))
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
viper.BindPFlag("tls", rootCmd.PersistentFlags().Lookup("tls"))
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
}
// initConfig reads in config file and ENV variables if set.
@@ -357,6 +367,7 @@ func createRDB() *rdb.RDB {
c = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
})
} else {
@@ -364,6 +375,7 @@ func createRDB() *rdb.RDB {
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
})
}
@@ -386,6 +398,7 @@ func getRedisConnOpt() asynq.RedisConnOpt {
return asynq.RedisClusterClientOpt{
Addrs: addrs,
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
}
}
@@ -393,16 +406,22 @@ func getRedisConnOpt() asynq.RedisConnOpt {
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
}
}
func getTLSConfig() *tls.Config {
tlsServer := viper.GetString("tls_server")
if tlsServer == "" {
return nil
if tlsServer != "" {
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
}
return &tls.Config{ServerName: tlsServer}
if viper.GetBool("tls") {
return &tls.Config{InsecureSkipVerify: viper.GetBool("insecure")}
}
return nil
}
// printTable is a helper function to print data in table format.
@@ -410,18 +429,22 @@ func getTLSConfig() *tls.Config {
// cols is a list of headers and printRow specifies how to print rows.
//
// Example:
// type User struct {
// Name string
// Addr string
// Age int
// }
//
// type User struct {
// Name string
// Addr string
// Age int
// }
//
// data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...}
// cols := []string{"Name", "Addr", "Age"}
// printRows := func(w io.Writer, tmpl string) {
// for _, u := range data {
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// }
// }
//
// printRows := func(w io.Writer, tmpl string) {
// for _, u := range data {
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// }
// }
//
// printTable(cols, printRows)
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
format := strings.Repeat("%v\t", len(cols)) + "\n"

View File

@@ -770,4 +770,3 @@ func taskRunAll(cmd *cobra.Command, args []string) {
}
fmt.Printf("%d tasks are now pending\n", n)
}

View File

@@ -25,13 +25,13 @@ type QueueMetricsCollector struct {
func (qmc *QueueMetricsCollector) collectQueueInfo() ([]*asynq.QueueInfo, error) {
qnames, err := qmc.inspector.Queues()
if err != nil {
return nil, fmt.Errorf("failed to get queue names: %v", err)
return nil, fmt.Errorf("failed to get queue names: %w", err)
}
infos := make([]*asynq.QueueInfo, len(qnames))
for i, qname := range qnames {
qinfo, err := qmc.inspector.GetQueueInfo(qname)
if err != nil {
return nil, fmt.Errorf("failed to get queue info: %v", err)
return nil, fmt.Errorf("failed to get queue info: %w", err)
}
infos[i] = qinfo
}

View File

@@ -94,7 +94,7 @@ func (s *Semaphore) Release(ctx context.Context) error {
n, err := s.rc.ZRem(ctx, semaphoreKey(s.scope), taskID).Result()
if err != nil {
return fmt.Errorf("redis command failed: %v", err)
return fmt.Errorf("redis command failed: %w", err)
}
if n == 0 {