mirror of
https://github.com/hibiken/asynq.git
synced 2026-07-02 09:23:46 +08:00
Add tests for BatchEnqueue: multiple tasks, empty batch, duplicate IDs
This commit is contained in:
@@ -160,6 +160,83 @@ func TestEnqueueTaskIdConflictError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchEnqueue(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
t1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}))
|
||||
t2 := h.NewTaskMessageWithQueue("generate_csv", h.JSON(map[string]interface{}{}), "csv")
|
||||
t3 := h.NewTaskMessageWithQueue("sync", nil, "low")
|
||||
|
||||
enqueueTime := time.Now()
|
||||
r.SetClock(timeutil.NewSimulatedClock(enqueueTime))
|
||||
|
||||
t.Run("enqueue multiple tasks", func(t *testing.T) {
|
||||
h.FlushDB(t, r.client)
|
||||
msgs := []*base.TaskMessage{t1, t2, t3}
|
||||
|
||||
n, err := r.BatchEnqueue(context.Background(), msgs)
|
||||
if err != nil {
|
||||
t.Fatalf("BatchEnqueue returned error: %v", err)
|
||||
}
|
||||
if n != 3 {
|
||||
t.Errorf("BatchEnqueue returned %d, want 3", n)
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
pendingKey := base.PendingKey(msg.Queue)
|
||||
pendingIDs := r.client.LRange(context.Background(), pendingKey, 0, -1).Val()
|
||||
found := false
|
||||
for _, id := range pendingIDs {
|
||||
if id == msg.ID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("task %s not found in pending list %s", msg.ID, pendingKey)
|
||||
}
|
||||
|
||||
taskKey := base.TaskKey(msg.Queue, msg.ID)
|
||||
state := r.client.HGet(context.Background(), taskKey, "state").Val()
|
||||
if state != "pending" {
|
||||
t.Errorf("state for task %s = %q, want %q", msg.ID, state, "pending")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("empty batch", func(t *testing.T) {
|
||||
h.FlushDB(t, r.client)
|
||||
|
||||
n, err := r.BatchEnqueue(context.Background(), nil)
|
||||
if err != nil {
|
||||
t.Fatalf("BatchEnqueue(nil) returned error: %v", err)
|
||||
}
|
||||
if n != 0 {
|
||||
t.Errorf("BatchEnqueue(nil) returned %d, want 0", n)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("duplicate IDs skipped", func(t *testing.T) {
|
||||
h.FlushDB(t, r.client)
|
||||
|
||||
if err := r.Enqueue(context.Background(), t1); err != nil {
|
||||
t.Fatalf("pre-enqueue failed: %v", err)
|
||||
}
|
||||
|
||||
dup := *t1
|
||||
newMsg := h.NewTaskMessage("new_task", nil)
|
||||
|
||||
n, err := r.BatchEnqueue(context.Background(), []*base.TaskMessage{&dup, newMsg})
|
||||
if err != nil {
|
||||
t.Fatalf("BatchEnqueue returned error: %v", err)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Errorf("BatchEnqueue returned %d, want 1 (duplicate should be skipped)", n)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestEnqueueQueueCache(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
Reference in New Issue
Block a user