2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-06-18 02:17:01 +08:00

feat: Add an option to specify headers

This commit is contained in:
kayoch1n
2026-03-17 12:29:58 +08:00
parent d704b68a42
commit 5c391f3ffb
4 changed files with 110 additions and 1 deletions

View File

@@ -6,7 +6,9 @@ package asynq
import (
"context"
"encoding/json"
"fmt"
"maps"
"strings"
"time"
@@ -60,6 +62,7 @@ const (
TaskIDOpt
RetentionOpt
GroupOpt
HeaderOpt
)
// Option specifies the task processing behavior.
@@ -86,6 +89,7 @@ type (
processInOption time.Duration
retentionOption time.Duration
groupOption string
headerOption [2]string
)
// MaxRetry returns an option to specify the max number of times
@@ -217,6 +221,19 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st
func (name groupOption) Type() OptionType { return GroupOpt }
func (name groupOption) Value() interface{} { return string(name) }
// Header returns an option to associate the key-value header to the task.
func Header(key, value string) Option {
return headerOption{key, value}
}
func (h headerOption) String() string {
var bytes []byte
bytes, _ = json.Marshal(h)
return fmt.Sprintf("Header(%s)", bytes)
}
func (h headerOption) Type() OptionType { return HeaderOpt }
func (h headerOption) Value() interface{} { return [2]string{h[0], h[1]} }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
@@ -237,6 +254,7 @@ type option struct {
processAt time.Time
retention time.Duration
group string
headers map[string]string
}
// composeOptions merges user provided options into the default options
@@ -251,6 +269,7 @@ func composeOptions(opts ...Option) (option, error) {
timeout: 0, // do not set to defaultTimeout here
deadline: time.Time{},
processAt: time.Now(),
headers: make(map[string]string),
}
for _, opt := range opts {
switch opt := opt.(type) {
@@ -290,6 +309,9 @@ func composeOptions(opts ...Option) (option, error) {
return option{}, errors.New("group key cannot be empty")
}
res.group = key
case headerOption:
key, value := opt[0], opt[1]
res.headers[key] = value
default:
// ignore unexpected option
}
@@ -385,7 +407,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: task.Headers(),
Headers: maps.Clone(task.Headers()),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
@@ -394,6 +416,12 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
GroupKey: opt.group,
Retention: int64(opt.retention.Seconds()),
}
if len(opt.headers) > 0 {
if msg.Headers == nil {
msg.Headers = make(map[string]string)
}
maps.Copy(msg.Headers, opt.headers)
}
now := time.Now()
var state base.TaskState
if opt.processAt.After(now) {

View File

@@ -1339,6 +1339,70 @@ func TestClientEnqueueWithHeaders(t *testing.T) {
},
},
},
{
desc: "Task with header option",
task: NewTask("store_data", []byte("data"), Header("channel", "email")),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email"},
State: TaskStatePending,
MaxRetry: 25,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email"},
Retry: 25,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Enqueue task with header option",
task: NewTask("store_data", []byte("data")),
opts: []Option{Header("channel", "email"), MaxRetry(5)},
wantInfo: &TaskInfo{
Queue: "default",
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email"},
Retry: 5,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
}
for _, tc := range tests {

View File

@@ -5,6 +5,7 @@
package asynq
import (
"encoding/json"
"fmt"
"strconv"
"strings"
@@ -1002,6 +1003,13 @@ func parseOption(s string) (Option, error) {
return nil, err
}
return Retention(d), nil
case "Header":
var h [2]string
err := json.Unmarshal([]byte(arg), &h)
if err != nil {
return nil, err
}
return Header(h[0], h[1]), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}

View File

@@ -3526,6 +3526,7 @@ func TestParseOption(t *testing.T) {
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
{`Retention(24h)`, RetentionOpt, 24 * time.Hour},
{`Header(["email", "hello@example.com"])`, HeaderOpt, [2]string{"email", "hello@example.com"}},
}
for _, tc := range tests {
@@ -3573,6 +3574,14 @@ func TestParseOption(t *testing.T) {
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case HeaderOpt:
gotVal, ok := got.Value().([2]string)
if !ok {
t.Fatal("returned Option with non array value")
}
if gotVal != tc.wantVal.([2]string) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
default:
t.Fatalf("returned Option with unexpected type: %v", got.Type())
}