2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-30 17:15:53 +08:00

Compare commits

...

7 Commits

Author SHA1 Message Date
Ken Hibino
84b0c76c8b v0.7.1 2020-04-05 14:56:06 -07:00
Ken Hibino
60b887b8e3 Fix singnal handling for different systems 2020-04-05 14:37:23 -07:00
Ken Hibino
7864bea55c Update readme
Add features section
2020-03-28 08:44:06 -07:00
Apos Spanos
47220554ca Correct typo 2020-03-23 13:47:05 -07:00
Ken Hibino
f91c05b92c v0.7.0 2020-03-22 12:04:37 -07:00
Ken Hibino
9b4438347e Fix comment 2020-03-21 11:44:26 -07:00
Ken Hibino
c33dd447ac Allow client to enqueue a task with unique option
Changes:

- Added Unique option for clients
- Require go v.13 or above (to use new errors wrapping functions)
- Fixed adding queue key to all-queues set (asynq:queues) when scheduling.
2020-03-21 11:40:40 -07:00
12 changed files with 629 additions and 45 deletions

View File

@@ -2,9 +2,7 @@ language: go
go_import_path: github.com/hibiken/asynq go_import_path: github.com/hibiken/asynq
git: git:
depth: 1 depth: 1
env: go: [1.13.x, 1.14.x]
- GO111MODULE=on # go modules are the default
go: [1.12.x, 1.13.x, 1.14.x]
script: script:
- go test -race -v -coverprofile=coverage.txt -covermode=atomic ./... - go test -race -v -coverprofile=coverage.txt -covermode=atomic ./...
services: services:

View File

@@ -7,6 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.7.1] - 2020-04-05
### Fixed
- Fixed signal handling for windows.
## [0.7.0] - 2020-03-22
### Changed
- Support Go v1.13+, dropped support for go v1.12
### Added
- `Unique` option was added to allow client to enqueue a task only if it's unique within a certain time period.
## [0.6.2] - 2020-03-15 ## [0.6.2] - 2020-03-15
### Added ### Added

View File

