mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-02 04:45:50 +08:00
Compare commits
19 Commits
sohail/uni
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d704b68a42 | ||
|
|
a8db5b5571 | ||
|
|
e4248e2749 | ||
|
|
c4876e7247 | ||
|
|
dd2c3de356 | ||
|
|
ff887e1f89 | ||
|
|
5de9b1faf0 | ||
|
|
8261a03f0d | ||
|
|
74c47eb8bb | ||
|
|
e9037f003d | ||
|
|
604175e6ca | ||
|
|
1831a07efe | ||
|
|
d64f0b7ed0 | ||
|
|
a889ef0b08 | ||
|
|
093ba04266 | ||
|
|
c327bc40a2 | ||
|
|
ea0c6e93f0 | ||
|
|
489e21920b | ||
|
|
043dcfbf56 |
3
.github/workflows/benchstat.yml
vendored
3
.github/workflows/benchstat.yml
vendored
@@ -9,6 +9,7 @@ on: [pull_request]
|
||||
jobs:
|
||||
incoming:
|
||||
runs-on: ubuntu-latest
|
||||
if: false
|
||||
services:
|
||||
redis:
|
||||
image: redis:7
|
||||
@@ -31,6 +32,7 @@ jobs:
|
||||
|
||||
current:
|
||||
runs-on: ubuntu-latest
|
||||
if: false
|
||||
services:
|
||||
redis:
|
||||
image: redis:7
|
||||
@@ -55,6 +57,7 @@ jobs:
|
||||
|
||||
benchstat:
|
||||
needs: [incoming, current]
|
||||
if: false
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
5
.github/workflows/build.yml
vendored
5
.github/workflows/build.yml
vendored
@@ -7,7 +7,7 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-latest]
|
||||
go-version: [1.22.x, 1.23.x]
|
||||
go-version: [1.24.x, 1.25.x]
|
||||
runs-on: ${{ matrix.os }}
|
||||
services:
|
||||
redis:
|
||||
@@ -45,7 +45,7 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-latest]
|
||||
go-version: [1.22.x, 1.23.x]
|
||||
go-version: [1.24.x, 1.25.x]
|
||||
runs-on: ${{ matrix.os }}
|
||||
services:
|
||||
redis:
|
||||
@@ -70,6 +70,7 @@ jobs:
|
||||
golangci:
|
||||
name: lint
|
||||
runs-on: ubuntu-latest
|
||||
if: false
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
|
||||
39
CHANGELOG.md
39
CHANGELOG.md
@@ -7,6 +7,45 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.26.0] - 2026-02-03
|
||||
|
||||
### Upgrades
|
||||
- Prepare CI for Go 1.24.x and 1.25.x (commit: e9037f0)
|
||||
|
||||
### Added
|
||||
- Add Headers support to tasks (PR: https://github.com/hibiken/asynq/pull/1070)
|
||||
- Add `--tls` option to dash command (PR: https://github.com/hibiken/asynq/pull/1073)
|
||||
- Add `--username` CLI flag for Redis ACL authentication (PR: https://github.com/hibiken/asynq/pull/1083)
|
||||
- Add `UpdateTaskPayload` method for inspector (PR: https://github.com/hibiken/asynq/pull/1042)
|
||||
|
||||
### Fixes
|
||||
- Fix: Correct error message text in ResultWriter.Write (PR: https://github.com/hibiken/asynq/pull/1054)
|
||||
- Fix: Wrap all fmt.Errorf errors with %w (PR: https://github.com/hibiken/asynq/pull/1047)
|
||||
- Fix: ServeMux.NotFoundHandler returns ErrHandlerNotFound error (PR: https://github.com/hibiken/asynq/pull/1031)
|
||||
|
||||
### Changed
|
||||
- Docs: Update server.go documentation (PR: https://github.com/hibiken/asynq/pull/1010)
|
||||
- Chore: Fix godoc comment (PR: https://github.com/hibiken/asynq/pull/1009)
|
||||
|
||||
## [0.25.1] - 2024-12-11
|
||||
|
||||
### Upgrades
|
||||
|
||||
* Some packages
|
||||
|
||||
### Added
|
||||
|
||||
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
|
||||
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
|
||||
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
|
||||
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
|
||||
|
||||
### Fixes
|
||||
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
|
||||
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
|
||||
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
|
||||
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
|
||||
|
||||
## [0.25.0] - 2024-10-29
|
||||
|
||||
### Upgrades
|
||||
|
||||
@@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) {
|
||||
}
|
||||
tasks := make([]*Task, len(msgs))
|
||||
for i, m := range msgs {
|
||||
tasks[i] = NewTask(m.Type, m.Payload)
|
||||
tasks[i] = NewTaskWithHeaders(m.Type, m.Payload, m.Headers)
|
||||
}
|
||||
aggregatedTask := a.ga.Aggregate(gname, tasks)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), deadline)
|
||||
|
||||
44
asynq.go
44
asynq.go
@@ -8,14 +8,15 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"maps"
|
||||
"net"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// Task represents a unit of work to be performed.
|
||||
@@ -26,6 +27,9 @@ type Task struct {
|
||||
// payload holds data needed to perform the task.
|
||||
payload []byte
|
||||
|
||||
// headers holds additional metadata for the task.
|
||||
headers map[string]string
|
||||
|
||||
// opts holds options for the task.
|
||||
opts []Option
|
||||
|
||||
@@ -33,8 +37,9 @@ type Task struct {
|
||||
w *ResultWriter
|
||||
}
|
||||
|
||||
func (t *Task) Type() string { return t.typename }
|
||||
func (t *Task) Payload() []byte { return t.payload }
|
||||
func (t *Task) Type() string { return t.typename }
|
||||
func (t *Task) Payload() []byte { return t.payload }
|
||||
func (t *Task) Headers() map[string]string { return t.headers }
|
||||
|
||||
// ResultWriter returns a pointer to the ResultWriter associated with the task.
|
||||
//
|
||||
@@ -48,6 +53,21 @@ func NewTask(typename string, payload []byte, opts ...Option) *Task {
|
||||
return &Task{
|
||||
typename: typename,
|
||||
payload: payload,
|
||||
headers: nil,
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
|
||||
// NewTaskWithHeaders returns a new Task given a type name, payload data, and headers.
|
||||
// Options can be passed to configure task processing behavior.
|
||||
// TODO: In the next major (breaking) release, fold this functionality into NewTask
|
||||
//
|
||||
// so that headers are supported directly. After that, remove this method.
|
||||
func NewTaskWithHeaders(typename string, payload []byte, headers map[string]string, opts ...Option) *Task {
|
||||
return &Task{
|
||||
typename: typename,
|
||||
payload: payload,
|
||||
headers: maps.Clone(headers),
|
||||
opts: opts,
|
||||
}
|
||||
}
|
||||
@@ -57,6 +77,7 @@ func newTask(typename string, payload []byte, w *ResultWriter) *Task {
|
||||
return &Task{
|
||||
typename: typename,
|
||||
payload: payload,
|
||||
headers: make(map[string]string),
|
||||
w: w,
|
||||
}
|
||||
}
|
||||
@@ -75,6 +96,9 @@ type TaskInfo struct {
|
||||
// Payload is the payload data of the task.
|
||||
Payload []byte
|
||||
|
||||
// Headers holds additional metadata for the task.
|
||||
Headers map[string]string
|
||||
|
||||
// State indicates the task state.
|
||||
State TaskState
|
||||
|
||||
@@ -145,6 +169,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
|
||||
Queue: msg.Queue,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload, // Do we need to make a copy?
|
||||
Headers: msg.Headers,
|
||||
MaxRetry: msg.Retry,
|
||||
Retried: msg.Retried,
|
||||
LastErr: msg.ErrorMsg,
|
||||
@@ -442,14 +467,15 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
|
||||
//
|
||||
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
|
||||
// Supported formats are:
|
||||
// redis://[:password@]host[:port][/dbnumber]
|
||||
// rediss://[:password@]host[:port][/dbnumber]
|
||||
// redis-socket://[:password@]path[?db=dbnumber]
|
||||
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
|
||||
//
|
||||
// redis://[:password@]host[:port][/dbnumber]
|
||||
// rediss://[:password@]host[:port][/dbnumber]
|
||||
// redis-socket://[:password@]path[?db=dbnumber]
|
||||
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
|
||||
func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
||||
u, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
|
||||
return nil, fmt.Errorf("asynq: could not parse redis uri: %w", err)
|
||||
}
|
||||
switch u.Scheme {
|
||||
case "redis", "rediss":
|
||||
@@ -539,7 +565,7 @@ type ResultWriter struct {
|
||||
func (w *ResultWriter) Write(data []byte) (n int, err error) {
|
||||
select {
|
||||
case <-w.ctx.Done():
|
||||
return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err())
|
||||
return 0, fmt.Errorf("failed to write task result: %w", w.ctx.Err())
|
||||
default:
|
||||
}
|
||||
return w.broker.WriteResult(w.qname, w.id, data)
|
||||
|
||||
@@ -11,11 +11,11 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/hibiken/asynq/internal/log"
|
||||
h "github.com/hibiken/asynq/internal/testutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
//============================================================================
|
||||
|
||||
@@ -385,6 +385,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
|
||||
ID: opt.taskID,
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Headers: task.Headers(),
|
||||
Queue: opt.queue,
|
||||
Retry: opt.retry,
|
||||
Deadline: deadline.Unix(),
|
||||
|
||||
470
client_test.go
470
client_test.go
@@ -1191,3 +1191,473 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientEnqueueWithHeaders(t *testing.T) {
|
||||
r := setup(t)
|
||||
client := NewClient(getRedisConnOpt(t))
|
||||
defer client.Close()
|
||||
|
||||
now := time.Now()
|
||||
headers := map[string]string{
|
||||
"user-id": "123",
|
||||
"request-id": "abc-def-ghi",
|
||||
"priority": "high",
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
task *Task
|
||||
opts []Option
|
||||
wantInfo *TaskInfo
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
desc: "Task with headers",
|
||||
task: NewTaskWithHeaders("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}), headers),
|
||||
opts: []Option{},
|
||||
wantInfo: &TaskInfo{
|
||||
Queue: "default",
|
||||
Type: "send_email",
|
||||
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
|
||||
Headers: headers,
|
||||
State: TaskStatePending,
|
||||
MaxRetry: defaultMaxRetry,
|
||||
Retried: 0,
|
||||
LastErr: "",
|
||||
LastFailedAt: time.Time{},
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: time.Time{},
|
||||
NextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: "send_email",
|
||||
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
|
||||
Headers: headers,
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Task with empty headers",
|
||||
task: NewTaskWithHeaders("process_data", []byte("data"), map[string]string{}),
|
||||
opts: []Option{},
|
||||
wantInfo: &TaskInfo{
|
||||
Queue: "default",
|
||||
Type: "process_data",
|
||||
Payload: []byte("data"),
|
||||
Headers: map[string]string{},
|
||||
State: TaskStatePending,
|
||||
MaxRetry: defaultMaxRetry,
|
||||
Retried: 0,
|
||||
LastErr: "",
|
||||
LastFailedAt: time.Time{},
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: time.Time{},
|
||||
NextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: "process_data",
|
||||
Payload: []byte("data"),
|
||||
Headers: nil,
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Task with nil headers",
|
||||
task: NewTaskWithHeaders("cleanup", nil, nil),
|
||||
opts: []Option{},
|
||||
wantInfo: &TaskInfo{
|
||||
Queue: "default",
|
||||
Type: "cleanup",
|
||||
Payload: nil,
|
||||
Headers: nil,
|
||||
State: TaskStatePending,
|
||||
MaxRetry: defaultMaxRetry,
|
||||
Retried: 0,
|
||||
LastErr: "",
|
||||
LastFailedAt: time.Time{},
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: time.Time{},
|
||||
NextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: "cleanup",
|
||||
Payload: nil,
|
||||
Headers: nil,
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Task with headers and custom options",
|
||||
task: NewTaskWithHeaders("notify", []byte("notification"), map[string]string{"channel": "email"}),
|
||||
opts: []Option{MaxRetry(5), Queue("notifications")},
|
||||
wantInfo: &TaskInfo{
|
||||
Queue: "notifications",
|
||||
Type: "notify",
|
||||
Payload: []byte("notification"),
|
||||
Headers: map[string]string{"channel": "email"},
|
||||
State: TaskStatePending,
|
||||
MaxRetry: 5,
|
||||
Retried: 0,
|
||||
LastErr: "",
|
||||
LastFailedAt: time.Time{},
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: time.Time{},
|
||||
NextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"notifications": {
|
||||
{
|
||||
Type: "notify",
|
||||
Payload: []byte("notification"),
|
||||
Headers: map[string]string{"channel": "email"},
|
||||
Retry: 5,
|
||||
Queue: "notifications",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
|
||||
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
|
||||
if err != nil {
|
||||
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
|
||||
continue
|
||||
}
|
||||
|
||||
cmpOptions := []cmp.Option{
|
||||
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
|
||||
cmpopts.EquateApproxTime(500 * time.Millisecond),
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
|
||||
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, gotInfo, tc.wantInfo, diff)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantPending {
|
||||
got := h.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientEnqueueWithHeadersScheduled(t *testing.T) {
|
||||
r := setup(t)
|
||||
client := NewClient(getRedisConnOpt(t))
|
||||
defer client.Close()
|
||||
|
||||
now := time.Now()
|
||||
oneHourLater := now.Add(time.Hour)
|
||||
headers := map[string]string{
|
||||
"correlation-id": "xyz-123",
|
||||
"source": "api",
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
task *Task
|
||||
processAt time.Time
|
||||
opts []Option
|
||||
wantInfo *TaskInfo
|
||||
wantScheduled map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
desc: "Schedule task with headers",
|
||||
task: NewTaskWithHeaders("scheduled_task", []byte("payload"), headers),
|
||||
processAt: oneHourLater,
|
||||
opts: []Option{},
|
||||
wantInfo: &TaskInfo{
|
||||
Queue: "default",
|
||||
Type: "scheduled_task",
|
||||
Payload: []byte("payload"),
|
||||
Headers: headers,
|
||||
State: TaskStateScheduled,
|
||||
MaxRetry: defaultMaxRetry,
|
||||
Retried: 0,
|
||||
LastErr: "",
|
||||
LastFailedAt: time.Time{},
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: time.Time{},
|
||||
NextProcessAt: oneHourLater,
|
||||
},
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {
|
||||
{
|
||||
Message: &base.TaskMessage{
|
||||
Type: "scheduled_task",
|
||||
Payload: []byte("payload"),
|
||||
Headers: headers,
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
Score: oneHourLater.Unix(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
|
||||
opts := append(tc.opts, ProcessAt(tc.processAt))
|
||||
gotInfo, err := client.Enqueue(tc.task, opts...)
|
||||
if err != nil {
|
||||
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
|
||||
continue
|
||||
}
|
||||
|
||||
cmpOptions := []cmp.Option{
|
||||
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
|
||||
cmpopts.EquateApproxTime(500 * time.Millisecond),
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
|
||||
t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantScheduled {
|
||||
gotScheduled := h.GetScheduledEntries(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewTaskWithHeaders(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
typename string
|
||||
payload []byte
|
||||
headers map[string]string
|
||||
opts []Option
|
||||
want *Task
|
||||
}{
|
||||
{
|
||||
desc: "Task with headers",
|
||||
typename: "test_task",
|
||||
payload: []byte("test payload"),
|
||||
headers: map[string]string{"key1": "value1", "key2": "value2"},
|
||||
opts: []Option{MaxRetry(3)},
|
||||
want: &Task{
|
||||
typename: "test_task",
|
||||
payload: []byte("test payload"),
|
||||
headers: map[string]string{"key1": "value1", "key2": "value2"},
|
||||
opts: []Option{MaxRetry(3)},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Task with empty headers",
|
||||
typename: "empty_headers",
|
||||
payload: nil,
|
||||
headers: map[string]string{},
|
||||
opts: nil,
|
||||
want: &Task{
|
||||
typename: "empty_headers",
|
||||
payload: nil,
|
||||
headers: map[string]string{},
|
||||
opts: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Task with nil headers",
|
||||
typename: "nil_headers",
|
||||
payload: []byte("data"),
|
||||
headers: nil,
|
||||
opts: []Option{Queue("test")},
|
||||
want: &Task{
|
||||
typename: "nil_headers",
|
||||
payload: []byte("data"),
|
||||
headers: nil,
|
||||
opts: []Option{Queue("test")},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got := NewTaskWithHeaders(tc.typename, tc.payload, tc.headers, tc.opts...)
|
||||
|
||||
if got.Type() != tc.want.typename {
|
||||
t.Errorf("%s: Type() = %q, want %q", tc.desc, got.Type(), tc.want.typename)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(tc.want.payload, got.Payload()); diff != "" {
|
||||
t.Errorf("%s: Payload() mismatch (-want,+got)\n%s", tc.desc, diff)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(tc.want.headers, got.Headers()); diff != "" {
|
||||
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
|
||||
}
|
||||
|
||||
if tc.headers != nil && got.Headers() != nil {
|
||||
tc.headers["modified"] = "test"
|
||||
if _, exists := got.Headers()["modified"]; exists {
|
||||
t.Errorf("%s: Headers should be cloned, but modification affected task headers", tc.desc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskHeadersMethod(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
task *Task
|
||||
want map[string]string
|
||||
wantNil bool
|
||||
}{
|
||||
{
|
||||
desc: "Task created with NewTask has nil headers",
|
||||
task: NewTask("test", []byte("data")),
|
||||
want: nil,
|
||||
wantNil: true,
|
||||
},
|
||||
{
|
||||
desc: "Task created with NewTaskWithHeaders has headers",
|
||||
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{"key": "value"}),
|
||||
want: map[string]string{"key": "value"},
|
||||
},
|
||||
{
|
||||
desc: "Task created with empty headers",
|
||||
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{}),
|
||||
want: map[string]string{},
|
||||
},
|
||||
{
|
||||
desc: "Task created with nil headers",
|
||||
task: NewTaskWithHeaders("test", []byte("data"), nil),
|
||||
want: nil,
|
||||
wantNil: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got := tc.task.Headers()
|
||||
|
||||
if tc.wantNil {
|
||||
if got != nil {
|
||||
t.Errorf("%s: Headers() = %v, want nil", tc.desc, got)
|
||||
}
|
||||
} else {
|
||||
if diff := cmp.Diff(tc.want, got); diff != "" {
|
||||
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
|
||||
r := setup(t)
|
||||
client := NewClient(getRedisConnOpt(t))
|
||||
defer client.Close()
|
||||
|
||||
now := time.Now()
|
||||
headers := map[string]string{
|
||||
"batch-id": "batch-123",
|
||||
"priority": "high",
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
task *Task
|
||||
opts []Option
|
||||
wantInfo *TaskInfo
|
||||
wantGroups map[string]map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
desc: "Task with headers and group",
|
||||
task: NewTaskWithHeaders("batch_process", []byte("item1"), headers),
|
||||
opts: []Option{Group("batch-123")},
|
||||
wantInfo: &TaskInfo{
|
||||
Queue: "default",
|
||||
Group: "batch-123",
|
||||
Type: "batch_process",
|
||||
Payload: []byte("item1"),
|
||||
Headers: headers,
|
||||
State: TaskStateAggregating,
|
||||
MaxRetry: defaultMaxRetry,
|
||||
Retried: 0,
|
||||
LastErr: "",
|
||||
LastFailedAt: time.Time{},
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: time.Time{},
|
||||
NextProcessAt: time.Time{},
|
||||
},
|
||||
wantGroups: map[string]map[string][]base.Z{
|
||||
"default": {
|
||||
"batch-123": {
|
||||
{
|
||||
Message: &base.TaskMessage{
|
||||
Type: "batch_process",
|
||||
Payload: []byte("item1"),
|
||||
Headers: headers,
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
GroupKey: "batch-123",
|
||||
},
|
||||
Score: now.Unix(),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
|
||||
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
|
||||
if err != nil {
|
||||
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
|
||||
continue
|
||||
}
|
||||
|
||||
cmpOptions := []cmp.Option{
|
||||
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
|
||||
cmpopts.EquateApproxTime(500 * time.Millisecond),
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
|
||||
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, gotInfo, tc.wantInfo, diff)
|
||||
}
|
||||
|
||||
for qname, groups := range tc.wantGroups {
|
||||
for groupKey, want := range groups {
|
||||
got := h.GetGroupEntries(t, r, qname, groupKey)
|
||||
if diff := cmp.Diff(want, got, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.GroupKey(qname, groupKey), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
81
doc.go
81
doc.go
@@ -8,41 +8,41 @@ Package asynq provides a framework for Redis based distrubted task queue.
|
||||
Asynq uses Redis as a message broker. To connect to redis,
|
||||
specify the connection using one of RedisConnOpt types.
|
||||
|
||||
redisConnOpt = asynq.RedisClientOpt{
|
||||
Addr: "127.0.0.1:6379",
|
||||
Password: "xxxxx",
|
||||
DB: 2,
|
||||
}
|
||||
redisConnOpt = asynq.RedisClientOpt{
|
||||
Addr: "127.0.0.1:6379",
|
||||
Password: "xxxxx",
|
||||
DB: 2,
|
||||
}
|
||||
|
||||
The Client is used to enqueue a task.
|
||||
|
||||
client := asynq.NewClient(redisConnOpt)
|
||||
|
||||
client := asynq.NewClient(redisConnOpt)
|
||||
// Task is created with two parameters: its type and payload.
|
||||
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
|
||||
b, err := json.Marshal(ExamplePayload{UserID: 42})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Task is created with two parameters: its type and payload.
|
||||
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
|
||||
b, err := json.Marshal(ExamplePayload{UserID: 42})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
task := asynq.NewTask("example", b)
|
||||
|
||||
task := asynq.NewTask("example", b)
|
||||
// Enqueue the task to be processed immediately.
|
||||
info, err := client.Enqueue(task)
|
||||
|
||||
// Enqueue the task to be processed immediately.
|
||||
info, err := client.Enqueue(task)
|
||||
|
||||
// Schedule the task to be processed after one minute.
|
||||
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
|
||||
// Schedule the task to be processed after one minute.
|
||||
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
|
||||
|
||||
The Server is used to run the task processing workers with a given
|
||||
handler.
|
||||
srv := asynq.NewServer(redisConnOpt, asynq.Config{
|
||||
Concurrency: 10,
|
||||
})
|
||||
|
||||
if err := srv.Run(handler); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
srv := asynq.NewServer(redisConnOpt, asynq.Config{
|
||||
Concurrency: 10,
|
||||
})
|
||||
|
||||
if err := srv.Run(handler); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
Handler is an interface type with a method which
|
||||
takes a task and returns an error. Handler should return nil if
|
||||
@@ -50,23 +50,24 @@ the processing is successful, otherwise return a non-nil error.
|
||||
If handler panics or returns a non-nil error, the task will be retried in the future.
|
||||
|
||||
Example of a type that implements the Handler interface.
|
||||
type TaskHandler struct {
|
||||
// ...
|
||||
}
|
||||
|
||||
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
|
||||
switch task.Type {
|
||||
case "example":
|
||||
var data ExamplePayload
|
||||
if err := json.Unmarshal(task.Payload(), &data); err != nil {
|
||||
return err
|
||||
}
|
||||
// perform task with the data
|
||||
type TaskHandler struct {
|
||||
// ...
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unexpected task type %q", task.Type)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
|
||||
switch task.Type {
|
||||
case "example":
|
||||
var data ExamplePayload
|
||||
if err := json.Unmarshal(task.Payload(), &data); err != nil {
|
||||
return err
|
||||
}
|
||||
// perform task with the data
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unexpected task type %q", task.Type)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
package asynq
|
||||
|
||||
@@ -123,7 +123,7 @@ func ExampleResultWriter() {
|
||||
res := []byte("task result data")
|
||||
n, err := task.ResultWriter().Write(res) // implements io.Writer
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to write task result: %v", err)
|
||||
return fmt.Errorf("failed to write task result: %w", err)
|
||||
}
|
||||
log.Printf(" %d bytes written", n)
|
||||
return nil
|
||||
|
||||
16
go.mod
16
go.mod
@@ -1,20 +1,20 @@
|
||||
module github.com/hibiken/asynq
|
||||
|
||||
go 1.22
|
||||
go 1.24.0
|
||||
|
||||
require (
|
||||
github.com/google/go-cmp v0.6.0
|
||||
github.com/google/go-cmp v0.7.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/redis/go-redis/v9 v9.7.0
|
||||
github.com/redis/go-redis/v9 v9.14.1
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/spf13/cast v1.7.0
|
||||
github.com/spf13/cast v1.10.0
|
||||
go.uber.org/goleak v1.3.0
|
||||
golang.org/x/sys v0.27.0
|
||||
golang.org/x/time v0.8.0
|
||||
google.golang.org/protobuf v1.35.2
|
||||
golang.org/x/sys v0.37.0
|
||||
golang.org/x/time v0.14.0
|
||||
google.golang.org/protobuf v1.36.10
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
)
|
||||
|
||||
28
go.sum
28
go.sum
@@ -2,16 +2,16 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
|
||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
@@ -20,23 +20,23 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
|
||||
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
|
||||
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
|
||||
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
|
||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
|
||||
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
|
||||
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
|
||||
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
|
||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
|
||||
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
|
||||
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
|
||||
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
|
||||
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
|
||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
66
inspector.go
66
inspector.go
@@ -245,7 +245,7 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
|
||||
case errors.IsTaskNotFound(err):
|
||||
return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound)
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
|
||||
}
|
||||
@@ -316,7 +316,7 @@ func Page(n int) ListOption {
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
@@ -325,7 +325,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
|
||||
case errors.IsQueueNotFound(err):
|
||||
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
var tasks []*TaskInfo
|
||||
for _, i := range infos {
|
||||
@@ -344,7 +344,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
@@ -353,11 +353,11 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
|
||||
case errors.IsQueueNotFound(err):
|
||||
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
expired, err := i.rdb.ListLeaseExpired(time.Now(), queue)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
expiredSet := make(map[string]struct{}) // set of expired message IDs
|
||||
for _, msg := range expired {
|
||||
@@ -384,7 +384,7 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
@@ -393,7 +393,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
|
||||
case errors.IsQueueNotFound(err):
|
||||
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
var tasks []*TaskInfo
|
||||
for _, i := range infos {
|
||||
@@ -413,7 +413,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
@@ -422,7 +422,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
|
||||
case errors.IsQueueNotFound(err):
|
||||
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
var tasks []*TaskInfo
|
||||
for _, i := range infos {
|
||||
@@ -442,7 +442,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
@@ -451,7 +451,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
|
||||
case errors.IsQueueNotFound(err):
|
||||
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
var tasks []*TaskInfo
|
||||
for _, i := range infos {
|
||||
@@ -471,7 +471,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
@@ -480,7 +480,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
|
||||
case errors.IsQueueNotFound(err):
|
||||
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
var tasks []*TaskInfo
|
||||
for _, i := range infos {
|
||||
@@ -500,7 +500,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
@@ -509,7 +509,7 @@ func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*Tas
|
||||
case errors.IsQueueNotFound(err):
|
||||
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
return nil, fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
var tasks []*TaskInfo
|
||||
for _, i := range infos {
|
||||
@@ -583,6 +583,30 @@ func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error)
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// UpdateTaskPayload updates a task with the given id from the given queue with given payload.
|
||||
// The task needs to be in scheduled state,
|
||||
// otherwise UpdateTaskPayload will return an error.
|
||||
//
|
||||
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
|
||||
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
|
||||
// If the task is not in scheduled state, it returns a non-nil error.
|
||||
func (i *Inspector) UpdateTaskPayload(queue, id string, payload []byte) error {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
err := i.rdb.UpdateTaskPayload(queue, id, payload)
|
||||
switch {
|
||||
case errors.IsQueueNotFound(err):
|
||||
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
|
||||
case errors.IsTaskNotFound(err):
|
||||
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
|
||||
case err != nil:
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// DeleteTask deletes a task with the given id from the given queue.
|
||||
// The task needs to be in pending, scheduled, retry, or archived state,
|
||||
// otherwise DeleteTask will return an error.
|
||||
@@ -592,7 +616,7 @@ func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error)
|
||||
// If the task is in active state, it returns a non-nil error.
|
||||
func (i *Inspector) DeleteTask(queue, id string) error {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
return fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
err := i.rdb.DeleteTask(queue, id)
|
||||
switch {
|
||||
@@ -601,7 +625,7 @@ func (i *Inspector) DeleteTask(queue, id string) error {
|
||||
case errors.IsTaskNotFound(err):
|
||||
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
|
||||
case err != nil:
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
return fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -656,7 +680,7 @@ func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) {
|
||||
// If the task is in pending or active state, it returns a non-nil error.
|
||||
func (i *Inspector) RunTask(queue, id string) error {
|
||||
if err := base.ValidateQueueName(queue); err != nil {
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
return fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
err := i.rdb.RunTask(queue, id)
|
||||
switch {
|
||||
@@ -665,7 +689,7 @@ func (i *Inspector) RunTask(queue, id string) error {
|
||||
case errors.IsTaskNotFound(err):
|
||||
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
|
||||
case err != nil:
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
return fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -728,7 +752,7 @@ func (i *Inspector) ArchiveTask(queue, id string) error {
|
||||
case errors.IsTaskNotFound(err):
|
||||
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
|
||||
case err != nil:
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
return fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2369,6 +2369,148 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestInspectorUpdateTaskPayloadUpdatesScheduledTaskPayload(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
m1_old := h.NewTaskMessage("task1", []byte("m1_old"))
|
||||
m1_new := h.NewTaskMessage("task1", nil)
|
||||
m1_new.ID = m1_old.ID
|
||||
m2_old := h.NewTaskMessage("task2", nil)
|
||||
m2_new := h.NewTaskMessage("task2", []byte("m2_new"))
|
||||
m2_new.ID = m2_old.ID
|
||||
m3_old := h.NewTaskMessageWithQueue("task3", []byte("m3_old"), "custom")
|
||||
m3_new := h.NewTaskMessageWithQueue("task3", []byte("m3_new"), "custom")
|
||||
m3_new.ID = m3_old.ID
|
||||
|
||||
now := time.Now()
|
||||
z1_old := base.Z{Message: m1_old, Score: now.Add(5 * time.Minute).Unix()}
|
||||
z1_new := base.Z{Message: m1_new, Score: now.Add(5 * time.Minute).Unix()}
|
||||
z2_old := base.Z{Message: m2_old, Score: now.Add(15 * time.Minute).Unix()}
|
||||
z2_new := base.Z{Message: m2_new, Score: now.Add(15 * time.Minute).Unix()}
|
||||
z3_old := base.Z{Message: m3_old, Score: now.Add(2 * time.Minute).Unix()}
|
||||
z3_new := base.Z{Message: m3_new, Score: now.Add(2 * time.Minute).Unix()}
|
||||
|
||||
inspector := NewInspector(getRedisConnOpt(t))
|
||||
|
||||
tests := []struct {
|
||||
scheduled map[string][]base.Z
|
||||
qname string
|
||||
id string
|
||||
newPayload []byte
|
||||
wantScheduled map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {z1_old, z2_old},
|
||||
"custom": {z3_old},
|
||||
},
|
||||
qname: "default",
|
||||
id: createScheduledTask(z2_old).ID,
|
||||
newPayload: m2_new.Payload,
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {z1_old, z2_new},
|
||||
"custom": {z3_old},
|
||||
},
|
||||
},
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {z1_old, z2_old},
|
||||
"custom": {z3_old},
|
||||
},
|
||||
qname: "default",
|
||||
id: createScheduledTask(z1_old).ID,
|
||||
newPayload: m1_new.Payload,
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {z1_new, z2_old},
|
||||
"custom": {z3_old},
|
||||
},
|
||||
},
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {z1_old, z2_old},
|
||||
"custom": {z3_old},
|
||||
},
|
||||
qname: "custom",
|
||||
id: createScheduledTask(z3_old).ID,
|
||||
newPayload: m3_new.Payload,
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {z1_old, z2_old},
|
||||
"custom": {z3_new},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
h.SeedAllScheduledQueues(t, r, tc.scheduled)
|
||||
|
||||
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); err != nil {
|
||||
t.Errorf("UpdateTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
|
||||
}
|
||||
for qname, want := range tc.wantScheduled {
|
||||
gotScheduled := h.GetScheduledEntries(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" {
|
||||
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestInspectorUpdateTaskPayloadError(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
m1 := h.NewTaskMessage("task1", nil)
|
||||
m2 := h.NewTaskMessage("task2", nil)
|
||||
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
|
||||
|
||||
now := time.Now()
|
||||
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
|
||||
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
|
||||
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
|
||||
|
||||
inspector := NewInspector(getRedisConnOpt(t))
|
||||
|
||||
tests := []struct {
|
||||
tasks map[string][]base.Z
|
||||
qname string
|
||||
id string
|
||||
newPayload []byte
|
||||
wantErr error
|
||||
}{
|
||||
{
|
||||
tasks: map[string][]base.Z{
|
||||
"default": {z1, z2},
|
||||
"custom": {z3},
|
||||
},
|
||||
qname: "nonexistent",
|
||||
id: createScheduledTask(z2).ID,
|
||||
newPayload: nil,
|
||||
wantErr: ErrQueueNotFound,
|
||||
},
|
||||
{
|
||||
tasks: map[string][]base.Z{
|
||||
"default": {z1, z2},
|
||||
"custom": {z3},
|
||||
},
|
||||
qname: "default",
|
||||
id: uuid.NewString(),
|
||||
newPayload: nil,
|
||||
wantErr: ErrTaskNotFound,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
h.SeedAllScheduledQueues(t, r, tc.tasks)
|
||||
|
||||
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); !errors.Is(err, tc.wantErr) {
|
||||
t.Errorf("UpdateTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
)
|
||||
|
||||
// Version of asynq library and CLI.
|
||||
const Version = "0.25.0"
|
||||
const Version = "0.26.0"
|
||||
|
||||
// DefaultQueueName is the queue name used if none are specified by user.
|
||||
const DefaultQueueName = "default"
|
||||
@@ -240,6 +240,9 @@ type TaskMessage struct {
|
||||
// Payload holds data needed to process the task.
|
||||
Payload []byte
|
||||
|
||||
// Headers holds additional metadata for the task.
|
||||
Headers map[string]string
|
||||
|
||||
// ID is a unique identifier for each task.
|
||||
ID string
|
||||
|
||||
@@ -304,6 +307,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
|
||||
return proto.Marshal(&pb.TaskMessage{
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
Headers: msg.Headers,
|
||||
Id: msg.ID,
|
||||
Queue: msg.Queue,
|
||||
Retry: int32(msg.Retry),
|
||||
@@ -328,6 +332,7 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
|
||||
return &TaskMessage{
|
||||
Type: pbmsg.GetType(),
|
||||
Payload: pbmsg.GetPayload(),
|
||||
Headers: pbmsg.GetHeaders(),
|
||||
ID: pbmsg.GetId(),
|
||||
Queue: pbmsg.GetQueue(),
|
||||
Retry: int(pbmsg.GetRetry()),
|
||||
|
||||
@@ -107,6 +107,7 @@ type Op string
|
||||
// only the last one is recorded.
|
||||
//
|
||||
// The types are:
|
||||
//
|
||||
// errors.Op
|
||||
// The operation being performed, usually the method
|
||||
// being invoked (Get, Put, etc.).
|
||||
|
||||
@@ -4,8 +4,8 @@
|
||||
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.34.2
|
||||
// protoc v3.19.6
|
||||
// protoc-gen-go v1.36.6
|
||||
// protoc v5.29.3
|
||||
// source: asynq.proto
|
||||
|
||||
package proto
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
unsafe "unsafe"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -27,16 +28,15 @@ const (
|
||||
|
||||
// TaskMessage is the internal representation of a task with additional
|
||||
// metadata fields.
|
||||
// Next ID: 15
|
||||
// Next ID: 16
|
||||
type TaskMessage struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
// Type indicates the kind of the task to be performed.
|
||||
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
|
||||
// Payload holds data needed to process the task.
|
||||
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
|
||||
// Headers holds additional metadata for the task.
|
||||
Headers map[string]string `protobuf:"bytes,15,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
|
||||
// Unique identifier for the task.
|
||||
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
|
||||
// Name of the queue to which this task belongs.
|
||||
@@ -71,16 +71,16 @@ type TaskMessage struct {
|
||||
// Time when the task completed in success in Unix time,
|
||||
// the number of seconds elapsed since January 1, 1970 UTC.
|
||||
// This field is populated if result_ttl > 0 upon completion.
|
||||
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
|
||||
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *TaskMessage) Reset() {
|
||||
*x = TaskMessage{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_asynq_proto_msgTypes[0]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
mi := &file_asynq_proto_msgTypes[0]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *TaskMessage) String() string {
|
||||
@@ -91,7 +91,7 @@ func (*TaskMessage) ProtoMessage() {}
|
||||
|
||||
func (x *TaskMessage) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_asynq_proto_msgTypes[0]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
@@ -120,6 +120,13 @@ func (x *TaskMessage) GetPayload() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *TaskMessage) GetHeaders() map[string]string {
|
||||
if x != nil {
|
||||
return x.Headers
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *TaskMessage) GetId() string {
|
||||
if x != nil {
|
||||
return x.Id
|
||||
@@ -206,10 +213,7 @@ func (x *TaskMessage) GetCompletedAt() int64 {
|
||||
|
||||
// ServerInfo holds information about a running server.
|
||||
type ServerInfo struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
// Host machine the server is running on.
|
||||
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
|
||||
// PID of the server process.
|
||||
@@ -221,7 +225,7 @@ type ServerInfo struct {
|
||||
// List of queue names with their priorities.
|
||||
// The server will consume tasks from the queues and prioritize
|
||||
// queues with higher priority numbers.
|
||||
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
|
||||
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
|
||||
// If set, the server will always consume tasks from a queue with higher
|
||||
// priority.
|
||||
StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"`
|
||||
@@ -231,15 +235,15 @@ type ServerInfo struct {
|
||||
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
|
||||
// Number of workers currently processing tasks.
|
||||
ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *ServerInfo) Reset() {
|
||||
*x = ServerInfo{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_asynq_proto_msgTypes[1]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
mi := &file_asynq_proto_msgTypes[1]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *ServerInfo) String() string {
|
||||
@@ -250,7 +254,7 @@ func (*ServerInfo) ProtoMessage() {}
|
||||
|
||||
func (x *ServerInfo) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_asynq_proto_msgTypes[1]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
@@ -330,10 +334,7 @@ func (x *ServerInfo) GetActiveWorkerCount() int32 {
|
||||
|
||||
// WorkerInfo holds information about a running worker.
|
||||
type WorkerInfo struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
// Host matchine this worker is running on.
|
||||
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
|
||||
// PID of the process in which this worker is running.
|
||||
@@ -352,16 +353,16 @@ type WorkerInfo struct {
|
||||
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
|
||||
// Deadline by which the worker needs to complete processing
|
||||
// the task. If worker exceeds the deadline, the task will fail.
|
||||
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
|
||||
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *WorkerInfo) Reset() {
|
||||
*x = WorkerInfo{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_asynq_proto_msgTypes[2]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
mi := &file_asynq_proto_msgTypes[2]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *WorkerInfo) String() string {
|
||||
@@ -372,7 +373,7 @@ func (*WorkerInfo) ProtoMessage() {}
|
||||
|
||||
func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_asynq_proto_msgTypes[2]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
@@ -453,10 +454,7 @@ func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp {
|
||||
// SchedulerEntry holds information about a periodic task registered
|
||||
// with a scheduler.
|
||||
type SchedulerEntry struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
// Identifier of the scheduler entry.
|
||||
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
|
||||
// Periodic schedule spec of the entry.
|
||||
@@ -472,15 +470,15 @@ type SchedulerEntry struct {
|
||||
// Last time the task was enqueued.
|
||||
// Zero time if task was never enqueued.
|
||||
PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *SchedulerEntry) Reset() {
|
||||
*x = SchedulerEntry{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_asynq_proto_msgTypes[3]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
mi := &file_asynq_proto_msgTypes[3]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *SchedulerEntry) String() string {
|
||||
@@ -491,7 +489,7 @@ func (*SchedulerEntry) ProtoMessage() {}
|
||||
|
||||
func (x *SchedulerEntry) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_asynq_proto_msgTypes[3]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
@@ -558,23 +556,20 @@ func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp {
|
||||
// SchedulerEnqueueEvent holds information about an enqueue event
|
||||
// by a scheduler.
|
||||
type SchedulerEnqueueEvent struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
// ID of the task that was enqueued.
|
||||
TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
|
||||
// Time the task was enqueued.
|
||||
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
|
||||
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *SchedulerEnqueueEvent) Reset() {
|
||||
*x = SchedulerEnqueueEvent{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_asynq_proto_msgTypes[4]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
mi := &file_asynq_proto_msgTypes[4]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *SchedulerEnqueueEvent) String() string {
|
||||
@@ -585,7 +580,7 @@ func (*SchedulerEnqueueEvent) ProtoMessage() {}
|
||||
|
||||
func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_asynq_proto_msgTypes[4]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
@@ -616,146 +611,106 @@ func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp {
|
||||
|
||||
var File_asynq_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_asynq_proto_rawDesc = []byte{
|
||||
0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61,
|
||||
0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
|
||||
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e,
|
||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65,
|
||||
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79,
|
||||
0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c,
|
||||
0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74,
|
||||
0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12,
|
||||
0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05,
|
||||
0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72,
|
||||
0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72,
|
||||
0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66,
|
||||
0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c,
|
||||
0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07,
|
||||
0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74,
|
||||
0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
|
||||
0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
|
||||
0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79,
|
||||
0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65,
|
||||
0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e,
|
||||
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c,
|
||||
0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28,
|
||||
0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c,
|
||||
0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01,
|
||||
0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22,
|
||||
0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12,
|
||||
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f,
|
||||
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52,
|
||||
0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69,
|
||||
0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49,
|
||||
0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79,
|
||||
0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65,
|
||||
0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20,
|
||||
0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76,
|
||||
0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74,
|
||||
0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74,
|
||||
0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20,
|
||||
0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72,
|
||||
0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73,
|
||||
0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32,
|
||||
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
|
||||
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61,
|
||||
0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65,
|
||||
0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20,
|
||||
0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65,
|
||||
0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73,
|
||||
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
|
||||
0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
|
||||
0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f,
|
||||
0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
|
||||
0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
|
||||
0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
|
||||
0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65,
|
||||
0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04,
|
||||
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09,
|
||||
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
|
||||
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52,
|
||||
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05,
|
||||
0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65,
|
||||
0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65,
|
||||
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
|
||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
|
||||
0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a,
|
||||
0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32,
|
||||
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
|
||||
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61,
|
||||
0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75,
|
||||
0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
|
||||
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63,
|
||||
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09,
|
||||
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
|
||||
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52,
|
||||
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f,
|
||||
0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18,
|
||||
0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70,
|
||||
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e,
|
||||
0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
|
||||
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
|
||||
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65,
|
||||
0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a,
|
||||
0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69,
|
||||
0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
|
||||
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
|
||||
0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75,
|
||||
0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
|
||||
0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17,
|
||||
0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65,
|
||||
0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
|
||||
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
|
||||
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65,
|
||||
0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
|
||||
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79,
|
||||
0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
|
||||
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
const file_asynq_proto_rawDesc = "" +
|
||||
"\n" +
|
||||
"\vasynq.proto\x12\x05asynq\x1a\x1fgoogle/protobuf/timestamp.proto\"\xfe\x03\n" +
|
||||
"\vTaskMessage\x12\x12\n" +
|
||||
"\x04type\x18\x01 \x01(\tR\x04type\x12\x18\n" +
|
||||
"\apayload\x18\x02 \x01(\fR\apayload\x129\n" +
|
||||
"\aheaders\x18\x0f \x03(\v2\x1f.asynq.TaskMessage.HeadersEntryR\aheaders\x12\x0e\n" +
|
||||
"\x02id\x18\x03 \x01(\tR\x02id\x12\x14\n" +
|
||||
"\x05queue\x18\x04 \x01(\tR\x05queue\x12\x14\n" +
|
||||
"\x05retry\x18\x05 \x01(\x05R\x05retry\x12\x18\n" +
|
||||
"\aretried\x18\x06 \x01(\x05R\aretried\x12\x1b\n" +
|
||||
"\terror_msg\x18\a \x01(\tR\berrorMsg\x12$\n" +
|
||||
"\x0elast_failed_at\x18\v \x01(\x03R\flastFailedAt\x12\x18\n" +
|
||||
"\atimeout\x18\b \x01(\x03R\atimeout\x12\x1a\n" +
|
||||
"\bdeadline\x18\t \x01(\x03R\bdeadline\x12\x1d\n" +
|
||||
"\n" +
|
||||
"unique_key\x18\n" +
|
||||
" \x01(\tR\tuniqueKey\x12\x1b\n" +
|
||||
"\tgroup_key\x18\x0e \x01(\tR\bgroupKey\x12\x1c\n" +
|
||||
"\tretention\x18\f \x01(\x03R\tretention\x12!\n" +
|
||||
"\fcompleted_at\x18\r \x01(\x03R\vcompletedAt\x1a:\n" +
|
||||
"\fHeadersEntry\x12\x10\n" +
|
||||
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
|
||||
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x8f\x03\n" +
|
||||
"\n" +
|
||||
"ServerInfo\x12\x12\n" +
|
||||
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
|
||||
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
|
||||
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12 \n" +
|
||||
"\vconcurrency\x18\x04 \x01(\x05R\vconcurrency\x125\n" +
|
||||
"\x06queues\x18\x05 \x03(\v2\x1d.asynq.ServerInfo.QueuesEntryR\x06queues\x12'\n" +
|
||||
"\x0fstrict_priority\x18\x06 \x01(\bR\x0estrictPriority\x12\x16\n" +
|
||||
"\x06status\x18\a \x01(\tR\x06status\x129\n" +
|
||||
"\n" +
|
||||
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x12.\n" +
|
||||
"\x13active_worker_count\x18\t \x01(\x05R\x11activeWorkerCount\x1a9\n" +
|
||||
"\vQueuesEntry\x12\x10\n" +
|
||||
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
|
||||
"\x05value\x18\x02 \x01(\x05R\x05value:\x028\x01\"\xb1\x02\n" +
|
||||
"\n" +
|
||||
"WorkerInfo\x12\x12\n" +
|
||||
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
|
||||
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
|
||||
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12\x17\n" +
|
||||
"\atask_id\x18\x04 \x01(\tR\x06taskId\x12\x1b\n" +
|
||||
"\ttask_type\x18\x05 \x01(\tR\btaskType\x12!\n" +
|
||||
"\ftask_payload\x18\x06 \x01(\fR\vtaskPayload\x12\x14\n" +
|
||||
"\x05queue\x18\a \x01(\tR\x05queue\x129\n" +
|
||||
"\n" +
|
||||
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x126\n" +
|
||||
"\bdeadline\x18\t \x01(\v2\x1a.google.protobuf.TimestampR\bdeadline\"\xad\x02\n" +
|
||||
"\x0eSchedulerEntry\x12\x0e\n" +
|
||||
"\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" +
|
||||
"\x04spec\x18\x02 \x01(\tR\x04spec\x12\x1b\n" +
|
||||
"\ttask_type\x18\x03 \x01(\tR\btaskType\x12!\n" +
|
||||
"\ftask_payload\x18\x04 \x01(\fR\vtaskPayload\x12'\n" +
|
||||
"\x0fenqueue_options\x18\x05 \x03(\tR\x0eenqueueOptions\x12F\n" +
|
||||
"\x11next_enqueue_time\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\x0fnextEnqueueTime\x12F\n" +
|
||||
"\x11prev_enqueue_time\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\x0fprevEnqueueTime\"o\n" +
|
||||
"\x15SchedulerEnqueueEvent\x12\x17\n" +
|
||||
"\atask_id\x18\x01 \x01(\tR\x06taskId\x12=\n" +
|
||||
"\fenqueue_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\venqueueTimeB)Z'github.com/hibiken/asynq/internal/protob\x06proto3"
|
||||
|
||||
var (
|
||||
file_asynq_proto_rawDescOnce sync.Once
|
||||
file_asynq_proto_rawDescData = file_asynq_proto_rawDesc
|
||||
file_asynq_proto_rawDescData []byte
|
||||
)
|
||||
|
||||
func file_asynq_proto_rawDescGZIP() []byte {
|
||||
file_asynq_proto_rawDescOnce.Do(func() {
|
||||
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData)
|
||||
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)))
|
||||
})
|
||||
return file_asynq_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
|
||||
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
|
||||
var file_asynq_proto_goTypes = []any{
|
||||
(*TaskMessage)(nil), // 0: asynq.TaskMessage
|
||||
(*ServerInfo)(nil), // 1: asynq.ServerInfo
|
||||
(*WorkerInfo)(nil), // 2: asynq.WorkerInfo
|
||||
(*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry
|
||||
(*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent
|
||||
nil, // 5: asynq.ServerInfo.QueuesEntry
|
||||
(*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp
|
||||
nil, // 5: asynq.TaskMessage.HeadersEntry
|
||||
nil, // 6: asynq.ServerInfo.QueuesEntry
|
||||
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
|
||||
}
|
||||
var file_asynq_proto_depIdxs = []int32{
|
||||
5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
|
||||
6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
|
||||
6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
|
||||
6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
|
||||
6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
|
||||
6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
|
||||
6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
|
||||
7, // [7:7] is the sub-list for method output_type
|
||||
7, // [7:7] is the sub-list for method input_type
|
||||
7, // [7:7] is the sub-list for extension type_name
|
||||
7, // [7:7] is the sub-list for extension extendee
|
||||
0, // [0:7] is the sub-list for field type_name
|
||||
5, // 0: asynq.TaskMessage.headers:type_name -> asynq.TaskMessage.HeadersEntry
|
||||
6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
|
||||
7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
|
||||
7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
|
||||
7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
|
||||
7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
|
||||
7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
|
||||
7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
|
||||
8, // [8:8] is the sub-list for method output_type
|
||||
8, // [8:8] is the sub-list for method input_type
|
||||
8, // [8:8] is the sub-list for extension type_name
|
||||
8, // [8:8] is the sub-list for extension extendee
|
||||
0, // [0:8] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_asynq_proto_init() }
|
||||
@@ -763,75 +718,13 @@ func file_asynq_proto_init() {
|
||||
if File_asynq_proto != nil {
|
||||
return
|
||||
}
|
||||
if !protoimpl.UnsafeEnabled {
|
||||
file_asynq_proto_msgTypes[0].Exporter = func(v any, i int) any {
|
||||
switch v := v.(*TaskMessage); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_asynq_proto_msgTypes[1].Exporter = func(v any, i int) any {
|
||||
switch v := v.(*ServerInfo); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_asynq_proto_msgTypes[2].Exporter = func(v any, i int) any {
|
||||
switch v := v.(*WorkerInfo); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_asynq_proto_msgTypes[3].Exporter = func(v any, i int) any {
|
||||
switch v := v.(*SchedulerEntry); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_asynq_proto_msgTypes[4].Exporter = func(v any, i int) any {
|
||||
switch v := v.(*SchedulerEnqueueEvent); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
File: protoimpl.DescBuilder{
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_asynq_proto_rawDesc,
|
||||
RawDescriptor: unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)),
|
||||
NumEnums: 0,
|
||||
NumMessages: 6,
|
||||
NumMessages: 7,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
@@ -840,7 +733,6 @@ func file_asynq_proto_init() {
|
||||
MessageInfos: file_asynq_proto_msgTypes,
|
||||
}.Build()
|
||||
File_asynq_proto = out.File
|
||||
file_asynq_proto_rawDesc = nil
|
||||
file_asynq_proto_goTypes = nil
|
||||
file_asynq_proto_depIdxs = nil
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ option go_package = "github.com/hibiken/asynq/internal/proto";
|
||||
|
||||
// TaskMessage is the internal representation of a task with additional
|
||||
// metadata fields.
|
||||
// Next ID: 15
|
||||
// Next ID: 16
|
||||
message TaskMessage {
|
||||
// Type indicates the kind of the task to be performed.
|
||||
string type = 1;
|
||||
@@ -19,6 +19,9 @@ message TaskMessage {
|
||||
// Payload holds data needed to process the task.
|
||||
bytes payload = 2;
|
||||
|
||||
// Headers holds additional metadata for the task.
|
||||
map<string, string> headers = 15;
|
||||
|
||||
// Unique identifier for the task.
|
||||
string id = 3;
|
||||
|
||||
|
||||
@@ -1412,6 +1412,93 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Input:
|
||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||
// --
|
||||
// ARGV[1] -> task message data
|
||||
//
|
||||
// Output:
|
||||
// Numeric code indicating the status:
|
||||
// Returns 1 if task is successfully updated.
|
||||
// Returns 0 if task is not found.
|
||||
// Returns -1 if task is not in scheduled state.
|
||||
var updateTaskPayloadCmd = redis.NewScript(`
|
||||
-- Check if given taks exists
|
||||
if redis.call("EXISTS", KEYS[1]) == 0 then
|
||||
return 0
|
||||
end
|
||||
local state, pending_since, group, unique_key = unpack(redis.call("HMGET", KEYS[1], "state", "pending_since", "group", "unique_key"))
|
||||
if state ~= "scheduled" then
|
||||
return -1
|
||||
end
|
||||
local redis_call_args = {"state", state}
|
||||
|
||||
if pending_since then
|
||||
table.insert(redis_call_args, "pending_since")
|
||||
table.insert(redis_call_args, pending_since)
|
||||
end
|
||||
if group then
|
||||
table.insert(redis_call_args, "group")
|
||||
table.insert(redis_call_args, group)
|
||||
end
|
||||
if unique_key then
|
||||
table.insert(redis_call_args, "unique_key")
|
||||
table.insert(redis_call_args, unique_key)
|
||||
end
|
||||
redis.call("HSET", KEYS[1], "msg", ARGV[1], unpack(redis_call_args))
|
||||
return 1
|
||||
`)
|
||||
|
||||
// UpdateTaskPayload finds a task that matches the id from the given queue and updates it's payload.
|
||||
// It returns nil if it successfully updated the task payload.
|
||||
//
|
||||
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
|
||||
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
|
||||
// If a task is in active state it returns non-nil error with Code FailedPrecondition.
|
||||
func (r *RDB) UpdateTaskPayload(qname, id string, payload []byte) error {
|
||||
var op errors.Op = "rdb.UpdateTask"
|
||||
if err := r.checkQueueExists(qname); err != nil {
|
||||
return errors.E(op, errors.CanonicalCode(err), err)
|
||||
}
|
||||
|
||||
taskInfo, err := r.GetTaskInfo(qname, id)
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, err)
|
||||
}
|
||||
|
||||
taskInfo.Message.Payload = payload
|
||||
|
||||
encoded, err := base.EncodeMessage(taskInfo.Message)
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||
}
|
||||
keys := []string{
|
||||
base.TaskKey(qname, id),
|
||||
}
|
||||
argv := []interface{}{
|
||||
encoded,
|
||||
}
|
||||
|
||||
res, err := updateTaskPayloadCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, err)
|
||||
}
|
||||
n, ok := res.(int64)
|
||||
if !ok {
|
||||
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: updateTaskCmd script returned unexported value %v", res))
|
||||
}
|
||||
switch n {
|
||||
case 1:
|
||||
return nil
|
||||
case 0:
|
||||
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id})
|
||||
case -1:
|
||||
return errors.E(op, errors.FailedPrecondition, "cannot update task that is not in scheduled state.")
|
||||
default:
|
||||
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from updateTaskCmd script: %d", n))
|
||||
}
|
||||
}
|
||||
|
||||
// Input:
|
||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||
// KEYS[2] -> asynq:{<qname>}:groups
|
||||
|
||||
@@ -122,10 +122,10 @@ func (mgr *PeriodicTaskManager) Start() error {
|
||||
panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize")
|
||||
}
|
||||
if err := mgr.initialSync(); err != nil {
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
return fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
if err := mgr.s.Start(); err != nil {
|
||||
return fmt.Errorf("asynq: %v", err)
|
||||
return fmt.Errorf("asynq: %w", err)
|
||||
}
|
||||
mgr.wg.Add(1)
|
||||
go func() {
|
||||
@@ -168,11 +168,11 @@ func (mgr *PeriodicTaskManager) Run() error {
|
||||
func (mgr *PeriodicTaskManager) initialSync() error {
|
||||
configs, err := mgr.p.GetConfigs()
|
||||
if err != nil {
|
||||
return fmt.Errorf("initial call to GetConfigs failed: %v", err)
|
||||
return fmt.Errorf("initial call to GetConfigs failed: %w", err)
|
||||
}
|
||||
for _, c := range configs {
|
||||
if err := validatePeriodicTaskConfig(c); err != nil {
|
||||
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %v", err)
|
||||
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %w", err)
|
||||
}
|
||||
}
|
||||
mgr.add(configs)
|
||||
|
||||
@@ -230,6 +230,7 @@ func (p *processor) exec() {
|
||||
ctx: ctx,
|
||||
},
|
||||
)
|
||||
task.headers = msg.Headers
|
||||
resCh <- p.perform(ctx, task)
|
||||
}()
|
||||
|
||||
@@ -333,7 +334,7 @@ var RevokeTask = errors.New("revoke task")
|
||||
|
||||
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
|
||||
if p.errHandler != nil {
|
||||
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
|
||||
p.errHandler.HandleError(ctx, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers), err)
|
||||
}
|
||||
switch {
|
||||
case errors.Is(err, RevokeTask):
|
||||
@@ -354,7 +355,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
|
||||
}
|
||||
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
|
||||
defer cancel()
|
||||
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
|
||||
d := p.retryDelayFunc(msg.Retried, e, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
|
||||
retryAt := time.Now().Add(d)
|
||||
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
|
||||
if err != nil {
|
||||
|
||||
@@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() {
|
||||
}
|
||||
|
||||
func (r *recoverer) retry(msg *base.TaskMessage, err error) {
|
||||
delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload))
|
||||
delay := r.retryDelayFunc(msg.Retried, err, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
|
||||
retryAt := time.Now().Add(delay)
|
||||
if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
|
||||
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)
|
||||
|
||||
@@ -6,12 +6,16 @@ package asynq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// ErrHandlerNotFound indicates that no task handler was found for a given pattern.
|
||||
var ErrHandlerNotFound = errors.New("handler not found for task")
|
||||
|
||||
// ServeMux is a multiplexer for asynchronous tasks.
|
||||
// It matches the type of each task against a list of registered patterns
|
||||
// and calls the handler for the pattern that most closely matches the
|
||||
@@ -149,8 +153,8 @@ func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
|
||||
|
||||
// NotFound returns an error indicating that the handler was not found for the given task.
|
||||
func NotFound(ctx context.Context, task *Task) error {
|
||||
return fmt.Errorf("handler not found for task %q", task.Type())
|
||||
return fmt.Errorf("%w %q", ErrHandlerNotFound, task.Type())
|
||||
}
|
||||
|
||||
// NotFoundHandler returns a simple task handler that returns a ``not found`` error.
|
||||
// NotFoundHandler returns a simple task handler that returns a “not found“ error.
|
||||
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }
|
||||
|
||||
@@ -24,7 +24,7 @@ func makeFakeHandler(identity string) Handler {
|
||||
}
|
||||
|
||||
// makeFakeMiddleware returns a middleware function that appends the given identity
|
||||
//to the global invoked slice.
|
||||
// to the global invoked slice.
|
||||
func makeFakeMiddleware(identity string) MiddlewareFunc {
|
||||
return func(next Handler) Handler {
|
||||
return HandlerFunc(func(ctx context.Context, t *Task) error {
|
||||
|
||||
@@ -174,16 +174,15 @@ type Config struct {
|
||||
// })
|
||||
//
|
||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||
|
||||
//
|
||||
// we can also handle panic error like:
|
||||
// func reportError(ctx context, task *asynq.Task, err error) {
|
||||
// if asynq.IsPanic(err) {
|
||||
// if asynq.IsPanicError(err) {
|
||||
// errorReportingService.Notify(err)
|
||||
// }
|
||||
// })
|
||||
//
|
||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||
|
||||
ErrorHandler ErrorHandler
|
||||
|
||||
// Logger specifies the logger used by the server instance.
|
||||
|
||||
@@ -24,8 +24,10 @@ func (srv *Server) waitForSignals() {
|
||||
if sig == unix.SIGTSTP {
|
||||
srv.Stop()
|
||||
continue
|
||||
} else {
|
||||
srv.Stop()
|
||||
break
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,9 +8,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/log"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type subscriber struct {
|
||||
|
||||
@@ -39,6 +39,7 @@ By default, CLI will try to connect to a redis server running at `localhost:6379
|
||||
--config string config file to set flag defaut values (default is $HOME/.asynq.yaml)
|
||||
-n, --db int redis database number (default is 0)
|
||||
-h, --help help for asynq
|
||||
-U, --username string username to use when connecting to redis server
|
||||
-p, --password string password to use when connecting to redis server
|
||||
-u, --uri string redis server URI (default "127.0.0.1:6379")
|
||||
|
||||
|
||||
@@ -16,11 +16,12 @@ import (
|
||||
// ScreenDrawer is used to draw contents on screen.
|
||||
//
|
||||
// Usage example:
|
||||
// d := NewScreenDrawer(s)
|
||||
// d.Println("Hello world", mystyle)
|
||||
// d.NL() // adds newline
|
||||
// d.Print("foo", mystyle.Bold(true))
|
||||
// d.Print("bar", mystyle.Italic(true))
|
||||
//
|
||||
// d := NewScreenDrawer(s)
|
||||
// d.Println("Hello world", mystyle)
|
||||
// d.NL() // adds newline
|
||||
// d.Print("foo", mystyle.Bold(true))
|
||||
// d.Print("bar", mystyle.Italic(true))
|
||||
type ScreenDrawer struct {
|
||||
l *LineDrawer
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
//
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
@@ -36,11 +37,13 @@ var (
|
||||
uri string
|
||||
db int
|
||||
password string
|
||||
username string
|
||||
|
||||
useRedisCluster bool
|
||||
clusterAddrs string
|
||||
tlsServerName string
|
||||
insecure bool
|
||||
useTLS bool
|
||||
)
|
||||
|
||||
// rootCmd represents the base command when called without any subcommands
|
||||
@@ -258,7 +261,6 @@ func adjustPadding(lines ...*displayLine) {
|
||||
func rpad(s string, padding int) string {
|
||||
tmpl := fmt.Sprintf("%%-%ds ", padding)
|
||||
return fmt.Sprintf(tmpl, s)
|
||||
|
||||
}
|
||||
|
||||
// lpad adds padding to the left of a string.
|
||||
@@ -309,10 +311,12 @@ func init() {
|
||||
rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "Redis server URI")
|
||||
rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "Redis database number (default is 0)")
|
||||
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password to use when connecting to redis server")
|
||||
rootCmd.PersistentFlags().StringVarP(&username, "username", "U", "", "Username to use when connecting to Redis (ACL username)")
|
||||
rootCmd.PersistentFlags().BoolVar(&useRedisCluster, "cluster", false, "Connect to redis cluster")
|
||||
rootCmd.PersistentFlags().StringVar(&clusterAddrs, "cluster_addrs",
|
||||
"127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005",
|
||||
"List of comma-separated redis server addresses")
|
||||
rootCmd.PersistentFlags().BoolVar(&useTLS, "tls", false, "Enable TLS connection")
|
||||
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
|
||||
"", "Server name for TLS validation")
|
||||
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
|
||||
@@ -321,8 +325,10 @@ func init() {
|
||||
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
|
||||
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
|
||||
viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password"))
|
||||
viper.BindPFlag("username", rootCmd.PersistentFlags().Lookup("username"))
|
||||
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
|
||||
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
|
||||
viper.BindPFlag("tls", rootCmd.PersistentFlags().Lookup("tls"))
|
||||
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
|
||||
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
|
||||
}
|
||||
@@ -361,6 +367,7 @@ func createRDB() *rdb.RDB {
|
||||
c = redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: addrs,
|
||||
Password: viper.GetString("password"),
|
||||
Username: viper.GetString("username"),
|
||||
TLSConfig: getTLSConfig(),
|
||||
})
|
||||
} else {
|
||||
@@ -368,6 +375,7 @@ func createRDB() *rdb.RDB {
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
Username: viper.GetString("username"),
|
||||
TLSConfig: getTLSConfig(),
|
||||
})
|
||||
}
|
||||
@@ -390,6 +398,7 @@ func getRedisConnOpt() asynq.RedisConnOpt {
|
||||
return asynq.RedisClusterClientOpt{
|
||||
Addrs: addrs,
|
||||
Password: viper.GetString("password"),
|
||||
Username: viper.GetString("username"),
|
||||
TLSConfig: getTLSConfig(),
|
||||
}
|
||||
}
|
||||
@@ -397,16 +406,22 @@ func getRedisConnOpt() asynq.RedisConnOpt {
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
Username: viper.GetString("username"),
|
||||
TLSConfig: getTLSConfig(),
|
||||
}
|
||||
}
|
||||
|
||||
func getTLSConfig() *tls.Config {
|
||||
tlsServer := viper.GetString("tls_server")
|
||||
if tlsServer == "" {
|
||||
return nil
|
||||
if tlsServer != "" {
|
||||
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
|
||||
}
|
||||
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
|
||||
|
||||
if viper.GetBool("tls") {
|
||||
return &tls.Config{InsecureSkipVerify: viper.GetBool("insecure")}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// printTable is a helper function to print data in table format.
|
||||
@@ -414,18 +429,22 @@ func getTLSConfig() *tls.Config {
|
||||
// cols is a list of headers and printRow specifies how to print rows.
|
||||
//
|
||||
// Example:
|
||||
// type User struct {
|
||||
// Name string
|
||||
// Addr string
|
||||
// Age int
|
||||
// }
|
||||
//
|
||||
// type User struct {
|
||||
// Name string
|
||||
// Addr string
|
||||
// Age int
|
||||
// }
|
||||
//
|
||||
// data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...}
|
||||
// cols := []string{"Name", "Addr", "Age"}
|
||||
// printRows := func(w io.Writer, tmpl string) {
|
||||
// for _, u := range data {
|
||||
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// printRows := func(w io.Writer, tmpl string) {
|
||||
// for _, u := range data {
|
||||
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// printTable(cols, printRows)
|
||||
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
|
||||
format := strings.Repeat("%v\t", len(cols)) + "\n"
|
||||
|
||||
@@ -770,4 +770,3 @@ func taskRunAll(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
fmt.Printf("%d tasks are now pending\n", n)
|
||||
}
|
||||
|
||||
|
||||
@@ -25,13 +25,13 @@ type QueueMetricsCollector struct {
|
||||
func (qmc *QueueMetricsCollector) collectQueueInfo() ([]*asynq.QueueInfo, error) {
|
||||
qnames, err := qmc.inspector.Queues()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get queue names: %v", err)
|
||||
return nil, fmt.Errorf("failed to get queue names: %w", err)
|
||||
}
|
||||
infos := make([]*asynq.QueueInfo, len(qnames))
|
||||
for i, qname := range qnames {
|
||||
qinfo, err := qmc.inspector.GetQueueInfo(qname)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get queue info: %v", err)
|
||||
return nil, fmt.Errorf("failed to get queue info: %w", err)
|
||||
}
|
||||
infos[i] = qinfo
|
||||
}
|
||||
|
||||
@@ -94,7 +94,7 @@ func (s *Semaphore) Release(ctx context.Context) error {
|
||||
|
||||
n, err := s.rc.ZRem(ctx, semaphoreKey(s.scope), taskID).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("redis command failed: %v", err)
|
||||
return fmt.Errorf("redis command failed: %w", err)
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
|
||||
Reference in New Issue
Block a user