mirror of
https://github.com/hibiken/asynq.git
synced 2026-06-13 11:58:33 +08:00
test: add coverage for BatchEnqueue error paths
Add tests for all uncovered BatchEnqueueContext error paths: nil task, empty/blank typename, invalid options, group rejection, unique rejection, and broker-down (sleeping broker). Add pipeline error test for rdb.BatchEnqueue via cancelled context, and TestTaskOptions/TestTaskOptionsNil for the new Options() accessor on Task.
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
@@ -198,3 +199,30 @@ func TestParseRedisURIErrors(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskOptions(t *testing.T) {
|
||||
opts := []Option{
|
||||
MaxRetry(3),
|
||||
Queue("critical"),
|
||||
Timeout(5 * time.Minute),
|
||||
}
|
||||
task := NewTask("mytask", []byte("payload"), opts...)
|
||||
|
||||
got := task.Options()
|
||||
if len(got) != len(opts) {
|
||||
t.Fatalf("task.Options() returned %d options, want %d", len(got), len(opts))
|
||||
}
|
||||
for i, o := range opts {
|
||||
if got[i].String() != o.String() {
|
||||
t.Errorf("task.Options()[%d] = %v, want %v", i, got[i], o)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskOptionsNil(t *testing.T) {
|
||||
task := NewTask("mytask", []byte("payload"))
|
||||
got := task.Options()
|
||||
if got != nil {
|
||||
t.Errorf("task.Options() = %v, want nil for task with no options", got)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/hibiken/asynq/internal/testbroker"
|
||||
h "github.com/hibiken/asynq/internal/testutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
@@ -1785,3 +1787,85 @@ func TestBatchEnqueueContext_MixedBatch(t *testing.T) {
|
||||
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchEnqueueContext_ValidationErrors(t *testing.T) {
|
||||
setup(t)
|
||||
client := NewClient(getRedisConnOpt(t))
|
||||
defer client.Close()
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
tasks []*Task
|
||||
opts []Option
|
||||
}{
|
||||
{
|
||||
desc: "nil task",
|
||||
tasks: []*Task{nil},
|
||||
},
|
||||
{
|
||||
desc: "empty task typename",
|
||||
tasks: []*Task{NewTask("", []byte("payload"))},
|
||||
},
|
||||
{
|
||||
desc: "blank task typename",
|
||||
tasks: []*Task{NewTask(" ", []byte("payload"))},
|
||||
},
|
||||
{
|
||||
desc: "invalid option: unique TTL less than 1s",
|
||||
tasks: []*Task{NewTask("foo", nil)},
|
||||
opts: []Option{Unique(300 * time.Millisecond)},
|
||||
},
|
||||
{
|
||||
desc: "group task rejected",
|
||||
tasks: []*Task{NewTask("foo", nil, Group("mygroup"))},
|
||||
},
|
||||
{
|
||||
desc: "unique task rejected",
|
||||
tasks: []*Task{NewTask("foo", nil, Unique(time.Hour))},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
results := client.BatchEnqueueContext(context.Background(), tc.tasks, tc.opts...)
|
||||
if len(results) != len(tc.tasks) {
|
||||
t.Errorf("%s: got %d results, want %d", tc.desc, len(results), len(tc.tasks))
|
||||
continue
|
||||
}
|
||||
for i, res := range results {
|
||||
if res.Err == nil {
|
||||
t.Errorf("%s: results[%d].Err = nil, want non-nil error", tc.desc, i)
|
||||
}
|
||||
if res.TaskInfo != nil {
|
||||
t.Errorf("%s: results[%d].TaskInfo = %v, want nil", tc.desc, i, res.TaskInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchEnqueueContext_BrokerError(t *testing.T) {
|
||||
r := rdb.NewRDB(setup(t))
|
||||
defer r.Close()
|
||||
testBroker := testbroker.NewTestBroker(r)
|
||||
client := &Client{broker: testBroker, sharedConnection: true}
|
||||
|
||||
tasks := []*Task{
|
||||
NewTask("task1", []byte("p1")),
|
||||
NewTask("task2", []byte("p2")),
|
||||
}
|
||||
|
||||
testBroker.Sleep()
|
||||
results := client.BatchEnqueueContext(context.Background(), tasks)
|
||||
testBroker.Wakeup()
|
||||
|
||||
if len(results) != 2 {
|
||||
t.Fatalf("BatchEnqueueContext returned %d results, want 2", len(results))
|
||||
}
|
||||
for i, res := range results {
|
||||
if res.Err == nil {
|
||||
t.Errorf("results[%d].Err = nil, want non-nil error when broker is down", i)
|
||||
}
|
||||
if res.TaskInfo != nil {
|
||||
t.Errorf("results[%d].TaskInfo = %v, want nil on broker error", i, res.TaskInfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -293,6 +293,21 @@ func TestBatchEnqueue(t *testing.T) {
|
||||
t.Errorf("state for scheduled task %s = %q, want %q", s1.ID, state, "scheduled")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("pipeline error from cancelled context", func(t *testing.T) {
|
||||
h.FlushDB(t, r.client)
|
||||
|
||||
msg := h.NewTaskMessage("pipeline_error_task", nil)
|
||||
items := []base.BatchEnqueueItem{{Msg: msg}}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
_, err := r.BatchEnqueue(ctx, items)
|
||||
if err == nil {
|
||||
t.Error("BatchEnqueue with cancelled context returned nil error, want non-nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestEnqueueQueueCache(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user