diff --git a/client.go b/client.go index 544b0d7..945381c 100644 --- a/client.go +++ b/client.go @@ -426,10 +426,39 @@ type BatchEnqueueResult struct { Err error } -// BatchEnqueueContext enqueues all given tasks using a single Redis pipeline round-trip. -// Each task gets its own result so callers can handle partial success. -// Immediate and scheduled tasks are supported; unique and group tasks are -// rejected with an error in the corresponding BatchEnqueueResult. +// BatchEnqueueContext enqueues multiple tasks in a single Redis pipeline round-trip, +// returning a per-task result slice aligned with the input tasks slice. +// +// # Atomicity Guarantees +// +// There is no all-or-nothing guarantee across the batch. Each task is executed as +// an independent Lua script inside a Redis pipeline. Individual scripts are atomic +// (the existence check, hash write, and list/sorted-set push for one task cannot +// be partially applied), but the pipeline as a whole is not wrapped in a +// MULTI/EXEC transaction. This means: +// +// - Partial success is possible: some tasks may be enqueued while others are not. +// - A task whose ID already exists in Redis is silently skipped (treated as a +// no-op by the Lua script), and its result will still show success. +// - If the Redis pipeline call itself fails (e.g. connection lost, context +// cancelled), every task that passed client-side validation receives that +// error — none of them can be assumed to have been enqueued. +// +// # Validation Errors (pre-pipeline) +// +// The following are caught before any Redis call and rejected in the +// corresponding BatchEnqueueResult.Err without affecting other tasks: +// +// - nil task +// - empty task type name +// - invalid options +// - group tasks (not supported in batch mode) +// - unique tasks (not supported in batch mode) +// +// # Supported Task Types +// +// Immediate and scheduled (via [ProcessAt] or [ProcessIn]) tasks are supported. +// Group and unique tasks are rejected as described above. func (c *Client) BatchEnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) []BatchEnqueueResult { results := make([]BatchEnqueueResult, len(tasks)) if len(tasks) == 0 { diff --git a/internal/base/base.go b/internal/base/base.go index f5e3609..b70b30f 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -700,6 +700,11 @@ type Broker interface { Close() error Enqueue(ctx context.Context, msg *TaskMessage) error EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error + // BatchEnqueue enqueues multiple tasks in a single round-trip. It returns the + // count of newly enqueued tasks; duplicate IDs are silently skipped. The error + // is non-nil only on infrastructure failure (e.g. lost connection), in which + // case the count is meaningless. Individual task scripts are atomic but the + // batch as a whole is not transactional — partial success is possible. BatchEnqueue(ctx context.Context, items []BatchEnqueueItem) (int, error) Dequeue(qnames ...string) (*TaskMessage, time.Time, error) Done(ctx context.Context, msg *TaskMessage) error diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 13aae29..5281086 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -139,23 +139,62 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { return nil } -// BatchEnqueue adds all given tasks to their respective pending lists using a -// single Redis pipeline round-trip. It returns the number of newly enqueued -// messages (tasks whose IDs already exist in Redis are silently skipped). // BatchEnqueue adds all given tasks to Redis using a single pipeline round-trip. -// Each item is either enqueued immediately or scheduled based on its ProcessAt field. +// Each item is either enqueued immediately (ProcessAt is zero) or added to the +// scheduled sorted set. +// +// The pipeline executes independent Lua scripts per task — there is no +// MULTI/EXEC wrapping the batch, so individual tasks may succeed or fail +// independently. A task whose ID already exists in Redis is skipped by the Lua +// script (returns 0) and does not count toward the returned enqueued total. +// +// The returned int is the number of tasks that were actually written to Redis +// (i.e. whose IDs did not already exist). The returned error is non-nil only +// when the pipeline call itself fails (network error, context cancellation, +// etc.), in which case no individual result should be trusted. +// +// Message encoding errors cause an immediate return before any Redis I/O. func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) { var op errors.Op = "rdb.BatchEnqueue" if len(items) == 0 { return 0, nil } + // Preload Lua scripts so that EVALSHA inside the pipeline does not fail with + // NOSCRIPT. Script.Run on a pipeline only sends EVALSHA (unlike non-pipeline + // Run which retries with EVAL on NOSCRIPT). + needsEnqueue, needsSchedule := false, false + for _, item := range items { + if item.ProcessAt.IsZero() { + needsEnqueue = true + } else { + needsSchedule = true + } + if needsEnqueue && needsSchedule { + break + } + } + if needsEnqueue { + if err := enqueueCmd.Load(ctx, r.client).Err(); err != nil { + return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load enqueue script: %v", err)) + } + } + if needsSchedule { + if err := scheduleCmd.Load(ctx, r.client).Err(); err != nil { + return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load schedule script: %v", err)) + } + } + pipe := r.client.Pipeline() - type cmdIndex struct{ pipeIdx int } - scriptCmds := make([]cmdIndex, 0, len(items)) + // Track which pipeline slot holds each item's script result. + scriptIdxs := make([]int, 0, len(items)) pipeLen := 0 + // Track queues we add to AllQueues in this pipeline so we can roll back the + // in-memory cache on failure. + var newQueues []string + now := r.clock.Now().UnixNano() for _, item := range items { @@ -166,6 +205,7 @@ func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) ( if _, found := r.queuesPublished.Load(item.Msg.Queue); !found { pipe.SAdd(ctx, base.AllQueues, item.Msg.Queue) r.queuesPublished.Store(item.Msg.Queue, true) + newQueues = append(newQueues, item.Msg.Queue) pipeLen++ } @@ -184,21 +224,24 @@ func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) ( argv := []interface{}{encoded, item.ProcessAt.Unix(), item.Msg.ID} scheduleCmd.Run(ctx, pipe, keys, argv...) } - scriptCmds = append(scriptCmds, cmdIndex{pipeIdx: pipeLen}) + scriptIdxs = append(scriptIdxs, pipeLen) pipeLen++ } cmds, err := pipe.Exec(ctx) if err != nil && err != redis.Nil { + for _, q := range newQueues { + r.queuesPublished.Delete(q) + } return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis pipeline error: %v", err)) } enqueued := 0 - for _, sc := range scriptCmds { - if sc.pipeIdx >= len(cmds) { + for _, idx := range scriptIdxs { + if idx >= len(cmds) { continue } - res, err := cmds[sc.pipeIdx].(*redis.Cmd).Result() + res, err := cmds[idx].(*redis.Cmd).Result() if err != nil { continue }