@@ -10,10 +10,23 @@
Asynq is a simple Go library for queueing tasks and processing them in the background with workers. Asynq is a simple Go library for queueing tasks and processing them in the background with workers.
It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily. It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily.
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release.
![Task Queue Diagram](/docs/assets/task-queue.png) ![Task Queue Diagram](/docs/assets/task-queue.png)
## Features
- Guaranteed at least one execution of a task
- Scheduling of tasks
- Durability since tasks are written to Redis
- Retries of failed tasks
- Concurrency management via configuration
- Weighted priority queues
- Strict priority queues
- Low latency to add a task since writes are fast in Redis
- De-duplication of tasks using unique option
- Allow timeout and deadline per task
- Flexible handler interface with support for middlewares
- CLI to inspect and remote-control queues and tasks
## Quickstart ## Quickstart
First, make sure you are running a Redis server locally. First, make sure you are running a Redis server locally.
@@ -22,7 +35,7 @@ First, make sure you are running a Redis server locally.
$ redis-server $ redis-server
``` ```
Next, write a package that encapslates task creation and task handling. Next, write a package that encapsulates task creation and task handling.
```go ```go
package tasks package tasks
@@ -205,7 +218,13 @@ go get -u github.com/hibiken/asynq/tools/asynqmon
| Dependency | Version | | Dependency | Version |
| -------------------------- | ------- | | -------------------------- | ------- |
| [Redis](https://redis.io/) | v2.8+ | | [Redis](https://redis.io/) | v2.8+ |
| [Go](https://golang.org/) | v1.12+ | | [Go](https://golang.org/) | v1.13+ |
## Stability and Compatibility
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release.
The library is getting close to a stable release, and we are trying to minimize breaking API changes. However, we cannot make any guarantees at this time.
## Contributing ## Contributing

View File

@@ -10,9 +10,7 @@ import (
"math" "math"
"math/rand" "math/rand"
"os" "os"
"os/signal"
"sync" "sync"
"syscall"
"time" "time"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
@@ -252,18 +250,7 @@ func (bg *Background) Run(handler Handler) {
bg.logger.Info("Send signal TSTP to stop processing new tasks") bg.logger.Info("Send signal TSTP to stop processing new tasks")
bg.logger.Info("Send signal TERM or INT to terminate the process") bg.logger.Info("Send signal TERM or INT to terminate the process")
// Wait for a signal to terminate. bg.waitForSignals()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP)
for {
sig := <-sigs
if sig == syscall.SIGTSTP {
bg.processor.stop()
bg.ps.SetStatus(base.StatusStopped)
continue
}
break
}
fmt.Println() fmt.Println()
bg.logger.Info("Starting graceful shutdown") bg.logger.Info("Starting graceful shutdown")
} }

106
client.go
View File

@@ -5,6 +5,9 @@
package asynq package asynq
import ( import (
"errors"
"fmt"
"sort"
"strings" "strings"
"time" "time"
@@ -38,6 +41,7 @@ type (
queueOption string queueOption string
timeoutOption time.Duration timeoutOption time.Duration
deadlineOption time.Time deadlineOption time.Time
uniqueOption time.Duration
) )
// MaxRetry returns an option to specify the max number of times // MaxRetry returns an option to specify the max number of times
@@ -70,11 +74,30 @@ func Deadline(t time.Time) Option {
return deadlineOption(t) return deadlineOption(t)
} }
// Unique returns an option to enqueue a task only if the given task is unique.
// Task enqueued with this option is guaranteed to be unique within the given ttl.
// Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued.
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
//
// Uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
var ErrDuplicateTask = errors.New("task already exists")
type option struct { type option struct {
retry int retry int
queue string queue string
timeout time.Duration timeout time.Duration
deadline time.Time deadline time.Time
uniqueTTL time.Duration
} }
func composeOptions(opts ...Option) option { func composeOptions(opts ...Option) option {
@@ -94,6 +117,8 @@ func composeOptions(opts ...Option) option {
res.timeout = time.Duration(opt) res.timeout = time.Duration(opt)
case deadlineOption: case deadlineOption:
res.deadline = time.Time(opt) res.deadline = time.Time(opt)
case uniqueOption:
res.uniqueTTL = time.Duration(opt)
default: default:
// ignore unexpected option // ignore unexpected option
} }
@@ -101,6 +126,39 @@ func composeOptions(opts ...Option) option {
return res return res
} }
// uniqueKey computes the redis key used for the given task.
// It returns an empty string if ttl is zero.
func uniqueKey(t *Task, ttl time.Duration, qname string) string {
if ttl == 0 {
return ""
}
return fmt.Sprintf("%s:%s:%s", t.Type, serializePayload(t.Payload.data), qname)
}
func serializePayload(payload map[string]interface{}) string {
if payload == nil {
return "nil"
}
type entry struct {
k string
v interface{}
}
var es []entry
for k, v := range payload {
es = append(es, entry{k, v})
}
// sort entries by key
sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
var b strings.Builder
for _, e := range es {
if b.Len() > 0 {
b.WriteString(",")
}
b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
}
return b.String()
}
const ( const (
// Max retry count by default // Max retry count by default
defaultMaxRetry = 25 defaultMaxRetry = 25
@@ -115,15 +173,25 @@ const (
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
opt := composeOptions(opts...) opt := composeOptions(opts...)
msg := &base.TaskMessage{ msg := &base.TaskMessage{
ID: xid.New(), ID: xid.New(),
Type: task.Type, Type: task.Type,
Payload: task.Payload.data, Payload: task.Payload.data,
Queue: opt.queue, Queue: opt.queue,
Retry: opt.retry, Retry: opt.retry,
Timeout: opt.timeout.String(), Timeout: opt.timeout.String(),
Deadline: opt.deadline.Format(time.RFC3339), Deadline: opt.deadline.Format(time.RFC3339),
UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue),
} }
return c.enqueue(msg, t) var err error
if time.Now().After(t) {
err = c.enqueue(msg, opt.uniqueTTL)
} else {
err = c.schedule(msg, t, opt.uniqueTTL)
}
if err == rdb.ErrDuplicateTask {
return fmt.Errorf("%w", ErrDuplicateTask)
}
return err
} }
// Enqueue enqueues task to be processed immediately. // Enqueue enqueues task to be processed immediately.
@@ -146,9 +214,17 @@ func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
return c.EnqueueAt(time.Now().Add(d), task, opts...) return c.EnqueueAt(time.Now().Add(d), task, opts...)
} }
func (c *Client) enqueue(msg *base.TaskMessage, t time.Time) error { func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
if time.Now().After(t) { if uniqueTTL > 0 {
return c.rdb.Enqueue(msg) return c.rdb.EnqueueUnique(msg, uniqueTTL)
}
return c.rdb.Enqueue(msg)
}
func (c *Client) schedule(msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
if uniqueTTL > 0 {
ttl := t.Add(uniqueTTL).Sub(time.Now())
return c.rdb.ScheduleUnique(msg, t, ttl)
} }
return c.rdb.Schedule(msg, t) return c.rdb.Schedule(msg, t)
} }

