mirror of
https://github.com/hibiken/asynq.git
synced 2026-06-29 10:54:11 +08:00
Merge pull request #1094 from fanatics-live/batch-enqueue
Add BatchEnqueue for pipelined multi-task enqueue
This commit is contained in:
136
client.go
136
client.go
@@ -456,6 +456,142 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
|
||||
return newTaskInfo(msg, state, opt.processAt, nil), nil
|
||||
}
|
||||
|
||||
// BatchEnqueueResult holds the result of enqueuing a single task within a batch.
|
||||
type BatchEnqueueResult struct {
|
||||
TaskInfo *TaskInfo
|
||||
Err error
|
||||
}
|
||||
|
||||
// BatchEnqueueContext enqueues multiple tasks in a single Redis pipeline round-trip,
|
||||
// returning a per-task result slice aligned with the input tasks slice.
|
||||
//
|
||||
// # Atomicity Guarantees
|
||||
//
|
||||
// There is no all-or-nothing guarantee across the batch. Each task is executed as
|
||||
// an independent Lua script inside a Redis pipeline. Individual scripts are atomic
|
||||
// (the existence check, hash write, and list/sorted-set push for one task cannot
|
||||
// be partially applied), but the pipeline as a whole is not wrapped in a
|
||||
// MULTI/EXEC transaction. This means:
|
||||
//
|
||||
// - Partial success is possible: some tasks may be enqueued while others are not.
|
||||
// - A task whose ID already exists in Redis is silently skipped (treated as a
|
||||
// no-op by the Lua script), and its result will still show success.
|
||||
// - If the Redis pipeline call itself fails (e.g. connection lost, context
|
||||
// cancelled), every task that passed client-side validation receives that
|
||||
// error — none of them can be assumed to have been enqueued.
|
||||
//
|
||||
// # Validation Errors (pre-pipeline)
|
||||
//
|
||||
// The following are caught before any Redis call and rejected in the
|
||||
// corresponding BatchEnqueueResult.Err without affecting other tasks:
|
||||
//
|
||||
// - nil task
|
||||
// - empty task type name
|
||||
// - invalid options
|
||||
// - group tasks (not supported in batch mode)
|
||||
// - unique tasks (not supported in batch mode)
|
||||
//
|
||||
// # Supported Task Types
|
||||
//
|
||||
// Immediate and scheduled (via [ProcessAt] or [ProcessIn]) tasks are supported.
|
||||
// Group and unique tasks are rejected as described above.
|
||||
func (c *Client) BatchEnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) []BatchEnqueueResult {
|
||||
results := make([]BatchEnqueueResult, len(tasks))
|
||||
if len(tasks) == 0 {
|
||||
return results
|
||||
}
|
||||
|
||||
type itemMeta struct {
|
||||
state base.TaskState
|
||||
processAt time.Time
|
||||
}
|
||||
|
||||
items := make([]base.BatchEnqueueItem, 0, len(tasks))
|
||||
itemIndexes := make([]int, 0, len(tasks))
|
||||
itemMetas := make([]itemMeta, 0, len(tasks))
|
||||
|
||||
for i, task := range tasks {
|
||||
if task == nil {
|
||||
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task cannot be nil")}
|
||||
continue
|
||||
}
|
||||
if strings.TrimSpace(task.Type()) == "" {
|
||||
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task typename cannot be empty")}
|
||||
continue
|
||||
}
|
||||
merged := append(task.opts, opts...)
|
||||
opt, err := composeOptions(merged...)
|
||||
if err != nil {
|
||||
results[i] = BatchEnqueueResult{Err: err}
|
||||
continue
|
||||
}
|
||||
if opt.group != "" {
|
||||
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support group tasks")}
|
||||
continue
|
||||
}
|
||||
if opt.uniqueTTL > 0 {
|
||||
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support unique tasks")}
|
||||
continue
|
||||
}
|
||||
deadline := noDeadline
|
||||
if !opt.deadline.IsZero() {
|
||||
deadline = opt.deadline
|
||||
}
|
||||
timeout := noTimeout
|
||||
if opt.timeout != 0 {
|
||||
timeout = opt.timeout
|
||||
}
|
||||
if deadline.Equal(noDeadline) && timeout == noTimeout {
|
||||
timeout = defaultTimeout
|
||||
}
|
||||
msg := &base.TaskMessage{
|
||||
ID: opt.taskID,
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Headers: task.Headers(),
|
||||
Queue: opt.queue,
|
||||
Retry: opt.retry,
|
||||
Deadline: deadline.Unix(),
|
||||
Timeout: int64(timeout.Seconds()),
|
||||
Retention: int64(opt.retention.Seconds()),
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
scheduled := opt.processAt.After(now)
|
||||
|
||||
item := base.BatchEnqueueItem{Msg: msg}
|
||||
var meta itemMeta
|
||||
if scheduled {
|
||||
item.ProcessAt = opt.processAt
|
||||
meta = itemMeta{state: base.TaskStateScheduled, processAt: opt.processAt}
|
||||
} else {
|
||||
meta = itemMeta{state: base.TaskStatePending, processAt: now}
|
||||
}
|
||||
|
||||
items = append(items, item)
|
||||
itemIndexes = append(itemIndexes, i)
|
||||
itemMetas = append(itemMetas, meta)
|
||||
}
|
||||
|
||||
if len(items) == 0 {
|
||||
return results
|
||||
}
|
||||
|
||||
_, err := c.broker.BatchEnqueue(ctx, items)
|
||||
if err != nil {
|
||||
for _, idx := range itemIndexes {
|
||||
results[idx] = BatchEnqueueResult{Err: err}
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
for j, idx := range itemIndexes {
|
||||
info := newTaskInfo(items[j].Msg, itemMetas[j].state, itemMetas[j].processAt, nil)
|
||||
results[idx] = BatchEnqueueResult{TaskInfo: info}
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
// Ping performs a ping against the redis connection.
|
||||
func (c *Client) Ping() error {
|
||||
return c.broker.Ping()
|
||||
|
||||
Reference in New Issue
Block a user