mirror of
https://github.com/hibiken/asynq.git
synced 2026-06-19 10:37:39 +08:00
Compare commits
2 Commits
sohail/rel
...
v0.25.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd3eb86d95 | ||
|
|
3dbda60333 |
@@ -10,9 +10,9 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
"github.com/spf13/cast"
|
"github.com/spf13/cast"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1832,6 +1832,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
|||||||
if err := r.client.SRem(context.Background(), base.AllQueues, qname).Err(); err != nil {
|
if err := r.client.SRem(context.Background(), base.AllQueues, qname).Err(); err != nil {
|
||||||
return errors.E(op, errors.Unknown, err)
|
return errors.E(op, errors.Unknown, err)
|
||||||
}
|
}
|
||||||
|
r.queuesPublished.Delete(qname)
|
||||||
return nil
|
return nil
|
||||||
case -1:
|
case -1:
|
||||||
return errors.E(op, errors.NotFound, &errors.QueueNotEmptyError{Queue: qname})
|
return errors.E(op, errors.NotFound, &errors.QueueNotEmptyError{Queue: qname})
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@@ -26,8 +27,9 @@ const LeaseDuration = 30 * time.Second
|
|||||||
|
|
||||||
// RDB is a client interface to query and mutate task queues.
|
// RDB is a client interface to query and mutate task queues.
|
||||||
type RDB struct {
|
type RDB struct {
|
||||||
client redis.UniversalClient
|
client redis.UniversalClient
|
||||||
clock timeutil.Clock
|
clock timeutil.Clock
|
||||||
|
queuesPublished sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRDB returns a new instance of RDB.
|
// NewRDB returns a new instance of RDB.
|
||||||
@@ -112,8 +114,11 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
|
}
|
||||||
|
r.queuesPublished.Store(msg.Queue, true)
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.TaskKey(msg.Queue, msg.ID),
|
base.TaskKey(msg.Queue, msg.ID),
|
||||||
@@ -174,8 +179,11 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Internal, "cannot encode task message: %v", err)
|
return errors.E(op, errors.Internal, "cannot encode task message: %v", err)
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
|
}
|
||||||
|
r.queuesPublished.Store(msg.Queue, true)
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
msg.UniqueKey,
|
msg.UniqueKey,
|
||||||
@@ -529,8 +537,11 @@ func (r *RDB) AddToGroup(ctx context.Context, msg *base.TaskMessage, groupKey st
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
|
}
|
||||||
|
r.queuesPublished.Store(msg.Queue, true)
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.TaskKey(msg.Queue, msg.ID),
|
base.TaskKey(msg.Queue, msg.ID),
|
||||||
@@ -591,8 +602,11 @@ func (r *RDB) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, group
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
|
}
|
||||||
|
r.queuesPublished.Store(msg.Queue, true)
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.TaskKey(msg.Queue, msg.ID),
|
base.TaskKey(msg.Queue, msg.ID),
|
||||||
@@ -648,8 +662,11 @@ func (r *RDB) Schedule(ctx context.Context, msg *base.TaskMessage, processAt tim
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
|
}
|
||||||
|
r.queuesPublished.Store(msg.Queue, true)
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.TaskKey(msg.Queue, msg.ID),
|
base.TaskKey(msg.Queue, msg.ID),
|
||||||
@@ -707,8 +724,11 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err))
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err))
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
if _, found := r.queuesPublished.Load(msg.Queue); !found {
|
||||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
if err := r.client.SAdd(ctx, base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
|
}
|
||||||
|
r.queuesPublished.Store(msg.Queue, true)
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
msg.UniqueKey,
|
msg.UniqueKey,
|
||||||
|
|||||||
@@ -160,6 +160,59 @@ func TestEnqueueTaskIdConflictError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEnqueueQueueCache(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
t1 := h.NewTaskMessageWithQueue("sync1", nil, "low")
|
||||||
|
|
||||||
|
enqueueTime := time.Now()
|
||||||
|
clock := timeutil.NewSimulatedClock(enqueueTime)
|
||||||
|
r.SetClock(clock)
|
||||||
|
|
||||||
|
err := r.Enqueue(context.Background(), t1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check queue is in the AllQueues set.
|
||||||
|
if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
|
||||||
|
t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := r.queuesPublished.Load(t1.Queue); !ok {
|
||||||
|
t.Fatalf("%q is not cached in queuesPublished", t1.Queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("remove-queue", func(t *testing.T) {
|
||||||
|
err := r.RemoveQueue(t1.Queue, true)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("(*RDB).RemoveQueue(%q, %t) = %v, want nil", t1.Queue, true, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := r.queuesPublished.Load(t1.Queue); ok {
|
||||||
|
t.Fatalf("%q is still cached in queuesPublished", t1.Queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
|
||||||
|
t.Fatalf("%q is a member of SET %q", t1.Queue, base.AllQueues)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = r.Enqueue(context.Background(), t1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check queue is in the AllQueues set.
|
||||||
|
if !r.client.SIsMember(context.Background(), base.AllQueues, t1.Queue).Val() {
|
||||||
|
t.Fatalf("%q is not a member of SET %q", t1.Queue, base.AllQueues)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := r.queuesPublished.Load(t1.Queue); !ok {
|
||||||
|
t.Fatalf("%q is not cached in queuesPublished", t1.Queue)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestEnqueueUnique(t *testing.T) {
|
func TestEnqueueUnique(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|||||||
Reference in New Issue
Block a user