View File

@@ -5,10 +5,12 @@
package asynq package asynq
import ( import (
"errors"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
@@ -361,3 +363,210 @@ func TestClientEnqueueIn(t *testing.T) {
} }
} }
} }
func TestUniqueKey(t *testing.T) {
tests := []struct {
desc string
task *Task
ttl time.Duration
qname string
want string
}{
{
"with zero TTL",
NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}),
0,
"default",
"",
},
{
"with primitive types",
NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}),
10 * time.Minute,
"default",
"email:send:a=123,b=hello,c=true:default",
},
{
"with unsorted keys",
NewTask("email:send", map[string]interface{}{"b": "hello", "c": true, "a": 123}),
10 * time.Minute,
"default",
"email:send:a=123,b=hello,c=true:default",
},
{
"with composite types",
NewTask("email:send",
map[string]interface{}{
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
"names": []string{"bob", "mike", "rob"}}),
10 * time.Minute,
"default",
"email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]:default",
},
{
"with complex types",
NewTask("email:send",
map[string]interface{}{
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
"duration": time.Hour}),
10 * time.Minute,
"default",
"email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC:default",
},
{
"with nil payload",
NewTask("reindex", nil),
10 * time.Minute,
"default",
"reindex:nil:default",
},
}
for _, tc := range tests {
got := uniqueKey(tc.task, tc.ttl, tc.qname)
if got != tc.want {
t.Errorf("%s: uniqueKey(%v, %v, %q) = %q, want %q", tc.desc, tc.task, tc.ttl, tc.qname, got, tc.want)
}
}
}
func TestEnqueueUnique(t *testing.T) {
r := setup(t)
c := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
task *Task
ttl time.Duration
}{
{
NewTask("email", map[string]interface{}{"user_id": 123}),
time.Hour,
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
// Enqueue the task first. It should succeed.
err := c.Enqueue(tc.task, Unique(tc.ttl))
if err != nil {
t.Fatal(err)
}
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl)
continue
}
// Enqueue the task again. It should fail.
err = c.Enqueue(tc.task, Unique(tc.ttl))
if err == nil {
t.Errorf("Enqueueing %+v did not return an error", tc.task)
continue
}
if !errors.Is(err, ErrDuplicateTask) {
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
continue
}
}
}
func TestEnqueueInUnique(t *testing.T) {
r := setup(t)
c := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
task *Task
d time.Duration
ttl time.Duration
}{
{
NewTask("reindex", nil),
time.Hour,
10 * time.Minute,
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
// Enqueue the task first. It should succeed.
err := c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl))
if err != nil {
t.Fatal(err)
}
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
continue
}
// Enqueue the task again. It should fail.
err = c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl))
if err == nil {
t.Errorf("Enqueueing %+v did not return an error", tc.task)
continue
}
if !errors.Is(err, ErrDuplicateTask) {
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
continue
}
}
}
func TestEnqueueAtUnique(t *testing.T) {
r := setup(t)
c := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
task *Task
at time.Time
ttl time.Duration
}{
{
NewTask("reindex", nil),
time.Now().Add(time.Hour),
10 * time.Minute,
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
// Enqueue the task first. It should succeed.
err := c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl))
if err != nil {
t.Fatal(err)
}
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
wantTTL := tc.at.Add(tc.ttl).Sub(time.Now())
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
continue
}
// Enqueue the task again. It should fail.
err = c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl))
if err == nil {
t.Errorf("Enqueueing %+v did not return an error", tc.task)
continue
}
if !errors.Is(err, ErrDuplicateTask) {
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
continue
}
}
}

