mirror of
https://github.com/hibiken/asynq.git
synced 2026-06-18 08:22:30 +08:00
Document atomicity guarantees and fix NOSCRIPT bug in BatchEnqueue
Add comprehensive doc comments to BatchEnqueueContext, the Broker interface method, and the RDB implementation explaining that the batch uses a Redis pipeline (not MULTI/EXEC), so partial success is possible and individual Lua scripts are atomic but the batch is not. Fix a bug where Script.Run inside a pipeline only sends EVALSHA without the automatic EVAL fallback that non-pipeline calls get. On a fresh Redis (or after SCRIPT FLUSH), this caused NOSCRIPT errors for every pipeline-batched script invocation. The fix preloads the required Lua scripts before building the pipeline. Also roll back the in-memory queuesPublished cache when the pipeline fails, preventing stale entries from suppressing future SADD calls.
This commit is contained in:
37
client.go
37
client.go
@@ -426,10 +426,39 @@ type BatchEnqueueResult struct {
|
|||||||
Err error
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// BatchEnqueueContext enqueues all given tasks using a single Redis pipeline round-trip.
|
// BatchEnqueueContext enqueues multiple tasks in a single Redis pipeline round-trip,
|
||||||
// Each task gets its own result so callers can handle partial success.
|
// returning a per-task result slice aligned with the input tasks slice.
|
||||||
// Immediate and scheduled tasks are supported; unique and group tasks are
|
//
|
||||||
// rejected with an error in the corresponding BatchEnqueueResult.
|
// # Atomicity Guarantees
|
||||||
|
//
|
||||||
|
// There is no all-or-nothing guarantee across the batch. Each task is executed as
|
||||||
|
// an independent Lua script inside a Redis pipeline. Individual scripts are atomic
|
||||||
|
// (the existence check, hash write, and list/sorted-set push for one task cannot
|
||||||
|
// be partially applied), but the pipeline as a whole is not wrapped in a
|
||||||
|
// MULTI/EXEC transaction. This means:
|
||||||
|
//
|
||||||
|
// - Partial success is possible: some tasks may be enqueued while others are not.
|
||||||
|
// - A task whose ID already exists in Redis is silently skipped (treated as a
|
||||||
|
// no-op by the Lua script), and its result will still show success.
|
||||||
|
// - If the Redis pipeline call itself fails (e.g. connection lost, context
|
||||||
|
// cancelled), every task that passed client-side validation receives that
|
||||||
|
// error — none of them can be assumed to have been enqueued.
|
||||||
|
//
|
||||||
|
// # Validation Errors (pre-pipeline)
|
||||||
|
//
|
||||||
|
// The following are caught before any Redis call and rejected in the
|
||||||
|
// corresponding BatchEnqueueResult.Err without affecting other tasks:
|
||||||
|
//
|
||||||
|
// - nil task
|
||||||
|
// - empty task type name
|
||||||
|
// - invalid options
|
||||||
|
// - group tasks (not supported in batch mode)
|
||||||
|
// - unique tasks (not supported in batch mode)
|
||||||
|
//
|
||||||
|
// # Supported Task Types
|
||||||
|
//
|
||||||
|
// Immediate and scheduled (via [ProcessAt] or [ProcessIn]) tasks are supported.
|
||||||
|
// Group and unique tasks are rejected as described above.
|
||||||
func (c *Client) BatchEnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) []BatchEnqueueResult {
|
func (c *Client) BatchEnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) []BatchEnqueueResult {
|
||||||
results := make([]BatchEnqueueResult, len(tasks))
|
results := make([]BatchEnqueueResult, len(tasks))
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
|
|||||||
@@ -700,6 +700,11 @@ type Broker interface {
|
|||||||
Close() error
|
Close() error
|
||||||
Enqueue(ctx context.Context, msg *TaskMessage) error
|
Enqueue(ctx context.Context, msg *TaskMessage) error
|
||||||
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
|
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
|
||||||
|
// BatchEnqueue enqueues multiple tasks in a single round-trip. It returns the
|
||||||
|
// count of newly enqueued tasks; duplicate IDs are silently skipped. The error
|
||||||
|
// is non-nil only on infrastructure failure (e.g. lost connection), in which
|
||||||
|
// case the count is meaningless. Individual task scripts are atomic but the
|
||||||
|
// batch as a whole is not transactional — partial success is possible.
|
||||||
BatchEnqueue(ctx context.Context, items []BatchEnqueueItem) (int, error)
|
BatchEnqueue(ctx context.Context, items []BatchEnqueueItem) (int, error)
|
||||||
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
||||||
Done(ctx context.Context, msg *TaskMessage) error
|
Done(ctx context.Context, msg *TaskMessage) error
|
||||||
|
|||||||
@@ -139,23 +139,62 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BatchEnqueue adds all given tasks to their respective pending lists using a
|
|
||||||
// single Redis pipeline round-trip. It returns the number of newly enqueued
|
|
||||||
// messages (tasks whose IDs already exist in Redis are silently skipped).
|
|
||||||
// BatchEnqueue adds all given tasks to Redis using a single pipeline round-trip.
|
// BatchEnqueue adds all given tasks to Redis using a single pipeline round-trip.
|
||||||
// Each item is either enqueued immediately or scheduled based on its ProcessAt field.
|
// Each item is either enqueued immediately (ProcessAt is zero) or added to the
|
||||||
|
// scheduled sorted set.
|
||||||
|
//
|
||||||
|
// The pipeline executes independent Lua scripts per task — there is no
|
||||||
|
// MULTI/EXEC wrapping the batch, so individual tasks may succeed or fail
|
||||||
|
// independently. A task whose ID already exists in Redis is skipped by the Lua
|
||||||
|
// script (returns 0) and does not count toward the returned enqueued total.
|
||||||
|
//
|
||||||
|
// The returned int is the number of tasks that were actually written to Redis
|
||||||
|
// (i.e. whose IDs did not already exist). The returned error is non-nil only
|
||||||
|
// when the pipeline call itself fails (network error, context cancellation,
|
||||||
|
// etc.), in which case no individual result should be trusted.
|
||||||
|
//
|
||||||
|
// Message encoding errors cause an immediate return before any Redis I/O.
|
||||||
func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
|
func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
|
||||||
var op errors.Op = "rdb.BatchEnqueue"
|
var op errors.Op = "rdb.BatchEnqueue"
|
||||||
if len(items) == 0 {
|
if len(items) == 0 {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Preload Lua scripts so that EVALSHA inside the pipeline does not fail with
|
||||||
|
// NOSCRIPT. Script.Run on a pipeline only sends EVALSHA (unlike non-pipeline
|
||||||
|
// Run which retries with EVAL on NOSCRIPT).
|
||||||
|
needsEnqueue, needsSchedule := false, false
|
||||||
|
for _, item := range items {
|
||||||
|
if item.ProcessAt.IsZero() {
|
||||||
|
needsEnqueue = true
|
||||||
|
} else {
|
||||||
|
needsSchedule = true
|
||||||
|
}
|
||||||
|
if needsEnqueue && needsSchedule {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if needsEnqueue {
|
||||||
|
if err := enqueueCmd.Load(ctx, r.client).Err(); err != nil {
|
||||||
|
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load enqueue script: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if needsSchedule {
|
||||||
|
if err := scheduleCmd.Load(ctx, r.client).Err(); err != nil {
|
||||||
|
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load schedule script: %v", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pipe := r.client.Pipeline()
|
pipe := r.client.Pipeline()
|
||||||
|
|
||||||
type cmdIndex struct{ pipeIdx int }
|
// Track which pipeline slot holds each item's script result.
|
||||||
scriptCmds := make([]cmdIndex, 0, len(items))
|
scriptIdxs := make([]int, 0, len(items))
|
||||||
pipeLen := 0
|
pipeLen := 0
|
||||||
|
|
||||||
|
// Track queues we add to AllQueues in this pipeline so we can roll back the
|
||||||
|
// in-memory cache on failure.
|
||||||
|
var newQueues []string
|
||||||
|
|
||||||
now := r.clock.Now().UnixNano()
|
now := r.clock.Now().UnixNano()
|
||||||
|
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
@@ -166,6 +205,7 @@ func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (
|
|||||||
if _, found := r.queuesPublished.Load(item.Msg.Queue); !found {
|
if _, found := r.queuesPublished.Load(item.Msg.Queue); !found {
|
||||||
pipe.SAdd(ctx, base.AllQueues, item.Msg.Queue)
|
pipe.SAdd(ctx, base.AllQueues, item.Msg.Queue)
|
||||||
r.queuesPublished.Store(item.Msg.Queue, true)
|
r.queuesPublished.Store(item.Msg.Queue, true)
|
||||||
|
newQueues = append(newQueues, item.Msg.Queue)
|
||||||
pipeLen++
|
pipeLen++
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,21 +224,24 @@ func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (
|
|||||||
argv := []interface{}{encoded, item.ProcessAt.Unix(), item.Msg.ID}
|
argv := []interface{}{encoded, item.ProcessAt.Unix(), item.Msg.ID}
|
||||||
scheduleCmd.Run(ctx, pipe, keys, argv...)
|
scheduleCmd.Run(ctx, pipe, keys, argv...)
|
||||||
}
|
}
|
||||||
scriptCmds = append(scriptCmds, cmdIndex{pipeIdx: pipeLen})
|
scriptIdxs = append(scriptIdxs, pipeLen)
|
||||||
pipeLen++
|
pipeLen++
|
||||||
}
|
}
|
||||||
|
|
||||||
cmds, err := pipe.Exec(ctx)
|
cmds, err := pipe.Exec(ctx)
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
|
for _, q := range newQueues {
|
||||||
|
r.queuesPublished.Delete(q)
|
||||||
|
}
|
||||||
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis pipeline error: %v", err))
|
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis pipeline error: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
enqueued := 0
|
enqueued := 0
|
||||||
for _, sc := range scriptCmds {
|
for _, idx := range scriptIdxs {
|
||||||
if sc.pipeIdx >= len(cmds) {
|
if idx >= len(cmds) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
res, err := cmds[sc.pipeIdx].(*redis.Cmd).Result()
|
res, err := cmds[idx].(*redis.Cmd).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user