2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-06 06:45:52 +08:00

Compare commits

..

2 Commits

Author SHA1 Message Date
Mohammed Sohail
f1e7dc4056 fix: closing the Client also closes the broker
* The error was also previously unhandled. For shared connections an error will be returned by the broker itself because the sharedConnection bool is also set on the client. This also means we can get rid of the sharedConnection flag on the Scheduler itself and let it work internally.
2024-12-03 10:27:50 +03:00
Mohammed Sohail
ee17997650 fix: NewScheduler wrongly creates a client whose sharedConnection value is always true
* This is affecting the PeriodicManager as well as the Scheduler
2024-12-03 10:08:47 +03:00
33 changed files with 380 additions and 1105 deletions

View File

@@ -9,7 +9,6 @@ on: [pull_request]
jobs: jobs:
incoming: incoming:
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: false
services: services:
redis: redis:
image: redis:7 image: redis:7
@@ -32,7 +31,6 @@ jobs:
current: current:
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: false
services: services:
redis: redis:
image: redis:7 image: redis:7
@@ -57,7 +55,6 @@ jobs:
benchstat: benchstat:
needs: [incoming, current] needs: [incoming, current]
if: false
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout - name: Checkout

View File

@@ -7,7 +7,7 @@ jobs:
strategy: strategy:
matrix: matrix:
os: [ubuntu-latest] os: [ubuntu-latest]
go-version: [1.24.x, 1.25.x] go-version: [1.22.x, 1.23.x]
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
services: services:
redis: redis:
@@ -45,7 +45,7 @@ jobs:
strategy: strategy:
matrix: matrix:
os: [ubuntu-latest] os: [ubuntu-latest]
go-version: [1.24.x, 1.25.x] go-version: [1.22.x, 1.23.x]
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
services: services:
redis: redis:
@@ -70,7 +70,6 @@ jobs:
golangci: golangci:
name: lint name: lint
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: false
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4

View File

@@ -7,45 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.26.0] - 2026-02-03
### Upgrades
- Prepare CI for Go 1.24.x and 1.25.x (commit: e9037f0)
### Added
- Add Headers support to tasks (PR: https://github.com/hibiken/asynq/pull/1070)
- Add `--tls` option to dash command (PR: https://github.com/hibiken/asynq/pull/1073)
- Add `--username` CLI flag for Redis ACL authentication (PR: https://github.com/hibiken/asynq/pull/1083)
- Add `UpdateTaskPayload` method for inspector (PR: https://github.com/hibiken/asynq/pull/1042)
### Fixes
- Fix: Correct error message text in ResultWriter.Write (PR: https://github.com/hibiken/asynq/pull/1054)
- Fix: Wrap all fmt.Errorf errors with %w (PR: https://github.com/hibiken/asynq/pull/1047)
- Fix: ServeMux.NotFoundHandler returns ErrHandlerNotFound error (PR: https://github.com/hibiken/asynq/pull/1031)
### Changed
- Docs: Update server.go documentation (PR: https://github.com/hibiken/asynq/pull/1010)
- Chore: Fix godoc comment (PR: https://github.com/hibiken/asynq/pull/1009)
## [0.25.1] - 2024-12-11
### Upgrades
* Some packages
### Added
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
### Fixes
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
## [0.25.0] - 2024-10-29 ## [0.25.0] - 2024-10-29
### Upgrades ### Upgrades

View File

@@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) {
} }
tasks := make([]*Task, len(msgs)) tasks := make([]*Task, len(msgs))
for i, m := range msgs { for i, m := range msgs {
tasks[i] = NewTaskWithHeaders(m.Type, m.Payload, m.Headers) tasks[i] = NewTask(m.Type, m.Payload)
} }
aggregatedTask := a.ga.Aggregate(gname, tasks) aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline) ctx, cancel := context.WithDeadline(context.Background(), deadline)

View File

@@ -8,15 +8,14 @@ import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"maps"
"net" "net"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
) )
// Task represents a unit of work to be performed. // Task represents a unit of work to be performed.
@@ -27,9 +26,6 @@ type Task struct {
// payload holds data needed to perform the task. // payload holds data needed to perform the task.
payload []byte payload []byte
// headers holds additional metadata for the task.
headers map[string]string
// opts holds options for the task. // opts holds options for the task.
opts []Option opts []Option
@@ -37,9 +33,8 @@ type Task struct {
w *ResultWriter w *ResultWriter
} }
func (t *Task) Type() string { return t.typename } func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload } func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Headers() map[string]string { return t.headers }
// ResultWriter returns a pointer to the ResultWriter associated with the task. // ResultWriter returns a pointer to the ResultWriter associated with the task.
// //
@@ -53,21 +48,6 @@ func NewTask(typename string, payload []byte, opts ...Option) *Task {
return &Task{ return &Task{
typename: typename, typename: typename,
payload: payload, payload: payload,
headers: nil,
opts: opts,
}
}
// NewTaskWithHeaders returns a new Task given a type name, payload data, and headers.
// Options can be passed to configure task processing behavior.
// TODO: In the next major (breaking) release, fold this functionality into NewTask
//
// so that headers are supported directly. After that, remove this method.
func NewTaskWithHeaders(typename string, payload []byte, headers map[string]string, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: maps.Clone(headers),
opts: opts, opts: opts,
} }
} }
@@ -77,7 +57,6 @@ func newTask(typename string, payload []byte, w *ResultWriter) *Task {
return &Task{ return &Task{
typename: typename, typename: typename,
payload: payload, payload: payload,
headers: make(map[string]string),
w: w, w: w,
} }
} }
@@ -96,9 +75,6 @@ type TaskInfo struct {
// Payload is the payload data of the task. // Payload is the payload data of the task.
Payload []byte Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// State indicates the task state. // State indicates the task state.
State TaskState State TaskState
@@ -169,7 +145,6 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
Queue: msg.Queue, Queue: msg.Queue,
Type: msg.Type, Type: msg.Type,
Payload: msg.Payload, // Do we need to make a copy? Payload: msg.Payload, // Do we need to make a copy?
Headers: msg.Headers,
MaxRetry: msg.Retry, MaxRetry: msg.Retry,
Retried: msg.Retried, Retried: msg.Retried,
LastErr: msg.ErrorMsg, LastErr: msg.ErrorMsg,
@@ -467,15 +442,14 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
// //
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:. // Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are: // Supported formats are:
// // redis://[:password@]host[:port][/dbnumber]
// redis://[:password@]host[:port][/dbnumber] // rediss://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber] // redis-socket://[:password@]path[?db=dbnumber]
// redis-socket://[:password@]path[?db=dbnumber] // redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) { func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri) u, err := url.Parse(uri)
if err != nil { if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis uri: %w", err) return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
} }
switch u.Scheme { switch u.Scheme {
case "redis", "rediss": case "redis", "rediss":
@@ -565,7 +539,7 @@ type ResultWriter struct {
func (w *ResultWriter) Write(data []byte) (n int, err error) { func (w *ResultWriter) Write(data []byte) (n int, err error) {
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
return 0, fmt.Errorf("failed to write task result: %w", w.ctx.Err()) return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err())
default: default:
} }
return w.broker.WriteResult(w.qname, w.id, data) return w.broker.WriteResult(w.qname, w.id, data)

View File

@@ -11,11 +11,11 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/redis/go-redis/v9"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/log"
h "github.com/hibiken/asynq/internal/testutil" h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
) )
//============================================================================ //============================================================================

