diff --git a/aggregator.go b/aggregator.go index 9f8da70..5801d87 100644 --- a/aggregator.go +++ b/aggregator.go @@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) { } tasks := make([]*Task, len(msgs)) for i, m := range msgs { - tasks[i] = NewTask(m.Type, m.Payload) + tasks[i] = NewTaskWithHeaders(m.Type, m.Payload, m.Headers) } aggregatedTask := a.ga.Aggregate(gname, tasks) ctx, cancel := context.WithDeadline(context.Background(), deadline) diff --git a/asynq.go b/asynq.go index 084dc63..c27d0f7 100644 --- a/asynq.go +++ b/asynq.go @@ -8,6 +8,7 @@ import ( "context" "crypto/tls" "fmt" + "maps" "net" "net/url" "strconv" @@ -26,6 +27,9 @@ type Task struct { // payload holds data needed to perform the task. payload []byte + // headers holds additional metadata for the task. + headers map[string]string + // opts holds options for the task. opts []Option @@ -33,8 +37,9 @@ type Task struct { w *ResultWriter } -func (t *Task) Type() string { return t.typename } -func (t *Task) Payload() []byte { return t.payload } +func (t *Task) Type() string { return t.typename } +func (t *Task) Payload() []byte { return t.payload } +func (t *Task) Headers() map[string]string { return t.headers } // ResultWriter returns a pointer to the ResultWriter associated with the task. // @@ -48,6 +53,21 @@ func NewTask(typename string, payload []byte, opts ...Option) *Task { return &Task{ typename: typename, payload: payload, + headers: nil, + opts: opts, + } +} + +// NewTaskWithHeaders returns a new Task given a type name, payload data, and headers. +// Options can be passed to configure task processing behavior. +// TODO: In the next major (breaking) release, fold this functionality into NewTask +// +// so that headers are supported directly. After that, remove this method. +func NewTaskWithHeaders(typename string, payload []byte, headers map[string]string, opts ...Option) *Task { + return &Task{ + typename: typename, + payload: payload, + headers: maps.Clone(headers), opts: opts, } } @@ -57,6 +77,7 @@ func newTask(typename string, payload []byte, w *ResultWriter) *Task { return &Task{ typename: typename, payload: payload, + headers: make(map[string]string), w: w, } } @@ -75,6 +96,9 @@ type TaskInfo struct { // Payload is the payload data of the task. Payload []byte + // Headers holds additional metadata for the task. + Headers map[string]string + // State indicates the task state. State TaskState @@ -145,6 +169,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time Queue: msg.Queue, Type: msg.Type, Payload: msg.Payload, // Do we need to make a copy? + Headers: msg.Headers, MaxRetry: msg.Retry, Retried: msg.Retried, LastErr: msg.ErrorMsg, diff --git a/client.go b/client.go index 4a0ba39..ab2bcbb 100644 --- a/client.go +++ b/client.go @@ -385,6 +385,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) ID: opt.taskID, Type: task.Type(), Payload: task.Payload(), + Headers: task.Headers(), Queue: opt.queue, Retry: opt.retry, Deadline: deadline.Unix(), diff --git a/client_test.go b/client_test.go index 62aefc5..e4104f5 100644 --- a/client_test.go +++ b/client_test.go @@ -1191,3 +1191,473 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { } } } + +func TestClientEnqueueWithHeaders(t *testing.T) { + r := setup(t) + client := NewClient(getRedisConnOpt(t)) + defer client.Close() + + now := time.Now() + headers := map[string]string{ + "user-id": "123", + "request-id": "abc-def-ghi", + "priority": "high", + } + + tests := []struct { + desc string + task *Task + opts []Option + wantInfo *TaskInfo + wantPending map[string][]*base.TaskMessage + }{ + { + desc: "Task with headers", + task: NewTaskWithHeaders("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}), headers), + opts: []Option{}, + wantInfo: &TaskInfo{ + Queue: "default", + Type: "send_email", + Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}), + Headers: headers, + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, + }, + wantPending: map[string][]*base.TaskMessage{ + "default": { + { + Type: "send_email", + Payload: h.JSON(map[string]interface{}{"to": "user@example.com"}), + Headers: headers, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + }, + }, + }, + { + desc: "Task with empty headers", + task: NewTaskWithHeaders("process_data", []byte("data"), map[string]string{}), + opts: []Option{}, + wantInfo: &TaskInfo{ + Queue: "default", + Type: "process_data", + Payload: []byte("data"), + Headers: map[string]string{}, + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, + }, + wantPending: map[string][]*base.TaskMessage{ + "default": { + { + Type: "process_data", + Payload: []byte("data"), + Headers: nil, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + }, + }, + }, + { + desc: "Task with nil headers", + task: NewTaskWithHeaders("cleanup", nil, nil), + opts: []Option{}, + wantInfo: &TaskInfo{ + Queue: "default", + Type: "cleanup", + Payload: nil, + Headers: nil, + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, + }, + wantPending: map[string][]*base.TaskMessage{ + "default": { + { + Type: "cleanup", + Payload: nil, + Headers: nil, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + }, + }, + }, + { + desc: "Task with headers and custom options", + task: NewTaskWithHeaders("notify", []byte("notification"), map[string]string{"channel": "email"}), + opts: []Option{MaxRetry(5), Queue("notifications")}, + wantInfo: &TaskInfo{ + Queue: "notifications", + Type: "notify", + Payload: []byte("notification"), + Headers: map[string]string{"channel": "email"}, + State: TaskStatePending, + MaxRetry: 5, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, + }, + wantPending: map[string][]*base.TaskMessage{ + "notifications": { + { + Type: "notify", + Payload: []byte("notification"), + Headers: map[string]string{"channel": "email"}, + Retry: 5, + Queue: "notifications", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + }, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + + gotInfo, err := client.Enqueue(tc.task, tc.opts...) + if err != nil { + t.Errorf("%s: Enqueue failed: %v", tc.desc, err) + continue + } + + cmpOptions := []cmp.Option{ + cmpopts.IgnoreFields(TaskInfo{}, "ID"), + cmpopts.EquateApproxTime(500 * time.Millisecond), + } + if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { + t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s", + tc.desc, gotInfo, tc.wantInfo, diff) + } + + for qname, want := range tc.wantPending { + got := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff) + } + } + } +} + +func TestClientEnqueueWithHeadersScheduled(t *testing.T) { + r := setup(t) + client := NewClient(getRedisConnOpt(t)) + defer client.Close() + + now := time.Now() + oneHourLater := now.Add(time.Hour) + headers := map[string]string{ + "correlation-id": "xyz-123", + "source": "api", + } + + tests := []struct { + desc string + task *Task + processAt time.Time + opts []Option + wantInfo *TaskInfo + wantScheduled map[string][]base.Z + }{ + { + desc: "Schedule task with headers", + task: NewTaskWithHeaders("scheduled_task", []byte("payload"), headers), + processAt: oneHourLater, + opts: []Option{}, + wantInfo: &TaskInfo{ + Queue: "default", + Type: "scheduled_task", + Payload: []byte("payload"), + Headers: headers, + State: TaskStateScheduled, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: oneHourLater, + }, + wantScheduled: map[string][]base.Z{ + "default": { + { + Message: &base.TaskMessage{ + Type: "scheduled_task", + Payload: []byte("payload"), + Headers: headers, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + Score: oneHourLater.Unix(), + }, + }, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + + opts := append(tc.opts, ProcessAt(tc.processAt)) + gotInfo, err := client.Enqueue(tc.task, opts...) + if err != nil { + t.Errorf("%s: Enqueue failed: %v", tc.desc, err) + continue + } + + cmpOptions := []cmp.Option{ + cmpopts.IgnoreFields(TaskInfo{}, "ID"), + cmpopts.EquateApproxTime(500 * time.Millisecond), + } + if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { + t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s", + tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff) + } + + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff) + } + } + } +} + +func TestNewTaskWithHeaders(t *testing.T) { + tests := []struct { + desc string + typename string + payload []byte + headers map[string]string + opts []Option + want *Task + }{ + { + desc: "Task with headers", + typename: "test_task", + payload: []byte("test payload"), + headers: map[string]string{"key1": "value1", "key2": "value2"}, + opts: []Option{MaxRetry(3)}, + want: &Task{ + typename: "test_task", + payload: []byte("test payload"), + headers: map[string]string{"key1": "value1", "key2": "value2"}, + opts: []Option{MaxRetry(3)}, + }, + }, + { + desc: "Task with empty headers", + typename: "empty_headers", + payload: nil, + headers: map[string]string{}, + opts: nil, + want: &Task{ + typename: "empty_headers", + payload: nil, + headers: map[string]string{}, + opts: nil, + }, + }, + { + desc: "Task with nil headers", + typename: "nil_headers", + payload: []byte("data"), + headers: nil, + opts: []Option{Queue("test")}, + want: &Task{ + typename: "nil_headers", + payload: []byte("data"), + headers: nil, + opts: []Option{Queue("test")}, + }, + }, + } + + for _, tc := range tests { + got := NewTaskWithHeaders(tc.typename, tc.payload, tc.headers, tc.opts...) + + if got.Type() != tc.want.typename { + t.Errorf("%s: Type() = %q, want %q", tc.desc, got.Type(), tc.want.typename) + } + + if diff := cmp.Diff(tc.want.payload, got.Payload()); diff != "" { + t.Errorf("%s: Payload() mismatch (-want,+got)\n%s", tc.desc, diff) + } + + if diff := cmp.Diff(tc.want.headers, got.Headers()); diff != "" { + t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff) + } + + if tc.headers != nil && got.Headers() != nil { + tc.headers["modified"] = "test" + if _, exists := got.Headers()["modified"]; exists { + t.Errorf("%s: Headers should be cloned, but modification affected task headers", tc.desc) + } + } + } +} + +func TestTaskHeadersMethod(t *testing.T) { + tests := []struct { + desc string + task *Task + want map[string]string + wantNil bool + }{ + { + desc: "Task created with NewTask has nil headers", + task: NewTask("test", []byte("data")), + want: nil, + wantNil: true, + }, + { + desc: "Task created with NewTaskWithHeaders has headers", + task: NewTaskWithHeaders("test", []byte("data"), map[string]string{"key": "value"}), + want: map[string]string{"key": "value"}, + }, + { + desc: "Task created with empty headers", + task: NewTaskWithHeaders("test", []byte("data"), map[string]string{}), + want: map[string]string{}, + }, + { + desc: "Task created with nil headers", + task: NewTaskWithHeaders("test", []byte("data"), nil), + want: nil, + wantNil: true, + }, + } + + for _, tc := range tests { + got := tc.task.Headers() + + if tc.wantNil { + if got != nil { + t.Errorf("%s: Headers() = %v, want nil", tc.desc, got) + } + } else { + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("%s: Headers() mismatch (-want,+got)\n%s", tc.desc, diff) + } + } + } +} + +func TestClientEnqueueWithHeadersAndGroup(t *testing.T) { + r := setup(t) + client := NewClient(getRedisConnOpt(t)) + defer client.Close() + + now := time.Now() + headers := map[string]string{ + "batch-id": "batch-123", + "priority": "high", + } + + tests := []struct { + desc string + task *Task + opts []Option + wantInfo *TaskInfo + wantGroups map[string]map[string][]base.Z + }{ + { + desc: "Task with headers and group", + task: NewTaskWithHeaders("batch_process", []byte("item1"), headers), + opts: []Option{Group("batch-123")}, + wantInfo: &TaskInfo{ + Queue: "default", + Group: "batch-123", + Type: "batch_process", + Payload: []byte("item1"), + Headers: headers, + State: TaskStateAggregating, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: time.Time{}, + }, + wantGroups: map[string]map[string][]base.Z{ + "default": { + "batch-123": { + { + Message: &base.TaskMessage{ + Type: "batch_process", + Payload: []byte("item1"), + Headers: headers, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + GroupKey: "batch-123", + }, + Score: now.Unix(), + }, + }, + }, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + + gotInfo, err := client.Enqueue(tc.task, tc.opts...) + if err != nil { + t.Errorf("%s: Enqueue failed: %v", tc.desc, err) + continue + } + + cmpOptions := []cmp.Option{ + cmpopts.IgnoreFields(TaskInfo{}, "ID"), + cmpopts.EquateApproxTime(500 * time.Millisecond), + } + if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { + t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s", + tc.desc, gotInfo, tc.wantInfo, diff) + } + + for qname, groups := range tc.wantGroups { + for groupKey, want := range groups { + got := h.GetGroupEntries(t, r, qname, groupKey) + if diff := cmp.Diff(want, got, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.GroupKey(qname, groupKey), diff) + } + } + } + } +} diff --git a/internal/base/base.go b/internal/base/base.go index 90e635e..c8be8be 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -240,6 +240,9 @@ type TaskMessage struct { // Payload holds data needed to process the task. Payload []byte + // Headers holds additional metadata for the task. + Headers map[string]string + // ID is a unique identifier for each task. ID string @@ -304,6 +307,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { return proto.Marshal(&pb.TaskMessage{ Type: msg.Type, Payload: msg.Payload, + Headers: msg.Headers, Id: msg.ID, Queue: msg.Queue, Retry: int32(msg.Retry), @@ -328,6 +332,7 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { return &TaskMessage{ Type: pbmsg.GetType(), Payload: pbmsg.GetPayload(), + Headers: pbmsg.GetHeaders(), ID: pbmsg.GetId(), Queue: pbmsg.GetQueue(), Retry: int(pbmsg.GetRetry()), diff --git a/internal/proto/asynq.pb.go b/internal/proto/asynq.pb.go index a7d77be..548c82e 100644 --- a/internal/proto/asynq.pb.go +++ b/internal/proto/asynq.pb.go @@ -4,8 +4,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2 -// protoc v3.19.6 +// protoc-gen-go v1.36.6 +// protoc v5.29.3 // source: asynq.proto package proto @@ -16,6 +16,7 @@ import ( timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" + unsafe "unsafe" ) const ( @@ -27,16 +28,15 @@ const ( // TaskMessage is the internal representation of a task with additional // metadata fields. -// Next ID: 15 +// Next ID: 16 type TaskMessage struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Type indicates the kind of the task to be performed. Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` // Payload holds data needed to process the task. Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + // Headers holds additional metadata for the task. + Headers map[string]string `protobuf:"bytes,15,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` // Unique identifier for the task. Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` // Name of the queue to which this task belongs. @@ -71,16 +71,16 @@ type TaskMessage struct { // Time when the task completed in success in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // This field is populated if result_ttl > 0 upon completion. - CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"` + CompletedAt int64 `protobuf:"varint,13,opt,name=completed_at,json=completedAt,proto3" json:"completed_at,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *TaskMessage) Reset() { *x = TaskMessage{} - if protoimpl.UnsafeEnabled { - mi := &file_asynq_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_asynq_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *TaskMessage) String() string { @@ -91,7 +91,7 @@ func (*TaskMessage) ProtoMessage() {} func (x *TaskMessage) ProtoReflect() protoreflect.Message { mi := &file_asynq_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -120,6 +120,13 @@ func (x *TaskMessage) GetPayload() []byte { return nil } +func (x *TaskMessage) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + func (x *TaskMessage) GetId() string { if x != nil { return x.Id @@ -206,10 +213,7 @@ func (x *TaskMessage) GetCompletedAt() int64 { // ServerInfo holds information about a running server. type ServerInfo struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Host machine the server is running on. Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` // PID of the server process. @@ -221,7 +225,7 @@ type ServerInfo struct { // List of queue names with their priorities. // The server will consume tasks from the queues and prioritize // queues with higher priority numbers. - Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` // If set, the server will always consume tasks from a queue with higher // priority. StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"` @@ -231,15 +235,15 @@ type ServerInfo struct { StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` // Number of workers currently processing tasks. ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ServerInfo) Reset() { *x = ServerInfo{} - if protoimpl.UnsafeEnabled { - mi := &file_asynq_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_asynq_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *ServerInfo) String() string { @@ -250,7 +254,7 @@ func (*ServerInfo) ProtoMessage() {} func (x *ServerInfo) ProtoReflect() protoreflect.Message { mi := &file_asynq_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -330,10 +334,7 @@ func (x *ServerInfo) GetActiveWorkerCount() int32 { // WorkerInfo holds information about a running worker. type WorkerInfo struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Host matchine this worker is running on. Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` // PID of the process in which this worker is running. @@ -352,16 +353,16 @@ type WorkerInfo struct { StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` // Deadline by which the worker needs to complete processing // the task. If worker exceeds the deadline, the task will fail. - Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"` + Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *WorkerInfo) Reset() { *x = WorkerInfo{} - if protoimpl.UnsafeEnabled { - mi := &file_asynq_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_asynq_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *WorkerInfo) String() string { @@ -372,7 +373,7 @@ func (*WorkerInfo) ProtoMessage() {} func (x *WorkerInfo) ProtoReflect() protoreflect.Message { mi := &file_asynq_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -453,10 +454,7 @@ func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp { // SchedulerEntry holds information about a periodic task registered // with a scheduler. type SchedulerEntry struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Identifier of the scheduler entry. Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // Periodic schedule spec of the entry. @@ -472,15 +470,15 @@ type SchedulerEntry struct { // Last time the task was enqueued. // Zero time if task was never enqueued. PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *SchedulerEntry) Reset() { *x = SchedulerEntry{} - if protoimpl.UnsafeEnabled { - mi := &file_asynq_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_asynq_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *SchedulerEntry) String() string { @@ -491,7 +489,7 @@ func (*SchedulerEntry) ProtoMessage() {} func (x *SchedulerEntry) ProtoReflect() protoreflect.Message { mi := &file_asynq_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -558,23 +556,20 @@ func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp { // SchedulerEnqueueEvent holds information about an enqueue event // by a scheduler. type SchedulerEnqueueEvent struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // ID of the task that was enqueued. TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` // Time the task was enqueued. - EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"` + EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *SchedulerEnqueueEvent) Reset() { *x = SchedulerEnqueueEvent{} - if protoimpl.UnsafeEnabled { - mi := &file_asynq_proto_msgTypes[4] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_asynq_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *SchedulerEnqueueEvent) String() string { @@ -585,7 +580,7 @@ func (*SchedulerEnqueueEvent) ProtoMessage() {} func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message { mi := &file_asynq_proto_msgTypes[4] - if protoimpl.UnsafeEnabled && x != nil { + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -616,146 +611,106 @@ func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp { var File_asynq_proto protoreflect.FileDescriptor -var file_asynq_proto_rawDesc = []byte{ - 0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61, - 0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, - 0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74, - 0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12, - 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66, - 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, - 0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07, - 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, - 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, - 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, - 0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, - 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, - 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c, - 0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, - 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, - 0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, - 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, - 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, - 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, - 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, - 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, - 0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, - 0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74, - 0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72, - 0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73, - 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, - 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, - 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20, - 0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, - 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, - 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, - 0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, - 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, - 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, - 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05, - 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, - 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, - 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a, - 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61, - 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, - 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09, - 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, - 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f, - 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, - 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e, - 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65, - 0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a, - 0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, - 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, - 0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, - 0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17, - 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65, - 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65, - 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79, - 0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} +const file_asynq_proto_rawDesc = "" + + "\n" + + "\vasynq.proto\x12\x05asynq\x1a\x1fgoogle/protobuf/timestamp.proto\"\xfe\x03\n" + + "\vTaskMessage\x12\x12\n" + + "\x04type\x18\x01 \x01(\tR\x04type\x12\x18\n" + + "\apayload\x18\x02 \x01(\fR\apayload\x129\n" + + "\aheaders\x18\x0f \x03(\v2\x1f.asynq.TaskMessage.HeadersEntryR\aheaders\x12\x0e\n" + + "\x02id\x18\x03 \x01(\tR\x02id\x12\x14\n" + + "\x05queue\x18\x04 \x01(\tR\x05queue\x12\x14\n" + + "\x05retry\x18\x05 \x01(\x05R\x05retry\x12\x18\n" + + "\aretried\x18\x06 \x01(\x05R\aretried\x12\x1b\n" + + "\terror_msg\x18\a \x01(\tR\berrorMsg\x12$\n" + + "\x0elast_failed_at\x18\v \x01(\x03R\flastFailedAt\x12\x18\n" + + "\atimeout\x18\b \x01(\x03R\atimeout\x12\x1a\n" + + "\bdeadline\x18\t \x01(\x03R\bdeadline\x12\x1d\n" + + "\n" + + "unique_key\x18\n" + + " \x01(\tR\tuniqueKey\x12\x1b\n" + + "\tgroup_key\x18\x0e \x01(\tR\bgroupKey\x12\x1c\n" + + "\tretention\x18\f \x01(\x03R\tretention\x12!\n" + + "\fcompleted_at\x18\r \x01(\x03R\vcompletedAt\x1a:\n" + + "\fHeadersEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x8f\x03\n" + + "\n" + + "ServerInfo\x12\x12\n" + + "\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" + + "\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" + + "\tserver_id\x18\x03 \x01(\tR\bserverId\x12 \n" + + "\vconcurrency\x18\x04 \x01(\x05R\vconcurrency\x125\n" + + "\x06queues\x18\x05 \x03(\v2\x1d.asynq.ServerInfo.QueuesEntryR\x06queues\x12'\n" + + "\x0fstrict_priority\x18\x06 \x01(\bR\x0estrictPriority\x12\x16\n" + + "\x06status\x18\a \x01(\tR\x06status\x129\n" + + "\n" + + "start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x12.\n" + + "\x13active_worker_count\x18\t \x01(\x05R\x11activeWorkerCount\x1a9\n" + + "\vQueuesEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\x05R\x05value:\x028\x01\"\xb1\x02\n" + + "\n" + + "WorkerInfo\x12\x12\n" + + "\x04host\x18\x01 \x01(\tR\x04host\x12\x10\n" + + "\x03pid\x18\x02 \x01(\x05R\x03pid\x12\x1b\n" + + "\tserver_id\x18\x03 \x01(\tR\bserverId\x12\x17\n" + + "\atask_id\x18\x04 \x01(\tR\x06taskId\x12\x1b\n" + + "\ttask_type\x18\x05 \x01(\tR\btaskType\x12!\n" + + "\ftask_payload\x18\x06 \x01(\fR\vtaskPayload\x12\x14\n" + + "\x05queue\x18\a \x01(\tR\x05queue\x129\n" + + "\n" + + "start_time\x18\b \x01(\v2\x1a.google.protobuf.TimestampR\tstartTime\x126\n" + + "\bdeadline\x18\t \x01(\v2\x1a.google.protobuf.TimestampR\bdeadline\"\xad\x02\n" + + "\x0eSchedulerEntry\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x12\n" + + "\x04spec\x18\x02 \x01(\tR\x04spec\x12\x1b\n" + + "\ttask_type\x18\x03 \x01(\tR\btaskType\x12!\n" + + "\ftask_payload\x18\x04 \x01(\fR\vtaskPayload\x12'\n" + + "\x0fenqueue_options\x18\x05 \x03(\tR\x0eenqueueOptions\x12F\n" + + "\x11next_enqueue_time\x18\x06 \x01(\v2\x1a.google.protobuf.TimestampR\x0fnextEnqueueTime\x12F\n" + + "\x11prev_enqueue_time\x18\a \x01(\v2\x1a.google.protobuf.TimestampR\x0fprevEnqueueTime\"o\n" + + "\x15SchedulerEnqueueEvent\x12\x17\n" + + "\atask_id\x18\x01 \x01(\tR\x06taskId\x12=\n" + + "\fenqueue_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\venqueueTimeB)Z'github.com/hibiken/asynq/internal/protob\x06proto3" var ( file_asynq_proto_rawDescOnce sync.Once - file_asynq_proto_rawDescData = file_asynq_proto_rawDesc + file_asynq_proto_rawDescData []byte ) func file_asynq_proto_rawDescGZIP() []byte { file_asynq_proto_rawDescOnce.Do(func() { - file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData) + file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc))) }) return file_asynq_proto_rawDescData } -var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_asynq_proto_goTypes = []any{ (*TaskMessage)(nil), // 0: asynq.TaskMessage (*ServerInfo)(nil), // 1: asynq.ServerInfo (*WorkerInfo)(nil), // 2: asynq.WorkerInfo (*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry (*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent - nil, // 5: asynq.ServerInfo.QueuesEntry - (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp + nil, // 5: asynq.TaskMessage.HeadersEntry + nil, // 6: asynq.ServerInfo.QueuesEntry + (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp } var file_asynq_proto_depIdxs = []int32{ - 5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry - 6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp - 6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp - 6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp - 6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp - 6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp - 6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp - 7, // [7:7] is the sub-list for method output_type - 7, // [7:7] is the sub-list for method input_type - 7, // [7:7] is the sub-list for extension type_name - 7, // [7:7] is the sub-list for extension extendee - 0, // [0:7] is the sub-list for field type_name + 5, // 0: asynq.TaskMessage.headers:type_name -> asynq.TaskMessage.HeadersEntry + 6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry + 7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp + 7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp + 7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp + 7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp + 7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp + 7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_asynq_proto_init() } @@ -763,75 +718,13 @@ func file_asynq_proto_init() { if File_asynq_proto != nil { return } - if !protoimpl.UnsafeEnabled { - file_asynq_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*TaskMessage); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_asynq_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*ServerInfo); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_asynq_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*WorkerInfo); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_asynq_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*SchedulerEntry); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_asynq_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*SchedulerEnqueueEvent); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_asynq_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_asynq_proto_rawDesc), len(file_asynq_proto_rawDesc)), NumEnums: 0, - NumMessages: 6, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, @@ -840,7 +733,6 @@ func file_asynq_proto_init() { MessageInfos: file_asynq_proto_msgTypes, }.Build() File_asynq_proto = out.File - file_asynq_proto_rawDesc = nil file_asynq_proto_goTypes = nil file_asynq_proto_depIdxs = nil } diff --git a/internal/proto/asynq.proto b/internal/proto/asynq.proto index 777aa98..87be2c0 100644 --- a/internal/proto/asynq.proto +++ b/internal/proto/asynq.proto @@ -11,7 +11,7 @@ option go_package = "github.com/hibiken/asynq/internal/proto"; // TaskMessage is the internal representation of a task with additional // metadata fields. -// Next ID: 15 +// Next ID: 16 message TaskMessage { // Type indicates the kind of the task to be performed. string type = 1; @@ -19,6 +19,9 @@ message TaskMessage { // Payload holds data needed to process the task. bytes payload = 2; + // Headers holds additional metadata for the task. + map headers = 15; + // Unique identifier for the task. string id = 3; diff --git a/processor.go b/processor.go index fa810d6..8d0ca6e 100644 --- a/processor.go +++ b/processor.go @@ -230,6 +230,7 @@ func (p *processor) exec() { ctx: ctx, }, ) + task.headers = msg.Headers resCh <- p.perform(ctx, task) }() @@ -333,7 +334,7 @@ var RevokeTask = errors.New("revoke task") func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { if p.errHandler != nil { - p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) + p.errHandler.HandleError(ctx, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers), err) } switch { case errors.Is(err, RevokeTask): @@ -354,7 +355,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu } ctx, cancel := context.WithDeadline(context.Background(), l.Deadline()) defer cancel() - d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) + d := p.retryDelayFunc(msg.Retried, e, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers)) retryAt := time.Now().Add(d) err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure) if err != nil { diff --git a/recoverer.go b/recoverer.go index e350aa7..9899bf9 100644 --- a/recoverer.go +++ b/recoverer.go @@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() { } func (r *recoverer) retry(msg *base.TaskMessage, err error) { - delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload)) + delay := r.retryDelayFunc(msg.Retried, err, NewTaskWithHeaders(msg.Type, msg.Payload, msg.Headers)) retryAt := time.Now().Add(delay) if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil { r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)