2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-11 21:25:53 +08:00

Feature: Add Headers Support to Tasks (#1070)

* feat(task): Add headers support to tasks

* fix: cleanup copy map code

* fix: Add tests
This commit is contained in:
Joe
2025-11-04 09:07:59 -08:00
committed by GitHub
parent 8261a03f0d
commit 5de9b1faf0
9 changed files with 647 additions and 250 deletions

View File

@@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) {
}
tasks := make([]*Task, len(msgs))
for i, m := range msgs {
tasks[i] = NewTask(m.Type, m.Payload)
tasks[i] = NewTaskWithHeaders(m.Type, m.Payload, m.Headers)
}
aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline)

View File

@@ -8,6 +8,7 @@ import (
"context"
"crypto/tls"
"fmt"
"maps"
"net"
"net/url"
"strconv"
@@ -26,6 +27,9 @@ type Task struct {
// payload holds data needed to perform the task.
payload []byte
// headers holds additional metadata for the task.
headers map[string]string
// opts holds options for the task.
opts []Option
@@ -33,8 +37,9 @@ type Task struct {
w *ResultWriter
}
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Headers() map[string]string { return t.headers }
// ResultWriter returns a pointer to the ResultWriter associated with the task.
//
@@ -48,6 +53,21 @@ func NewTask(typename string, payload []byte, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: nil,
opts: opts,
}
}
// NewTaskWithHeaders returns a new Task given a type name, payload data, and headers.
// Options can be passed to configure task processing behavior.
// TODO: In the next major (breaking) release, fold this functionality into NewTask
//
// so that headers are supported directly. After that, remove this method.
func NewTaskWithHeaders(typename string, payload []byte, headers map[string]string, opts ...Option) *Task {
return &Task{
typename: typename,
payload: payload,
headers: maps.Clone(headers),
opts: opts,
}
}
@@ -57,6 +77,7 @@ func newTask(typename string, payload []byte, w *ResultWriter) *Task {
return &Task{
typename: typename,
payload: payload,
headers: make(map[string]string),
w: w,
}
}
@@ -75,6 +96,9 @@ type TaskInfo struct {
// Payload is the payload data of the task.
Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// State indicates the task state.
State TaskState
@@ -145,6 +169,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
Queue: msg.Queue,
Type: msg.Type,
Payload: msg.Payload, // Do we need to make a copy?
Headers: msg.Headers,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastErr: msg.ErrorMsg,

View File

@@ -385,6 +385,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: task.Headers(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),

View File

@@ -1191,3 +1191,473 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
}
}
}
func TestClientEnqueueWithHeaders(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"user-id": "123",
"request-id": "abc-def-ghi",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantPending map[string][]*base.TaskMessage
}{
{
desc: "Task with headers",
task: NewTaskWithHeaders("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}), headers),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "send_email",
Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with empty headers",
task: NewTaskWithHeaders("process_data", []byte("data"), map[string]string{}),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "process_data",
Payload: []byte("data"),
Headers: map[string]string{},
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "process_data",
Payload: []byte("data"),
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with nil headers",
task: NewTaskWithHeaders("cleanup", nil, nil),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "cleanup",
Payload: nil,
Headers: nil,
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "cleanup",
Payload: nil,
Headers: nil,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Task with headers and custom options",
task: NewTaskWithHeaders("notify", []byte("notification"), map[string]string{"channel": "email"}),
opts: []Option{MaxRetry(5), Queue("notifications")},
wantInfo: &TaskInfo{
Queue: "notifications",
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"notifications": {
{
Type: "notify",
Payload: []byte("notification"),
Headers: map[string]string{"channel": "email"},
Retry: 5,
Queue: "notifications",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantPending {
got := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
}
}
func TestClientEnqueueWithHeadersScheduled(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
oneHourLater := now.Add(time.Hour)
headers := map[string]string{
"correlation-id": "xyz-123",
"source": "api",
}
tests := []struct {
desc string
task *Task
processAt time.Time
opts []Option
wantInfo *TaskInfo
wantScheduled map[string][]base.Z
}{
{
desc: "Schedule task with headers",
task: NewTaskWithHeaders("scheduled_task", []byte("payload"), headers),
processAt: oneHourLater,
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
State: TaskStateScheduled,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: oneHourLater,
},
wantScheduled: map[string][]base.Z{
"default": {
{
Message: &base.TaskMessage{
Type: "scheduled_task",
Payload: []byte("payload"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
Score: oneHourLater.Unix(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
opts := append(tc.opts, ProcessAt(tc.processAt))
gotInfo, err := client.Enqueue(tc.task, opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
}
}
}
}
func TestNewTaskWithHeaders(t *testing.T) {
tests := []struct {
desc string
typename string
payload []byte
headers map[string]string
opts []Option
want *Task
}{
{
desc: "Task with headers",
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
want: &Task{
typename: "test_task",
payload: []byte("test payload"),
headers: map[string]string{"key1": "value1", "key2": "value2"},
opts: []Option{MaxRetry(3)},
},
},
{
desc: "Task with empty headers",
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
want: &Task{
typename: "empty_headers",
payload: nil,
headers: map[string]string{},
opts: nil,
},
},
{
desc: "Task with nil headers",
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
want: &Task{
typename: "nil_headers",
payload: []byte("data"),
headers: nil,
opts: []Option{Queue("test")},
},
},
}
for _, tc := range tests {
got := NewTaskWithHeaders(tc.typename, tc.payload, tc.headers, tc.opts...)
if got.Type() != tc.want.typename {
t.Errorf("%s: Type() = %q, want %q", tc.desc, got.Type(), tc.want.typename)
}
if diff := cmp.Diff(tc.want.payload, got.Payload()); diff != "" {
t.Errorf("%s: Payload() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if diff := cmp.Diff(tc.want.headers, got.Headers()); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
if tc.headers != nil && got.Headers() != nil {
tc.headers["modified"] = "test"
if _, exists := got.Headers()["modified"]; exists {
t.Errorf("%s: Headers should be cloned, but modification affected task headers", tc.desc)
}
}
}
}
func TestTaskHeadersMethod(t *testing.T) {
tests := []struct {
desc string
task *Task
want map[string]string
wantNil bool
}{
{
desc: "Task created with NewTask has nil headers",
task: NewTask("test", []byte("data")),
want: nil,
wantNil: true,
},
{
desc: "Task created with NewTaskWithHeaders has headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{"key": "value"}),
want: map[string]string{"key": "value"},
},
{
desc: "Task created with empty headers",
task: NewTaskWithHeaders("test", []byte("data"), map[string]string{}),
want: map[string]string{},
},
{
desc: "Task created with nil headers",
task: NewTaskWithHeaders("test", []byte("data"), nil),
want: nil,
wantNil: true,
},
}
for _, tc := range tests {
got := tc.task.Headers()
if tc.wantNil {
if got != nil {
t.Errorf("%s: Headers() = %v, want nil", tc.desc, got)
}
} else {
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff)
}
}
}
}
func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
now := time.Now()
headers := map[string]string{
"batch-id": "batch-123",
"priority": "high",
}
tests := []struct {
desc string
task *Task
opts []Option
wantInfo *TaskInfo
wantGroups map[string]map[string][]base.Z
}{
{
desc: "Task with headers and group",
task: NewTaskWithHeaders("batch_process", []byte("item1"), headers),
opts: []Option{Group("batch-123")},
wantInfo: &TaskInfo{
Queue: "default",
Group: "batch-123",
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
State: TaskStateAggregating,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: time.Time{},
},
wantGroups: map[string]map[string][]base.Z{
"default": {
"batch-123": {
{
Message: &base.TaskMessage{
Type: "batch_process",
Payload: []byte("item1"),
Headers: headers,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
GroupKey: "batch-123",
},
Score: now.Unix(),
},
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Errorf("%s: Enqueue failed: %v", tc.desc, err)
continue
}
cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond),
}
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotInfo, tc.wantInfo, diff)
}
for qname, groups := range tc.wantGroups {
for groupKey, want := range groups {
got := h.GetGroupEntries(t, r, qname, groupKey)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.GroupKey(qname, groupKey), diff)
}
}
}
}
}

View File

@@ -240,6 +240,9 @@ type TaskMessage struct {
// Payload holds data needed to process the task.
Payload []byte
// Headers holds additional metadata for the task.
Headers map[string]string
// ID is a unique identifier for each task.
ID string
@@ -304,6 +307,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
return proto.Marshal(&pb.TaskMessage{
Type: msg.Type,
Payload: msg.Payload,
Headers: msg.Headers,
Id: msg.ID,
Queue: msg.Queue,
Retry: int32(msg.Retry),
@@ -328,6 +332,7 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
return &TaskMessage{
Type: pbmsg.GetType(),
Payload: pbmsg.GetPayload(),
Headers: pbmsg.GetHeaders(),
ID: pbmsg.GetId(),
Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()),

View File

@@ -4,8 +4,8 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.34.2
// protoc v3.19.6
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: asynq.proto
package proto
@@ -16,6 +16,7 @@ import (
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
@@ -27,16 +28,15 @@ const (
// TaskMessage is the internal representation of a task with additional
// metadata fields.
// Next ID: 15
// Next ID: 16
type TaskMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Type indicates the kind of the task to be performed.
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
// Payload holds data needed to process the task.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
// Headers holds additional metadata for the task.
Headers map[string]string `protobuf:"bytes,15,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
// Unique identifier for the task.
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
// Name of the queue to which this task belongs.
@@ -71,16 +71,16 @@ type TaskMessage struct {
// Time when the task completed in success in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
// This field is populated if result_ttl > 0 upon completion.
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *TaskMessage) Reset() {
*x = TaskMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *TaskMessage) String() string {
@@ -91,7 +91,7 @@ func (*TaskMessage) ProtoMessage() {}
func (x *TaskMessage) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -120,6 +120,13 @@ func (x *TaskMessage) GetPayload() []byte {
return nil
}
func (x *TaskMessage) GetHeaders() map[string]string {
if x != nil {
return x.Headers
}
return nil
}
func (x *TaskMessage) GetId() string {
if x != nil {
return x.Id
@@ -206,10 +213,7 @@ func (x *TaskMessage) GetCompletedAt() int64 {
// ServerInfo holds information about a running server.
type ServerInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Host machine the server is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the server process.
@@ -221,7 +225,7 @@ type ServerInfo struct {
// List of queue names with their priorities.
// The server will consume tasks from the queues and prioritize
// queues with higher priority numbers.
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
// If set, the server will always consume tasks from a queue with higher
// priority.
StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"`
@@ -231,15 +235,15 @@ type ServerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Number of workers currently processing tasks.
ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *ServerInfo) Reset() {
*x = ServerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ServerInfo) String() string {
@@ -250,7 +254,7 @@ func (*ServerInfo) ProtoMessage() {}
func (x *ServerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -330,10 +334,7 @@ func (x *ServerInfo) GetActiveWorkerCount() int32 {
// WorkerInfo holds information about a running worker.
type WorkerInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Host matchine this worker is running on.
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
// PID of the process in which this worker is running.
@@ -352,16 +353,16 @@ type WorkerInfo struct {
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
// Deadline by which the worker needs to complete processing
// the task. If worker exceeds the deadline, the task will fail.
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *WorkerInfo) Reset() {
*x = WorkerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *WorkerInfo) String() string {
@@ -372,7 +373,7 @@ func (*WorkerInfo) ProtoMessage() {}
func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -453,10 +454,7 @@ func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp {
// SchedulerEntry holds information about a periodic task registered
// with a scheduler.
type SchedulerEntry struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Identifier of the scheduler entry.
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// Periodic schedule spec of the entry.
@@ -472,15 +470,15 @@ type SchedulerEntry struct {
// Last time the task was enqueued.
// Zero time if task was never enqueued.
PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SchedulerEntry) Reset() {
*x = SchedulerEntry{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SchedulerEntry) String() string {
@@ -491,7 +489,7 @@ func (*SchedulerEntry) ProtoMessage() {}
func (x *SchedulerEntry) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -558,23 +556,20 @@ func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp {
// SchedulerEnqueueEvent holds information about an enqueue event
// by a scheduler.
type SchedulerEnqueueEvent struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// ID of the task that was enqueued.
TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
// Time the task was enqueued.
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *SchedulerEnqueueEvent) Reset() {
*x = SchedulerEnqueueEvent{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *SchedulerEnqueueEvent) String() string {
@@ -585,7 +580,7 @@ func (*SchedulerEnqueueEvent) ProtoMessage() {}
func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -616,146 +611,106 @@ func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp {
var File_asynq_proto protoreflect.FileDescriptor
var file_asynq_proto_rawDesc = []byte{
0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61,
0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79,
0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c,
0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74,
0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12,
0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05,
0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66,
0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c,
0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07,
0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74,
0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69,
0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79,
0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65,
0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c,
0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28,
0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c,
0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01,
0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22,
0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12,
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f,
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52,
0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69,
0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49,
0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79,
0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65,
0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20,
0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76,
0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74,
0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20,
0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72,
0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73,
0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61,
0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65,
0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20,
0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65,
0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38,
0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f,
0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28,
0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65,
0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05,
0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65,
0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65,
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a,
0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61,
0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75,
0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f,
0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18,
0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e,
0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65,
0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a,
0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75,
0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17,
0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79,
0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
const file_asynq_proto_rawDesc = "" +
"\n" +
"\vasynq.proto\x12\x05asynq\x1a\x1fgoogle/protobuf/timestamp.proto\"\xfe\x03\n" +
"\vTaskMessage\x12\x12\n" +
"\x04type\x18\x01 \x01(\tR\x04type\x12\x18\n" +
"\apayload\x18\x02 \x01(\fR\apayload\x129\n" +
"\aheaders\x18\x0f \x03(\v2\x1f.asynq.TaskMessage.HeadersEntryR\aheaders\x12\x0e\n" +
"\x02id\x18\x03 \x01(\tR\x02id\x12\x14\n" +
"\x05queue\x18\x04 \x01(\tR\x05queue\x12\x14\n" +
"\x05retry\x18\x05 \x01(\x05R\x05retry\x12\x18\n" +
"\aretried\x18\x06 \x01(\x05R\aretried\x12\x1b\n" +
"\terror_msg\x18\a \x01(\tR\berrorMsg\x12$\n" +
"\x0elast_failed_at\x18\v \x01(\x03R\flastFailedAt\x12\x18\n" +
"\atimeout\x18\b \x01(\x03R\atimeout\x12\x1a\n" +
"\bdeadline\x18\t \x01(\x03R\bdeadline\x12\x1d\n" +
"\n" +
"unique_key\x18\n" +
" \x01(\tR\tuniqueKey\x12\x1b\n" +
"\tgroup_key\x18\x0e \x01(\tR\bgroupKey\x12\x1c\n" +
"\tretention\x18\f \x01(\x03R\tretention\x12!\n" +
"\fcompleted_at\x18\r \x01(\x03R\vcompletedAt\x1a:\n" +
"\fHeadersEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x8f\x03\n" +
"\n" +
"ServerInfo\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12 \n" +
"\vconcurrency\x18\x04 \x01(\x05R\vconcurrency\x125\n" +
"\x06queues\x18\x05 \x03(\v2\x1d.asynq.ServerInfo.QueuesEntryR\x06queues\x12'\n" +
"\x0fstrict_priority\x18\x06 \x01(\bR\x0estrictPriority\x12\x16\n" +
"\x06status\x18\a \x01(\tR\x06status\x129\n" +
"\n" +
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x12.\n" +
"\x13active_worker_count\x18\t \x01(\x05R\x11activeWorkerCount\x1a9\n" +
"\vQueuesEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\x05R\x05value:\x028\x01\"\xb1\x02\n" +
"\n" +
"WorkerInfo\x12\x12\n" +
"\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" +
"\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" +
"\tserver_id\x18\x03 \x01(\tR\bserverId\x12\x17\n" +
"\atask_id\x18\x04 \x01(\tR\x06taskId\x12\x1b\n" +
"\ttask_type\x18\x05 \x01(\tR\btaskType\x12!\n" +
"\ftask_payload\x18\x06 \x01(\fR\vtaskPayload\x12\x14\n" +
"\x05queue\x18\a \x01(\tR\x05queue\x129\n" +
"\n" +
"start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x126\n" +
"\bdeadline\x18\t \x01(\v2\x1a.google.protobuf.TimestampR\bdeadline\"\xad\x02\n" +
"\x0eSchedulerEntry\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" +
"\x04spec\x18\x02 \x01(\tR\x04spec\x12\x1b\n" +
"\ttask_type\x18\x03 \x01(\tR\btaskType\x12!\n" +
"\ftask_payload\x18\x04 \x01(\fR\vtaskPayload\x12'\n" +
"\x0fenqueue_options\x18\x05 \x03(\tR\x0eenqueueOptions\x12F\n" +
"\x11next_enqueue_time\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\x0fnextEnqueueTime\x12F\n" +
"\x11prev_enqueue_time\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\x0fprevEnqueueTime\"o\n" +
"\x15SchedulerEnqueueEvent\x12\x17\n" +
"\atask_id\x18\x01 \x01(\tR\x06taskId\x12=\n" +
"\fenqueue_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\venqueueTimeB)Z'github.com/hibiken/asynq/internal/protob\x06proto3"
var (
file_asynq_proto_rawDescOnce sync.Once
file_asynq_proto_rawDescData = file_asynq_proto_rawDesc
file_asynq_proto_rawDescData []byte
)
func file_asynq_proto_rawDescGZIP() []byte {
file_asynq_proto_rawDescOnce.Do(func() {
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData)
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)))
})
return file_asynq_proto_rawDescData
}
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_asynq_proto_goTypes = []any{
(*TaskMessage)(nil), // 0: asynq.TaskMessage
(*ServerInfo)(nil), // 1: asynq.ServerInfo
(*WorkerInfo)(nil), // 2: asynq.WorkerInfo
(*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry
(*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent
nil, // 5: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp
nil, // 5: asynq.TaskMessage.HeadersEntry
nil, // 6: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
}
var file_asynq_proto_depIdxs = []int32{
5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
7, // [7:7] is the sub-list for method output_type
7, // [7:7] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
5, // 0: asynq.TaskMessage.headers:type_name -> asynq.TaskMessage.HeadersEntry
6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
8, // [8:8] is the sub-list for method output_type
8, // [8:8] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
}
func init() { file_asynq_proto_init() }
@@ -763,75 +718,13 @@ func file_asynq_proto_init() {
if File_asynq_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_asynq_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*TaskMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*ServerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*WorkerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[3].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEntry); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[4].Exporter = func(v any, i int) any {
switch v := v.(*SchedulerEnqueueEvent); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_asynq_proto_rawDesc,
RawDescriptor: unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)),
NumEnums: 0,
NumMessages: 6,
NumMessages: 7,
NumExtensions: 0,
NumServices: 0,
},
@@ -840,7 +733,6 @@ func file_asynq_proto_init() {
MessageInfos: file_asynq_proto_msgTypes,
}.Build()
File_asynq_proto = out.File
file_asynq_proto_rawDesc = nil
file_asynq_proto_goTypes = nil
file_asynq_proto_depIdxs = nil
}

View File

@@ -11,7 +11,7 @@ option go_package = "github.com/hibiken/asynq/internal/proto";
// TaskMessage is the internal representation of a task with additional
// metadata fields.
// Next ID: 15
// Next ID: 16
message TaskMessage {
// Type indicates the kind of the task to be performed.
string type = 1;
@@ -19,6 +19,9 @@ message TaskMessage {
// Payload holds data needed to process the task.
bytes payload = 2;
// Headers holds additional metadata for the task.
map<string, string> headers = 15;
// Unique identifier for the task.
string id = 3;

View File

@@ -230,6 +230,7 @@ func (p *processor) exec() {
ctx: ctx,
},
)
task.headers = msg.Headers
resCh <- p.perform(ctx, task)
}()
@@ -333,7 +334,7 @@ var RevokeTask = errors.New("revoke task")
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
p.errHandler.HandleError(ctx, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers), err)
}
switch {
case errors.Is(err, RevokeTask):
@@ -354,7 +355,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
d := p.retryDelayFunc(msg.Retried, e, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
retryAt := time.Now().Add(d)
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
if err != nil {

View File

@@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() {
}
func (r *recoverer) retry(msg *base.TaskMessage, err error) {
delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload))
delay := r.retryDelayFunc(msg.Retried, err, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers))
retryAt := time.Now().Add(delay)
if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)