2
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/rs/xid v1.2.1 github.com/rs/xid v1.2.1
github.com/spf13/cast v1.3.1 github.com/spf13/cast v1.3.1
go.uber.org/goleak v0.10.0 go.uber.org/goleak v0.10.0
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/yaml.v2 v2.2.7 // indirect gopkg.in/yaml.v2 v2.2.7 // indirect
) )

View File

@@ -97,6 +97,11 @@ type TaskMessage struct {
// //
// time.Time's zero value means no deadline. // time.Time's zero value means no deadline.
Deadline string Deadline string
// UniqueKey holds the redis key used for uniqueness lock for this task.
//
// Empty string indicates that no uniqueness lock was used.
UniqueKey string
} }
// ProcessState holds process level information. // ProcessState holds process level information.

View File

@@ -22,6 +22,9 @@ var (
// ErrTaskNotFound indicates that a task that matches the given identifier was not found. // ErrTaskNotFound indicates that a task that matches the given identifier was not found.
ErrTaskNotFound = errors.New("could not find a task") ErrTaskNotFound = errors.New("could not find a task")
// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
ErrDuplicateTask = errors.New("task already exists")
) )
const statsTTL = 90 * 24 * time.Hour // 90 days const statsTTL = 90 * 24 * time.Hour // 90 days
@@ -59,6 +62,46 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, bytes).Err() return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, bytes).Err()
} }
// KEYS[1] -> unique key in the form <type>:<payload>:<qname>
// KEYS[2] -> asynq:queues:<qname>
// KEYS[2] -> asynq:queues
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> task message data
var enqueueUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("LPUSH", KEYS[2], ARGV[3])
redis.call("SADD", KEYS[3], KEYS[2])
return 1
`)
// EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired.
// It returns ErrDuplicateTask if the lock cannot be acquired.
func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
key := base.QueueKey(msg.Queue)
res, err := enqueueUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, key, base.AllQueues},
msg.ID.String(), int(ttl.Seconds()), bytes).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("could not cast %v to int64", res)
}
if n == 0 {
return ErrDuplicateTask
}
return nil
}
// Dequeue queries given queues in order and pops a task message if there is one and returns it. // Dequeue queries given queues in order and pops a task message if there is one and returns it.
// If all queues are empty, ErrNoProcessableTask error is returned. // If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
@@ -118,8 +161,10 @@ func (r *RDB) dequeue(queues ...string) (data string, err error) {
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:processed:<yyyy-mm-dd> // KEYS[2] -> asynq:processed:<yyyy-mm-dd>
// KEYS[3] -> unique key in the format <type>:<payload>:<qname>
// ARGV[1] -> base.TaskMessage value // ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp // ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
// Note: LREM count ZERO means "remove all elements equal to val" // Note: LREM count ZERO means "remove all elements equal to val"
var doneCmd = redis.NewScript(` var doneCmd = redis.NewScript(`
redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("LREM", KEYS[1], 0, ARGV[1])
@@ -127,10 +172,14 @@ local n = redis.call("INCR", KEYS[2])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[2], ARGV[2]) redis.call("EXPIREAT", KEYS[2], ARGV[2])
end end
if string.len(KEYS[3]) > 0 and redis.call("GET", KEYS[3]) == ARGV[3] then
redis.call("DEL", KEYS[3])
end
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
// Done removes the task from in-progress queue to mark the task as done. // Done removes the task from in-progress queue to mark the task as done.
// It removes a uniqueness lock acquired by the task, if any.
func (r *RDB) Done(msg *base.TaskMessage) error { func (r *RDB) Done(msg *base.TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
@@ -140,8 +189,8 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
processedKey := base.ProcessedKey(now) processedKey := base.ProcessedKey(now)
expireAt := now.Add(statsTTL) expireAt := now.Add(statsTTL)
return doneCmd.Run(r.client, return doneCmd.Run(r.client,
[]string{base.InProgressQueue, processedKey}, []string{base.InProgressQueue, processedKey, msg.UniqueKey},
bytes, expireAt.Unix()).Err() bytes, expireAt.Unix(), msg.ID.String()).Err()
} }
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress
@@ -164,15 +213,71 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
string(bytes)).Err() string(bytes)).Err()
} }
// KEYS[1] -> asynq:scheduled
// KEYS[2] -> asynq:queues
// ARGV[1] -> score (process_at timestamp)
// ARGV[2] -> task message
// ARGV[3] -> queue key
var scheduleCmd = redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
redis.call("SADD", KEYS[2], ARGV[3])
return 1
`)
// Schedule adds the task to the backlog queue to be processed in the future. // Schedule adds the task to the backlog queue to be processed in the future.
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
qkey := base.QueueKey(msg.Queue)
score := float64(processAt.Unix()) score := float64(processAt.Unix())
return r.client.ZAdd(base.ScheduledQueue, return scheduleCmd.Run(r.client,
&redis.Z{Member: string(bytes), Score: score}).Err() []string{base.ScheduledQueue, base.AllQueues},
score, bytes, qkey).Err()
}
// KEYS[1] -> unique key in the format <type>:<payload>:<qname>
// KEYS[2] -> asynq:scheduled
// KEYS[3] -> asynq:queues
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> score (process_at timestamp)
// ARGV[4] -> task message
// ARGV[5] -> queue key
var scheduleUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4])
redis.call("SADD", KEYS[3], ARGV[5])
return 1
`)
// ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired.
// It returns ErrDuplicateTask if the lock cannot be acquired.
func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error {
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
qkey := base.QueueKey(msg.Queue)
score := float64(processAt.Unix())
res, err := scheduleUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, base.ScheduledQueue, base.AllQueues},
msg.ID.String(), int(ttl.Seconds()), score, bytes, qkey).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("could not cast %v to int64", res)
}
if n == 0 {
return ErrDuplicateTask
}
return nil
} }
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress

View File

@@ -16,6 +16,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/rs/xid"
) )
// TODO(hibiken): Get Redis address and db number from ENV variables. // TODO(hibiken): Get Redis address and db number from ENV variables.
@@ -69,6 +70,48 @@ func TestEnqueue(t *testing.T) {
} }
} }
func TestEnqueueUnique(t *testing.T) {
r := setup(t)
m1 := base.TaskMessage{
ID: xid.New(),
Type: "email",
Payload: map[string]interface{}{"user_id": 123},
Queue: base.DefaultQueueName,
UniqueKey: "email:user_id=123:default",
}
tests := []struct {
msg *base.TaskMessage
ttl time.Duration // uniqueness ttl
}{
{&m1, time.Minute},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case.
err := r.EnqueueUnique(tc.msg, tc.ttl)
if err != nil {
t.Errorf("First message: (*RDB).EnqueueUnique(%v, %v) = %v, want nil",
tc.msg, tc.ttl, err)
continue
}
got := r.EnqueueUnique(tc.msg, tc.ttl)
if got != ErrDuplicateTask {
t.Errorf("Second message: (*RDB).EnqueueUnique(%v, %v) = %v, want %v",
tc.msg, tc.ttl, got, ErrDuplicateTask)
continue
}
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
continue
}
}
}
func TestDequeue(t *testing.T) { func TestDequeue(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"})
@@ -188,6 +231,13 @@ func TestDone(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("export_csv", nil) t2 := h.NewTaskMessage("export_csv", nil)
t3 := &base.TaskMessage{
ID: xid.New(),
Type: "reindex",
Payload: nil,
UniqueKey: "reindex:nil:default",
Queue: "default",
}
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage // initial state of the in-progress list inProgress []*base.TaskMessage // initial state of the in-progress list
@@ -204,11 +254,25 @@ func TestDone(t *testing.T) {
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
}, },
{
inProgress: []*base.TaskMessage{t1, t2, t3},
target: t3,
wantInProgress: []*base.TaskMessage{t1, t2},
},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress) h.SeedInProgressQueue(t, r.client, tc.inProgress)
for _, msg := range tc.inProgress {
// Set uniqueness lock if unique key is present.
if len(msg.UniqueKey) > 0 {
err := r.client.SetNX(msg.UniqueKey, msg.ID.String(), time.Minute).Err()
if err != nil {
t.Fatal(err)
}
}
}
err := r.Done(tc.target) err := r.Done(tc.target)
if err != nil { if err != nil {
@@ -232,6 +296,10 @@ func TestDone(t *testing.T) {
if gotTTL > statsTTL { if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
} }
if len(tc.target.UniqueKey) > 0 && r.client.Exists(tc.target.UniqueKey).Val() != 0 {
t.Errorf("Uniqueness lock %q still exists", tc.target.UniqueKey)
}
} }
} }
@@ -344,6 +412,58 @@ func TestSchedule(t *testing.T) {
} }
} }
func TestScheduleUnique(t *testing.T) {
r := setup(t)
m1 := base.TaskMessage{
ID: xid.New(),
Type: "email",
Payload: map[string]interface{}{"user_id": 123},
Queue: base.DefaultQueueName,
UniqueKey: "email:user_id=123:default",
}
tests := []struct {
msg *base.TaskMessage
processAt time.Time
ttl time.Duration // uniqueness lock ttl
}{
{&m1, time.Now().Add(15 * time.Minute), time.Minute},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
desc := fmt.Sprintf("(*RDB).ScheduleUnique(%v, %v, %v)", tc.msg, tc.processAt, tc.ttl)
err := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
if err != nil {
t.Errorf("Frist task: %s = %v, want nil", desc, err)
continue
}
gotScheduled := h.GetScheduledEntries(t, r.client)
if len(gotScheduled) != 1 {
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue)
continue
}
if int64(gotScheduled[0].Score) != tc.processAt.Unix() {
t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix())
continue
}
got := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
if got != ErrDuplicateTask {
t.Errorf("Second task: %s = %v, want %v",
desc, got, ErrDuplicateTask)
}
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
continue
}
}
}
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"})
@@ -784,8 +904,7 @@ func TestWriteProcessState(t *testing.T) {
} }
// Check ProcessInfo TTL was set correctly // Check ProcessInfo TTL was set correctly
gotTTL := r.client.TTL(pkey).Val() gotTTL := r.client.TTL(pkey).Val()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second) if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
if !cmp.Equal(ttl, gotTTL, timeCmpOpt) {
t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl) t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl)
} }
// Check ProcessInfo key was added to the set correctly // Check ProcessInfo key was added to the set correctly
@@ -858,8 +977,7 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
} }
// Check ProcessInfo TTL was set correctly // Check ProcessInfo TTL was set correctly
gotTTL := r.client.TTL(pkey).Val() gotTTL := r.client.TTL(pkey).Val()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second) if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
if !cmp.Equal(ttl, gotTTL, timeCmpOpt) {
t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl) t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl)
} }
// Check ProcessInfo key was added to the set correctly // Check ProcessInfo key was added to the set correctly

30
signals_unix.go Normal file
View File

@@ -0,0 +1,30 @@
// +build linux bsd darwin
package asynq
import (
"os"
"os/signal"
"golang.org/x/sys/unix"
"github.com/hibiken/asynq/internal/base"
)
// waitForSignals waits for signals and handles them.
// It handles SIGTERM, SIGINT, and SIGTSTP.
// SIGTERM and SIGINT will signal the process to exit.
// SIGTSTP will signal the process to stop processing new tasks.
func (bg *Background) waitForSignals() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
sig := <-sigs
if sig == unix.SIGTSTP {
bg.processor.stop()
bg.ps.SetStatus(base.StatusStopped)
continue
}
break
}
}

21
signals_windows.go Normal file
View File

@@ -0,0 +1,21 @@
// +build windows
package asynq
import (
"os"
"os/signal"
"golang.org/x/sys/windows"
)
// waitForSignals waits for signals and handles them.
// It handles SIGTERM and SIGINT.
// SIGTERM and SIGINT will signal the process to exit.
//
// Note: Currently SIGTSTP is not supported for windows build.
func (bg *Background) waitForSignals() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, windows.SIGTERM, windows.SIGINT)
<-sigs
}