diff --git a/asynq_test.go b/asynq_test.go index 08afa82..fcba8e7 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -10,6 +10,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -198,3 +199,30 @@ func TestParseRedisURIErrors(t *testing.T) { } } } + +func TestTaskOptions(t *testing.T) { + opts := []Option{ + MaxRetry(3), + Queue("critical"), + Timeout(5 * time.Minute), + } + task := NewTask("mytask", []byte("payload"), opts...) + + got := task.Options() + if len(got) != len(opts) { + t.Fatalf("task.Options() returned %d options, want %d", len(got), len(opts)) + } + for i, o := range opts { + if got[i].String() != o.String() { + t.Errorf("task.Options()[%d] = %v, want %v", i, got[i], o) + } + } +} + +func TestTaskOptionsNil(t *testing.T) { + task := NewTask("mytask", []byte("payload")) + got := task.Options() + if got != nil { + t.Errorf("task.Options() = %v, want nil for task with no options", got) + } +} diff --git a/client_test.go b/client_test.go index 8f5935f..8bfde15 100644 --- a/client_test.go +++ b/client_test.go @@ -13,6 +13,8 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq/internal/testbroker" h "github.com/hibiken/asynq/internal/testutil" "github.com/redis/go-redis/v9" ) @@ -1785,3 +1787,85 @@ func TestBatchEnqueueContext_MixedBatch(t *testing.T) { t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled)) } } + +func TestBatchEnqueueContext_ValidationErrors(t *testing.T) { + setup(t) + client := NewClient(getRedisConnOpt(t)) + defer client.Close() + + tests := []struct { + desc string + tasks []*Task + opts []Option + }{ + { + desc: "nil task", + tasks: []*Task{nil}, + }, + { + desc: "empty task typename", + tasks: []*Task{NewTask("", []byte("payload"))}, + }, + { + desc: "blank task typename", + tasks: []*Task{NewTask(" ", []byte("payload"))}, + }, + { + desc: "invalid option: unique TTL less than 1s", + tasks: []*Task{NewTask("foo", nil)}, + opts: []Option{Unique(300 * time.Millisecond)}, + }, + { + desc: "group task rejected", + tasks: []*Task{NewTask("foo", nil, Group("mygroup"))}, + }, + { + desc: "unique task rejected", + tasks: []*Task{NewTask("foo", nil, Unique(time.Hour))}, + }, + } + + for _, tc := range tests { + results := client.BatchEnqueueContext(context.Background(), tc.tasks, tc.opts...) + if len(results) != len(tc.tasks) { + t.Errorf("%s: got %d results, want %d", tc.desc, len(results), len(tc.tasks)) + continue + } + for i, res := range results { + if res.Err == nil { + t.Errorf("%s: results[%d].Err = nil, want non-nil error", tc.desc, i) + } + if res.TaskInfo != nil { + t.Errorf("%s: results[%d].TaskInfo = %v, want nil", tc.desc, i, res.TaskInfo) + } + } + } +} + +func TestBatchEnqueueContext_BrokerError(t *testing.T) { + r := rdb.NewRDB(setup(t)) + defer r.Close() + testBroker := testbroker.NewTestBroker(r) + client := &Client{broker: testBroker, sharedConnection: true} + + tasks := []*Task{ + NewTask("task1", []byte("p1")), + NewTask("task2", []byte("p2")), + } + + testBroker.Sleep() + results := client.BatchEnqueueContext(context.Background(), tasks) + testBroker.Wakeup() + + if len(results) != 2 { + t.Fatalf("BatchEnqueueContext returned %d results, want 2", len(results)) + } + for i, res := range results { + if res.Err == nil { + t.Errorf("results[%d].Err = nil, want non-nil error when broker is down", i) + } + if res.TaskInfo != nil { + t.Errorf("results[%d].TaskInfo = %v, want nil on broker error", i, res.TaskInfo) + } + } +} diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index a1a873c..6cf2b85 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -293,6 +293,21 @@ func TestBatchEnqueue(t *testing.T) { t.Errorf("state for scheduled task %s = %q, want %q", s1.ID, state, "scheduled") } }) + + t.Run("pipeline error from cancelled context", func(t *testing.T) { + h.FlushDB(t, r.client) + + msg := h.NewTaskMessage("pipeline_error_task", nil) + items := []base.BatchEnqueueItem{{Msg: msg}} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := r.BatchEnqueue(ctx, items) + if err == nil { + t.Error("BatchEnqueue with cancelled context returned nil error, want non-nil") + } + }) } func TestEnqueueQueueCache(t *testing.T) {