2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-05 18:25:57 +08:00

Compare commits

..

2 Commits

Author SHA1 Message Date
Mohammed Sohail
f1e7dc4056 fix: closing the Client also closes the broker
* The error was also previously unhandled. For shared connections an error will be returned by the broker itself because the sharedConnection bool is also set on the client. This also means we can get rid of the sharedConnection flag on the Scheduler itself and let it work internally.
2024-12-03 10:27:50 +03:00
Mohammed Sohail
ee17997650 fix: NewScheduler wrongly creates a client whose sharedConnection value is always true
* This is affecting the PeriodicManager as well as the Scheduler
2024-12-03 10:08:47 +03:00
33 changed files with 380 additions and 1105 deletions

View File

@@ -9,7 +9,6 @@ on: [pull_request]
jobs:
incoming:
runs-on: ubuntu-latest
if: false
services:
redis:
image: redis:7
@@ -32,7 +31,6 @@ jobs:
current:
runs-on: ubuntu-latest
if: false
services:
redis:
image: redis:7
@@ -57,7 +55,6 @@ jobs:
benchstat:
needs: [incoming, current]
if: false
runs-on: ubuntu-latest
steps:
- name: Checkout

View File

@@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.24.x, 1.25.x]
go-version: [1.22.x, 1.23.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -45,7 +45,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.24.x, 1.25.x]
go-version: [1.22.x, 1.23.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -70,7 +70,6 @@ jobs:
golangci:
name: lint
runs-on: ubuntu-latest
if: false
steps:
- uses: actions/checkout@v4

View File

@@ -7,45 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.26.0] - 2026-02-03
### Upgrades
- Prepare CI for Go 1.24.x and 1.25.x (commit: e9037f0)
### Added
- Add Headers support to tasks (PR: https://github.com/hibiken/asynq/pull/1070)
- Add `--tls` option to dash command (PR: https://github.com/hibiken/asynq/pull/1073)
- Add `--username` CLI flag for Redis ACL authentication (PR: https://github.com/hibiken/asynq/pull/1083)
- Add `UpdateTaskPayload` method for inspector (PR: https://github.com/hibiken/asynq/pull/1042)
### Fixes
- Fix: Correct error message text in ResultWriter.Write (PR: https://github.com/hibiken/asynq/pull/1054)
- Fix: Wrap all fmt.Errorf errors with %w (PR: https://github.com/hibiken/asynq/pull/1047)
- Fix: ServeMux.NotFoundHandler returns ErrHandlerNotFound error (PR: https://github.com/hibiken/asynq/pull/1031)
### Changed
- Docs: Update server.go documentation (PR: https://github.com/hibiken/asynq/pull/1010)
- Chore: Fix godoc comment (PR: https://github.com/hibiken/asynq/pull/1009)
## [0.25.1] - 2024-12-11
### Upgrades
* Some packages
### Added
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
### Fixes
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
## [0.25.0] - 2024-10-29
### Upgrades

View File

@@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) {
}
tasks := make([]*Task, len(msgs))
for i, m := range msgs {
tasks[i] = NewTaskWithHeaders(m.Type, m.Payload, m.Headers)
tasks[i] = NewTask(m.Type, m.Payload)
}
aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline)

View File

@@ -8,15 +8,14 @@ import (
"context"
"crypto/tls"
"fmt"
"maps"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
)
// Task represents a unit of work to be performed.
@@ -27,9 +26,6 @@ type Task struct {
// payload holds data needed to perform the task.
payload []byte
// headers holds additional metadata for the task.
headers map[string]string
// opts holds options for the task.
opts []Option
@@ -37,9 +33,8 @@ type Task struct {
w *ResultWriter
}
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Headers() map[string]string { return t.headers }
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
// ResultWriter returns a pointer to the ResultWriter associated with the task.
//
@@ -53,21 +48,6 @@ func NewTask(typename string, payload []byte, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: nil,
opts: opts,
}
}
// NewTaskWithHeaders returns a new Task given a type name, payload data, and headers.
// Options can be passed to configure task processing behavior.
// TODO: In the next major (breaking) release, fold this functionality into NewTask
//
// so that headers are supported directly. After that, remove this method.
func NewTaskWithHeaders(typename string, payload []byte, headers map[string]string, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: maps.Clone(headers),
opts: opts,
}
}
@@ -77,7 +57,6 @@ func newTask(typename string, payload []byte, w *ResultWriter) *Task {
return &Task{
typename: typename,
payload: payload,
headers: make(map[string]string),
w: w,
}
}
@@ -96,9 +75,6 @@ type TaskInfo struct {
// Payload is the payload data of the task.
Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// State indicates the task state.
State TaskState
@@ -169,7 +145,6 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
Queue: msg.Queue,
Type: msg.Type,
Payload: msg.Payload, // Do we need to make a copy?
Headers: msg.Headers,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastErr: msg.ErrorMsg,
@@ -467,15 +442,14 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
//
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are:
//
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis uri: %w", err)
return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
}
switch u.Scheme {
case "redis", "rediss":
@@ -565,7 +539,7 @@ type ResultWriter struct {
func (w *ResultWriter) Write(data []byte) (n int, err error) {
select {
case <-w.ctx.Done():
return 0, fmt.Errorf("failed to write task result: %w", w.ctx.Err())
return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err())
default:
}
return w.broker.WriteResult(w.qname, w.id, data)

View File

@@ -11,11 +11,11 @@ import (
"strings"
"testing"
"github.com/redis/go-redis/v9"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/log"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
)
//============================================================================

View File

@@ -385,7 +385,6 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: task.Headers(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),

View File

@@ -1191,473 +1191,3 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
}
}
}
func TestClientEnqueueWithHeaders(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"user-id": "123",
"request-id": "abc-def-ghi",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantPending map[string][]*base.TaskMessage
}{
{
desc: "Task with headers",
task: NewTaskWithHeaders("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}), headers),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with empty headers",
task: NewTaskWithHeaders("process_data", []byte("data"), map[string]string{}),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "process_data",
Payload: []byte("data"),
Headers: map[string]string{},
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "process_data",
Payload: []byte("data"),
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with nil headers",
task: NewTaskWithHeaders("cleanup", nil, nil),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "cleanup",
Payload: nil,
Headers: nil,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "cleanup",
Payload: nil,
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with headers and custom options",
task: NewTaskWithHeaders("notify", []byte("notification"), map[string]string{"channel": "email"}),
opts: []Option{MaxRetry(5), Queue("notifications")},
wantInfo: &TaskInfo{
Queue: "notifications",
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"notifications": {
{
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
Retry: 5,
Queue: "notifications",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantPending {
got := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
}
}
func TestClientEnqueueWithHeadersScheduled(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
oneHourLater := now.Add(time.Hour)
headers := map[string]string{
"correlation-id": "xyz-123",
"source": "api",
}
tests := []struct {
desc string
task *Task
processAt time.Time
opts []Option
wantInfo *TaskInfo
wantScheduled map[string][]base.Z
}{
{
desc: "Schedule task with headers",
task: NewTaskWithHeaders("scheduled_task", []byte("payload"), headers),
processAt: oneHourLater,
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
State: TaskStateScheduled,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: oneHourLater,
},
wantScheduled: map[string][]base.Z{
"default": {
{
Message: &base.TaskMessage{
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
Score: oneHourLater.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
opts := append(tc.opts, ProcessAt(tc.processAt))
gotInfo, err := client.Enqueue(tc.task, opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
}
}
}
}
func TestNewTaskWithHeaders(t *testing.T) {
tests := []struct {
desc string
typename string
payload []byte
headers map[string]string
opts []Option
want *Task
}{
{
desc: "Task with headers",
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
want: &Task{
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
},
},
{
desc: "Task with empty headers",
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
want: &Task{
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
},
},
{
desc: "Task with nil headers",
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
want: &Task{
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
},
},
}
for _, tc := range tests {
got := NewTaskWithHeaders(tc.typename, tc.payload, tc.headers, tc.opts...)
if got.Type() != tc.want.typename {
t.Errorf("%s: Type() = %q, want %q", tc.desc, got.Type(), tc.want.typename)
}
if diff := cmp.Diff(tc.want.payload, got.Payload()); diff != "" {
t.Errorf("%s: Payload() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if diff := cmp.Diff(tc.want.headers, got.Headers()); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if tc.headers != nil && got.Headers() != nil {
tc.headers["modified"] = "test"
if _, exists := got.Headers()["modified"]; exists {
t.Errorf("%s: Headers should be cloned, but modification affected task headers", tc.desc)
}
}
}
}
func TestTaskHeadersMethod(t *testing.T) {
tests := []struct {
desc string
task *Task
want map[string]string
wantNil bool
}{
{
desc: "Task created with NewTask has nil headers",
task: NewTask("test", []byte("data")),
want: nil,
wantNil: true,
},
{
desc: "Task created with NewTaskWithHeaders has headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{"key": "value"}),
want: map[string]string{"key": "value"},
},
{
desc: "Task created with empty headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{}),
want: map[string]string{},
},
{
desc: "Task created with nil headers",
task: NewTaskWithHeaders("test", []byte("data"), nil),
want: nil,
wantNil: true,
},
}
for _, tc := range tests {
got := tc.task.Headers()
if tc.wantNil {
if got != nil {
t.Errorf("%s: Headers() = %v, want nil", tc.desc, got)
}
} else {
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
}
}
}
func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"batch-id": "batch-123",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantGroups map[string]map[string][]base.Z
}{
{
desc: "Task with headers and group",
task: NewTaskWithHeaders("batch_process", []byte("item1"), headers),
opts: []Option{Group("batch-123")},
wantInfo: &TaskInfo{
Queue: "default",
Group: "batch-123",
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
State: TaskStateAggregating,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: time.Time{},
},
wantGroups: map[string]map[string][]base.Z{
"default": {
"batch-123": {
{
Message: &base.TaskMessage{
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
GroupKey: "batch-123",
},
Score: now.Unix(),
},
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, groups := range tc.wantGroups {
for groupKey, want := range groups {
got := h.GetGroupEntries(t, r, qname, groupKey)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.GroupKey(qname, groupKey), diff)
}
}
}
}
}

81
doc.go
View File

@@ -8,41 +8,41 @@ Package asynq provides a framework for Redis based distrubted task queue.
Asynq uses Redis as a message broker. To connect to redis,
specify the connection using one of RedisConnOpt types.
redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "xxxxx",
DB: 2,
}
redisConnOpt = asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
Password: "xxxxx",
DB: 2,
}
The Client is used to enqueue a task.
client := asynq.NewClient(redisConnOpt)
// Task is created with two parameters: its type and payload.
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
client := asynq.NewClient(redisConnOpt)
task := asynq.NewTask("example", b)
// Task is created with two parameters: its type and payload.
// Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc.
b, err := json.Marshal(ExamplePayload{UserID: 42})
if err != nil {
log.Fatal(err)
}
// Enqueue the task to be processed immediately.
info, err := client.Enqueue(task)
task := asynq.NewTask("example", b)
// Schedule the task to be processed after one minute.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
// Enqueue the task to be processed immediately.
info, err := client.Enqueue(task)
// Schedule the task to be processed after one minute.
info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute))
The Server is used to run the task processing workers with a given
handler.
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 10,
})
srv := asynq.NewServer(redisConnOpt, asynq.Config{
Concurrency: 10,
})
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
if err := srv.Run(handler); err != nil {
log.Fatal(err)
}
Handler is an interface type with a method which
takes a task and returns an error. Handler should return nil if
@@ -50,24 +50,23 @@ the processing is successful, otherwise return a non-nil error.
If handler panics or returns a non-nil error, the task will be retried in the future.
Example of a type that implements the Handler interface.
type TaskHandler struct {
// ...
}
type TaskHandler struct {
// ...
}
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
switch task.Type {
case "example":
var data ExamplePayload
if err := json.Unmarshal(task.Payload(), &data); err != nil {
return err
}
// perform task with the data
func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error {
switch task.Type {
case "example":
var data ExamplePayload
if err := json.Unmarshal(task.Payload(), &data); err != nil {
return err
}
// perform task with the data
default:
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}
default:
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}
*/
package asynq

View File

@@ -123,7 +123,7 @@ func ExampleResultWriter() {
res := []byte("task result data")
n, err := task.ResultWriter().Write(res) // implements io.Writer
if err != nil {
return fmt.Errorf("failed to write task result: %w", err)
return fmt.Errorf("failed to write task result: %v", err)
}
log.Printf(" %d bytes written", n)
return nil

16
go.mod
View File

@@ -1,20 +1,20 @@
module github.com/hibiken/asynq
go 1.24.0
go 1.22
require (
github.com/google/go-cmp v0.7.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.14.1
github.com/redis/go-redis/v9 v9.7.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.10.0
github.com/spf13/cast v1.7.0
go.uber.org/goleak v1.3.0
golang.org/x/sys v0.37.0
golang.org/x/time v0.14.0
google.golang.org/protobuf v1.36.10
golang.org/x/sys v0.27.0
golang.org/x/time v0.8.0
google.golang.org/protobuf v1.35.2
)
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)

28
go.sum
View File

@@ -2,16 +2,16 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -20,23 +20,23 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -245,7 +245,7 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
case errors.IsTaskNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
}
@@ -316,7 +316,7 @@ func Page(n int) ListOption {
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -325,7 +325,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -344,7 +344,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -353,11 +353,11 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
expired, err := i.rdb.ListLeaseExpired(time.Now(), queue)
if err != nil {
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
expiredSet := make(map[string]struct{}) // set of expired message IDs
for _, msg := range expired {
@@ -384,7 +384,7 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -393,7 +393,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -413,7 +413,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -422,7 +422,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -442,7 +442,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -451,7 +451,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -471,7 +471,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -480,7 +480,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -500,7 +500,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -509,7 +509,7 @@ func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %w", err)
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -583,30 +583,6 @@ func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error)
return int(n), err
}
// UpdateTaskPayload updates a task with the given id from the given queue with given payload.
// The task needs to be in scheduled state,
// otherwise UpdateTaskPayload will return an error.
//
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is not in scheduled state, it returns a non-nil error.
func (i *Inspector) UpdateTaskPayload(queue, id string, payload []byte) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
}
err := i.rdb.UpdateTaskPayload(queue, id, payload)
switch {
case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
}
return nil
}
// DeleteTask deletes a task with the given id from the given queue.
// The task needs to be in pending, scheduled, retry, or archived state,
// otherwise DeleteTask will return an error.
@@ -616,7 +592,7 @@ func (i *Inspector) UpdateTaskPayload(queue, id string, payload []byte) error {
// If the task is in active state, it returns a non-nil error.
func (i *Inspector) DeleteTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %w", err)
return fmt.Errorf("asynq: %v", err)
}
err := i.rdb.DeleteTask(queue, id)
switch {
@@ -625,7 +601,7 @@ func (i *Inspector) DeleteTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %w", err)
return fmt.Errorf("asynq: %v", err)
}
return nil
@@ -680,7 +656,7 @@ func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) {
// If the task is in pending or active state, it returns a non-nil error.
func (i *Inspector) RunTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %w", err)
return fmt.Errorf("asynq: %v", err)
}
err := i.rdb.RunTask(queue, id)
switch {
@@ -689,7 +665,7 @@ func (i *Inspector) RunTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %w", err)
return fmt.Errorf("asynq: %v", err)
}
return nil
}
@@ -752,7 +728,7 @@ func (i *Inspector) ArchiveTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %w", err)
return fmt.Errorf("asynq: %v", err)
}
return nil
}

View File

@@ -2369,148 +2369,6 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
}
}
func TestInspectorUpdateTaskPayloadUpdatesScheduledTaskPayload(t *testing.T) {
r := setup(t)
defer r.Close()
m1_old := h.NewTaskMessage("task1", []byte("m1_old"))
m1_new := h.NewTaskMessage("task1", nil)
m1_new.ID = m1_old.ID
m2_old := h.NewTaskMessage("task2", nil)
m2_new := h.NewTaskMessage("task2", []byte("m2_new"))
m2_new.ID = m2_old.ID
m3_old := h.NewTaskMessageWithQueue("task3", []byte("m3_old"), "custom")
m3_new := h.NewTaskMessageWithQueue("task3", []byte("m3_new"), "custom")
m3_new.ID = m3_old.ID
now := time.Now()
z1_old := base.Z{Message: m1_old, Score: now.Add(5 * time.Minute).Unix()}
z1_new := base.Z{Message: m1_new, Score: now.Add(5 * time.Minute).Unix()}
z2_old := base.Z{Message: m2_old, Score: now.Add(15 * time.Minute).Unix()}
z2_new := base.Z{Message: m2_new, Score: now.Add(15 * time.Minute).Unix()}
z3_old := base.Z{Message: m3_old, Score: now.Add(2 * time.Minute).Unix()}
z3_new := base.Z{Message: m3_new, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
qname string
id string
newPayload []byte
wantScheduled map[string][]base.Z
}{
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "default",
id: createScheduledTask(z2_old).ID,
newPayload: m2_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_old, z2_new},
"custom": {z3_old},
},
},
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "default",
id: createScheduledTask(z1_old).ID,
newPayload: m1_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_new, z2_old},
"custom": {z3_old},
},
},
{
scheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_old},
},
qname: "custom",
id: createScheduledTask(z3_old).ID,
newPayload: m3_new.Payload,
wantScheduled: map[string][]base.Z{
"default": {z1_old, z2_old},
"custom": {z3_new},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllScheduledQueues(t, r, tc.scheduled)
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); err != nil {
t.Errorf("UpdateTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
}
}
}
}
func TestInspectorUpdateTaskPayloadError(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
tasks map[string][]base.Z
qname string
id string
newPayload []byte
wantErr error
}{
{
tasks: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "nonexistent",
id: createScheduledTask(z2).ID,
newPayload: nil,
wantErr: ErrQueueNotFound,
},
{
tasks: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "default",
id: uuid.NewString(),
newPayload: nil,
wantErr: ErrTaskNotFound,
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllScheduledQueues(t, r, tc.tasks)
if err := inspector.UpdateTaskPayload(tc.qname, tc.id, tc.newPayload); !errors.Is(err, tc.wantErr) {
t.Errorf("UpdateTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr)
continue
}
}
}
func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
r := setup(t)
defer r.Close()

View File

@@ -23,7 +23,7 @@ import (
)
// Version of asynq library and CLI.
const Version = "0.26.0"
const Version = "0.25.0"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"
@@ -240,9 +240,6 @@ type TaskMessage struct {
// Payload holds data needed to process the task.
Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// ID is a unique identifier for each task.
ID string
@@ -307,7 +304,6 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
return proto.Marshal(&pb.TaskMessage{
Type: msg.Type,
Payload: msg.Payload,
Headers: msg.Headers,
Id: msg.ID,
Queue: msg.Queue,
Retry: int32(msg.Retry),
@@ -332,7 +328,6 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
return &TaskMessage{
Type: pbmsg.GetType(),
Payload: pbmsg.GetPayload(),
Headers: pbmsg.GetHeaders(),
ID: pbmsg.GetId(),
Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()),

View File

@@ -107,7 +107,6 @@ type Op string
// only the last one is recorded.
//
// The types are:
//
// errors.Op
// The operation being performed, usually the method
// being invoked (Get, Put, etc.).

View File

@@ -4,8 +4,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v5.29.3
// protoc-gen-go v1.34.2
// protoc v3.19.6
// source: asynq.proto
package proto
@@ -16,7 +16,6 @@ import (
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
@@ -28,15 +27,16 @@ const (
// TaskMessage is the internal representation of a task with additional
// metadata fields.
// Next ID: 16
// Next ID: 15
type TaskMessage struct {
state protoimpl.MessageState `protogen:"open.v1"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Type indicates the kind of the task to be performed.
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
// Payload holds data needed to process the task.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
// Headers holds additional metadata for the task.
Headers map[string]string `protobuf:"bytes,15,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
// Unique identifier for the task.
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
// Name of the queue to which this task belongs.
@@ -71,16 +71,16 @@ type TaskMessage struct {
// Time when the task completed in success in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
// This field is populated if result_ttl > 0 upon completion.
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
}
func (x *TaskMessage) Reset() {
*x = TaskMessage{}
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TaskMessage) String() string {
@@ -91,7 +91,7 @@ func (*TaskMessage) ProtoMessage() {}
func (x *TaskMessage) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[0]
if x != nil {
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -120,13 +120,6 @@ func (x *TaskMessage) GetPayload() []byte {
return nil
}
func (x *TaskMessage) GetHeaders() map[string]string {
if x != nil {
return x.Headers
}
return nil
}
func (x *TaskMessage) GetId() string {
if x != nil {
return x.Id
@@ -213,7 +206,10 @@ func (x *TaskMessage) GetCompletedAt() int64 {
// ServerInfo holds information about a running server.
type ServerInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Host machine the server is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the server process.
@@ -225,7 +221,7 @@ type ServerInfo struct {
// List of queue names with their priorities.
// The server will consume tasks from the queues and prioritize
// queues with higher priority numbers.
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
// If set, the server will always consume tasks from a queue with higher
// priority.
StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"`
@@ -235,15 +231,15 @@ type ServerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Number of workers currently processing tasks.
ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ServerInfo) Reset() {
*x = ServerInfo{}
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ServerInfo) String() string {
@@ -254,7 +250,7 @@ func (*ServerInfo) ProtoMessage() {}
func (x *ServerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[1]
if x != nil {
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -334,7 +330,10 @@ func (x *ServerInfo) GetActiveWorkerCount() int32 {
// WorkerInfo holds information about a running worker.
type WorkerInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Host matchine this worker is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the process in which this worker is running.
@@ -353,16 +352,16 @@ type WorkerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Deadline by which the worker needs to complete processing
// the task. If worker exceeds the deadline, the task will fail.
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
}
func (x *WorkerInfo) Reset() {
*x = WorkerInfo{}
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *WorkerInfo) String() string {
@@ -373,7 +372,7 @@ func (*WorkerInfo) ProtoMessage() {}
func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[2]
if x != nil {
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -454,7 +453,10 @@ func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp {
// SchedulerEntry holds information about a periodic task registered
// with a scheduler.
type SchedulerEntry struct {
state protoimpl.MessageState `protogen:"open.v1"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Identifier of the scheduler entry.
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// Periodic schedule spec of the entry.
@@ -470,15 +472,15 @@ type SchedulerEntry struct {
// Last time the task was enqueued.
// Zero time if task was never enqueued.
PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SchedulerEntry) Reset() {
*x = SchedulerEntry{}
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SchedulerEntry) String() string {
@@ -489,7 +491,7 @@ func (*SchedulerEntry) ProtoMessage() {}
func (x *SchedulerEntry) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[3]
if x != nil {
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -556,20 +558,23 @@ func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp {
// SchedulerEnqueueEvent holds information about an enqueue event
// by a scheduler.
type SchedulerEnqueueEvent struct {
state protoimpl.MessageState `protogen:"open.v1"`
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// ID of the task that was enqueued.
TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
// Time the task was enqueued.
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
}
func (x *SchedulerEnqueueEvent) Reset() {
*x = SchedulerEnqueueEvent{}
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SchedulerEnqueueEvent) String() string {
@@ -580,7 +585,7 @@ func (*SchedulerEnqueueEvent) ProtoMessage() {}
func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[4]
if x != nil {
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -611,106 +616,146 @@ func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp {
var File_asynq_proto protoreflect.FileDescriptor
const file_asynq_proto_rawDesc = "" +
"\n" +
"\vasynq.proto\x12\x05asynq\x1a\x1fgoogle/protobuf/timestamp.proto\"\xfe\x03\n" +
"\vTaskMessage\x12\x12\n" +
"\x04type\x18\x01 \x01(\tR\x04type\x12\x18\n" +
"\apayload\x18\x02 \x01(\fR\apayload\x129\n" +
"\aheaders\x18\x0f \x03(\v2\x1f.asynq.TaskMessage.HeadersEntryR\aheaders\x12\x0e\n" +
"\x02id\x18\x03 \x01(\tR\x02id\x12\x14\n" +
"\x05queue\x18\x04 \x01(\tR\x05queue\x12\x14\n" +
"\x05retry\x18\x05 \x01(\x05R\x05retry\x12\x18\n" +
"\aretried\x18\x06 \x01(\x05R\aretried\x12\x1b\n" +
"\terror_msg\x18\a \x01(\tR\berrorMsg\x12$\n" +
"\x0elast_failed_at\x18\v \x01(\x03R\flastFailedAt\x12\x18\n" +
"\atimeout\x18\b \x01(\x03R\atimeout\x12\x1a\n" +
"\bdeadline\x18\t \x01(\x03R\bdeadline\x12\x1d\n" +
"\n" +
"unique_key\x18\n" +
" \x01(\tR\tuniqueKey\x12\x1b\n" +
"\tgroup_key\x18\x0e \x01(\tR\bgroupKey\x12\x1c\n" +
"\tretention\x18\f \x01(\x03R\tretention\x12!\n" +
"\fcompleted_at\x18\r \x01(\x03R\vcompletedAt\x1a:\n" +
"\fHeadersEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x8f\x03\n" +
"\n" +
"ServerInfo\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12 \n" +
"\vconcurrency\x18\x04 \x01(\x05R\vconcurrency\x125\n" +
"\x06queues\x18\x05 \x03(\v2\x1d.asynq.ServerInfo.QueuesEntryR\x06queues\x12'\n" +
"\x0fstrict_priority\x18\x06 \x01(\bR\x0estrictPriority\x12\x16\n" +
"\x06status\x18\a \x01(\tR\x06status\x129\n" +
"\n" +
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x12.\n" +
"\x13active_worker_count\x18\t \x01(\x05R\x11activeWorkerCount\x1a9\n" +
"\vQueuesEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\x05R\x05value:\x028\x01\"\xb1\x02\n" +
"\n" +
"WorkerInfo\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12\x17\n" +
"\atask_id\x18\x04 \x01(\tR\x06taskId\x12\x1b\n" +
"\ttask_type\x18\x05 \x01(\tR\btaskType\x12!\n" +
"\ftask_payload\x18\x06 \x01(\fR\vtaskPayload\x12\x14\n" +
"\x05queue\x18\a \x01(\tR\x05queue\x129\n" +
"\n" +
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x126\n" +
"\bdeadline\x18\t \x01(\v2\x1a.google.protobuf.TimestampR\bdeadline\"\xad\x02\n" +
"\x0eSchedulerEntry\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" +
"\x04spec\x18\x02 \x01(\tR\x04spec\x12\x1b\n" +
"\ttask_type\x18\x03 \x01(\tR\btaskType\x12!\n" +
"\ftask_payload\x18\x04 \x01(\fR\vtaskPayload\x12'\n" +
"\x0fenqueue_options\x18\x05 \x03(\tR\x0eenqueueOptions\x12F\n" +
"\x11next_enqueue_time\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\x0fnextEnqueueTime\x12F\n" +
"\x11prev_enqueue_time\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\x0fprevEnqueueTime\"o\n" +
"\x15SchedulerEnqueueEvent\x12\x17\n" +
"\atask_id\x18\x01 \x01(\tR\x06taskId\x12=\n" +
"\fenqueue_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\venqueueTimeB)Z'github.com/hibiken/asynq/internal/protob\x06proto3"
var file_asynq_proto_rawDesc = []byte{
0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61,
0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79,
0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c,
0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74,
0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12,
0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05,
0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66,
0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c,
0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07,
0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74,
0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79,
0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65,
0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c,
0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28,
0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c,
0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01,
0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22,
0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12,
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f,
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52,
0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69,
0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49,
0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79,
0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65,
0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74,
0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20,
0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72,
0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73,
0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61,
0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65,
0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20,
0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65,
0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f,
0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65,
0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05,
0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65,
0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65,
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a,
0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61,
0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75,
0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f,
0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18,
0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e,
0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65,
0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a,
0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75,
0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17,
0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79,
0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_asynq_proto_rawDescOnce sync.Once
file_asynq_proto_rawDescData []byte
file_asynq_proto_rawDescData = file_asynq_proto_rawDesc
)
func file_asynq_proto_rawDescGZIP() []byte {
file_asynq_proto_rawDescOnce.Do(func() {
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)))
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData)
})
return file_asynq_proto_rawDescData
}
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_asynq_proto_goTypes = []any{
(*TaskMessage)(nil), // 0: asynq.TaskMessage
(*ServerInfo)(nil), // 1: asynq.ServerInfo
(*WorkerInfo)(nil), // 2: asynq.WorkerInfo
(*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry
(*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent
nil, // 5: asynq.TaskMessage.HeadersEntry
nil, // 6: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
nil, // 5: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp
}
var file_asynq_proto_depIdxs = []int32{
5, // 0: asynq.TaskMessage.headers:type_name -> asynq.TaskMessage.HeadersEntry
6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
8, // [8:8] is the sub-list for method output_type
8, // [8:8] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
7, // [7:7] is the sub-list for method output_type
7, // [7:7] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
}
func init() { file_asynq_proto_init() }
@@ -718,13 +763,75 @@ func file_asynq_proto_init() {
if File_asynq_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_asynq_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*TaskMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*ServerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*WorkerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[3].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEntry); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[4].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEnqueueEvent); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)),
RawDescriptor: file_asynq_proto_rawDesc,
NumEnums: 0,
NumMessages: 7,
NumMessages: 6,
NumExtensions: 0,
NumServices: 0,
},
@@ -733,6 +840,7 @@ func file_asynq_proto_init() {
MessageInfos: file_asynq_proto_msgTypes,
}.Build()
File_asynq_proto = out.File
file_asynq_proto_rawDesc = nil
file_asynq_proto_goTypes = nil
file_asynq_proto_depIdxs = nil
}

View File

@@ -11,7 +11,7 @@ option go_package = "github.com/hibiken/asynq/internal/proto";
// TaskMessage is the internal representation of a task with additional
// metadata fields.
// Next ID: 16
// Next ID: 15
message TaskMessage {
// Type indicates the kind of the task to be performed.
string type = 1;
@@ -19,9 +19,6 @@ message TaskMessage {
// Payload holds data needed to process the task.
bytes payload = 2;
// Headers holds additional metadata for the task.
map<string, string> headers = 15;
// Unique identifier for the task.
string id = 3;

View File

@@ -1412,93 +1412,6 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
return n, nil
}
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// --
// ARGV[1] -> task message data
//
// Output:
// Numeric code indicating the status:
// Returns 1 if task is successfully updated.
// Returns 0 if task is not found.
// Returns -1 if task is not in scheduled state.
var updateTaskPayloadCmd = redis.NewScript(`
-- Check if given taks exists
if redis.call("EXISTS", KEYS[1]) == 0 then
return 0
end
local state, pending_since, group, unique_key = unpack(redis.call("HMGET", KEYS[1], "state", "pending_since", "group", "unique_key"))
if state ~= "scheduled" then
return -1
end
local redis_call_args = {"state", state}
if pending_since then
table.insert(redis_call_args, "pending_since")
table.insert(redis_call_args, pending_since)
end
if group then
table.insert(redis_call_args, "group")
table.insert(redis_call_args, group)
end
if unique_key then
table.insert(redis_call_args, "unique_key")
table.insert(redis_call_args, unique_key)
end
redis.call("HSET", KEYS[1], "msg", ARGV[1], unpack(redis_call_args))
return 1
`)
// UpdateTaskPayload finds a task that matches the id from the given queue and updates it's payload.
// It returns nil if it successfully updated the task payload.
//
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
// If a task is in active state it returns non-nil error with Code FailedPrecondition.
func (r *RDB) UpdateTaskPayload(qname, id string, payload []byte) error {
var op errors.Op = "rdb.UpdateTask"
if err := r.checkQueueExists(qname); err != nil {
return errors.E(op, errors.CanonicalCode(err), err)
}
taskInfo, err := r.GetTaskInfo(qname, id)
if err != nil {
return errors.E(op, errors.Unknown, err)
}
taskInfo.Message.Payload = payload
encoded, err := base.EncodeMessage(taskInfo.Message)
if err != nil {
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
keys := []string{
base.TaskKey(qname, id),
}
argv := []interface{}{
encoded,
}
res, err := updateTaskPayloadCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return errors.E(op, errors.Unknown, err)
}
n, ok := res.(int64)
if !ok {
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: updateTaskCmd script returned unexported value %v", res))
}
switch n {
case 1:
return nil
case 0:
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id})
case -1:
return errors.E(op, errors.FailedPrecondition, "cannot update task that is not in scheduled state.")
default:
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from updateTaskCmd script: %d", n))
}
}
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:groups

View File

@@ -122,10 +122,10 @@ func (mgr *PeriodicTaskManager) Start() error {
panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize")
}
if err := mgr.initialSync(); err != nil {
return fmt.Errorf("asynq: %w", err)
return fmt.Errorf("asynq: %v", err)
}
if err := mgr.s.Start(); err != nil {
return fmt.Errorf("asynq: %w", err)
return fmt.Errorf("asynq: %v", err)
}
mgr.wg.Add(1)
go func() {
@@ -168,11 +168,11 @@ func (mgr *PeriodicTaskManager) Run() error {
func (mgr *PeriodicTaskManager) initialSync() error {
configs, err := mgr.p.GetConfigs()
if err != nil {
return fmt.Errorf("initial call to GetConfigs failed: %w", err)
return fmt.Errorf("initial call to GetConfigs failed: %v", err)
}
for _, c := range configs {
if err := validatePeriodicTaskConfig(c); err != nil {
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %w", err)
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %v", err)
}
}
mgr.add(configs)

View File

@@ -230,7 +230,6 @@ func (p *processor) exec() {
ctx: ctx,
},
)
task.headers = msg.Headers
resCh <- p.perform(ctx, task)
}()
@@ -334,7 +333,7 @@ var RevokeTask = errors.New("revoke task")
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers), err)
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
}
switch {
case errors.Is(err, RevokeTask):
@@ -355,7 +354,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
d := p.retryDelayFunc(msg.Retried, e, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d)
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
if err != nil {

View File

@@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() {
}
func (r *recoverer) retry(msg *base.TaskMessage, err error) {
delay := r.retryDelayFunc(msg.Retried, err, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(delay)
if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)

View File

@@ -6,16 +6,12 @@ package asynq
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"sync"
)
// ErrHandlerNotFound indicates that no task handler was found for a given pattern.
var ErrHandlerNotFound = errors.New("handler not found for task")
// ServeMux is a multiplexer for asynchronous tasks.
// It matches the type of each task against a list of registered patterns
// and calls the handler for the pattern that most closely matches the
@@ -153,8 +149,8 @@ func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
// NotFound returns an error indicating that the handler was not found for the given task.
func NotFound(ctx context.Context, task *Task) error {
return fmt.Errorf("%w %q", ErrHandlerNotFound, task.Type())
return fmt.Errorf("handler not found for task %q", task.Type())
}
// NotFoundHandler returns a simple task handler that returns a not found error.
// NotFoundHandler returns a simple task handler that returns a ``not found`` error.
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }

View File

@@ -24,7 +24,7 @@ func makeFakeHandler(identity string) Handler {
}
// makeFakeMiddleware returns a middleware function that appends the given identity
// to the global invoked slice.
//to the global invoked slice.
func makeFakeMiddleware(identity string) MiddlewareFunc {
return func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, t *Task) error {

View File

@@ -174,15 +174,16 @@ type Config struct {
// })
//
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
//
// we can also handle panic error like:
// func reportError(ctx context, task *asynq.Task, err error) {
// if asynq.IsPanicError(err) {
// if asynq.IsPanic(err) {
// errorReportingService.Notify(err)
// }
// })
//
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler
// Logger specifies the logger used by the server instance.

View File

@@ -24,10 +24,8 @@ func (srv *Server) waitForSignals() {
if sig == unix.SIGTSTP {
srv.Stop()
continue
} else {
srv.Stop()
break
}
break
}
}

View File

@@ -8,9 +8,9 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/redis/go-redis/v9"
)
type subscriber struct {

View File

@@ -39,7 +39,6 @@ By default, CLI will try to connect to a redis server running at `localhost:6379
--config string config file to set flag defaut values (default is $HOME/.asynq.yaml)
-n, --db int redis database number (default is 0)
-h, --help help for asynq
-U, --username string username to use when connecting to redis server
-p, --password string password to use when connecting to redis server
-u, --uri string redis server URI (default "127.0.0.1:6379")

View File

@@ -16,12 +16,11 @@ import (
// ScreenDrawer is used to draw contents on screen.
//
// Usage example:
//
// d := NewScreenDrawer(s)
// d.Println("Hello world", mystyle)
// d.NL() // adds newline
// d.Print("foo", mystyle.Bold(true))
// d.Print("bar", mystyle.Italic(true))
// d := NewScreenDrawer(s)
// d.Println("Hello world", mystyle)
// d.NL() // adds newline
// d.Print("foo", mystyle.Bold(true))
// d.Print("bar", mystyle.Italic(true))
type ScreenDrawer struct {
l *LineDrawer
}

View File

@@ -1,4 +1,3 @@
//
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
@@ -37,13 +36,10 @@ var (
uri string
db int
password string
username string
useRedisCluster bool
clusterAddrs string
tlsServerName string
insecure bool
useTLS bool
)
// rootCmd represents the base command when called without any subcommands
@@ -261,6 +257,7 @@ func adjustPadding(lines ...*displayLine) {
func rpad(s string, padding int) string {
tmpl := fmt.Sprintf("%%-%ds ", padding)
return fmt.Sprintf(tmpl, s)
}
// lpad adds padding to the left of a string.
@@ -311,26 +308,19 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "Redis server URI")
rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "Redis database number (default is 0)")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password to use when connecting to redis server")
rootCmd.PersistentFlags().StringVarP(&username, "username", "U", "", "Username to use when connecting to Redis (ACL username)")
rootCmd.PersistentFlags().BoolVar(&useRedisCluster, "cluster", false, "Connect to redis cluster")
rootCmd.PersistentFlags().StringVar(&clusterAddrs, "cluster_addrs",
"127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005",
"List of comma-separated redis server addresses")
rootCmd.PersistentFlags().BoolVar(&useTLS, "tls", false, "Enable TLS connection")
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
"", "Server name for TLS validation")
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
false, "Allow insecure TLS connection by skipping cert validation")
// Bind flags with config.
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password"))
viper.BindPFlag("username", rootCmd.PersistentFlags().Lookup("username"))
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
viper.BindPFlag("tls", rootCmd.PersistentFlags().Lookup("tls"))
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
}
// initConfig reads in config file and ENV variables if set.
@@ -367,7 +357,6 @@ func createRDB() *rdb.RDB {
c = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
})
} else {
@@ -375,7 +364,6 @@ func createRDB() *rdb.RDB {
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
})
}
@@ -398,7 +386,6 @@ func getRedisConnOpt() asynq.RedisConnOpt {
return asynq.RedisClusterClientOpt{
Addrs: addrs,
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
}
}
@@ -406,22 +393,16 @@ func getRedisConnOpt() asynq.RedisConnOpt {
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
Username: viper.GetString("username"),
TLSConfig: getTLSConfig(),
}
}
func getTLSConfig() *tls.Config {
tlsServer := viper.GetString("tls_server")
if tlsServer != "" {
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
if tlsServer == "" {
return nil
}
if viper.GetBool("tls") {
return &tls.Config{InsecureSkipVerify: viper.GetBool("insecure")}
}
return nil
return &tls.Config{ServerName: tlsServer}
}
// printTable is a helper function to print data in table format.
@@ -429,22 +410,18 @@ func getTLSConfig() *tls.Config {
// cols is a list of headers and printRow specifies how to print rows.
//
// Example:
//
// type User struct {
// Name string
// Addr string
// Age int
// }
//
// type User struct {
// Name string
// Addr string
// Age int
// }
// data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...}
// cols := []string{"Name", "Addr", "Age"}
//
// printRows := func(w io.Writer, tmpl string) {
// for _, u := range data {
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// }
// }
//
// printRows := func(w io.Writer, tmpl string) {
// for _, u := range data {
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// }
// }
// printTable(cols, printRows)
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
format := strings.Repeat("%v\t", len(cols)) + "\n"

View File

@@ -770,3 +770,4 @@ func taskRunAll(cmd *cobra.Command, args []string) {
}
fmt.Printf("%d tasks are now pending\n", n)
}

View File

@@ -25,13 +25,13 @@ type QueueMetricsCollector struct {
func (qmc *QueueMetricsCollector) collectQueueInfo() ([]*asynq.QueueInfo, error) {
qnames, err := qmc.inspector.Queues()
if err != nil {
return nil, fmt.Errorf("failed to get queue names: %w", err)
return nil, fmt.Errorf("failed to get queue names: %v", err)
}
infos := make([]*asynq.QueueInfo, len(qnames))
for i, qname := range qnames {
qinfo, err := qmc.inspector.GetQueueInfo(qname)
if err != nil {
return nil, fmt.Errorf("failed to get queue info: %w", err)
return nil, fmt.Errorf("failed to get queue info: %v", err)
}
infos[i] = qinfo
}

View File

@@ -94,7 +94,7 @@ func (s *Semaphore) Release(ctx context.Context) error {
n, err := s.rc.ZRem(ctx, semaphoreKey(s.scope), taskID).Result()
if err != nil {
return fmt.Errorf("redis command failed: %w", err)
return fmt.Errorf("redis command failed: %v", err)
}
if n == 0 {