View File

@@ -385,7 +385,6 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
ID: opt.taskID, ID: opt.taskID,
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Headers: task.Headers(),
Queue: opt.queue, Queue: opt.queue,
Retry: opt.retry, Retry: opt.retry,
Deadline: deadline.Unix(), Deadline: deadline.Unix(),

View File

@@ -1191,473 +1191,3 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
} }
} }
} }
func TestClientEnqueueWithHeaders(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"user-id": "123",
"request-id": "abc-def-ghi",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantPending map[string][]*base.TaskMessage
}{
{
desc: "Task with headers",
task: NewTaskWithHeaders("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}), headers),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with empty headers",
task: NewTaskWithHeaders("process_data", []byte("data"), map[string]string{}),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "process_data",
Payload: []byte("data"),
Headers: map[string]string{},
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "process_data",
Payload: []byte("data"),
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with nil headers",
task: NewTaskWithHeaders("cleanup", nil, nil),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "cleanup",
Payload: nil,
Headers: nil,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "cleanup",
Payload: nil,
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with headers and custom options",
task: NewTaskWithHeaders("notify", []byte("notification"), map[string]string{"channel": "email"}),
opts: []Option{MaxRetry(5), Queue("notifications")},
wantInfo: &TaskInfo{
Queue: "notifications",
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"notifications": {
{
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
Retry: 5,
Queue: "notifications",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantPending {
got := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
}
}
func TestClientEnqueueWithHeadersScheduled(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
oneHourLater := now.Add(time.Hour)
headers := map[string]string{
"correlation-id": "xyz-123",
"source": "api",
}
tests := []struct {
desc string
task *Task
processAt time.Time
opts []Option
wantInfo *TaskInfo
wantScheduled map[string][]base.Z
}{
{
desc: "Schedule task with headers",
task: NewTaskWithHeaders("scheduled_task", []byte("payload"), headers),
processAt: oneHourLater,
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
State: TaskStateScheduled,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: oneHourLater,
},
wantScheduled: map[string][]base.Z{
"default": {
{
Message: &base.TaskMessage{
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
Score: oneHourLater.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
opts := append(tc.opts, ProcessAt(tc.processAt))
gotInfo, err := client.Enqueue(tc.task, opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
}
}
}
}
func TestNewTaskWithHeaders(t *testing.T) {
tests := []struct {
desc string
typename string
payload []byte
headers map[string]string
opts []Option
want *Task
}{
{
desc: "Task with headers",
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
want: &Task{
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
},
},
{
desc: "Task with empty headers",
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
want: &Task{
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
},
},
{
desc: "Task with nil headers",
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
want: &Task{
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
},
},
}
for _, tc := range tests {
got := NewTaskWithHeaders(tc.typename, tc.payload, tc.headers, tc.opts...)
if got.Type() != tc.want.typename {
t.Errorf("%s: Type() = %q, want %q", tc.desc, got.Type(), tc.want.typename)
}
if diff := cmp.Diff(tc.want.payload, got.Payload()); diff != "" {
t.Errorf("%s: Payload() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if diff := cmp.Diff(tc.want.headers, got.Headers()); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if tc.headers != nil && got.Headers() != nil {
tc.headers["modified"] = "test"
if _, exists := got.Headers()["modified"]; exists {
t.Errorf("%s: Headers should be cloned, but modification affected task headers", tc.desc)
}
}
}
}
func TestTaskHeadersMethod(t *testing.T) {
tests := []struct {
desc string
task *Task
want map[string]string
wantNil bool
}{
{
desc: "Task created with NewTask has nil headers",
task: NewTask("test", []byte("data")),
want: nil,
wantNil: true,
},
{
desc: "Task created with NewTaskWithHeaders has headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{"key": "value"}),
want: map[string]string{"key": "value"},
},
{
desc: "Task created with empty headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{}),
want: map[string]string{},
},
{
desc: "Task created with nil headers",
task: NewTaskWithHeaders("test", []byte("data"), nil),
want: nil,
wantNil: true,
},
}
for _, tc := range tests {
got := tc.task.Headers()
if tc.wantNil {
if got != nil {
t.Errorf("%s: Headers() = %v, want nil", tc.desc, got)
}
} else {
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
}
}
}
func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"batch-id": "batch-123",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantGroups map[string]map[string][]base.Z
}{
{
desc: "Task with headers and group",
task: NewTaskWithHeaders("batch_process", []byte("item1"), headers),
opts: []Option{Group("batch-123")},
wantInfo: &TaskInfo{
Queue: "default",
Group: "batch-123",
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
State: TaskStateAggregating,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: time.Time{},
},
wantGroups: map[string]map[string][]base.Z{
"default": {
"batch-123": {
{
Message: &base.TaskMessage{
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
GroupKey: "batch-123",
},
Score: now.Unix(),
},
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, groups := range tc.wantGroups {
for groupKey, want := range groups {
got := h.GetGroupEntries(t, r, qname, groupKey)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.GroupKey(qname, groupKey), diff)
}
}
}
}
}

81
doc.go
View File

@@ -8,41 +8,41 @@ Package asynq provides a framework for Redis based distrubted task queue.
Asynq uses Redis as a message broker. To connect to redis, Asynq uses Redis as a message broker. To connect to redis,
specify the connection using one of RedisConnOpt types. specify the connection using one of RedisConnOpt types.
redisConnOpt = asynq.RedisClientOpt{ redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379", Addr: "127.0.0.1:6379",
Password: "xxxxx", Password: "xxxxx",
DB: 2, DB: 2,
} }
The Client is used to enqueue a task. The Client is used to enqueue a task.
client := asynq.NewClient(redisConnOpt)
// Task is created with two parameters: its type and payload. client := asynq.NewClient(redisConnOpt)
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
task := asynq.NewTask("example", b) // Task is created with two parameters: its type and payload.
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
// Enqueue the task to be processed immediately. task := asynq.NewTask("example", b)
info, err := client.Enqueue(task)
// Schedule the task to be processed after one minute. // Enqueue the task to be processed immediately.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute)) info, err := client.Enqueue(task)
// Schedule the task to be processed after one minute.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
The Server is used to run the task processing workers with a given The Server is used to run the task processing workers with a given
handler. handler.
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 10,
})
srv := asynq.NewServer(redisConnOpt, asynq.Config{ if err := srv.Run(handler); err != nil {
Concurrency: 10, log.Fatal(err)
}) }
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
Handler is an interface type with a method which Handler is an interface type with a method which
takes a task and returns an error. Handler should return nil if takes a task and returns an error. Handler should return nil if
@@ -50,24 +50,23 @@ the processing is successful, otherwise return a non-nil error.
If handler panics or returns a non-nil error, the task will be retried in the future. If handler panics or returns a non-nil error, the task will be retried in the future.
Example of a type that implements the Handler interface. Example of a type that implements the Handler interface.
type TaskHandler struct {
// ...
}
type TaskHandler struct { func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
// ... switch task.Type {
} case "example":
var data ExamplePayload
if err := json.Unmarshal(task.Payload(), &data); err != nil {
return err
}
// perform task with the data
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { default:
switch task.Type { return fmt.Errorf("unexpected task type %q", task.Type)
case "example": }
var data ExamplePayload return nil
if err := json.Unmarshal(task.Payload(), &data); err != nil { }
return err
}
// perform task with the data
default:
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}
*/ */
package asynq package asynq

View File

@@ -123,7 +123,7 @@ func ExampleResultWriter() {
res := []byte("task result data") res := []byte("task result data")
n, err := task.ResultWriter().Write(res) // implements io.Writer n, err := task.ResultWriter().Write(res) // implements io.Writer
if err != nil { if err != nil {
return fmt.Errorf("failed to write task result: %w", err) return fmt.Errorf("failed to write task result: %v", err)
} }
log.Printf(" %d bytes written", n) log.Printf(" %d bytes written", n)
return nil return nil

16
go.mod
View File

@@ -1,20 +1,20 @@
module github.com/hibiken/asynq module github.com/hibiken/asynq
go 1.24.0 go 1.22
require ( require (
github.com/google/go-cmp v0.7.0 github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.14.1 github.com/redis/go-redis/v9 v9.7.0
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.10.0 github.com/spf13/cast v1.7.0
go.uber.org/goleak v1.3.0 go.uber.org/goleak v1.3.0
golang.org/x/sys v0.37.0 golang.org/x/sys v0.27.0
golang.org/x/time v0.14.0 golang.org/x/time v0.8.0
google.golang.org/protobuf v1.36.10 google.golang.org/protobuf v1.35.2
) )
require ( require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
) )

28
go.sum
View File

@@ -2,16 +2,16 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -20,23 +20,23 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -245,7 +245,7 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
case errors.IsTaskNotFound(err): case errors.IsTaskNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound) return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
} }
@@ -316,7 +316,7 @@ func Page(n int) ListOption {
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -325,7 +325,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, i := range infos { for _, i := range infos {
@@ -344,7 +344,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -353,11 +353,11 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
expired, err := i.rdb.ListLeaseExpired(time.Now(), queue) expired, err := i.rdb.ListLeaseExpired(time.Now(), queue)
if err != nil { if err != nil {
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
expiredSet := make(map[string]struct{}) // set of expired message IDs expiredSet := make(map[string]struct{}) // set of expired message IDs
for _, msg := range expired { for _, msg := range expired {
@@ -384,7 +384,7 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -393,7 +393,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, i := range infos { for _, i := range infos {
@@ -413,7 +413,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -422,7 +422,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, i := range infos { for _, i := range infos {
@@ -442,7 +442,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -451,7 +451,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, i := range infos { for _, i := range infos {
@@ -471,7 +471,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -480,7 +480,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, i := range infos { for _, i := range infos {
@@ -500,7 +500,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -509,7 +509,7 @@ func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %w", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, i := range infos { for _, i := range infos {
@@ -583,30 +583,6 @@ func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error)
return int(n), err return int(n), err
} }
// UpdateTaskPayload updates a task with the given id from the given queue with given payload.
// The task needs to be in scheduled state,
// otherwise UpdateTaskPayload will return an error.
//
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is not in scheduled state, it returns a non-nil error.
func (i *Inspector) UpdateTaskPayload(queue, id string, payload []byte) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
}
err := i.rdb.UpdateTaskPayload(queue, id, payload)
switch {
case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
}
return nil
}
// DeleteTask deletes a task with the given id from the given queue. // DeleteTask deletes a task with the given id from the given queue.
// The task needs to be in pending, scheduled, retry, or archived state, // The task needs to be in pending, scheduled, retry, or archived state,
// otherwise DeleteTask will return an error. // otherwise DeleteTask will return an error.
@@ -616,7 +592,7 @@ func (i *Inspector) UpdateTaskPayload(queue, id string, payload []byte) error {
// If the task is in active state, it returns a non-nil error. // If the task is in active state, it returns a non-nil error.
func (i *Inspector) DeleteTask(queue, id string) error { func (i *Inspector) DeleteTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %w", err) return fmt.Errorf("asynq: %v", err)
} }
err := i.rdb.DeleteTask(queue, id) err := i.rdb.DeleteTask(queue, id)
switch { switch {
@@ -625,7 +601,7 @@ func (i *Inspector) DeleteTask(queue, id string) error {
case errors.IsTaskNotFound(err): case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound) return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil: case err != nil:
return fmt.Errorf("asynq: %w", err) return fmt.Errorf("asynq: %v", err)
} }
return nil return nil
@@ -680,7 +656,7 @@ func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) {
// If the task is in pending or active state, it returns a non-nil error. // If the task is in pending or active state, it returns a non-nil error.
func (i *Inspector) RunTask(queue, id string) error { func (i *Inspector) RunTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %w", err) return fmt.Errorf("asynq: %v", err)
} }
err := i.rdb.RunTask(queue, id) err := i.rdb.RunTask(queue, id)
switch { switch {
@@ -689,7 +665,7 @@ func (i *Inspector) RunTask(queue, id string) error {
case errors.IsTaskNotFound(err): case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound) return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil: case err != nil:
return fmt.Errorf("asynq: %w", err) return fmt.Errorf("asynq: %v", err)
} }
return nil return nil
} }
@@ -752,7 +728,7 @@ func (i *Inspector) ArchiveTask(queue, id string) error {
case errors.IsTaskNotFound(err): case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound) return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil: case err != nil:
return fmt.Errorf("asynq: %w", err) return fmt.Errorf("asynq: %v", err)
} }
return nil return nil
} }

View File

@@ -2369,148 +2369,6 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
} }
} }
func TestInspectorUpdateTaskPayloadUpdatesScheduledTaskPayload(t *testing.T) {
r := setup(t)
defer r.Close()
m1_old := h.NewTaskMessage("task1", []byte("m1_old"))
m1_new := h.NewTaskMessage("task1", nil)
m1_new.ID = m1_old.ID
m2_old := h.NewTaskMessage("task2", nil)
m2_new := h.NewTaskMessage("task2", []byte("m2_new"))
m2_new.ID = m2_old.ID
m3_old := h.NewTaskMessageWithQueue("task3", []byte("m3_old"), "custom")
m3_new := h.NewTaskMessageWithQueue("task3", []byte("m3_new"), "custom")
m3_new.ID = m3_old.ID
now := time.Now()
z1_old := base.Z{Message: m1_old, Score: now.Add(5 * time.Minute).Unix()}
z1_new := base.Z{Message: m1_new, Score: now.Add(5 * time.Minute).Unix()}
z2_old := base.Z{Message: m2_old, Score: now.Add(15 * time.Minute).Unix()}
z2_new := base.Z{Message: m2_new, Score: now.Add(15 * time.Minute).Unix()}
z3_old := base.Z{Message: m3_old, Score: now.Add(2 * time.Minute).Unix()}
z3_new := base.Z{Message: m3_new, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
qname string
id string
newPayload []byte
wantScheduled map[string][]base.Z
}{
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "default",
id: createScheduledTask(z2_old).ID,
newPayload: m2_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_old, z2_new},
"custom": {z3_old},
},
},
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "default",
id: createScheduledTask(z1_old).ID,
newPayload: m1_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_new, z2_old},
"custom": {z3_old},
},
},
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "custom",
id: createScheduledTask(z3_old).ID,
newPayload: m3_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_new},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllScheduledQueues(t, r, tc.scheduled)
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); err != nil {
t.Errorf("UpdateTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
}
}
}
}
func TestInspectorUpdateTaskPayloadError(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
tasks map[string][]base.Z
qname string
id string
newPayload []byte
wantErr error
}{
{
tasks: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "nonexistent",
id: createScheduledTask(z2).ID,
newPayload: nil,
wantErr: ErrQueueNotFound,
},
{
tasks: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "default",
id: uuid.NewString(),
newPayload: nil,
wantErr: ErrTaskNotFound,
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllScheduledQueues(t, r, tc.tasks)
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); !errors.Is(err, tc.wantErr) {
t.Errorf("UpdateTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr)
continue
}
}
}
func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) { func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()

View File

@@ -23,7 +23,7 @@ import (
) )
// Version of asynq library and CLI. // Version of asynq library and CLI.
const Version = "0.26.0" const Version = "0.25.0"
// DefaultQueueName is the queue name used if none are specified by user. // DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default" const DefaultQueueName = "default"
@@ -240,9 +240,6 @@ type TaskMessage struct {
// Payload holds data needed to process the task. // Payload holds data needed to process the task.
Payload []byte Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// ID is a unique identifier for each task. // ID is a unique identifier for each task.
ID string ID string
@@ -307,7 +304,6 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
return proto.Marshal(&pb.TaskMessage{ return proto.Marshal(&pb.TaskMessage{
Type: msg.Type, Type: msg.Type,
Payload: msg.Payload, Payload: msg.Payload,
Headers: msg.Headers,
Id: msg.ID, Id: msg.ID,
Queue: msg.Queue, Queue: msg.Queue,
Retry: int32(msg.Retry), Retry: int32(msg.Retry),
@@ -332,7 +328,6 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
return &TaskMessage{ return &TaskMessage{
Type: pbmsg.GetType(), Type: pbmsg.GetType(),
Payload: pbmsg.GetPayload(), Payload: pbmsg.GetPayload(),
Headers: pbmsg.GetHeaders(),
ID: pbmsg.GetId(), ID: pbmsg.GetId(),
Queue: pbmsg.GetQueue(), Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()), Retry: int(pbmsg.GetRetry()),

View File

@@ -107,7 +107,6 @@ type Op string
// only the last one is recorded. // only the last one is recorded.
// //
// The types are: // The types are:
//
// errors.Op // errors.Op
// The operation being performed, usually the method // The operation being performed, usually the method
// being invoked (Get, Put, etc.). // being invoked (Get, Put, etc.).

View File

@@ -4,8 +4,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.36.6 // protoc-gen-go v1.34.2
// protoc v5.29.3 // protoc v3.19.6
// source: asynq.proto // source: asynq.proto
package proto package proto
@@ -16,7 +16,6 @@ import (
timestamppb "google.golang.org/protobuf/types/known/timestamppb" timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect" reflect "reflect"
sync "sync" sync "sync"
unsafe "unsafe"
) )
const ( const (
@@ -28,15 +27,16 @@ const (
// TaskMessage is the internal representation of a task with additional // TaskMessage is the internal representation of a task with additional
// metadata fields. // metadata fields.
// Next ID: 16 // Next ID: 15
type TaskMessage struct { type TaskMessage struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Type indicates the kind of the task to be performed. // Type indicates the kind of the task to be performed.
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
// Payload holds data needed to process the task. // Payload holds data needed to process the task.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
// Headers holds additional metadata for the task.
Headers map[string]string `protobuf:"bytes,15,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
// Unique identifier for the task. // Unique identifier for the task.
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
// Name of the queue to which this task belongs. // Name of the queue to which this task belongs.
@@ -71,16 +71,16 @@ type TaskMessage struct {
// Time when the task completed in success in Unix time, // Time when the task completed in success in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC. // the number of seconds elapsed since January 1, 1970 UTC.
// This field is populated if result_ttl > 0 upon completion. // This field is populated if result_ttl > 0 upon completion.
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"` CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
} }
func (x *TaskMessage) Reset() { func (x *TaskMessage) Reset() {
*x = TaskMessage{} *x = TaskMessage{}
mi := &file_asynq_proto_msgTypes[0] if protoimpl.UnsafeEnabled {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) mi := &file_asynq_proto_msgTypes[0]
ms.StoreMessageInfo(mi) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
} }
func (x *TaskMessage) String() string { func (x *TaskMessage) String() string {
@@ -91,7 +91,7 @@ func (*TaskMessage) ProtoMessage() {}
func (x *TaskMessage) ProtoReflect() protoreflect.Message { func (x *TaskMessage) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[0] mi := &file_asynq_proto_msgTypes[0]
if x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
@@ -120,13 +120,6 @@ func (x *TaskMessage) GetPayload() []byte {
return nil return nil
} }
func (x *TaskMessage) GetHeaders() map[string]string {
if x != nil {
return x.Headers
}
return nil
}
func (x *TaskMessage) GetId() string { func (x *TaskMessage) GetId() string {
if x != nil { if x != nil {
return x.Id return x.Id
@@ -213,7 +206,10 @@ func (x *TaskMessage) GetCompletedAt() int64 {
// ServerInfo holds information about a running server. // ServerInfo holds information about a running server.
type ServerInfo struct { type ServerInfo struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Host machine the server is running on. // Host machine the server is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the server process. // PID of the server process.
@@ -225,7 +221,7 @@ type ServerInfo struct {
// List of queue names with their priorities. // List of queue names with their priorities.
// The server will consume tasks from the queues and prioritize // The server will consume tasks from the queues and prioritize
// queues with higher priority numbers. // queues with higher priority numbers.
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
// If set, the server will always consume tasks from a queue with higher // If set, the server will always consume tasks from a queue with higher
// priority. // priority.
StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"` StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"`
@@ -235,15 +231,15 @@ type ServerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Number of workers currently processing tasks. // Number of workers currently processing tasks.
ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"` ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
} }
func (x *ServerInfo) Reset() { func (x *ServerInfo) Reset() {
*x = ServerInfo{} *x = ServerInfo{}
mi := &file_asynq_proto_msgTypes[1] if protoimpl.UnsafeEnabled {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) mi := &file_asynq_proto_msgTypes[1]
ms.StoreMessageInfo(mi) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
} }
func (x *ServerInfo) String() string { func (x *ServerInfo) String() string {
@@ -254,7 +250,7 @@ func (*ServerInfo) ProtoMessage() {}
func (x *ServerInfo) ProtoReflect() protoreflect.Message { func (x *ServerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[1] mi := &file_asynq_proto_msgTypes[1]
if x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
@@ -334,7 +330,10 @@ func (x *ServerInfo) GetActiveWorkerCount() int32 {
// WorkerInfo holds information about a running worker. // WorkerInfo holds information about a running worker.
type WorkerInfo struct { type WorkerInfo struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Host matchine this worker is running on. // Host matchine this worker is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the process in which this worker is running. // PID of the process in which this worker is running.
@@ -353,16 +352,16 @@ type WorkerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Deadline by which the worker needs to complete processing // Deadline by which the worker needs to complete processing
// the task. If worker exceeds the deadline, the task will fail. // the task. If worker exceeds the deadline, the task will fail.
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"` Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
} }
func (x *WorkerInfo) Reset() { func (x *WorkerInfo) Reset() {
*x = WorkerInfo{} *x = WorkerInfo{}
mi := &file_asynq_proto_msgTypes[2] if protoimpl.UnsafeEnabled {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) mi := &file_asynq_proto_msgTypes[2]
ms.StoreMessageInfo(mi) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
} }
func (x *WorkerInfo) String() string { func (x *WorkerInfo) String() string {
@@ -373,7 +372,7 @@ func (*WorkerInfo) ProtoMessage() {}
func (x *WorkerInfo) ProtoReflect() protoreflect.Message { func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[2] mi := &file_asynq_proto_msgTypes[2]
if x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
@@ -454,7 +453,10 @@ func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp {
// SchedulerEntry holds information about a periodic task registered // SchedulerEntry holds information about a periodic task registered
// with a scheduler. // with a scheduler.
type SchedulerEntry struct { type SchedulerEntry struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Identifier of the scheduler entry. // Identifier of the scheduler entry.
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// Periodic schedule spec of the entry. // Periodic schedule spec of the entry.
@@ -470,15 +472,15 @@ type SchedulerEntry struct {
// Last time the task was enqueued. // Last time the task was enqueued.
// Zero time if task was never enqueued. // Zero time if task was never enqueued.
PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"` PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
} }
func (x *SchedulerEntry) Reset() { func (x *SchedulerEntry) Reset() {
*x = SchedulerEntry{} *x = SchedulerEntry{}
mi := &file_asynq_proto_msgTypes[3] if protoimpl.UnsafeEnabled {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) mi := &file_asynq_proto_msgTypes[3]
ms.StoreMessageInfo(mi) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
} }
func (x *SchedulerEntry) String() string { func (x *SchedulerEntry) String() string {
@@ -489,7 +491,7 @@ func (*SchedulerEntry) ProtoMessage() {}
func (x *SchedulerEntry) ProtoReflect() protoreflect.Message { func (x *SchedulerEntry) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[3] mi := &file_asynq_proto_msgTypes[3]
if x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
@@ -556,20 +558,23 @@ func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp {
// SchedulerEnqueueEvent holds information about an enqueue event // SchedulerEnqueueEvent holds information about an enqueue event
// by a scheduler. // by a scheduler.
type SchedulerEnqueueEvent struct { type SchedulerEnqueueEvent struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// ID of the task that was enqueued. // ID of the task that was enqueued.
TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
// Time the task was enqueued. // Time the task was enqueued.
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"` EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
} }
func (x *SchedulerEnqueueEvent) Reset() { func (x *SchedulerEnqueueEvent) Reset() {
*x = SchedulerEnqueueEvent{} *x = SchedulerEnqueueEvent{}
mi := &file_asynq_proto_msgTypes[4] if protoimpl.UnsafeEnabled {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) mi := &file_asynq_proto_msgTypes[4]
ms.StoreMessageInfo(mi) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
} }
func (x *SchedulerEnqueueEvent) String() string { func (x *SchedulerEnqueueEvent) String() string {
@@ -580,7 +585,7 @@ func (*SchedulerEnqueueEvent) ProtoMessage() {}
func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message { func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[4] mi := &file_asynq_proto_msgTypes[4]
if x != nil { if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil { if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi) ms.StoreMessageInfo(mi)
@@ -611,106 +616,146 @@ func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp {
var File_asynq_proto protoreflect.FileDescriptor var File_asynq_proto protoreflect.FileDescriptor
const file_asynq_proto_rawDesc = "" + var file_asynq_proto_rawDesc = []byte{
"\n" + 0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61,
"\vasynq.proto\x12\x05asynq\x1a\x1fgoogle/protobuf/timestamp.proto\"\xfe\x03\n" + 0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
"\vTaskMessage\x12\x12\n" + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e,
"\x04type\x18\x01 \x01(\tR\x04type\x12\x18\n" + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65,
"\apayload\x18\x02 \x01(\fR\apayload\x129\n" + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
"\aheaders\x18\x0f \x03(\v2\x1f.asynq.TaskMessage.HeadersEntryR\aheaders\x12\x0e\n" + 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79,
"\x02id\x18\x03 \x01(\tR\x02id\x12\x14\n" + 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c,
"\x05queue\x18\x04 \x01(\tR\x05queue\x12\x14\n" + 0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
"\x05retry\x18\x05 \x01(\x05R\x05retry\x12\x18\n" + 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01,
"\aretried\x18\x06 \x01(\x05R\aretried\x12\x1b\n" + 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74,
"\terror_msg\x18\a \x01(\tR\berrorMsg\x12$\n" + 0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12,
"\x0elast_failed_at\x18\v \x01(\x03R\flastFailedAt\x12\x18\n" + 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05,
"\atimeout\x18\b \x01(\x03R\atimeout\x12\x1a\n" + 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72,
"\bdeadline\x18\t \x01(\x03R\bdeadline\x12\x1d\n" + 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72,
"\n" + 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66,
"unique_key\x18\n" + 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c,
" \x01(\tR\tuniqueKey\x12\x1b\n" + 0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07,
"\tgroup_key\x18\x0e \x01(\tR\bgroupKey\x12\x1c\n" + 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74,
"\tretention\x18\f \x01(\x03R\tretention\x12!\n" + 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
"\fcompleted_at\x18\r \x01(\x03R\vcompletedAt\x1a:\n" + 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
"\fHeadersEntry\x12\x10\n" + 0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79,
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65,
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x8f\x03\n" + 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e,
"\n" + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c,
"ServerInfo\x12\x12\n" + 0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28,
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" + 0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c,
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" + 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01,
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12 \n" + 0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22,
"\vconcurrency\x18\x04 \x01(\x05R\vconcurrency\x125\n" + 0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12,
"\x06queues\x18\x05 \x03(\v2\x1d.asynq.ServerInfo.QueuesEntryR\x06queues\x12'\n" + 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f,
"\x0fstrict_priority\x18\x06 \x01(\bR\x0estrictPriority\x12\x16\n" + 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52,
"\x06status\x18\a \x01(\tR\x06status\x129\n" + 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69,
"\n" + 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49,
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x12.\n" + 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79,
"\x13active_worker_count\x18\t \x01(\x05R\x11activeWorkerCount\x1a9\n" + 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65,
"\vQueuesEntry\x12\x10\n" + 0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20,
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76,
"\x05value\x18\x02 \x01(\x05R\x05value:\x028\x01\"\xb1\x02\n" + 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74,
"\n" + 0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74,
"WorkerInfo\x12\x12\n" + 0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20,
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" + 0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72,
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" + 0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20,
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12\x17\n" + 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73,
"\atask_id\x18\x04 \x01(\tR\x06taskId\x12\x1b\n" + 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32,
"\ttask_type\x18\x05 \x01(\tR\btaskType\x12!\n" + 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
"\ftask_payload\x18\x06 \x01(\fR\vtaskPayload\x12\x14\n" + 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61,
"\x05queue\x18\a \x01(\tR\x05queue\x129\n" + 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65,
"\n" + 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20,
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x126\n" + 0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65,
"\bdeadline\x18\t \x01(\v2\x1a.google.protobuf.TimestampR\bdeadline\"\xad\x02\n" + 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73,
"\x0eSchedulerEntry\x12\x0e\n" + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
"\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" + 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
"\x04spec\x18\x02 \x01(\tR\x04spec\x12\x1b\n" + 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
"\ttask_type\x18\x03 \x01(\tR\btaskType\x12!\n" + 0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f,
"\ftask_payload\x18\x04 \x01(\fR\vtaskPayload\x12'\n" + 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
"\x0fenqueue_options\x18\x05 \x03(\tR\x0eenqueueOptions\x12F\n" + 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
"\x11next_enqueue_time\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\x0fnextEnqueueTime\x12F\n" + 0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
"\x11prev_enqueue_time\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\x0fprevEnqueueTime\"o\n" + 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65,
"\x15SchedulerEnqueueEvent\x12\x17\n" + 0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04,
"\atask_id\x18\x01 \x01(\tR\x06taskId\x12=\n" + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09,
"\fenqueue_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\venqueueTimeB)Z'github.com/hibiken/asynq/internal/protob\x06proto3" 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05,
0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65,
0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65,
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a,
0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61,
0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75,
0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f,
0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18,
0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e,
0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65,
0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a,
0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75,
0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17,
0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79,
0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var ( var (
file_asynq_proto_rawDescOnce sync.Once file_asynq_proto_rawDescOnce sync.Once
file_asynq_proto_rawDescData []byte file_asynq_proto_rawDescData = file_asynq_proto_rawDesc
) )
func file_asynq_proto_rawDescGZIP() []byte { func file_asynq_proto_rawDescGZIP() []byte {
file_asynq_proto_rawDescOnce.Do(func() { file_asynq_proto_rawDescOnce.Do(func() {
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc))) file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData)
}) })
return file_asynq_proto_rawDescData return file_asynq_proto_rawDescData
} }
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_asynq_proto_goTypes = []any{ var file_asynq_proto_goTypes = []any{
(*TaskMessage)(nil), // 0: asynq.TaskMessage (*TaskMessage)(nil), // 0: asynq.TaskMessage
(*ServerInfo)(nil), // 1: asynq.ServerInfo (*ServerInfo)(nil), // 1: asynq.ServerInfo
(*WorkerInfo)(nil), // 2: asynq.WorkerInfo (*WorkerInfo)(nil), // 2: asynq.WorkerInfo
(*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry (*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry
(*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent (*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent
nil, // 5: asynq.TaskMessage.HeadersEntry nil, // 5: asynq.ServerInfo.QueuesEntry
nil, // 6: asynq.ServerInfo.QueuesEntry (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
} }
var file_asynq_proto_depIdxs = []int32{ var file_asynq_proto_depIdxs = []int32{
5, // 0: asynq.TaskMessage.headers:type_name -> asynq.TaskMessage.HeadersEntry 5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry 6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp 6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp 6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp 6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp 6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp 6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp 7, // [7:7] is the sub-list for method output_type
8, // [8:8] is the sub-list for method output_type 7, // [7:7] is the sub-list for method input_type
8, // [8:8] is the sub-list for method input_type 7, // [7:7] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension type_name 7, // [7:7] is the sub-list for extension extendee
8, // [8:8] is the sub-list for extension extendee 0, // [0:7] is the sub-list for field type_name
0, // [0:8] is the sub-list for field type_name
} }
func init() { file_asynq_proto_init() } func init() { file_asynq_proto_init() }
@@ -718,13 +763,75 @@ func file_asynq_proto_init() {
if File_asynq_proto != nil { if File_asynq_proto != nil {
return return
} }
if !protoimpl.UnsafeEnabled {
file_asynq_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*TaskMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*ServerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*WorkerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[3].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEntry); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[4].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEnqueueEvent); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{ File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)), RawDescriptor: file_asynq_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 7, NumMessages: 6,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },
@@ -733,6 +840,7 @@ func file_asynq_proto_init() {
MessageInfos: file_asynq_proto_msgTypes, MessageInfos: file_asynq_proto_msgTypes,
}.Build() }.Build()
File_asynq_proto = out.File File_asynq_proto = out.File
file_asynq_proto_rawDesc = nil
file_asynq_proto_goTypes = nil file_asynq_proto_goTypes = nil
file_asynq_proto_depIdxs = nil file_asynq_proto_depIdxs = nil
} }

View File

@@ -11,7 +11,7 @@ option go_package = "github.com/hibiken/asynq/internal/proto";
// TaskMessage is the internal representation of a task with additional // TaskMessage is the internal representation of a task with additional
// metadata fields. // metadata fields.
// Next ID: 16 // Next ID: 15
message TaskMessage { message TaskMessage {
// Type indicates the kind of the task to be performed. // Type indicates the kind of the task to be performed.
string type = 1; string type = 1;
@@ -19,9 +19,6 @@ message TaskMessage {
// Payload holds data needed to process the task. // Payload holds data needed to process the task.
bytes payload = 2; bytes payload = 2;
// Headers holds additional metadata for the task.
map<string, string> headers = 15;
// Unique identifier for the task. // Unique identifier for the task.
string id = 3; string id = 3;

View File

@@ -1412,93 +1412,6 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
return n, nil return n, nil
} }
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// --
// ARGV[1] -> task message data
//
// Output:
// Numeric code indicating the status:
// Returns 1 if task is successfully updated.
// Returns 0 if task is not found.
// Returns -1 if task is not in scheduled state.
var updateTaskPayloadCmd = redis.NewScript(`
-- Check if given taks exists
if redis.call("EXISTS", KEYS[1]) == 0 then
return 0
end
local state, pending_since, group, unique_key = unpack(redis.call("HMGET", KEYS[1], "state", "pending_since", "group", "unique_key"))
if state ~= "scheduled" then
return -1
end
local redis_call_args = {"state", state}
if pending_since then
table.insert(redis_call_args, "pending_since")
table.insert(redis_call_args, pending_since)
end
if group then
table.insert(redis_call_args, "group")
table.insert(redis_call_args, group)
end
if unique_key then
table.insert(redis_call_args, "unique_key")
table.insert(redis_call_args, unique_key)
end
redis.call("HSET", KEYS[1], "msg", ARGV[1], unpack(redis_call_args))
return 1
`)
// UpdateTaskPayload finds a task that matches the id from the given queue and updates it's payload.
// It returns nil if it successfully updated the task payload.
//
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
// If a task is in active state it returns non-nil error with Code FailedPrecondition.
func (r *RDB) UpdateTaskPayload(qname, id string, payload []byte) error {
var op errors.Op = "rdb.UpdateTask"
if err := r.checkQueueExists(qname); err != nil {
return errors.E(op, errors.CanonicalCode(err), err)
}
taskInfo, err := r.GetTaskInfo(qname, id)
if err != nil {
return errors.E(op, errors.Unknown, err)
}
taskInfo.Message.Payload = payload
encoded, err := base.EncodeMessage(taskInfo.Message)
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
keys := []string{
base.TaskKey(qname, id),
}
argv := []interface{}{
encoded,
}
res, err := updateTaskPayloadCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, err)
}
n, ok := res.(int64)
if !ok {
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: updateTaskCmd script returned unexported value %v", res))
}
switch n {
case 1:
return nil
case 0:
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id})
case -1:
return errors.E(op, errors.FailedPrecondition, "cannot update task that is not in scheduled state.")
default:
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from updateTaskCmd script: %d", n))
}
}
// Input: // Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:groups // KEYS[2] -> asynq:{<qname>}:groups

View File

@@ -122,10 +122,10 @@ func (mgr *PeriodicTaskManager) Start() error {
panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize") panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize")
} }
if err := mgr.initialSync(); err != nil { if err := mgr.initialSync(); err != nil {
return fmt.Errorf("asynq: %w", err) return fmt.Errorf("asynq: %v", err)
} }
if err := mgr.s.Start(); err != nil { if err := mgr.s.Start(); err != nil {
return fmt.Errorf("asynq: %w", err) return fmt.Errorf("asynq: %v", err)
} }
mgr.wg.Add(1) mgr.wg.Add(1)
go func() { go func() {
@@ -168,11 +168,11 @@ func (mgr *PeriodicTaskManager) Run() error {
func (mgr *PeriodicTaskManager) initialSync() error { func (mgr *PeriodicTaskManager) initialSync() error {
configs, err := mgr.p.GetConfigs() configs, err := mgr.p.GetConfigs()
if err != nil { if err != nil {
return fmt.Errorf("initial call to GetConfigs failed: %w", err) return fmt.Errorf("initial call to GetConfigs failed: %v", err)
} }
for _, c := range configs { for _, c := range configs {
if err := validatePeriodicTaskConfig(c); err != nil { if err := validatePeriodicTaskConfig(c); err != nil {
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %w", err) return fmt.Errorf("initial call to GetConfigs contained an invalid config: %v", err)
} }
} }
mgr.add(configs) mgr.add(configs)

View File

@@ -230,7 +230,6 @@ func (p *processor) exec() {
ctx: ctx, ctx: ctx,
}, },
) )
task.headers = msg.Headers
resCh <- p.perform(ctx, task) resCh <- p.perform(ctx, task)
}() }()
@@ -334,7 +333,7 @@ var RevokeTask = errors.New("revoke task")
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil { if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers), err) p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
} }
switch { switch {
case errors.Is(err, RevokeTask): case errors.Is(err, RevokeTask):
@@ -355,7 +354,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
} }
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline()) ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel() defer cancel()
d := p.retryDelayFunc(msg.Retried, e, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers)) d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure) err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
if err != nil { if err != nil {

View File

@@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() {
} }
func (r *recoverer) retry(msg *base.TaskMessage, err error) { func (r *recoverer) retry(msg *base.TaskMessage, err error) {
delay := r.retryDelayFunc(msg.Retried, err, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers)) delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(delay) retryAt := time.Now().Add(delay)
if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil { if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err) r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)

View File

@@ -6,16 +6,12 @@ package asynq
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
"sync" "sync"
) )
// ErrHandlerNotFound indicates that no task handler was found for a given pattern.
var ErrHandlerNotFound = errors.New("handler not found for task")
// ServeMux is a multiplexer for asynchronous tasks. // ServeMux is a multiplexer for asynchronous tasks.
// It matches the type of each task against a list of registered patterns // It matches the type of each task against a list of registered patterns
// and calls the handler for the pattern that most closely matches the // and calls the handler for the pattern that most closely matches the
@@ -153,8 +149,8 @@ func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
// NotFound returns an error indicating that the handler was not found for the given task. // NotFound returns an error indicating that the handler was not found for the given task.
func NotFound(ctx context.Context, task *Task) error { func NotFound(ctx context.Context, task *Task) error {
return fmt.Errorf("%w %q", ErrHandlerNotFound, task.Type()) return fmt.Errorf("handler not found for task %q", task.Type())
} }
// NotFoundHandler returns a simple task handler that returns a not found error. // NotFoundHandler returns a simple task handler that returns a ``not found`` error.
func NotFoundHandler() Handler { return HandlerFunc(NotFound) } func NotFoundHandler() Handler { return HandlerFunc(NotFound) }

View File

@@ -24,7 +24,7 @@ func makeFakeHandler(identity string) Handler {
} }
// makeFakeMiddleware returns a middleware function that appends the given identity // makeFakeMiddleware returns a middleware function that appends the given identity
// to the global invoked slice. //to the global invoked slice.
func makeFakeMiddleware(identity string) MiddlewareFunc { func makeFakeMiddleware(identity string) MiddlewareFunc {
return func(next Handler) Handler { return func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, t *Task) error { return HandlerFunc(func(ctx context.Context, t *Task) error {

View File

@@ -174,15 +174,16 @@ type Config struct {
// }) // })
// //
// ErrorHandler: asynq.ErrorHandlerFunc(reportError) // ErrorHandler: asynq.ErrorHandlerFunc(reportError)
//
// we can also handle panic error like: // we can also handle panic error like:
// func reportError(ctx context, task *asynq.Task, err error) { // func reportError(ctx context, task *asynq.Task, err error) {
// if asynq.IsPanicError(err) { // if asynq.IsPanic(err) {
// errorReportingService.Notify(err) // errorReportingService.Notify(err)
// } // }
// }) // })
// //
// ErrorHandler: asynq.ErrorHandlerFunc(reportError) // ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
// Logger specifies the logger used by the server instance. // Logger specifies the logger used by the server instance.

View File

@@ -24,10 +24,8 @@ func (srv *Server) waitForSignals() {
if sig == unix.SIGTSTP { if sig == unix.SIGTSTP {
srv.Stop() srv.Stop()
continue continue
} else {
srv.Stop()
break
} }
break
} }
} }

View File

@@ -8,9 +8,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/log"
"github.com/redis/go-redis/v9"
) )
type subscriber struct { type subscriber struct {

View File

@@ -39,7 +39,6 @@ By default, CLI will try to connect to a redis server running at `localhost:6379
--config string config file to set flag defaut values (default is $HOME/.asynq.yaml) --config string config file to set flag defaut values (default is $HOME/.asynq.yaml)
-n, --db int redis database number (default is 0) -n, --db int redis database number (default is 0)
-h, --help help for asynq -h, --help help for asynq
-U, --username string username to use when connecting to redis server
-p, --password string password to use when connecting to redis server -p, --password string password to use when connecting to redis server
-u, --uri string redis server URI (default "127.0.0.1:6379") -u, --uri string redis server URI (default "127.0.0.1:6379")

View File

@@ -16,12 +16,11 @@ import (
// ScreenDrawer is used to draw contents on screen. // ScreenDrawer is used to draw contents on screen.
// //
// Usage example: // Usage example:
// // d := NewScreenDrawer(s)
// d := NewScreenDrawer(s) // d.Println("Hello world", mystyle)
// d.Println("Hello world", mystyle) // d.NL() // adds newline
// d.NL() // adds newline // d.Print("foo", mystyle.Bold(true))
// d.Print("foo", mystyle.Bold(true)) // d.Print("bar", mystyle.Italic(true))
// d.Print("bar", mystyle.Italic(true))
type ScreenDrawer struct { type ScreenDrawer struct {
l *LineDrawer l *LineDrawer
} }

View File

@@ -1,4 +1,3 @@
//
// Copyright 2020 Kentaro Hibino. All rights reserved. // Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license // Use of this source code is governed by a MIT license
// that can be found in the LICENSE file. // that can be found in the LICENSE file.
@@ -37,13 +36,10 @@ var (
uri string uri string
db int db int
password string password string
username string
useRedisCluster bool useRedisCluster bool
clusterAddrs string clusterAddrs string
tlsServerName string tlsServerName string
insecure bool
useTLS bool
) )
// rootCmd represents the base command when called without any subcommands // rootCmd represents the base command when called without any subcommands
@@ -261,6 +257,7 @@ func adjustPadding(lines ...*displayLine) {
func rpad(s string, padding int) string { func rpad(s string, padding int) string {
tmpl := fmt.Sprintf("%%-%ds ", padding) tmpl := fmt.Sprintf("%%-%ds ", padding)
return fmt.Sprintf(tmpl, s) return fmt.Sprintf(tmpl, s)
} }
// lpad adds padding to the left of a string. // lpad adds padding to the left of a string.
@@ -311,26 +308,19 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "Redis server URI") rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "Redis server URI")
rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "Redis database number (default is 0)") rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "Redis database number (default is 0)")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password to use when connecting to redis server") rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password to use when connecting to redis server")
rootCmd.PersistentFlags().StringVarP(&username, "username", "U", "", "Username to use when connecting to Redis (ACL username)")
rootCmd.PersistentFlags().BoolVar(&useRedisCluster, "cluster", false, "Connect to redis cluster") rootCmd.PersistentFlags().BoolVar(&useRedisCluster, "cluster", false, "Connect to redis cluster")
rootCmd.PersistentFlags().StringVar(&clusterAddrs, "cluster_addrs", rootCmd.PersistentFlags().StringVar(&clusterAddrs, "cluster_addrs",
"127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005", "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005",
"List of comma-separated redis server addresses") "List of comma-separated redis server addresses")
rootCmd.PersistentFlags().BoolVar(&useTLS, "tls", false, "Enable TLS connection")
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server", rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
"", "Server name for TLS validation") "", "Server name for TLS validation")
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
false, "Allow insecure TLS connection by skipping cert validation")
// Bind flags with config. // Bind flags with config.
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri")) viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db")) viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password")) viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password"))
viper.BindPFlag("username", rootCmd.PersistentFlags().Lookup("username"))
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster")) viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs")) viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
viper.BindPFlag("tls", rootCmd.PersistentFlags().Lookup("tls"))
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server")) viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
} }
// initConfig reads in config file and ENV variables if set. // initConfig reads in config file and ENV variables if set.
@@ -367,7 +357,6 @@ func createRDB() *rdb.RDB {
c = redis.NewClusterClient(&redis.ClusterOptions{ c = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs, Addrs: addrs,
Password: viper.GetString("password"), Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(), TLSConfig: getTLSConfig(),
}) })
} else { } else {
@@ -375,7 +364,6 @@ func createRDB() *rdb.RDB {
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(), TLSConfig: getTLSConfig(),
}) })
} }
@@ -398,7 +386,6 @@ func getRedisConnOpt() asynq.RedisConnOpt {
return asynq.RedisClusterClientOpt{ return asynq.RedisClusterClientOpt{
Addrs: addrs, Addrs: addrs,
Password: viper.GetString("password"), Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(), TLSConfig: getTLSConfig(),
} }
} }
@@ -406,22 +393,16 @@ func getRedisConnOpt() asynq.RedisConnOpt {
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(), TLSConfig: getTLSConfig(),
} }
} }
func getTLSConfig() *tls.Config { func getTLSConfig() *tls.Config {
tlsServer := viper.GetString("tls_server") tlsServer := viper.GetString("tls_server")
if tlsServer != "" { if tlsServer == "" {
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")} return nil
} }
return &tls.Config{ServerName: tlsServer}
if viper.GetBool("tls") {
return &tls.Config{InsecureSkipVerify: viper.GetBool("insecure")}
}
return nil
} }
// printTable is a helper function to print data in table format. // printTable is a helper function to print data in table format.
@@ -429,22 +410,18 @@ func getTLSConfig() *tls.Config {
// cols is a list of headers and printRow specifies how to print rows. // cols is a list of headers and printRow specifies how to print rows.
// //
// Example: // Example:
// // type User struct {
// type User struct { // Name string
// Name string // Addr string
// Addr string // Age int
// Age int // }
// }
//
// data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...} // data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...}
// cols := []string{"Name", "Addr", "Age"} // cols := []string{"Name", "Addr", "Age"}
// // printRows := func(w io.Writer, tmpl string) {
// printRows := func(w io.Writer, tmpl string) { // for _, u := range data {
// for _, u := range data { // fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age) // }
// } // }
// }
//
// printTable(cols, printRows) // printTable(cols, printRows)
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) { func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
format := strings.Repeat("%v\t", len(cols)) + "\n" format := strings.Repeat("%v\t", len(cols)) + "\n"

View File

@@ -770,3 +770,4 @@ func taskRunAll(cmd *cobra.Command, args []string) {
} }
fmt.Printf("%d tasks are now pending\n", n) fmt.Printf("%d tasks are now pending\n", n)
} }

View File

@@ -25,13 +25,13 @@ type QueueMetricsCollector struct {
func (qmc *QueueMetricsCollector) collectQueueInfo() ([]*asynq.QueueInfo, error) { func (qmc *QueueMetricsCollector) collectQueueInfo() ([]*asynq.QueueInfo, error) {
qnames, err := qmc.inspector.Queues() qnames, err := qmc.inspector.Queues()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get queue names: %w", err) return nil, fmt.Errorf("failed to get queue names: %v", err)
} }
infos := make([]*asynq.QueueInfo, len(qnames)) infos := make([]*asynq.QueueInfo, len(qnames))
for i, qname := range qnames { for i, qname := range qnames {
qinfo, err := qmc.inspector.GetQueueInfo(qname) qinfo, err := qmc.inspector.GetQueueInfo(qname)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get queue info: %w", err) return nil, fmt.Errorf("failed to get queue info: %v", err)
} }
infos[i] = qinfo infos[i] = qinfo
} }

View File

@@ -94,7 +94,7 @@ func (s *Semaphore) Release(ctx context.Context) error {
n, err := s.rc.ZRem(ctx, semaphoreKey(s.scope), taskID).Result() n, err := s.rc.ZRem(ctx, semaphoreKey(s.scope), taskID).Result()
if err != nil { if err != nil {
return fmt.Errorf("redis command failed: %w", err) return fmt.Errorf("redis command failed: %v", err)
} }
if n == 0 { if n == 0 {