mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-09 08:15:53 +08:00
Compare commits
2 Commits
sohail/rel
...
v0.25.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd3eb86d95 | ||
|
|
3dbda60333 |
@@ -10,9 +10,9 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/spf13/cast"
|
||||
)
|
||||
|
||||
@@ -1832,6 +1832,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
||||
if err := r.client.SRem(context.Background(), base.AllQueues, qname).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, err)
|
||||
}
|
||||
r.queuesPublished.Delete(qname)
|
||||
return nil
|
||||
case -1:
|
||||
return errors.E(op, errors.NotFound, &errors.QueueNotEmptyError{Queue: qname})
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -26,8 +27,9 @@ const LeaseDuration = 30 * time.Second
|
||||
|
||||
// RDB is a client interface to query and mutate task queues.
|
||||
type RDB struct {
|
||||
client redis.UniversalClient
|
||||
clock timeutil.Clock
|
||||
client redis.UniversalClient
|
||||
clock timeutil.Clock
|
||||
queuesPublished sync.Map
|
||||
}
|
||||
|
||||
// NewRDB returns a new instance of RDB.
|
||||
@@ -112,8 +114,11 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||
}
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
}
|
||||
r.queuesPublished.Store(msg.Queue, true)
|
||||
}
|
||||
keys := []string{
|
||||
base.TaskKey(msg.Queue, msg.ID),
|
||||
@@ -174,8 +179,11 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Internal, "cannot encode task message: %v", err)
|
||||
}
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
}
|
||||
r.queuesPublished.Store(msg.Queue, true)
|
||||
}
|
||||
keys := []string{
|
||||
msg.UniqueKey,
|
||||
@@ -529,8 +537,11 @@ func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey st
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||
}
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
}
|
||||
r.queuesPublished.Store(msg.Queue, true)
|
||||
}
|
||||
keys := []string{
|
||||
base.TaskKey(msg.Queue, msg.ID),
|
||||
@@ -591,8 +602,11 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||
}
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
}
|
||||
r.queuesPublished.Store(msg.Queue, true)
|
||||
}
|
||||
keys := []string{
|
||||
base.TaskKey(msg.Queue, msg.ID),
|
||||
@@ -648,8 +662,11 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||
}
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
}
|
||||
r.queuesPublished.Store(msg.Queue, true)
|
||||
}
|
||||
keys := []string{
|
||||
base.TaskKey(msg.Queue, msg.ID),
|
||||
@@ -707,8 +724,11 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err))
|
||||
}
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
}
|
||||
r.queuesPublished.Store(msg.Queue, true)
|
||||
}
|
||||
keys := []string{
|
||||
msg.UniqueKey,
|
||||
|
||||
@@ -160,6 +160,59 @@ func TestEnqueueTaskIdConflictError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnqueueQueueCache(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
t1 := h.NewTaskMessageWithQueue("sync1", nil, "low")
|
||||
|
||||
enqueueTime := time.Now()
|
||||
clock := timeutil.NewSimulatedClock(enqueueTime)
|
||||
r.SetClock(clock)
|
||||
|
||||
err := r.Enqueue(context.Background(), t1)
|
||||
if err != nil {
|
||||
t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
||||
}
|
||||
|
||||
// Check queue is in the AllQueues set.
|
||||
if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
|
||||
t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues)
|
||||
}
|
||||
|
||||
if _, ok := r.queuesPublished.Load(t1.Queue); !ok {
|
||||
t.Fatalf("%q is not cached in queuesPublished", t1.Queue)
|
||||
}
|
||||
|
||||
t.Run("remove-queue", func(t *testing.T) {
|
||||
err := r.RemoveQueue(t1.Queue, true)
|
||||
if err != nil {
|
||||
t.Errorf("(*RDB).RemoveQueue(%q, %t) = %v, want nil", t1.Queue, true, err)
|
||||
}
|
||||
|
||||
if _, ok := r.queuesPublished.Load(t1.Queue); ok {
|
||||
t.Fatalf("%q is still cached in queuesPublished", t1.Queue)
|
||||
}
|
||||
|
||||
if r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
|
||||
t.Fatalf("%q is a member of SET %q", t1.Queue, base.AllQueues)
|
||||
}
|
||||
|
||||
err = r.Enqueue(context.Background(), t1)
|
||||
if err != nil {
|
||||
t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
||||
}
|
||||
|
||||
// Check queue is in the AllQueues set.
|
||||
if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
|
||||
t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues)
|
||||
}
|
||||
|
||||
if _, ok := r.queuesPublished.Load(t1.Queue); !ok {
|
||||
t.Fatalf("%q is not cached in queuesPublished", t1.Queue)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestEnqueueUnique(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
Reference in New Issue
Block a user