2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-06 06:45:52 +08:00

Compare commits

...

41 Commits

Author SHA1 Message Date
Ken Hibino
f43c51ce8b v0.2.0 2020-01-19 15:28:24 -08:00
Ken Hibino
04983dc00f [ci skip] Clarify terminology around 'state' and 'queue' 2020-01-19 15:24:02 -08:00
Ken Hibino
9e872a4cb4 Change asynqmon history command to take no arguments 2020-01-19 15:24:02 -08:00
Ken Hibino
959c9fd01a [ci skip] Allow config file to set default values for flags 2020-01-19 09:10:48 -08:00
Ken Hibino
d37f2a09ab [ci skip] Add readme for asynqmon tool 2020-01-18 21:00:33 -08:00
Ken Hibino
207a6d2d1a Fix benchmark tests 2020-01-18 15:07:15 -08:00
Ken Hibino
c29200b1fc Add syncer to retry failed redis commands 2020-01-18 15:07:15 -08:00
Ken Hibino
5c806676de [ci skip] Add link to gitter chat room 2020-01-16 21:35:03 -08:00
Ken Hibino
fd8eb51440 [ci skip] Add badges to readme 2020-01-16 21:13:34 -08:00
Ken Hibino
f66a65d6ca Fix tests 2020-01-16 21:04:46 -08:00
Ken Hibino
d1f516d8f1 [ci skip] Update changelog 2020-01-16 21:04:46 -08:00
Ken Hibino
0c2591ad7e [ci skip] Update docs 2020-01-16 21:04:46 -08:00
Ken Hibino
43d7591250 Remove redis.Client type from asynq package API 2020-01-16 21:04:46 -08:00
Ken Hibino
cb2ebf18ac [performance] Skip the overhead of json decoding when scheduling to one
queue
2020-01-14 20:46:47 -08:00
Ken Hibino
5a6f737589 [performance] Use BRPOPLPUSH if one queue is used 2020-01-14 20:46:47 -08:00
Ken Hibino
f0251be5d2 [ci skip] Update changelog 2020-01-14 06:05:21 -08:00
Ken Hibino
858b0325bd Add rmq command to asynqmon 2020-01-14 06:02:00 -08:00
Ken Hibino
874d8e8843 Add RDB.RemoveQueue method 2020-01-14 06:02:00 -08:00
Ken Hibino
84eef4ed0b Add strict-priority option 2020-01-12 18:39:57 -08:00
Ken Hibino
97316d6766 Fix flaky tests
Some tests were failing due to mismatch in Score in ZSetEntry.
Changed ZSetEntry Score to float64 type so that we can use
cmpopts.EquateApprox to allow for margin when comparing.
2020-01-11 10:09:15 -08:00
Ken Hibino
2631672575 Allow filtering results of asynqmon ls enqueued by providing queue
name
2020-01-11 10:09:15 -08:00
Ken Hibino
cf78a12866 Add "Queue" column to the output of asynqmon ls 2020-01-11 10:09:15 -08:00
Ken Hibino
c5b215e3b9 Update RDB.ListEnqueued to list tasks from all queues 2020-01-11 10:09:15 -08:00
Ken Hibino
2ff847d520 Include each queue counts in stats command output 2020-01-11 10:09:15 -08:00
Ken Hibino
89843ac565 Change RDB.CurrentStats to be multi-queue aware 2020-01-11 10:09:15 -08:00
Ken Hibino
67f381269a Maintain a set of queue names in redis set 2020-01-11 10:09:15 -08:00
Ken Hibino
390eb13149 Remove stale field in processor struct 2020-01-11 10:09:15 -08:00
Ken Hibino
718336ff44 Update RDB.Enqueue* methods to be multi queue aware 2020-01-11 10:09:15 -08:00
Ken Hibino
8ff5c5101e [ci skip] Update changelog 2020-01-07 21:55:18 -08:00
Ken Hibino
4f5d115b3e [ci skip] Upgrade github.com/google/go-cmp to v0.4.0 2020-01-07 21:55:18 -08:00
Ken Hibino
24bb45b36b [ci skip] Normalize queue priority numbers 2020-01-07 21:55:18 -08:00
Ken Hibino
8d9a2d1313 Update processor to query queues based on priority 2020-01-07 21:55:18 -08:00
Ken Hibino
53d0902808 Change RDB.Dequeue to query multiple queues 2020-01-07 21:55:18 -08:00
Ken Hibino
2af9eb2c88 Delete stale code 2020-01-07 21:55:18 -08:00
Ken Hibino
28d698c24e Update CheckAndEnqueue to enqueue tasks to specified queue 2020-01-07 21:55:18 -08:00
Ken Hibino
1d99d99692 Update comment 2020-01-07 21:55:18 -08:00
Ken Hibino
03cb6eef09 Add Queues field to Config 2020-01-07 21:55:18 -08:00
Ken Hibino
ca78b92078 Add Queue option to allow user to specify queue from client
Added base.QueueKey method to get redis key for given queue name.
Changed asynqtest.GetEnqueuedMessages to optionally take queue name.
2020-01-07 21:55:18 -08:00
Ken Hibino
29ad70c36a [ci skip] Update readme 2020-01-05 09:55:39 -08:00
Ken Hibino
00b03e1287 Add test for payload key not exist 2020-01-05 09:55:39 -08:00
Ken Hibino
f3a23b9b12 Make Task type immutable
This change makes it impossible to mutate payload within Handler or
RetryDelayFunc.
2020-01-05 09:55:39 -08:00
41 changed files with 2454 additions and 970 deletions

5
.gitignore vendored
View File

@@ -15,4 +15,7 @@
/examples
# Ignore command binary
/tools/asynqmon/asynqmon
/tools/asynqmon/asynqmon
# Ignore asynqmon config file
.asynqmon.*

View File

@@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.2.0] - 2020-01-19
### Added
- NewTask constructor
- `Queues` option in `Config` to specify mutiple queues with priority level
- `Client` can schedule a task with `asynq.Queue(name)` to specify which queue to use
- `StrictPriority` option in `Config` to specify whether the priority should be followed strictly
- `RedisConnOpt` to abstract away redis client implementation
- [CLI] `asynqmon rmq` command to remove queue
### Changed
- `Client` and `Background` constructors take `RedisConnOpt` as their first argument.
- `asynqmon stats` now shows the total of all enqueued tasks under "Enqueued"
- `asynqmon stats` now shows each queue's task count
- `asynqmon history` now doesn't take any arguments and shows data from the last 10 days by default (use `--days` flag to change the number of days)
- Task type is now immutable (i.e., Payload is read-only)
## [0.1.0] - 2020-01-04
### Added

133
README.md
View File

@@ -1,6 +1,10 @@
# Asynq [![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq)
# Asynq
Simple, efficent asynchronous task processing library in Go.
[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) [![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq) [![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq) [![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
Simple and efficent asynchronous task processing library in Go.
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before the release of verson 1.0.0.
## Table of Contents
@@ -8,37 +12,41 @@ Simple, efficent asynchronous task processing library in Go.
- [Requirements](#requirements)
- [Installation](#installation)
- [Getting Started](#getting-started)
- [Monitoring CLI](#monitoring-cli)
- [Acknowledgements](#acknowledgements)
- [License](#license)
## Overview
![Gif](/docs/assets/asynqmon_stats.gif)
Asynq provides a simple interface to asynchronous task processing.
Asynq also ships with a CLI to monitor the queues and take manual actions if needed.
It also ships with a tool to monitor the queues and take manual actions if needed.
Asynq provides:
- Clear separation of task producer and consumer
- Ability to schedule task processing in the future
- Automatic retry of failed tasks with exponential backoff
- Automatic failover using Redis sentinels
- Ability to configure max retry count per task
- Ability to configure max number of worker goroutines to process tasks
- Unix signal handling to safely shutdown background processing
- Enhanced reliability TODO(hibiken): link to wiki page describing this.
- CLI to query and mutate queues state for mointoring and administrative purposes
- Support for priority queues
- Unix signal handling to gracefully shutdown background processing
- CLI tool to query and mutate queues state for mointoring and administrative purposes
## Requirements
| Dependency | Version |
| -------------------------------------------------------------- | ------- |
| [Redis](https://redis.io/) | v2.6+ |
| [Go](https://golang.org/) | v1.12+ |
| [github.com/go-redis/redis](https://github.com/go-redis/redis) | v.7.0+ |
| Dependency | Version |
| -------------------------- | ------- |
| [Redis](https://redis.io/) | v2.8+ |
| [Go](https://golang.org/) | v1.12+ |
## Installation
```
go get github.com/hibiken/asynq
go get -u github.com/hibiken/asynq
```
## Getting Started
@@ -49,58 +57,63 @@ go get github.com/hibiken/asynq
import "github.com/hibiken/asynq"
```
2. Create a `Client` instance to create tasks.
2. Asynq uses redis as a message broker.
Use one of `RedisConnOpt` types to specify how to connect to Redis.
```go
func main() {
r := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}
client := asynq.NewClient(r)
t1 := asynq.Task{
Type: "send_welcome_email",
Payload: map[string]interface{}{
"recipient_id": 1234,
},
}
t2 := asynq.Task{
Type: "send_reminder_email",
Payload: map[string]interface{}{
"recipient_id": 1234,
},
}
// process the task immediately.
err := client.Schedule(&t1, time.Now())
// process the task 24 hours later.
err = client.Schedule(&t2, time.Now().Add(24 * time.Hour))
// specify the max number of retry (default: 25)
err = client.Schedule(&t1, time.Now(), asynq.MaxRetry(1))
var redis = &asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}
```
3. Create a `Background` instance to process tasks.
3. Create a `Client` instance to create and schedule tasks.
```go
func main() {
r := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}
bg := asynq.NewBackground(r, &asynq.Config{
Concurrency: 20,
client := asynq.NewClient(redis)
// Create a task with typename and payload.
t1 := asynq.NewTask(
"send_welcome_email",
map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask(
"send_reminder_email",
map[string]interface{}{"user_id": 42})
// Process the task immediately.
err := client.Schedule(t1, time.Now())
// Process the task 24 hours later.
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
// Specify the max number of retry (default: 25)
err = client.Schedule(t1, time.Now(), asynq.MaxRetry(1))
}
```
4. Create a `Background` instance to process tasks.
```go
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
// Blocks until signal TERM or INT is received.
// For graceful shutdown, send signal TSTP to stop processing more tasks
// before sending TERM or INT signal.
// before sending TERM or INT signal to terminate the process.
bg.Run(handler)
}
```
Note that `Client` and `Background` are intended to be used in separate executable binaries.
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
```go
@@ -120,7 +133,7 @@ The simplest way to implement a handler is to define a function with the same si
func handler(t *asynq.Task) error {
switch t.Type {
case "send_welcome_email":
id, err := t.Payload.GetInt("recipient_id")
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
@@ -135,11 +148,8 @@ func handler(t *asynq.Task) error {
}
func main() {
r := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}
bg := asynq.NewBackground(r, &asynq.Config{
Concurrency: 20,
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
// Use asynq.HandlerFunc adapter for a handler function
@@ -147,6 +157,21 @@ func main() {
}
```
## Monitoring CLI
Asynq ships with a CLI tool to inspect the state of queues and tasks.
To install the CLI, run the following command:
go get github.com/hibiken/asynq/tools/asynqmon
For details on how to use the tool, see the [README](/tools/asynqmon/README.md) for the asynqmon CLI.
## Acknowledgements
- [Sidekiq](https://github.com/mperham/sidekiq) : Many of the design ideas are taken from sidekiq and its Web UI
- [Cobra](https://github.com/spf13/cobra) : Asynqmon CLI is built with cobra
## License
Asynq is released under the MIT license. See [LICENSE](https://github.com/hibiken/asynq/blob/master/LICENSE).

136
asynq.go
View File

@@ -4,16 +4,140 @@
package asynq
/*
TODOs:
- [P0] Go docs
*/
import (
"crypto/tls"
"fmt"
"github.com/go-redis/redis/v7"
)
// Task represents a task to be performed.
type Task struct {
// Type indicates the kind of the task to be performed.
// Type indicates the type of task to be performed.
Type string
// Payload holds data needed to process the task.
// Payload holds data needed to perform the task.
Payload Payload
}
// NewTask returns a new Task. The typename and payload argument set Type
// and Payload field respectively.
//
// The payload must be serializable to JSON.
func NewTask(typename string, payload map[string]interface{}) *Task {
return &Task{
Type: typename,
Payload: Payload{payload},
}
}
// RedisConnOpt is a discriminated union of redis-client-option types.
//
// RedisConnOpt represents a sum of following types:
//
// RedisClientOpt | *RedisClientOpt | RedisFailoverClientOpt | *RedisFailoverClientOpt
//
// Passing unexpected type to a RedisConnOpt variable can cause panic.
type RedisConnOpt interface{}
// RedisClientOpt is used to create a redis client that connects
// to a redis server directly.
type RedisClientOpt struct {
// Network type to use, either tcp or unix.
// Default is tcp.
Network string
// Redis server address in "host:port" format.
Addr string
// Redis server password.
Password string
// Redis DB to select after connecting to the server.
// See: https://redis.io/commands/select.
DB int
// Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int
// TLS Config used to connect to the server.
// TLS will be negotiated only if this field is set.
TLSConfig *tls.Config
}
// RedisFailoverClientOpt is used to creates a redis client that talks
// to redis sentinels for service discovery and has automatic failover
// capability.
type RedisFailoverClientOpt struct {
// Redis master name that monitored by sentinels.
MasterName string
// Addresses of sentinels in "host:port" format.
// Use at least three sentinels to avoid problems described in
// https://redis.io/topics/sentinel.
SentinelAddrs []string
// Redis sentinel password.
SentinelPassword string
// Redis server password.
Password string
// Redis DB to select after connecting to the server.
// See: https://redis.io/commands/select.
DB int
// Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int
// TLS Config used to connect to the server.
// TLS will be negotiated only if this field is set.
TLSConfig *tls.Config
}
func createRedisClient(r RedisConnOpt) *redis.Client {
switch r := r.(type) {
case *RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case *RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
default:
panic(fmt.Sprintf("unexpected type %T for RedisConnOpt", r))
}
}

View File

@@ -16,11 +16,17 @@ import (
// This file defines test helper functions used by
// other test files.
// redis used for package testing.
const (
redisAddr = "localhost:6379"
redisDB = 14
)
func setup(tb testing.TB) *redis.Client {
tb.Helper()
r := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 14,
Addr: redisAddr,
DB: redisDB,
})
// Start each test with a clean slate.
h.FlushDB(tb, r)

View File

@@ -15,16 +15,16 @@ import (
"syscall"
"time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
// Background is responsible for managing the background-task processing.
//
// Background manages background queues to process tasks and retry if
// necessary. If the processing of a task is unsuccessful, background will
// schedule it for a retry with an exponential backoff until either the task
// gets processed successfully or it exhausts its max retry count.
// Background manages task queues to process tasks.
// If the processing of a task is unsuccessful, background will
// schedule it for a retry until either the task gets processed successfully
// or it exhausts its max retry count.
//
// Once a task exhausts its retries, it will be moved to the "dead" queue and
// will be kept in the queue for some time until a certain condition is met
@@ -37,11 +37,12 @@ type Background struct {
rdb *rdb.RDB
scheduler *scheduler
processor *processor
syncer *syncer
}
// Config specifies the background-task processing behavior.
type Config struct {
// Maximum number of concurrent workers to process tasks.
// Maximum number of concurrent processing of tasks.
//
// If set to zero or negative value, NewBackground will overwrite the value to one.
Concurrency int
@@ -52,8 +53,33 @@ type Config struct {
//
// n is the number of times the task has been retried.
// e is the error returned by the task handler.
// t is the task in question. t is read-only, the function should not mutate t.
// t is the task in question.
RetryDelayFunc func(n int, e error, t *Task) time.Duration
// List of queues to process with given priority level. Keys are the names of the
// queues and values are associated priority level.
//
// If set to nil or not specified, the background will process only the "default" queue.
//
// Priority is treated as follows to avoid starving low priority queues.
//
// Example:
// Queues: map[string]uint{
// "critical": 6,
// "default": 3,
// "low": 1,
// }
// With the above config and given that all queues are not empty, the tasks
// in "critical", "default", "low" should be processed 60%, 30%, 10% of
// the time respectively.
Queues map[string]uint
// StrictPriority indicates whether the queue priority should be treated strictly.
//
// If set to true, tasks in the queue with the highest priority is processed first.
// The tasks in lower priority queues are processed only when those queues with
// higher priorities are empty.
StrictPriority bool
}
// Formula taken from https://github.com/mperham/sidekiq.
@@ -63,9 +89,13 @@ func defaultDelayFunc(n int, e error, t *Task) time.Duration {
return time.Duration(s) * time.Second
}
// NewBackground returns a new Background instance given a redis client
var defaultQueueConfig = map[string]uint{
base.DefaultQueueName: 1,
}
// NewBackground returns a new Background given a redis connection option
// and background processing configuration.
func NewBackground(r *redis.Client, cfg *Config) *Background {
func NewBackground(r RedisConnOpt, cfg *Config) *Background {
n := cfg.Concurrency
if n < 1 {
n = 1
@@ -74,13 +104,22 @@ func NewBackground(r *redis.Client, cfg *Config) *Background {
if delayFunc == nil {
delayFunc = defaultDelayFunc
}
rdb := rdb.NewRDB(r)
scheduler := newScheduler(rdb, 5*time.Second)
processor := newProcessor(rdb, n, delayFunc)
queues := cfg.Queues
if queues == nil || len(queues) == 0 {
queues = defaultQueueConfig
}
qcfg := normalizeQueueCfg(queues)
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, 5*time.Second)
rdb := rdb.NewRDB(createRedisClient(r))
scheduler := newScheduler(rdb, 5*time.Second, qcfg)
processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh)
return &Background{
rdb: rdb,
scheduler: scheduler,
processor: processor,
syncer: syncer,
}
}
@@ -91,9 +130,6 @@ func NewBackground(r *redis.Client, cfg *Config) *Background {
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried after delay.
//
// Note: The argument task is ready only, ProcessTask should
// not mutate the task.
type Handler interface {
ProcessTask(*Task) error
}
@@ -143,6 +179,7 @@ func (bg *Background) start(handler Handler) {
bg.running = true
bg.processor.handler = handler
bg.syncer.start()
bg.scheduler.start()
bg.processor.start()
}
@@ -157,8 +194,43 @@ func (bg *Background) stop() {
bg.scheduler.terminate()
bg.processor.terminate()
// Note: processor and all worker goroutines need to be exited
// before shutting down syncer to avoid goroutine leak.
bg.syncer.terminate()
bg.rdb.Close()
bg.processor.handler = nil
bg.running = false
}
// normalizeQueueCfg divides priority numbers by their
// greatest common divisor.
func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint {
var xs []uint
for _, x := range queueCfg {
xs = append(xs, x)
}
d := gcd(xs...)
res := make(map[string]uint)
for q, x := range queueCfg {
res[q] = x / d
}
return res
}
func gcd(xs ...uint) uint {
fn := func(x, y uint) uint {
for y > 0 {
x, y = y, x%y
}
return x
}
res := xs[0]
for i := 0; i < len(xs); i++ {
res = fn(xs[i], res)
if res == 1 {
return 1
}
}
return res
}

View File

@@ -8,7 +8,7 @@ import (
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
"go.uber.org/goleak"
)
@@ -17,10 +17,10 @@ func TestBackground(t *testing.T) {
ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNoLeaks(t, ignoreOpt)
r := redis.NewClient(&redis.Options{
r := &RedisClientOpt{
Addr: "localhost:6379",
DB: 15,
})
}
client := NewClient(r)
bg := NewBackground(r, &Config{
Concurrency: 10,
@@ -33,15 +33,89 @@ func TestBackground(t *testing.T) {
bg.start(HandlerFunc(h))
client.Schedule(&Task{
Type: "send_email",
Payload: map[string]interface{}{"recipient_id": 123},
}, time.Now())
client.Schedule(NewTask("send_email", map[string]interface{}{"recipient_id": 123}), time.Now())
client.Schedule(&Task{
Type: "send_email",
Payload: map[string]interface{}{"recipient_id": 456},
}, time.Now().Add(time.Hour))
client.Schedule(NewTask("send_email", map[string]interface{}{"recipient_id": 456}), time.Now().Add(time.Hour))
bg.stop()
}
func TestGCD(t *testing.T) {
tests := []struct {
input []uint
want uint
}{
{[]uint{6, 2, 12}, 2},
{[]uint{3, 3, 3}, 3},
{[]uint{6, 3, 1}, 1},
{[]uint{1}, 1},
{[]uint{1, 0, 2}, 1},
{[]uint{8, 0, 4}, 4},
{[]uint{9, 12, 18, 30}, 3},
}
for _, tc := range tests {
got := gcd(tc.input...)
if got != tc.want {
t.Errorf("gcd(%v) = %d, want %d", tc.input, got, tc.want)
}
}
}
func TestNormalizeQueueCfg(t *testing.T) {
tests := []struct {
input map[string]uint
want map[string]uint
}{
{
input: map[string]uint{
"high": 100,
"default": 20,
"low": 5,
},
want: map[string]uint{
"high": 20,
"default": 4,
"low": 1,
},
},
{
input: map[string]uint{
"default": 10,
},
want: map[string]uint{
"default": 1,
},
},
{
input: map[string]uint{
"critical": 5,
"default": 1,
},
want: map[string]uint{
"critical": 5,
"default": 1,
},
},
{
input: map[string]uint{
"critical": 6,
"default": 3,
"low": 0,
},
want: map[string]uint{
"critical": 2,
"default": 1,
"low": 0,
},
},
}
for _, tc := range tests {
got := normalizeQueueCfg(tc.input)
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("normalizeQueueCfg(%v) = %v, want %v; (-want, +got):\n%s",
tc.input, got, tc.want, diff)
}
}
}

View File

@@ -18,9 +18,13 @@ func BenchmarkEndToEndSimple(b *testing.B) {
const count = 100000
for n := 0; n < b.N; n++ {
b.StopTimer() // begin setup
r := setup(b)
client := NewClient(r)
bg := NewBackground(r, &Config{
setup(b)
redis := &RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
}
client := NewClient(redis)
bg := NewBackground(redis, &Config{
Concurrency: 10,
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
return time.Second
@@ -28,8 +32,8 @@ func BenchmarkEndToEndSimple(b *testing.B) {
})
// Create a bunch of tasks
for i := 0; i < count; i++ {
t := Task{Type: fmt.Sprintf("task%d", i), Payload: Payload{"data": i}}
client.Schedule(&t, time.Now())
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now())
}
var wg sync.WaitGroup
@@ -55,9 +59,13 @@ func BenchmarkEndToEnd(b *testing.B) {
for n := 0; n < b.N; n++ {
b.StopTimer() // begin setup
rand.Seed(time.Now().UnixNano())
r := setup(b)
client := NewClient(r)
bg := NewBackground(r, &Config{
setup(b)
redis := &RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
}
client := NewClient(redis)
bg := NewBackground(redis, &Config{
Concurrency: 10,
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
return time.Second
@@ -65,12 +73,12 @@ func BenchmarkEndToEnd(b *testing.B) {
})
// Create a bunch of tasks
for i := 0; i < count; i++ {
t := Task{Type: fmt.Sprintf("task%d", i), Payload: Payload{"data": i}}
client.Schedule(&t, time.Now())
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now())
}
for i := 0; i < count; i++ {
t := Task{Type: fmt.Sprintf("scheduled%d", i), Payload: Payload{"data": i}}
client.Schedule(&t, time.Now().Add(time.Second))
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now().Add(time.Second))
}
var wg sync.WaitGroup

View File

@@ -5,9 +5,9 @@
package asynq
import (
"strings"
"time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid"
@@ -23,20 +23,23 @@ type Client struct {
rdb *rdb.RDB
}
// NewClient and returns a new Client given a redis configuration.
func NewClient(r *redis.Client) *Client {
rdb := rdb.NewRDB(r)
// NewClient and returns a new Client given a redis connection option.
func NewClient(r RedisConnOpt) *Client {
rdb := rdb.NewRDB(createRedisClient(r))
return &Client{rdb}
}
// Option specifies the processing behavior for the associated task.
// Option specifies the task processing behavior.
type Option interface{}
// max number of times a task will be retried.
type retryOption int
// Internal option representations.
type (
retryOption int
queueOption string
)
// MaxRetry returns an option to specify the max number of times
// a task will be retried.
// the task will be retried.
//
// Negative retry count is treated as zero retry.
func MaxRetry(n int) Option {
@@ -46,18 +49,29 @@ func MaxRetry(n int) Option {
return retryOption(n)
}
// Queue returns an option to specify the queue to enqueue the task into.
//
// Queue name is case-insensitive and the lowercased version is used.
func Queue(name string) Option {
return queueOption(strings.ToLower(name))
}
type option struct {
retry int
queue string
}
func composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
queue: base.DefaultQueueName,
}
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res.retry = int(opt)
case queueOption:
res.queue = string(opt)
default:
// ignore unexpected option
}
@@ -73,17 +87,17 @@ const (
// Schedule registers a task to be processed at the specified time.
//
// Schedule returns nil if the task is registered successfully,
// otherwise returns non-nil error.
// otherwise returns a non-nil error.
//
// opts specifies the behavior of task processing. If there are conflicting
// Option the last one overrides the ones before.
// Option values the last one overrides others.
func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error {
opt := composeOptions(opts...)
msg := &base.TaskMessage{
ID: xid.New(),
Type: task.Type,
Payload: task.Payload,
Queue: "default",
Payload: task.Payload.data,
Queue: opt.queue,
Retry: opt.retry,
}
return c.enqueue(msg, processAt)

View File

@@ -15,16 +15,19 @@ import (
func TestClient(t *testing.T) {
r := setup(t)
client := NewClient(r)
client := NewClient(&RedisClientOpt{
Addr: "localhost:6379",
DB: 14,
})
task := &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
tests := []struct {
desc string
task *Task
processAt time.Time
opts []Option
wantEnqueued []*base.TaskMessage
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []h.ZSetEntry
}{
{
@@ -32,12 +35,14 @@ func TestClient(t *testing.T) {
task: task,
processAt: time.Now(),
opts: []Option{},
wantEnqueued: []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: defaultMaxRetry,
Queue: "default",
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
@@ -52,11 +57,11 @@ func TestClient(t *testing.T) {
{
Msg: &base.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
},
Score: time.Now().Add(2 * time.Hour).Unix(),
Score: float64(time.Now().Add(2 * time.Hour).Unix()),
},
},
},
@@ -67,12 +72,14 @@ func TestClient(t *testing.T) {
opts: []Option{
MaxRetry(3),
},
wantEnqueued: []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: 3,
Queue: "default",
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
@@ -84,12 +91,14 @@ func TestClient(t *testing.T) {
opts: []Option{
MaxRetry(-2),
},
wantEnqueued: []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: 0, // Retry count should be set to zero
Queue: "default",
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
@@ -102,12 +111,52 @@ func TestClient(t *testing.T) {
MaxRetry(2),
MaxRetry(10),
},
wantEnqueued: []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: 10, // Last option takes precedence
Queue: "default",
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "With queue option",
task: task,
processAt: time.Now(),
opts: []Option{
Queue("custom"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"custom": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "custom",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Queue option should be case-insensitive",
task: task,
processAt: time.Now(),
opts: []Option{
Queue("HIGH"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"high": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "high",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
@@ -123,9 +172,11 @@ func TestClient(t *testing.T) {
continue
}
gotEnqueued := h.GetEnqueuedMessages(t, r)
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.DefaultQueue, diff)
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
gotScheduled := h.GetScheduledEntries(t, r)

29
doc.go
View File

@@ -5,16 +5,27 @@
/*
Package asynq provides a framework for background task processing.
Asynq uses Redis as a message broker. To connect to redis server,
specify the options using one of RedisConnOpt types.
redis = &asynq.RedisClientOpt{
Addr: "localhost:6379",
Password: "secretpassword",
DB: 3,
}
The Client is used to register a task to be processed at the specified time.
client := asynq.NewClient(redis)
Task is created with two parameters: its type and payload.
t := asynq.Task{
Type: "send_email",
Payload: map[string]interface{}{"user_id": 42},
}
client := asynq.NewClient(redis)
err := client.Schedule(&t, time.Now().Add(time.Minute))
t := asynq.NewTask(
"send_email",
map[string]interface{}{"user_id": 42})
// Schedule the task t to be processed a minute from now.
err := client.Schedule(t, time.Now().Add(time.Minute))
The Background is used to run the background task processing with a given
handler.
@@ -27,7 +38,7 @@ handler.
Handler is an interface with one method ProcessTask which
takes a task and returns an error. Handler should return nil if
the processing is successful, otherwise return a non-nil error.
If handler returns a non-nil error, the task will be retried in the future.
If handler panics or returns a non-nil error, the task will be retried in the future.
Example of a type that implements the Handler interface.
type TaskHandler struct {
@@ -39,11 +50,9 @@ Example of a type that implements the Handler interface.
case "send_email":
id, err := task.Payload.GetInt("user_id")
// send email
case "generate_thumbnail":
// generate thumbnail image
//...
default:
return fmt.Errorf("unepected task type %q", task.Type)
return fmt.Errorf("unexpected task type %q", task.Type)
}
return nil
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

2
go.mod
View File

@@ -4,7 +4,7 @@ go 1.13
require (
github.com/go-redis/redis/v7 v7.0.0-beta.4
github.com/google/go-cmp v0.3.1
github.com/google/go-cmp v0.4.0
github.com/mitchellh/go-homedir v1.1.0
github.com/pelletier/go-toml v1.6.0 // indirect
github.com/rs/xid v1.2.1

6
go.sum
View File

@@ -39,8 +39,8 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
@@ -185,6 +185,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=

View File

@@ -20,7 +20,7 @@ import (
// ZSetEntry is an entry in redis sorted set.
type ZSetEntry struct {
Msg *base.TaskMessage
Score int64
Score float64
}
// SortMsgOpt is a cmp.Option to sort base.TaskMessage for comparing slice of task messages.
@@ -49,7 +49,19 @@ func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskM
return &base.TaskMessage{
ID: xid.New(),
Type: taskType,
Queue: "default",
Queue: base.DefaultQueueName,
Retry: 25,
Payload: payload,
}
}
// NewTaskMessageWithQueue returns a new instance of TaskMessage given a
// task type, payload and queue name.
func NewTaskMessageWithQueue(taskType string, payload map[string]interface{}, qname string) *base.TaskMessage {
return &base.TaskMessage{
ID: xid.New(),
Type: taskType,
Queue: qname,
Retry: 25,
Payload: payload,
}
@@ -108,10 +120,17 @@ func FlushDB(tb testing.TB, r *redis.Client) {
}
}
// SeedDefaultQueue initializes the default queue with the given messages.
func SeedDefaultQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) {
// SeedEnqueuedQueue initializes the specified queue with the given messages.
//
// If queue name option is not passed, it defaults to the default queue.
func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, queueOpt ...string) {
tb.Helper()
seedRedisList(tb, r, base.DefaultQueue, msgs)
queue := base.DefaultQueue
if len(queueOpt) > 0 {
queue = base.QueueKey(queueOpt[0])
}
r.SAdd(base.AllQueues, queue)
seedRedisList(tb, r, queue, msgs)
}
// SeedInProgressQueue initializes the in-progress queue with the given messages.
@@ -156,10 +175,16 @@ func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []ZSetEntry
}
}
// GetEnqueuedMessages returns all task messages in the default queue.
func GetEnqueuedMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
// GetEnqueuedMessages returns all task messages in the specified queue.
//
// If queue name option is not passed, it defaults to the default queue.
func GetEnqueuedMessages(tb testing.TB, r *redis.Client, queueOpt ...string) []*base.TaskMessage {
tb.Helper()
return getListMessages(tb, r, base.DefaultQueue)
queue := base.DefaultQueue
if len(queueOpt) > 0 {
queue = base.QueueKey(queueOpt[0])
}
return getListMessages(tb, r, queue)
}
// GetInProgressMessages returns all task messages in the in-progress queue.
@@ -220,7 +245,7 @@ func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []ZSetEntry {
for _, z := range data {
entries = append(entries, ZSetEntry{
Msg: MustUnmarshal(tb, z.Member.(string)),
Score: int64(z.Score),
Score: z.Score,
})
}
return entries

View File

@@ -6,24 +6,34 @@
package base
import (
"strings"
"time"
"github.com/rs/xid"
)
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"
// Redis keys
const (
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
DefaultQueue = QueuePrefix + "default" // LIST
ScheduledQueue = "asynq:scheduled" // ZSET
RetryQueue = "asynq:retry" // ZSET
DeadQueue = "asynq:dead" // ZSET
InProgressQueue = "asynq:in_progress" // LIST
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
AllQueues = "asynq:queues" // SET
DefaultQueue = QueuePrefix + DefaultQueueName // LIST
ScheduledQueue = "asynq:scheduled" // ZSET
RetryQueue = "asynq:retry" // ZSET
DeadQueue = "asynq:dead" // ZSET
InProgressQueue = "asynq:in_progress" // LIST
)
// ProcessedKey returns a redis key string for procesed count
// QueueKey returns a redis key string for the given queue name.
func QueueKey(qname string) string {
return QueuePrefix + strings.ToLower(qname)
}
// ProcessedKey returns a redis key string for processed count
// for the given day.
func ProcessedKey(t time.Time) string {
return processedPrefix + t.UTC().Format("2006-01-02")

View File

@@ -9,6 +9,22 @@ import (
"time"
)
func TestQueueKey(t *testing.T) {
tests := []struct {
qname string
want string
}{
{"custom", "asynq:queues:custom"},
}
for _, tc := range tests {
got := QueueKey(tc.qname)
if got != tc.want {
t.Errorf("QueueKey(%q) = %q, want %q", tc.qname, got, tc.want)
}
}
}
func TestProcessedKey(t *testing.T) {
tests := []struct {
input time.Time

View File

@@ -25,6 +25,7 @@ type Stats struct {
Dead int
Processed int
Failed int
Queues map[string]int // map of queue name to number of tasks in the queue (e.g., "default": 100, "critical": 20)
Timestamp time.Time
}
@@ -40,6 +41,7 @@ type EnqueuedTask struct {
ID xid.ID
Type string
Payload map[string]interface{}
Queue string
}
// InProgressTask is a task that's currently being processed.
@@ -56,6 +58,7 @@ type ScheduledTask struct {
Payload map[string]interface{}
ProcessAt time.Time
Score int64
Queue string
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
@@ -69,6 +72,7 @@ type RetryTask struct {
Retried int
Retry int
Score int64
Queue string
}
// DeadTask is a task in that has exhausted all retries.
@@ -79,11 +83,12 @@ type DeadTask struct {
LastFailedAt time.Time
ErrorMsg string
Score int64
Queue string
}
// CurrentStats returns a current state of the queues.
func (r *RDB) CurrentStats() (*Stats, error) {
// KEYS[1] -> asynq:queues:default
// KEYS[1] -> asynq:queues
// KEYS[2] -> asynq:in_progress
// KEYS[3] -> asynq:scheduled
// KEYS[4] -> asynq:retry
@@ -91,27 +96,40 @@ func (r *RDB) CurrentStats() (*Stats, error) {
// KEYS[6] -> asynq:processed:<yyyy-mm-dd>
// KEYS[7] -> asynq:failure:<yyyy-mm-dd>
script := redis.NewScript(`
local qlen = redis.call("LLEN", KEYS[1])
local plen = redis.call("LLEN", KEYS[2])
local slen = redis.call("ZCARD", KEYS[3])
local rlen = redis.call("ZCARD", KEYS[4])
local dlen = redis.call("ZCARD", KEYS[5])
local res = {}
local queues = redis.call("SMEMBERS", KEYS[1])
for _, qkey in ipairs(queues) do
table.insert(res, qkey)
table.insert(res, redis.call("LLEN", qkey))
end
table.insert(res, KEYS[2])
table.insert(res, redis.call("LLEN", KEYS[2]))
table.insert(res, KEYS[3])
table.insert(res, redis.call("ZCARD", KEYS[3]))
table.insert(res, KEYS[4])
table.insert(res, redis.call("ZCARD", KEYS[4]))
table.insert(res, KEYS[5])
table.insert(res, redis.call("ZCARD", KEYS[5]))
local pcount = 0
local p = redis.call("GET", KEYS[6])
if p then
pcount = tonumber(p)
end
table.insert(res, "processed")
table.insert(res, pcount)
local fcount = 0
local f = redis.call("GET", KEYS[7])
if f then
fcount = tonumber(f)
end
return {qlen, plen, slen, rlen, dlen, pcount, fcount}
table.insert(res, "failed")
table.insert(res, fcount)
return res
`)
now := time.Now()
res, err := script.Run(r.client, []string{
base.DefaultQueue,
base.AllQueues,
base.InProgressQueue,
base.ScheduledQueue,
base.RetryQueue,
@@ -122,20 +140,37 @@ func (r *RDB) CurrentStats() (*Stats, error) {
if err != nil {
return nil, err
}
nums, err := cast.ToIntSliceE(res)
data, err := cast.ToSliceE(res)
if err != nil {
return nil, err
}
return &Stats{
Enqueued: nums[0],
InProgress: nums[1],
Scheduled: nums[2],
Retry: nums[3],
Dead: nums[4],
Processed: nums[5],
Failed: nums[6],
Timestamp: now,
}, nil
stats := &Stats{
Queues: make(map[string]int),
Timestamp: now,
}
for i := 0; i < len(data); i += 2 {
key := cast.ToString(data[i])
val := cast.ToInt(data[i+1])
switch {
case strings.HasPrefix(key, base.QueuePrefix):
stats.Enqueued += val
stats.Queues[strings.TrimPrefix(key, base.QueuePrefix)] = val
case key == base.InProgressQueue:
stats.InProgress = val
case key == base.ScheduledQueue:
stats.Scheduled = val
case key == base.RetryQueue:
stats.Retry = val
case key == base.DeadQueue:
stats.Dead = val
case key == "processed":
stats.Processed = val
case key == "failed":
stats.Failed = val
}
}
return stats, nil
}
// HistoricalStats returns a list of stats from the last n days.
@@ -200,24 +235,79 @@ func (r *RDB) RedisInfo() (map[string]string, error) {
return info, nil
}
// ListEnqueued returns all enqueued tasks that are ready to be processed.
func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
data, err := r.client.LRange(base.DefaultQueue, 0, -1).Result()
// ListEnqueued returns enqueued tasks that are ready to be processed.
//
// Queue names can be optionally passed to query only the specified queues.
// If none are passed, it will query all queues.
func (r *RDB) ListEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
if len(qnames) == 0 {
return r.listAllEnqueued()
}
return r.listEnqueued(qnames...)
}
func (r *RDB) listAllEnqueued() ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
local queues = redis.call("SMEMBERS", KEYS[1])
for _, qkey in ipairs(queues) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
res, err := script.Run(r.client, []string{base.AllQueues}).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func (r *RDB) listEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
for _, qkey in ipairs(KEYS) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
var keys []string
for _, q := range qnames {
keys = append(keys, base.QueueKey(q))
}
res, err := script.Run(r.client, keys).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) {
var tasks []*EnqueuedTask
for _, s := range data {
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
// continue // bad data, ignore and continue
return nil, err
continue // bad data, ignore and continue
}
tasks = append(tasks, &EnqueuedTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
Queue: msg.Queue,
})
}
return tasks, nil
@@ -268,6 +358,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
Queue: msg.Queue,
ProcessAt: processAt,
Score: int64(z.Score),
})
@@ -301,6 +392,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
ErrorMsg: msg.ErrorMsg,
Retry: msg.Retry,
Retried: msg.Retried,
Queue: msg.Queue,
ProcessAt: processAt,
Score: int64(z.Score),
})
@@ -331,6 +423,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) {
Type: msg.Type,
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
Queue: msg.Queue,
LastFailedAt: lastFailedAt,
Score: int64(z.Score),
})
@@ -405,13 +498,14 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
redis.call("LPUSH", KEYS[2], msg)
local qkey = ARGV[3] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
return 1
end
end
return 0
`)
res, err := script.Run(r.client, []string{zset, base.DefaultQueue}, score, id).Result()
res, err := script.Run(r.client, []string{zset}, score, id, base.QueuePrefix).Result()
if err != nil {
return 0, err
}
@@ -427,11 +521,13 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("ZREM", KEYS[1], msg)
redis.call("LPUSH", KEYS[2], msg)
local decoded = cjson.decode(msg)
local qkey = ARGV[1] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
end
return table.getn(msgs)
`)
res, err := script.Run(r.client, []string{zset, base.DefaultQueue}).Result()
res, err := script.Run(r.client, []string{zset}, base.QueuePrefix).Result()
if err != nil {
return 0, err
}
@@ -610,3 +706,68 @@ func (r *RDB) DeleteAllRetryTasks() error {
func (r *RDB) DeleteAllScheduledTasks() error {
return r.client.Del(base.ScheduledQueue).Err()
}
// ErrQueueNotFound indicates specified queue does not exist.
type ErrQueueNotFound struct {
qname string
}
func (e *ErrQueueNotFound) Error() string {
return fmt.Sprintf("queue %q does not exist", e.qname)
}
// ErrQueueNotEmpty indicates specified queue is not empty.
type ErrQueueNotEmpty struct {
qname string
}
func (e *ErrQueueNotEmpty) Error() string {
return fmt.Sprintf("queue %q is not empty", e.qname)
}
// RemoveQueue removes the specified queue.
//
// If force is set to true, it will remove the queue regardless
// of whether the queue is empty.
// If force is set to false, it will only remove the queue if
// it is empty.
func (r *RDB) RemoveQueue(qname string, force bool) error {
var script *redis.Script
if force {
script = redis.NewScript(`
local n = redis.call("SREM", KEYS[1], KEYS[2])
if n == 0 then
return redis.error_reply("LIST NOT FOUND")
end
redis.call("DEL", KEYS[2])
return redis.status_reply("OK")
`)
} else {
script = redis.NewScript(`
local l = redis.call("LLEN", KEYS[2])
if l > 0 then
return redis.error_reply("LIST NOT EMPTY")
end
local n = redis.call("SREM", KEYS[1], KEYS[2])
if n == 0 then
return redis.error_reply("LIST NOT FOUND")
end
redis.call("DEL", KEYS[2])
return redis.status_reply("OK")
`)
}
err := script.Run(r.client,
[]string{base.AllQueues, base.QueueKey(qname)},
force).Err()
if err != nil {
switch err.Error() {
case "LIST NOT FOUND":
return &ErrQueueNotFound{qname}
case "LIST NOT EMPTY":
return &ErrQueueNotEmpty{qname}
default:
return err
}
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -13,11 +13,12 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
"github.com/spf13/cast"
)
var (
// ErrDequeueTimeout indicates that the blocking dequeue operation timed out.
ErrDequeueTimeout = errors.New("blocking dequeue operation timed out")
// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
ErrNoProcessableTask = errors.New("no tasks are ready for processing")
// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
ErrTaskNotFound = errors.New("could not find a task")
@@ -46,18 +47,32 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
if err != nil {
return err
}
qname := base.QueuePrefix + msg.Queue
return r.client.LPush(qname, string(bytes)).Err()
key := base.QueueKey(msg.Queue)
script := redis.NewScript(`
redis.call("LPUSH", KEYS[1], ARGV[1])
redis.call("SADD", KEYS[2], KEYS[1])
return 1
`)
return script.Run(r.client, []string{key, base.AllQueues}, string(bytes)).Err()
}
// Dequeue blocks until there is a task available to be processed,
// once a task is available, it adds the task to "in progress" queue
// and returns the task. If there are no tasks for the entire timeout
// duration, it returns ErrDequeueTimeout.
func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) {
data, err := r.client.BRPopLPush(base.DefaultQueue, base.InProgressQueue, timeout).Result()
// Dequeue queries given queues in order and pops a task message if there
// is one and returns it. If all queues are empty, ErrNoProcessableTask
// error is returned.
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
var data string
var err error
if len(qnames) == 1 {
data, err = r.dequeueSingle(base.QueueKey(qnames[0]))
} else {
var keys []string
for _, q := range qnames {
keys = append(keys, base.QueueKey(q))
}
data, err = r.dequeue(keys...)
}
if err == redis.Nil {
return nil, ErrDequeueTimeout
return nil, ErrNoProcessableTask
}
if err != nil {
return nil, err
@@ -70,6 +85,33 @@ func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) {
return &msg, nil
}
func (r *RDB) dequeueSingle(queue string) (data string, err error) {
// timeout needed to avoid blocking forever
return r.client.BRPopLPush(queue, base.InProgressQueue, time.Second).Result()
}
func (r *RDB) dequeue(queues ...string) (data string, err error) {
var args []interface{}
for _, qkey := range queues {
args = append(args, qkey)
}
script := redis.NewScript(`
local res
for _, qkey in ipairs(ARGV) do
res = redis.call("RPOPLPUSH", qkey, KEYS[1])
if res then
return res
end
end
return res
`)
res, err := script.Run(r.client, []string{base.InProgressQueue}, args...).Result()
if err != nil {
return "", err
}
return cast.ToStringE(res)
}
// Done removes the task from in-progress queue to mark the task as done.
func (r *RDB) Done(msg *base.TaskMessage) error {
bytes, err := json.Marshal(msg)
@@ -77,26 +119,12 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
return err
}
// Note: LREM count ZERO means "remove all elements equal to val"
// Note: Script will try removing the message by exact match first,
// if the task is mutated and exact match is not found, it'll fallback
// to finding a match with ID.
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:processed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp
script := redis.NewScript(`
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
if tonumber(x) == 0 then
local target = cjson.decode(ARGV[1])
local data = redis.call("LRANGE", KEYS[1], 0, -1)
for _, s in ipairs(data) do
local msg = cjson.decode(s)
if target["ID"] == msg["ID"] then
redis.call("LREM", KEYS[1], 0, s)
break
end
end
end
redis.call("LREM", KEYS[1], 0, ARGV[1])
local n = redis.call("INCR", KEYS[2])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[2], ARGV[2])
@@ -157,9 +185,6 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
if err != nil {
return err
}
// Note: Script will try removing the message by exact match first,
// if the task is mutated and exact match is not found, it'll fallback
// to finding a match with ID.
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:retry
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
@@ -169,18 +194,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
// ARGV[3] -> retry_at UNIX timestamp
// ARGV[4] -> stats expiration timestamp
script := redis.NewScript(`
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
if tonumber(x) == 0 then
local target = cjson.decode(ARGV[1])
local data = redis.call("LRANGE", KEYS[1], 0, -1)
for _, s in ipairs(data) do
local msg = cjson.decode(s)
if target["ID"] == msg["ID"] then
redis.call("LREM", KEYS[1], 0, s)
break
end
end
end
redis.call("LREM", KEYS[1], 0, ARGV[1])
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
local n = redis.call("INCR", KEYS[3])
if tonumber(n) == 1 then
@@ -225,9 +239,6 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
processedKey := base.ProcessedKey(now)
failureKey := base.FailureKey(now)
expireAt := now.Add(statsTTL)
// Note: Script will try removing the message by exact match first,
// if the task is mutated and exact match is not found, it'll fallback
// to finding a match with ID.
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:dead
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
@@ -239,18 +250,7 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
// ARGV[6] -> stats expiration timestamp
script := redis.NewScript(`
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
if tonumber(x) == 0 then
local target = cjson.decode(ARGV[1])
local data = redis.call("LRANGE", KEYS[1], 0, -1)
for _, s in ipairs(data) do
local msg = cjson.decode(s)
if target["ID"] == msg["ID"] then
redis.call("LREM", KEYS[1], 0, s)
break
end
end
end
redis.call("LREM", KEYS[1], 0, ARGV[1])
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
@@ -292,10 +292,18 @@ func (r *RDB) RestoreUnfinished() (int64, error) {
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
// have to be processed.
func (r *RDB) CheckAndEnqueue() error {
//
// qnames specifies to which queues to send tasks.
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
delayed := []string{base.ScheduledQueue, base.RetryQueue}
for _, zset := range delayed {
if err := r.forward(zset); err != nil {
var err error
if len(qnames) == 1 {
err = r.forwardSingle(zset, base.QueueKey(qnames[0]))
} else {
err = r.forward(zset)
}
if err != nil {
return err
}
}
@@ -303,8 +311,26 @@ func (r *RDB) CheckAndEnqueue() error {
}
// forward moves all tasks with a score less than the current unix time
// from the given zset to the default queue.
func (r *RDB) forward(from string) error {
// from the src zset.
func (r *RDB) forward(src string) error {
script := redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do
redis.call("ZREM", KEYS[1], msg)
local decoded = cjson.decode(msg)
local qkey = ARGV[2] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
end
return msgs
`)
now := float64(time.Now().Unix())
return script.Run(r.client,
[]string{src}, now, base.QueuePrefix).Err()
}
// forwardSingle moves all tasks with a score less than the current unix time
// from the src zset to dst list.
func (r *RDB) forwardSingle(src, dst string) error {
script := redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do
@@ -315,5 +341,5 @@ func (r *RDB) forward(from string) error {
`)
now := float64(time.Now().Unix())
return script.Run(r.client,
[]string{from, base.DefaultQueue}, now).Err()
[]string{src, dst}, now).Err()
}

View File

@@ -29,12 +29,18 @@ func setup(t *testing.T) *RDB {
func TestEnqueue(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})
t2 := h.NewTaskMessage("generate_csv", map[string]interface{}{})
t2.Queue = "csv"
t3 := h.NewTaskMessage("sync", nil)
t3.Queue = "low"
tests := []struct {
msg *base.TaskMessage
}{
{h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})},
{h.NewTaskMessage("generate_csv", map[string]interface{}{})},
{h.NewTaskMessage("sync", nil)},
{t1},
{t2},
{t3},
}
for _, tc := range tests {
@@ -42,54 +48,132 @@ func TestEnqueue(t *testing.T) {
err := r.Enqueue(tc.msg)
if err != nil {
t.Errorf("(*RDB).Enqueue = %v, want nil", err)
continue
t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err)
}
gotEnqueued := h.GetEnqueuedMessages(t, r.client)
qkey := base.QueueKey(tc.msg.Queue)
gotEnqueued := h.GetEnqueuedMessages(t, r.client, tc.msg.Queue)
if len(gotEnqueued) != 1 {
t.Errorf("%q has length %d, want 1", base.DefaultQueue, len(gotEnqueued))
t.Errorf("%q has length %d, want 1", qkey, len(gotEnqueued))
continue
}
if diff := cmp.Diff(tc.msg, gotEnqueued[0]); diff != "" {
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
}
if !r.client.SIsMember(base.AllQueues, qkey).Val() {
t.Errorf("%q is not a member of SET %q", qkey, base.AllQueues)
}
}
}
func TestDequeue(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"})
t2 := h.NewTaskMessage("export_csv", nil)
t3 := h.NewTaskMessage("reindex", nil)
tests := []struct {
enqueued []*base.TaskMessage
enqueued map[string][]*base.TaskMessage
args []string // list of queues to query
want *base.TaskMessage
err error
wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage
}{
{
enqueued: []*base.TaskMessage{t1},
want: t1,
err: nil,
enqueued: map[string][]*base.TaskMessage{
"default": {t1},
},
args: []string{"default"},
want: t1,
err: nil,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
},
wantInProgress: []*base.TaskMessage{t1},
},
{
enqueued: []*base.TaskMessage{},
want: nil,
err: ErrDequeueTimeout,
enqueued: map[string][]*base.TaskMessage{
"default": {},
},
args: []string{"default"},
want: nil,
err: ErrNoProcessableTask,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
},
wantInProgress: []*base.TaskMessage{},
},
{
enqueued: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {t2},
"low": {t3},
},
args: []string{"critical", "default", "low"},
want: t2,
err: nil,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {},
"low": {t3},
},
wantInProgress: []*base.TaskMessage{t2},
},
{
enqueued: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {},
"low": {t2, t3},
},
args: []string{"critical", "default", "low"},
want: t1,
err: nil,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {t2, t3},
},
wantInProgress: []*base.TaskMessage{t1},
},
{
enqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
args: []string{"critical", "default", "low"},
want: nil,
err: ErrNoProcessableTask,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
wantInProgress: []*base.TaskMessage{},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDefaultQueue(t, r.client, tc.enqueued)
for queue, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
}
got, err := r.Dequeue(time.Second)
got, err := r.Dequeue(tc.args...)
if !cmp.Equal(got, tc.want) || err != tc.err {
t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v",
got, err, tc.want, tc.err)
t.Errorf("(*RDB).Dequeue(%v) = %v, %v; want %v, %v",
tc.args, got, err, tc.want, tc.err)
continue
}
for queue, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
}
}
gotInProgress := h.GetInProgressMessages(t, r.client)
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff)
@@ -148,64 +232,6 @@ func TestDone(t *testing.T) {
}
}
// Note: User should not mutate task payload in Handler
// However, we should handle even if the user mutates the task
// in Handler. This test case is to make sure that we remove task
// from in-progress queue when we call Done for the task.
func TestDoneWithMutatedTask(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
t2 := h.NewTaskMessage("export_csv", map[string]interface{}{"subjct": "hola"})
tests := []struct {
inProgress []*base.TaskMessage // initial state of the in-progress list
target *base.TaskMessage // task to remove
wantInProgress []*base.TaskMessage // final state of the in-progress list
}{
{
inProgress: []*base.TaskMessage{t1, t2},
target: t1,
wantInProgress: []*base.TaskMessage{t2},
},
{
inProgress: []*base.TaskMessage{t1},
target: t1,
wantInProgress: []*base.TaskMessage{},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress)
// Mutate payload map!
tc.target.Payload["newkey"] = 123
err := r.Done(tc.target)
if err != nil {
t.Errorf("(*RDB).Done(task) = %v, want nil", err)
continue
}
gotInProgress := h.GetInProgressMessages(t, r.client)
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff)
continue
}
processedKey := base.ProcessedKey(time.Now())
gotProcessed := r.client.Get(processedKey).Val()
if gotProcessed != "1" {
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
}
gotTTL := r.client.TTL(processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
}
}
}
func TestRequeue(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", nil)
@@ -236,7 +262,7 @@ func TestRequeue(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDefaultQueue(t, r.client, tc.enqueued)
h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
h.SeedInProgressQueue(t, r.client, tc.inProgress)
err := r.Requeue(tc.target)
@@ -282,8 +308,8 @@ func TestSchedule(t *testing.T) {
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue)
continue
}
if gotScheduled[0].Score != tc.processAt.Unix() {
t.Errorf("%s inserted an item with score %d, want %d", desc, gotScheduled[0].Score, tc.processAt.Unix())
if int64(gotScheduled[0].Score) != tc.processAt.Unix() {
t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix())
continue
}
}
@@ -321,7 +347,7 @@ func TestRetry(t *testing.T) {
retry: []h.ZSetEntry{
{
Msg: t3,
Score: now.Add(time.Minute).Unix(),
Score: float64(now.Add(time.Minute).Unix()),
},
},
msg: t1,
@@ -331,11 +357,11 @@ func TestRetry(t *testing.T) {
wantRetry: []h.ZSetEntry{
{
Msg: t1AfterRetry,
Score: now.Add(5 * time.Minute).Unix(),
Score: float64(now.Add(5 * time.Minute).Unix()),
},
{
Msg: t3,
Score: now.Add(time.Minute).Unix(),
Score: float64(now.Add(time.Minute).Unix()),
},
},
},
@@ -384,104 +410,6 @@ func TestRetry(t *testing.T) {
}
}
func TestRetryWithMutatedTask(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"})
t2 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image.jpg"})
t3 := h.NewTaskMessage("reindex", map[string]interface{}{})
t1.Retried = 10
errMsg := "SMTP server is not responding"
t1AfterRetry := &base.TaskMessage{
ID: t1.ID,
Type: t1.Type,
Payload: t1.Payload,
Queue: t1.Queue,
Retry: t1.Retry,
Retried: t1.Retried + 1,
ErrorMsg: errMsg,
}
now := time.Now()
tests := []struct {
inProgress []*base.TaskMessage
retry []h.ZSetEntry
msg *base.TaskMessage
processAt time.Time
errMsg string
wantInProgress []*base.TaskMessage
wantRetry []h.ZSetEntry
}{
{
inProgress: []*base.TaskMessage{t1, t2},
retry: []h.ZSetEntry{
{
Msg: t3,
Score: now.Add(time.Minute).Unix(),
},
},
msg: t1,
processAt: now.Add(5 * time.Minute),
errMsg: errMsg,
wantInProgress: []*base.TaskMessage{t2},
wantRetry: []h.ZSetEntry{
{
Msg: t1AfterRetry,
Score: now.Add(5 * time.Minute).Unix(),
},
{
Msg: t3,
Score: now.Add(time.Minute).Unix(),
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedInProgressQueue(t, r.client, tc.inProgress)
h.SeedRetryQueue(t, r.client, tc.retry)
// Mutate paylod map!
tc.msg.Payload["newkey"] = "newvalue"
err := r.Retry(tc.msg, tc.processAt, tc.errMsg)
if err != nil {
t.Errorf("(*RDB).Retry = %v, want nil", err)
continue
}
gotInProgress := h.GetInProgressMessages(t, r.client)
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.InProgressQueue, diff)
}
gotRetry := h.GetRetryEntries(t, r.client)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff)
}
processedKey := base.ProcessedKey(time.Now())
gotProcessed := r.client.Get(processedKey).Val()
if gotProcessed != "1" {
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
}
gotTTL := r.client.TTL(processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
}
failureKey := base.FailureKey(time.Now())
gotFailure := r.client.Get(failureKey).Val()
if gotFailure != "1" {
t.Errorf("GET %q = %q, want 1", failureKey, gotFailure)
}
gotTTL = r.client.TTL(processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL)
}
}
}
func TestKill(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", nil)
@@ -512,7 +440,7 @@ func TestKill(t *testing.T) {
dead: []h.ZSetEntry{
{
Msg: t3,
Score: now.Add(-time.Hour).Unix(),
Score: float64(now.Add(-time.Hour).Unix()),
},
},
target: t1,
@@ -520,11 +448,11 @@ func TestKill(t *testing.T) {
wantDead: []h.ZSetEntry{
{
Msg: t1AfterKill,
Score: now.Unix(),
Score: float64(now.Unix()),
},
{
Msg: t3,
Score: now.Add(-time.Hour).Unix(),
Score: float64(now.Add(-time.Hour).Unix()),
},
},
},
@@ -536,7 +464,7 @@ func TestKill(t *testing.T) {
wantDead: []h.ZSetEntry{
{
Msg: t1AfterKill,
Score: now.Unix(),
Score: float64(now.Unix()),
},
},
},
@@ -585,112 +513,6 @@ func TestKill(t *testing.T) {
}
}
func TestKillWithMutatedTask(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
t2 := h.NewTaskMessage("reindex", map[string]interface{}{})
t3 := h.NewTaskMessage("generate_csv", map[string]interface{}{"path": "some/path/to/img"})
errMsg := "SMTP server not responding"
t1AfterKill := &base.TaskMessage{
ID: t1.ID,
Type: t1.Type,
Payload: t1.Payload,
Queue: t1.Queue,
Retry: t1.Retry,
Retried: t1.Retried,
ErrorMsg: errMsg,
}
now := time.Now()
// TODO(hibiken): add test cases for trimming
tests := []struct {
inProgress []*base.TaskMessage
dead []h.ZSetEntry
target *base.TaskMessage // task to kill
wantInProgress []*base.TaskMessage
wantDead []h.ZSetEntry
}{
{
inProgress: []*base.TaskMessage{t1, t2},
dead: []h.ZSetEntry{
{
Msg: t3,
Score: now.Add(-time.Hour).Unix(),
},
},
target: t1,
wantInProgress: []*base.TaskMessage{t2},
wantDead: []h.ZSetEntry{
{
Msg: t1AfterKill,
Score: now.Unix(),
},
{
Msg: t3,
Score: now.Add(-time.Hour).Unix(),
},
},
},
{
inProgress: []*base.TaskMessage{t1, t2, t3},
dead: []h.ZSetEntry{},
target: t1,
wantInProgress: []*base.TaskMessage{t2, t3},
wantDead: []h.ZSetEntry{
{
Msg: t1AfterKill,
Score: now.Unix(),
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress)
h.SeedDeadQueue(t, r.client, tc.dead)
// Mutate payload map!
tc.target.Payload["newkey"] = "newvalue"
err := r.Kill(tc.target, errMsg)
if err != nil {
t.Errorf("(*RDB).Kill(%v, %v) = %v, want nil", tc.target, errMsg, err)
continue
}
gotInProgress := h.GetInProgressMessages(t, r.client)
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.InProgressQueue, diff)
}
gotDead := h.GetDeadEntries(t, r.client)
if diff := cmp.Diff(tc.wantDead, gotDead, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff)
}
processedKey := base.ProcessedKey(time.Now())
gotProcessed := r.client.Get(processedKey).Val()
if gotProcessed != "1" {
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
}
gotTTL := r.client.TTL(processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
}
failureKey := base.FailureKey(time.Now())
gotFailure := r.client.Get(failureKey).Val()
if gotFailure != "1" {
t.Errorf("GET %q = %q, want 1", failureKey, gotFailure)
}
gotTTL = r.client.TTL(processedKey).Val()
if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL)
}
}
}
func TestRestoreUnfinished(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", nil)
@@ -730,7 +552,7 @@ func TestRestoreUnfinished(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress)
h.SeedDefaultQueue(t, r.client, tc.enqueued)
h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
got, err := r.RestoreUnfinished()
if got != tc.want || err != nil {
@@ -755,47 +577,77 @@ func TestCheckAndEnqueue(t *testing.T) {
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("generate_csv", nil)
t3 := h.NewTaskMessage("gen_thumbnail", nil)
t4 := h.NewTaskMessage("important_task", nil)
t4.Queue = "critical"
t5 := h.NewTaskMessage("minor_task", nil)
t5.Queue = "low"
secondAgo := time.Now().Add(-time.Second)
hourFromNow := time.Now().Add(time.Hour)
tests := []struct {
scheduled []h.ZSetEntry
retry []h.ZSetEntry
wantQueued []*base.TaskMessage
qnames []string
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []*base.TaskMessage
wantRetry []*base.TaskMessage
}{
{
scheduled: []h.ZSetEntry{
{Msg: t1, Score: secondAgo.Unix()},
{Msg: t2, Score: secondAgo.Unix()},
{Msg: t1, Score: float64(secondAgo.Unix())},
{Msg: t2, Score: float64(secondAgo.Unix())},
},
retry: []h.ZSetEntry{
{Msg: t3, Score: secondAgo.Unix()}},
wantQueued: []*base.TaskMessage{t1, t2, t3},
{Msg: t3, Score: float64(secondAgo.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
},
wantScheduled: []*base.TaskMessage{},
wantRetry: []*base.TaskMessage{},
},
{
scheduled: []h.ZSetEntry{
{Msg: t1, Score: hourFromNow.Unix()},
{Msg: t2, Score: secondAgo.Unix()}},
{Msg: t1, Score: float64(hourFromNow.Unix())},
{Msg: t2, Score: float64(secondAgo.Unix())}},
retry: []h.ZSetEntry{
{Msg: t3, Score: secondAgo.Unix()}},
wantQueued: []*base.TaskMessage{t2, t3},
{Msg: t3, Score: float64(secondAgo.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {t2, t3},
},
wantScheduled: []*base.TaskMessage{t1},
wantRetry: []*base.TaskMessage{},
},
{
scheduled: []h.ZSetEntry{
{Msg: t1, Score: hourFromNow.Unix()},
{Msg: t2, Score: hourFromNow.Unix()}},
{Msg: t1, Score: float64(hourFromNow.Unix())},
{Msg: t2, Score: float64(hourFromNow.Unix())}},
retry: []h.ZSetEntry{
{Msg: t3, Score: hourFromNow.Unix()}},
wantQueued: []*base.TaskMessage{},
{Msg: t3, Score: float64(hourFromNow.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
},
wantScheduled: []*base.TaskMessage{t1, t2},
wantRetry: []*base.TaskMessage{t3},
},
{
scheduled: []h.ZSetEntry{
{Msg: t1, Score: float64(secondAgo.Unix())},
{Msg: t4, Score: float64(secondAgo.Unix())},
},
retry: []h.ZSetEntry{
{Msg: t5, Score: float64(secondAgo.Unix())}},
qnames: []string{"default", "critical", "low"},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {t4},
"low": {t5},
},
wantScheduled: []*base.TaskMessage{},
wantRetry: []*base.TaskMessage{},
},
}
for _, tc := range tests {
@@ -803,15 +655,17 @@ func TestCheckAndEnqueue(t *testing.T) {
h.SeedScheduledQueue(t, r.client, tc.scheduled)
h.SeedRetryQueue(t, r.client, tc.retry)
err := r.CheckAndEnqueue()
err := r.CheckAndEnqueue(tc.qnames...)
if err != nil {
t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err)
continue
}
gotEnqueued := h.GetEnqueuedMessages(t, r.client)
if diff := cmp.Diff(tc.wantQueued, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DefaultQueue, diff)
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
gotScheduled := h.GetScheduledMessages(t, r.client)

View File

@@ -11,9 +11,10 @@ import (
"github.com/spf13/cast"
)
// Payload is an arbitrary data needed for task execution.
// The values have to be JSON serializable.
type Payload map[string]interface{}
// Payload holds arbitrary data needed for task execution.
type Payload struct {
data map[string]interface{}
}
type errKeyNotFound struct {
key string
@@ -25,14 +26,14 @@ func (e *errKeyNotFound) Error() string {
// Has reports whether key exists.
func (p Payload) Has(key string) bool {
_, ok := p[key]
_, ok := p.data[key]
return ok
}
// GetString returns a string value if a string type is associated with
// the key, otherwise reports an error.
func (p Payload) GetString(key string) (string, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return "", &errKeyNotFound{key}
}
@@ -42,7 +43,7 @@ func (p Payload) GetString(key string) (string, error) {
// GetInt returns an int value if a numeric type is associated with
// the key, otherwise reports an error.
func (p Payload) GetInt(key string) (int, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return 0, &errKeyNotFound{key}
}
@@ -52,7 +53,7 @@ func (p Payload) GetInt(key string) (int, error) {
// GetFloat64 returns a float64 value if a numeric type is associated with
// the key, otherwise reports an error.
func (p Payload) GetFloat64(key string) (float64, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return 0, &errKeyNotFound{key}
}
@@ -62,7 +63,7 @@ func (p Payload) GetFloat64(key string) (float64, error) {
// GetBool returns a boolean value if a boolean type is associated with
// the key, otherwise reports an error.
func (p Payload) GetBool(key string) (bool, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return false, &errKeyNotFound{key}
}
@@ -72,7 +73,7 @@ func (p Payload) GetBool(key string) (bool, error) {
// GetStringSlice returns a slice of strings if a string slice type is associated with
// the key, otherwise reports an error.
func (p Payload) GetStringSlice(key string) ([]string, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
@@ -82,7 +83,7 @@ func (p Payload) GetStringSlice(key string) ([]string, error) {
// GetIntSlice returns a slice of ints if a int slice type is associated with
// the key, otherwise reports an error.
func (p Payload) GetIntSlice(key string) ([]int, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
@@ -93,7 +94,7 @@ func (p Payload) GetIntSlice(key string) ([]int, error) {
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMap(key string) (map[string]interface{}, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
@@ -104,7 +105,7 @@ func (p Payload) GetStringMap(key string) (map[string]interface{}, error) {
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMapString(key string) (map[string]string, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
@@ -115,7 +116,7 @@ func (p Payload) GetStringMapString(key string) (map[string]string, error) {
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
@@ -126,7 +127,7 @@ func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMapInt(key string) (map[string]int, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
@@ -137,7 +138,7 @@ func (p Payload) GetStringMapInt(key string) (map[string]int, error) {
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMapBool(key string) (map[string]bool, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
@@ -147,7 +148,7 @@ func (p Payload) GetStringMapBool(key string) (map[string]bool, error) {
// GetTime returns a time value if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetTime(key string) (time.Time, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return time.Time{}, &errKeyNotFound{key}
}
@@ -157,7 +158,7 @@ func (p Payload) GetTime(key string) (time.Time, error) {
// GetDuration returns a duration value if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetDuration(key string) (time.Duration, error) {
v, ok := p[key]
v, ok := p.data[key]
if !ok {
return 0, &errKeyNotFound{key}
}

View File

@@ -10,6 +10,8 @@ import (
"time"
"github.com/google/go-cmp/cmp"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
func TestPayloadGet(t *testing.T) {
@@ -19,7 +21,7 @@ func TestPayloadGet(t *testing.T) {
location := map[string]string{"address": "123 Main St.", "state": "NY", "zipcode": "10002"}
favs := map[string][]string{
"movies": []string{"forrest gump", "star wars"},
"tv_shows": []string{"game of throwns", "HIMYM", "breaking bad"},
"tv_shows": []string{"game of thrones", "HIMYM", "breaking bad"},
}
counter := map[string]int{
"a": 1,
@@ -34,7 +36,7 @@ func TestPayloadGet(t *testing.T) {
now := time.Now()
duration := 15 * time.Minute
payload := Payload{
data := map[string]interface{}{
"greeting": "Hello",
"user_id": 9876,
"pi": 3.1415,
@@ -49,6 +51,7 @@ func TestPayloadGet(t *testing.T) {
"timestamp": now,
"duration": duration,
}
payload := Payload{data}
gotStr, err := payload.GetString("greeting")
if gotStr != "Hello" || err != nil {
@@ -151,7 +154,7 @@ func TestPayloadGetWithMarshaling(t *testing.T) {
now := time.Now()
duration := 15 * time.Minute
in := Payload{
in := Payload{map[string]interface{}{
"subject": "Hello",
"recipient_id": 9876,
"pi": 3.14,
@@ -165,18 +168,19 @@ func TestPayloadGetWithMarshaling(t *testing.T) {
"features": features,
"timestamp": now,
"duration": duration,
}
// encode and then decode
data, err := json.Marshal(in)
}}
// encode and then decode task messsage
inMsg := h.NewTaskMessage("testing", in.data)
data, err := json.Marshal(inMsg)
if err != nil {
t.Fatal(err)
}
var out Payload
err = json.Unmarshal(data, &out)
var outMsg base.TaskMessage
err = json.Unmarshal(data, &outMsg)
if err != nil {
t.Fatal(err)
}
out := Payload{outMsg.Payload}
gotStr, err := out.GetString("subject")
if gotStr != "Hello" || err != nil {
@@ -257,11 +261,94 @@ func TestPayloadGetWithMarshaling(t *testing.T) {
}
}
func TestPayloadHas(t *testing.T) {
payload := Payload{
"user_id": 123,
func TestPayloadKeyNotFound(t *testing.T) {
payload := Payload{nil}
key := "something"
gotStr, err := payload.GetString(key)
if err == nil || gotStr != "" {
t.Errorf("Payload.GetString(%q) = %v, %v; want '', error",
key, gotStr, err)
}
gotInt, err := payload.GetInt(key)
if err == nil || gotInt != 0 {
t.Errorf("Payload.GetInt(%q) = %v, %v; want 0, error",
key, gotInt, err)
}
gotFloat, err := payload.GetFloat64(key)
if err == nil || gotFloat != 0 {
t.Errorf("Payload.GetFloat64(%q = %v, %v; want 0, error",
key, gotFloat, err)
}
gotBool, err := payload.GetBool(key)
if err == nil || gotBool != false {
t.Errorf("Payload.GetBool(%q) = %v, %v; want false, error",
key, gotBool, err)
}
gotStrSlice, err := payload.GetStringSlice(key)
if err == nil || gotStrSlice != nil {
t.Errorf("Payload.GetStringSlice(%q) = %v, %v; want nil, error",
key, gotStrSlice, err)
}
gotIntSlice, err := payload.GetIntSlice(key)
if err == nil || gotIntSlice != nil {
t.Errorf("Payload.GetIntSlice(%q) = %v, %v; want nil, error",
key, gotIntSlice, err)
}
gotStrMap, err := payload.GetStringMap(key)
if err == nil || gotStrMap != nil {
t.Errorf("Payload.GetStringMap(%q) = %v, %v; want nil, error",
key, gotStrMap, err)
}
gotStrMapStr, err := payload.GetStringMapString(key)
if err == nil || gotStrMapStr != nil {
t.Errorf("Payload.GetStringMapString(%q) = %v, %v; want nil, error",
key, gotStrMapStr, err)
}
gotStrMapStrSlice, err := payload.GetStringMapStringSlice(key)
if err == nil || gotStrMapStrSlice != nil {
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v; want nil, error",
key, gotStrMapStrSlice, err)
}
gotStrMapInt, err := payload.GetStringMapInt(key)
if err == nil || gotStrMapInt != nil {
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want nil, error",
key, gotStrMapInt, err)
}
gotStrMapBool, err := payload.GetStringMapBool(key)
if err == nil || gotStrMapBool != nil {
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want nil, error",
key, gotStrMapBool, err)
}
gotTime, err := payload.GetTime(key)
if err == nil || !gotTime.IsZero() {
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, error",
key, gotTime, err, time.Time{})
}
gotDuration, err := payload.GetDuration(key)
if err == nil || gotDuration != 0 {
t.Errorf("Payload.GetDuration(%q) = %v, %v, want 0, error",
key, gotDuration, err)
}
}
func TestPayloadHas(t *testing.T) {
payload := Payload{map[string]interface{}{
"user_id": 123,
}}
if !payload.Has("user_id") {
t.Errorf("Payload.Has(%q) = false, want true", "user_id")
}

View File

@@ -7,6 +7,8 @@ package asynq
import (
"fmt"
"log"
"math/rand"
"sort"
"sync"
"time"
@@ -19,12 +21,15 @@ type processor struct {
handler Handler
queueConfig map[string]uint
// orderedQueues is set only in strict-priority mode.
orderedQueues []string
retryDelayFunc retryDelayFunc
// timeout for blocking dequeue operation.
// dequeue needs to timeout to avoid blocking forever
// in case of a program shutdown or additon of a new queue.
dequeueTimeout time.Duration
// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest
// sema is a counting semaphore to ensure the number of active workers
// does not exceed the limit.
@@ -44,11 +49,24 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
func newProcessor(r *rdb.RDB, n int, fn retryDelayFunc) *processor {
// newProcessor constructs a new processor.
//
// r is an instance of RDB used by the processor.
// n specifies the max number of concurrenct worker goroutines.
// qfcg is a mapping of queue names to associated priority level.
// strict specifies whether queue priority should be treated strictly.
// fn is a function to compute retry delay.
func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor {
orderedQueues := []string(nil)
if strict {
orderedQueues = sortByPriority(qcfg)
}
return &processor{
rdb: r,
queueConfig: qcfg,
orderedQueues: orderedQueues,
retryDelayFunc: fn,
dequeueTimeout: 2 * time.Second,
syncRequestCh: syncRequestCh,
sema: make(chan struct{}, n),
done: make(chan struct{}),
abort: make(chan struct{}),
@@ -106,9 +124,16 @@ func (p *processor) start() {
// exec pulls a task out of the queue and starts a worker goroutine to
// process the task.
func (p *processor) exec() {
msg, err := p.rdb.Dequeue(p.dequeueTimeout)
if err == rdb.ErrDequeueTimeout {
// timed out, this is a normal behavior.
qnames := p.queues()
msg, err := p.rdb.Dequeue(qnames...)
if err == rdb.ErrNoProcessableTask {
// queues are empty, this is a normal behavior.
if len(p.queueConfig) > 1 {
// sleep to avoid slamming redis and let scheduler move tasks into queues.
// Note: With multiple queues, we are not using blocking pop operation and
// polling queues instead. This adds significant load to redis.
time.Sleep(time.Second)
}
return
}
if err != nil {
@@ -126,7 +151,7 @@ func (p *processor) exec() {
defer func() { <-p.sema /* release token */ }()
resCh := make(chan error, 1)
task := &Task{Type: msg.Type, Payload: msg.Payload}
task := NewTask(msg.Type, msg.Payload)
go func() {
resCh <- perform(p.handler, task)
}()
@@ -177,16 +202,30 @@ func (p *processor) requeue(msg *base.TaskMessage) {
func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.rdb.Done(msg)
if err != nil {
log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err)
errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Done(msg)
},
errMsg: errMsg,
}
}
}
func (p *processor) retry(msg *base.TaskMessage, e error) {
d := p.retryDelayFunc(msg.Retried, e, &Task{Type: msg.Type, Payload: msg.Payload})
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d)
err := p.rdb.Retry(msg, retryAt, e.Error())
if err != nil {
log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err)
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Retry(msg, retryAt, e.Error())
},
errMsg: errMsg,
}
}
}
@@ -194,10 +233,44 @@ func (p *processor) kill(msg *base.TaskMessage, e error) {
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
err := p.rdb.Kill(msg, e.Error())
if err != nil {
log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err)
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Kill(msg, e.Error())
},
errMsg: errMsg,
}
}
}
// queues returns a list of queues to query.
// Order of the queue names is based on the priority of each queue.
// Queue names is sorted by their priority level if strict-priority is true.
// If strict-priority is false, then the order of queue names are roughly based on
// the priority level but randomized in order to avoid starving low priority queues.
func (p *processor) queues() []string {
// skip the overhead of generating a list of queue names
// if we are processing one queue.
if len(p.queueConfig) == 1 {
for qname := range p.queueConfig {
return []string{qname}
}
}
if p.orderedQueues != nil {
return p.orderedQueues
}
var names []string
for qname, priority := range p.queueConfig {
for i := 0; i < int(priority); i++ {
names = append(names, qname)
}
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(names), func(i, j int) { names[i], names[j] = names[j], names[i] })
return uniq(names, len(p.queueConfig))
}
// perform calls the handler with the given task.
// If the call returns without panic, it simply returns the value,
// otherwise, it recovers from panic and returns an error.
@@ -209,3 +282,46 @@ func perform(h Handler, task *Task) (err error) {
}()
return h.ProcessTask(task)
}
// uniq dedupes elements and returns a slice of unique names of length l.
// Order of the output slice is based on the input list.
func uniq(names []string, l int) []string {
var res []string
seen := make(map[string]struct{})
for _, s := range names {
if _, ok := seen[s]; !ok {
seen[s] = struct{}{}
res = append(res, s)
}
if len(res) == l {
break
}
}
return res
}
// sortByPriority returns a list of queue names sorted by
// their priority level in descending order.
func sortByPriority(qcfg map[string]uint) []string {
var queues []*queue
for qname, n := range qcfg {
queues = append(queues, &queue{qname, n})
}
sort.Sort(sort.Reverse(byPriority(queues)))
var res []string
for _, q := range queues {
res = append(res, q.name)
}
return res
}
type queue struct {
name string
priority uint
}
type byPriority []*queue
func (x byPriority) Len() int { return len(x) }
func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority }
func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] }

View File

@@ -6,11 +6,13 @@ package asynq
import (
"fmt"
"sort"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
@@ -25,10 +27,10 @@ func TestProcessorSuccess(t *testing.T) {
m3 := h.NewTaskMessage("reindex", nil)
m4 := h.NewTaskMessage("sync", nil)
t1 := &Task{Type: m1.Type, Payload: m1.Payload}
t2 := &Task{Type: m2.Type, Payload: m2.Payload}
t3 := &Task{Type: m3.Type, Payload: m3.Payload}
t4 := &Task{Type: m4.Type, Payload: m4.Payload}
t1 := NewTask(m1.Type, m1.Payload)
t2 := NewTask(m2.Type, m2.Payload)
t3 := NewTask(m3.Type, m3.Payload)
t4 := NewTask(m4.Type, m4.Payload)
tests := []struct {
enqueued []*base.TaskMessage // initial default queue state
@@ -51,8 +53,8 @@ func TestProcessorSuccess(t *testing.T) {
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue.
h.FlushDB(t, r) // clean up db before each test case.
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
// instantiate a new processor
var mu sync.Mutex
@@ -63,9 +65,8 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task)
return nil
}
p := newProcessor(rdbClient, 10, defaultDelayFunc)
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc, nil)
p.handler = HandlerFunc(handler)
p.dequeueTimeout = time.Second // short time out for test purpose
p.start()
for _, msg := range tc.incoming {
@@ -78,7 +79,7 @@ func TestProcessorSuccess(t *testing.T) {
time.Sleep(tc.wait)
p.terminate()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt); diff != "" {
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
}
@@ -128,17 +129,17 @@ func TestProcessorRetry(t *testing.T) {
delay: time.Minute,
wait: time.Second,
wantRetry: []h.ZSetEntry{
{Msg: &r2, Score: now.Add(time.Minute).Unix()},
{Msg: &r3, Score: now.Add(time.Minute).Unix()},
{Msg: &r4, Score: now.Add(time.Minute).Unix()},
{Msg: &r2, Score: float64(now.Add(time.Minute).Unix())},
{Msg: &r3, Score: float64(now.Add(time.Minute).Unix())},
{Msg: &r4, Score: float64(now.Add(time.Minute).Unix())},
},
wantDead: []*base.TaskMessage{&r1},
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue.
h.FlushDB(t, r) // clean up db before each test case.
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
// instantiate a new processor
delayFunc := func(n int, e error, t *Task) time.Duration {
@@ -147,9 +148,8 @@ func TestProcessorRetry(t *testing.T) {
handler := func(task *Task) error {
return fmt.Errorf(errMsg)
}
p := newProcessor(rdbClient, 10, delayFunc)
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc, nil)
p.handler = HandlerFunc(handler)
p.dequeueTimeout = time.Second // short time out for test purpose
p.start()
for _, msg := range tc.incoming {
@@ -162,8 +162,9 @@ func TestProcessorRetry(t *testing.T) {
time.Sleep(tc.wait)
p.terminate()
cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score
gotRetry := h.GetRetryEntries(t, r)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt); diff != "" {
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff)
}
@@ -178,6 +179,117 @@ func TestProcessorRetry(t *testing.T) {
}
}
func TestProcessorQueues(t *testing.T) {
sortOpt := cmp.Transformer("SortStrings", func(in []string) []string {
out := append([]string(nil), in...) // Copy input to avoid mutating it
sort.Strings(out)
return out
})
tests := []struct {
queueCfg map[string]uint
want []string
}{
{
queueCfg: map[string]uint{
"high": 6,
"default": 3,
"low": 1,
},
want: []string{"high", "default", "low"},
},
{
queueCfg: map[string]uint{
"default": 1,
},
want: []string{"default"},
},
}
for _, tc := range tests {
p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc, nil)
got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
tc.queueCfg, got, tc.want, diff)
}
}
}
func TestProcessorWithStrictPriority(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("send_email", nil)
m2 := h.NewTaskMessage("send_email", nil)
m3 := h.NewTaskMessage("send_email", nil)
m4 := h.NewTaskMessage("gen_thumbnail", nil)
m5 := h.NewTaskMessage("gen_thumbnail", nil)
m6 := h.NewTaskMessage("sync", nil)
m7 := h.NewTaskMessage("sync", nil)
t1 := NewTask(m1.Type, m1.Payload)
t2 := NewTask(m2.Type, m2.Payload)
t3 := NewTask(m3.Type, m3.Payload)
t4 := NewTask(m4.Type, m4.Payload)
t5 := NewTask(m5.Type, m5.Payload)
t6 := NewTask(m6.Type, m6.Payload)
t7 := NewTask(m7.Type, m7.Payload)
tests := []struct {
enqueued map[string][]*base.TaskMessage // initial queues state
wait time.Duration // wait duration between starting and stopping processor for this test case
wantProcessed []*Task // tasks to be processed at the end
}{
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m4, m5},
"critical": {m1, m2, m3},
"low": {m6, m7},
},
wait: time.Second,
wantProcessed: []*Task{t1, t2, t3, t4, t5, t6, t7},
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
for qname, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r, msgs, qname)
}
// instantiate a new processor
var mu sync.Mutex
var processed []*Task
handler := func(task *Task) error {
mu.Lock()
defer mu.Unlock()
processed = append(processed, task)
return nil
}
queueCfg := map[string]uint{
"critical": 3,
base.DefaultQueueName: 2,
"low": 1,
}
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc, nil)
p.handler = HandlerFunc(handler)
p.start()
time.Sleep(tc.wait)
p.terminate()
if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
}
if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
}
}
}
func TestPerform(t *testing.T) {
tests := []struct {
desc string
@@ -190,7 +302,7 @@ func TestPerform(t *testing.T) {
handler: func(t *Task) error {
return nil
},
task: &Task{Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "some/img/path"}},
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}),
wantErr: false,
},
{
@@ -198,7 +310,7 @@ func TestPerform(t *testing.T) {
handler: func(t *Task) error {
return fmt.Errorf("something went wrong")
},
task: &Task{Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "some/img/path"}},
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}),
wantErr: true,
},
{
@@ -206,7 +318,7 @@ func TestPerform(t *testing.T) {
handler: func(t *Task) error {
panic("something went terribly wrong")
},
task: &Task{Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "some/img/path"}},
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}),
wantErr: true,
},
}

View File

@@ -19,13 +19,21 @@ type scheduler struct {
// poll interval on average
avgInterval time.Duration
// list of queues to move the tasks into.
qnames []string
}
func newScheduler(r *rdb.RDB, avgInterval time.Duration) *scheduler {
func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *scheduler {
var qnames []string
for q := range qcfg {
qnames = append(qnames, q)
}
return &scheduler{
rdb: r,
done: make(chan struct{}),
avgInterval: avgInterval,
qnames: qnames,
}
}
@@ -51,7 +59,7 @@ func (s *scheduler) start() {
}
func (s *scheduler) exec() {
if err := s.rdb.CheckAndEnqueue(); err != nil {
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err)
}
}

View File

@@ -18,7 +18,7 @@ func TestScheduler(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second
s := newScheduler(rdbClient, pollInterval)
s := newScheduler(rdbClient, pollInterval, defaultQueueConfig)
t1 := h.NewTaskMessage("gen_thumbnail", nil)
t2 := h.NewTaskMessage("send_email", nil)
t3 := h.NewTaskMessage("reindex", nil)
@@ -36,11 +36,11 @@ func TestScheduler(t *testing.T) {
}{
{
initScheduled: []h.ZSetEntry{
{Msg: t1, Score: now.Add(time.Hour).Unix()},
{Msg: t2, Score: now.Add(-2 * time.Second).Unix()},
{Msg: t1, Score: float64(now.Add(time.Hour).Unix())},
{Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())},
},
initRetry: []h.ZSetEntry{
{Msg: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()},
{Msg: t3, Score: float64(time.Now().Add(-500 * time.Millisecond).Unix())},
},
initQueue: []*base.TaskMessage{t4},
wait: pollInterval * 2,
@@ -50,9 +50,9 @@ func TestScheduler(t *testing.T) {
},
{
initScheduled: []h.ZSetEntry{
{Msg: t1, Score: now.Unix()},
{Msg: t2, Score: now.Add(-2 * time.Second).Unix()},
{Msg: t3, Score: now.Add(-500 * time.Millisecond).Unix()},
{Msg: t1, Score: float64(now.Unix())},
{Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())},
{Msg: t3, Score: float64(now.Add(-500 * time.Millisecond).Unix())},
},
initRetry: []h.ZSetEntry{},
initQueue: []*base.TaskMessage{t4},
@@ -67,7 +67,7 @@ func TestScheduler(t *testing.T) {
h.FlushDB(t, r) // clean up db before each test case.
h.SeedScheduledQueue(t, r, tc.initScheduled) // initialize scheduled queue
h.SeedRetryQueue(t, r, tc.initRetry) // initialize retry queue
h.SeedDefaultQueue(t, r, tc.initQueue) // initialize default queue
h.SeedEnqueuedQueue(t, r, tc.initQueue) // initialize default queue
s.start()
time.Sleep(tc.wait)

70
syncer.go Normal file
View File

@@ -0,0 +1,70 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"log"
"time"
)
// syncer is responsible for queuing up failed requests to redis and retry
// those requests to sync state between the background process and redis.
type syncer struct {
requestsCh <-chan *syncRequest
// channel to communicate back to the long running "syncer" goroutine.
done chan struct{}
// interval between sync operations.
interval time.Duration
}
type syncRequest struct {
fn func() error // sync operation
errMsg string // error message
}
func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
return &syncer{
requestsCh: requestsCh,
done: make(chan struct{}),
interval: interval,
}
}
func (s *syncer) terminate() {
log.Println("[INFO] Syncer shutting down...")
// Signal the syncer goroutine to stop.
s.done <- struct{}{}
}
func (s *syncer) start() {
go func() {
var requests []*syncRequest
for {
select {
case <-s.done:
// Try sync one last time before shutting down.
for _, req := range requests {
if err := req.fn(); err != nil {
log.Printf("[ERROR] %s\n", req.errMsg)
}
}
log.Println("[INFO] Syncer done.")
return
case req := <-s.requestsCh:
requests = append(requests, req)
case <-time.After(s.interval):
var temp []*syncRequest
for _, req := range requests {
if err := req.fn(); err != nil {
temp = append(temp, req)
}
}
requests = temp
}
}
}()
}

99
syncer_test.go Normal file
View File

@@ -0,0 +1,99 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"testing"
"time"
"github.com/go-redis/redis/v7"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
func TestSyncer(t *testing.T) {
inProgress := []*base.TaskMessage{
h.NewTaskMessage("send_email", nil),
h.NewTaskMessage("reindex", nil),
h.NewTaskMessage("gen_thumbnail", nil),
}
r := setup(t)
rdbClient := rdb.NewRDB(r)
h.SeedInProgressQueue(t, r, inProgress)
const interval = time.Second
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, interval)
syncer.start()
defer syncer.terminate()
for _, msg := range inProgress {
m := msg
syncRequestCh <- &syncRequest{
fn: func() error {
return rdbClient.Done(m)
},
}
}
time.Sleep(2 * interval) // ensure that syncer runs at least once
gotInProgress := h.GetInProgressMessages(t, r)
if l := len(gotInProgress); l != 0 {
t.Errorf("%q has length %d; want 0", base.InProgressQueue, l)
}
}
func TestSyncerRetry(t *testing.T) {
inProgress := []*base.TaskMessage{
h.NewTaskMessage("send_email", nil),
h.NewTaskMessage("reindex", nil),
h.NewTaskMessage("gen_thumbnail", nil),
}
goodClient := setup(t)
h.SeedInProgressQueue(t, goodClient, inProgress)
// Simulate the situation where redis server is down
// by connecting to a wrong port.
badClient := redis.NewClient(&redis.Options{
Addr: "localhost:6390",
})
rdbClient := rdb.NewRDB(badClient)
const interval = time.Second
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, interval)
syncer.start()
defer syncer.terminate()
for _, msg := range inProgress {
m := msg
syncRequestCh <- &syncRequest{
fn: func() error {
return rdbClient.Done(m)
},
}
}
time.Sleep(2 * interval) // ensure that syncer runs at least once
// Sanity check to ensure that message was not successfully deleted
// from in-progress list.
gotInProgress := h.GetInProgressMessages(t, goodClient)
if l := len(gotInProgress); l != len(inProgress) {
t.Errorf("%q has length %d; want %d", base.InProgressQueue, l, len(inProgress))
}
// simualate failover.
rdbClient = rdb.NewRDB(goodClient)
time.Sleep(2 * interval) // ensure that syncer runs at least once
gotInProgress = h.GetInProgressMessages(t, goodClient)
if l := len(gotInProgress); l != 0 {
t.Errorf("%q has length %d; want 0", base.InProgressQueue, l)
}
}

83
tools/asynqmon/README.md Normal file
View File

@@ -0,0 +1,83 @@
# Asynqmon
Asynqmon is a CLI tool to monitor the queues managed by `asynq` package.
## Table of Contents
- [Installation](#installation)
- [Quick Start](#quick-start)
- [Stats](#stats)
- [History](#history)
- [List](#list)
- [Enqueue](#enqueue)
- [Delete](#delete)
- [Kill](#kill)
- [Config File](#config-file)
## Installation
In order to use the tool, compile it using the following command:
go get github.com/hibiken/asynq/tools/asynqmon
This will create the asynqmon executable under your `$GOPATH/bin` directory.
## Quick Start
Asynqmon tool has a few commands to inspect the state of tasks and queues.
Run `asynqmon help` to see all the available commands.
Asynqmon needs to connect to a redis-server to inspect the state of queues and tasks. Use flags to specify the options to connect to the redis-server used by your application.
By default, Asynqmon will try to connect to a redis server running at `localhost:6379`.
### Stats
Stats command gives the overview of the current state of tasks and queues. Run it in conjunction with `watch` command to repeatedly run `stats`.
Example:
watch -n 3 asynqmon stats
This will run `asynqmon stats` command every 3 seconds.
![Gif](/docs/assets/asynqmon_stats.gif)
### History
TODO: Add discription
### List
TODO: Add discription
### Enqueue
TODO: Add discription
### Delete
TODO: Add discription
### Kill
TODO: Add discription
## Config File
You can use a config file to set default values for flags.
This is useful, for example when you have to connect to a remote redis server.
By default, `asynqmon` will try to read config file located in
`$HOME/.asynqmon.(yml|json)`. You can specify the file location via `--config` flag.
Config file example:
```yml
uri: 127.0.0.1:6379
db: 2
password: mypassword
```
This will set the default values for `--uri`, `--db`, and `--password` flags.

View File

@@ -11,6 +11,7 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// delCmd represents the del command
@@ -20,7 +21,7 @@ var delCmd = &cobra.Command{
Long: `Del (asynqmon del) will delete a task given an identifier.
The command takes one argument which specifies the task to delete.
The task should be in either scheduled, retry or dead queue.
The task should be in either scheduled, retry or dead state.
Identifier for a task should be obtained by running "asynqmon ls" command.
Example: asynqmon enq d:1575732274:bnogo8gt6toe23vhef0g`,
@@ -49,8 +50,9 @@ func del(cmd *cobra.Command, args []string) {
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}))
switch qtype {
case "s":

View File

@@ -11,19 +11,20 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var delallValidArgs = []string{"scheduled", "retry", "dead"}
// delallCmd represents the delall command
var delallCmd = &cobra.Command{
Use: "delall [queue name]",
Short: "Deletes all tasks from the specified queue",
Long: `Delall (asynqmon delall) will delete all tasks from the specified queue.
Use: "delall [state]",
Short: "Deletes all tasks from the specified state",
Long: `Delall (asynqmon delall) will delete all tasks in the specified state.
The argument should be one of "scheduled", "retry", or "dead".
Example: asynqmon delall dead -> Deletes all tasks from the dead queue`,
Example: asynqmon delall dead -> Deletes all dead tasks`,
ValidArgs: delallValidArgs,
Args: cobra.ExactValidArgs(1),
Run: delall,
@@ -45,8 +46,9 @@ func init() {
func delall(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
var err error
@@ -58,12 +60,12 @@ func delall(cmd *cobra.Command, args []string) {
case "dead":
err = r.DeleteAllDeadTasks()
default:
fmt.Printf("error: `asynqmon delall [queue name]` only accepts %v as the argument.\n", delallValidArgs)
fmt.Printf("error: `asynqmon delall [state]` only accepts %v as the argument.\n", delallValidArgs)
os.Exit(1)
}
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("Deleted all tasks from %q queue\n", args[0])
fmt.Printf("Deleted all tasks in %q state\n", args[0])
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// enqCmd represents the enq command
@@ -20,7 +21,7 @@ var enqCmd = &cobra.Command{
Long: `Enq (asynqmon enq) will enqueue a task given an identifier.
The command takes one argument which specifies the task to enqueue.
The task should be in either scheduled, retry or dead queue.
The task should be in either scheduled, retry or dead state.
Identifier for a task should be obtained by running "asynqmon ls" command.
The task enqueued by this command will be processed as soon as the task
@@ -52,8 +53,9 @@ func enq(cmd *cobra.Command, args []string) {
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}))
switch qtype {
case "s":

View File

@@ -11,22 +11,23 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var enqallValidArgs = []string{"scheduled", "retry", "dead"}
// enqallCmd represents the enqall command
var enqallCmd = &cobra.Command{
Use: "enqall [queue name]",
Short: "Enqueues all tasks from the specified queue",
Long: `Enqall (asynqmon enqall) will enqueue all tasks from the specified queue.
Use: "enqall [state]",
Short: "Enqueues all tasks in the specified state",
Long: `Enqall (asynqmon enqall) will enqueue all tasks in the specified state.
The argument should be one of "scheduled", "retry", or "dead".
The tasks enqueued by this command will be processed as soon as it
gets dequeued by a processor.
Example: asynqmon enqall dead -> Enqueues all tasks from the dead queue`,
Example: asynqmon enqall dead -> Enqueues all dead tasks`,
ValidArgs: enqallValidArgs,
Args: cobra.ExactValidArgs(1),
Run: enqall,
@@ -48,8 +49,9 @@ func init() {
func enqall(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
var n int64
@@ -62,12 +64,12 @@ func enqall(cmd *cobra.Command, args []string) {
case "dead":
n, err = r.EnqueueAllDeadTasks()
default:
fmt.Printf("error: `asynqmon enqall [queue name]` only accepts %v as the argument.\n", enqallValidArgs)
fmt.Printf("error: `asynqmon enqall [state]` only accepts %v as the argument.\n", enqallValidArgs)
os.Exit(1)
}
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("Enqueued %d tasks from %q queue\n", n, args[0])
fmt.Printf("Enqueued %d tasks in %q state\n", n, args[0])
}

View File

@@ -7,60 +7,45 @@ package cmd
import (
"fmt"
"os"
"strconv"
"strings"
"text/tabwriter"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var days int
// historyCmd represents the history command
var historyCmd = &cobra.Command{
Use: "history [num of days]",
Use: "history",
Short: "Shows historical aggregate data",
Long: `History (asynqmon history) will show the number of processed tasks
as well as the error rate for the last n days.
Long: `History (asynqmon history) will show the number of processed and failed tasks
from the last x days.
Example: asynqmon history 7 -> Shows stats from the last 7 days`,
Args: cobra.ExactArgs(1),
By default, it will show the data from the last 10 days.
Example: asynqmon history -x=30 -> Shows stats from the last 30 days`,
Args: cobra.NoArgs,
Run: history,
}
func init() {
rootCmd.AddCommand(historyCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// historyCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// historyCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
historyCmd.Flags().IntVarP(&days, "days", "x", 10, "show data from last x days")
}
func history(cmd *cobra.Command, args []string) {
n, err := strconv.Atoi(args[0])
if err != nil {
fmt.Printf(`Error: Invalid argument. Argument has to be an integer.
Usage: asynqmon history [num of days]
`)
os.Exit(1)
}
if err != nil {
}
c := redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
stats, err := r.HistoricalStats(n)
stats, err := r.HistoricalStats(days)
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -11,16 +11,17 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// killCmd represents the kill command
var killCmd = &cobra.Command{
Use: "kill [task id]",
Short: "Sends a task to dead queue given an identifier",
Long: `Kill (asynqmon kill) will send a task to dead queue given an identifier.
Short: "Kills a task given an identifier",
Long: `Kill (asynqmon kill) will put a task in dead state given an identifier.
The command takes one argument which specifies the task to kill.
The task should be in either scheduled or retry queue.
The task should be in either scheduled or retry state.
Identifier for a task should be obtained by running "asynqmon ls" command.
Example: asynqmon kill r:1575732274:bnogo8gt6toe23vhef0g`,
@@ -49,8 +50,9 @@ func kill(cmd *cobra.Command, args []string) {
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}))
switch qtype {
case "s":

View File

@@ -11,19 +11,20 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var killallValidArgs = []string{"scheduled", "retry"}
// killallCmd represents the killall command
var killallCmd = &cobra.Command{
Use: "killall [queue name]",
Short: "Sends all tasks to dead queue from the specified queue",
Long: `Killall (asynqmon killall) will moves all tasks from the specified queue to dead queue.
Use: "killall [state]",
Short: "Update all tasks to dead state from the specified state",
Long: `Killall (asynqmon killall) will update all tasks from the specified state to dead state.
The argument should be either "scheduled" or "retry".
Example: asynqmon killall retry -> Moves all tasks from retry queue to dead queue`,
Example: asynqmon killall retry -> Update all retry tasks to dead tasks`,
ValidArgs: killallValidArgs,
Args: cobra.ExactValidArgs(1),
Run: killall,
@@ -45,8 +46,9 @@ func init() {
func killall(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
var n int64
@@ -57,12 +59,12 @@ func killall(cmd *cobra.Command, args []string) {
case "retry":
n, err = r.KillAllRetryTasks()
default:
fmt.Printf("error: `asynqmon killall [queue name]` only accepts %v as the argument.\n", killallValidArgs)
fmt.Printf("error: `asynqmon killall [state]` only accepts %v as the argument.\n", killallValidArgs)
os.Exit(1)
}
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("Sent %d tasks to \"dead\" queue from %q queue\n", n, args[0])
fmt.Printf("Successfully updated %d tasks to \"dead\" state\n", n)
}

View File

@@ -17,24 +17,30 @@ import (
"github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var lsValidArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"}
// lsCmd represents the ls command
var lsCmd = &cobra.Command{
Use: "ls [queue name]",
Short: "Lists queue contents",
Long: `Ls (asynqmon ls) will list all tasks from the specified queue in a table format.
Use: "ls [state]",
Short: "Lists tasks in the specified state",
Long: `Ls (asynqmon ls) will list all tasks in the specified state in a table format.
The command takes one argument which specifies the queue to inspect. The value
of the argument should be one of "enqueued", "inprogress", "scheduled",
The command takes one argument which specifies the state of tasks.
The argument value should be one of "enqueued", "inprogress", "scheduled",
"retry", or "dead".
Example: asynqmon ls dead`,
ValidArgs: lsValidArgs,
Args: cobra.ExactValidArgs(1),
Run: ls,
Example:
asynqmon ls dead -> Lists all tasks in dead state
Enqueued tasks can optionally be filtered by providing queue names after ":"
Example:
asynqmon ls enqueued:critical -> List tasks from critical queue only
`,
Args: cobra.ExactValidArgs(1),
Run: ls,
}
func init() {
@@ -53,13 +59,15 @@ func init() {
func ls(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
switch args[0] {
parts := strings.Split(args[0], ":")
switch parts[0] {
case "enqueued":
listEnqueued(r)
listEnqueued(r, parts[1:]...)
case "inprogress":
listInProgress(r)
case "scheduled":
@@ -69,7 +77,7 @@ func ls(cmd *cobra.Command, args []string) {
case "dead":
listDead(r)
default:
fmt.Printf("error: `asynqmon ls [queue name]` only accepts %v as the argument.\n", lsValidArgs)
fmt.Printf("error: `asynqmon ls [state]` only accepts %v as the argument.\n", lsValidArgs)
os.Exit(1)
}
}
@@ -105,20 +113,30 @@ func parseQueryID(queryID string) (id xid.ID, score int64, qtype string, err err
return id, score, qtype, nil
}
func listEnqueued(r *rdb.RDB) {
tasks, err := r.ListEnqueued()
func listEnqueued(r *rdb.RDB, qnames ...string) {
tasks, err := r.ListEnqueued(qnames...)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Println("No enqueued tasks")
msg := "No enqueued tasks"
if len(qnames) > 0 {
msg += " in"
for i, q := range qnames {
msg += fmt.Sprintf(" %q queue", q)
if i != len(qnames)-1 {
msg += ","
}
}
}
fmt.Println(msg)
return
}
cols := []string{"ID", "Type", "Payload"}
cols := []string{"ID", "Type", "Payload", "Queue"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload)
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue)
}
}
printTable(cols, printRows)
@@ -153,11 +171,11 @@ func listScheduled(r *rdb.RDB) {
fmt.Println("No scheduled tasks")
return
}
cols := []string{"ID", "Type", "Payload", "Process In"}
cols := []string{"ID", "Type", "Payload", "Process In", "Queue"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn, t.Queue)
}
}
printTable(cols, printRows)
@@ -173,11 +191,11 @@ func listRetry(r *rdb.RDB) {
fmt.Println("No retry tasks")
return
}
cols := []string{"ID", "Type", "Payload", "Retry In", "Last Error", "Retried", "Max Retry"}
cols := []string{"ID", "Type", "Payload", "Retry In", "Last Error", "Retried", "Max Retry", "Queue"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
retryIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry, t.Queue)
}
}
printTable(cols, printRows)
@@ -193,10 +211,10 @@ func listDead(r *rdb.RDB) {
fmt.Println("No dead tasks")
return
}
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error"}
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error", "Queue"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue)
}
}
printTable(cols, printRows)

54
tools/asynqmon/cmd/rmq.go Normal file
View File

@@ -0,0 +1,54 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package cmd
import (
"fmt"
"os"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// rmqCmd represents the rmq command
var rmqCmd = &cobra.Command{
Use: "rmq [queue name]",
Short: "Removes the specified queue",
Long: `Rmq (asynqmon rmq) will remove the specified queue.
By default, it will remove the queue only if it's empty.
Use --force option to override this behavior.
Example: asynqmon rmq low -> Removes "low" queue`,
Args: cobra.ExactValidArgs(1),
Run: rmq,
}
var rmqForce bool
func init() {
rootCmd.AddCommand(rmqCmd)
rmqCmd.Flags().BoolVarP(&rmqForce, "force", "f", false, "remove the queue regardless of its size")
}
func rmq(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
err := r.RemoveQueue(args[0], rmqForce)
if err != nil {
if _, ok := err.(*rdb.ErrQueueNotEmpty); ok {
fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynqmon rmq --force %s'\n", err, args[0])
os.Exit(1)
}
fmt.Printf("error: %v", err)
os.Exit(1)
}
fmt.Printf("Successfully removed queue %q\n", args[0])
}

View File

@@ -6,9 +6,10 @@ package cmd
import (
"fmt"
"github.com/spf13/cobra"
"os"
"github.com/spf13/cobra"
homedir "github.com/mitchellh/go-homedir"
"github.com/spf13/viper"
)
@@ -18,22 +19,20 @@ var cfgFile string
// Flags
var uri string
var db int
var password string
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "asynqmon",
Short: "A monitoring tool for asynq queues",
Long: `Asynqmon is a CLI tool to inspect and monitor queues managed by asynq package.
Long: `Asynqmon is a CLI tool to inspect tasks and queues managed by asynq package.
Asynqmon has a few commands to query and mutate the current state of the queues.
Use commands to query and mutate the current state of tasks and queues.
Monitoring commands such as "stats" and "ls" can be used in conjunction with the
"watch" command to continuously run the command at a certain interval.
Example: watch -n 5 asynqmon stats`,
// Uncomment the following line if your bare application
// has an action associated with it:
// Run: func(cmd *cobra.Command, args []string) { },
}
// Execute adds all child commands to the root command and sets flags appropriately.
@@ -48,13 +47,16 @@ func Execute() {
func init() {
cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.asynqmon.yaml)")
rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "Redis server URI")
rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "Redis database number (default is 0)")
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file to set flag defaut values (default is $HOME/.asynqmon.yaml)")
rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "redis server URI")
rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "redis database number (default is 0)")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "password to use when connecting to redis server")
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password"))
}
// initConfig reads in config file and ENV variables if set.
// TODO(hibiken): Remove this if not necessary.
func initConfig() {
if cfgFile != "" {
// Use config file from the flag.

View File

@@ -7,25 +7,33 @@ package cmd
import (
"fmt"
"os"
"sort"
"strconv"
"strings"
"text/tabwriter"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// statsCmd represents the stats command
var statsCmd = &cobra.Command{
Use: "stats",
Short: "Shows current state of the queues",
Long: `Stats (aysnqmon stats) will show the number of tasks in each queue at that instant.
It also displays basic information about the running redis instance.
Short: "Shows current state of the tasks and queues",
Long: `Stats (aysnqmon stats) will show the overview of tasks and queues at that instant.
To monitor the queues continuously, it's recommended that you run this
Specifically, the command shows the following:
* Number of tasks in each state
* Number of tasks in each queue
* Aggregate data for the current day
* Basic information about the running redis instance
To monitor the tasks continuously, it's recommended that you run this
command in conjunction with the watch command.
Example: watch -n 5 asynqmon stats`,
Example: watch -n 3 asynqmon stats -> Shows current state of tasks every three seconds`,
Args: cobra.NoArgs,
Run: stats,
}
@@ -46,8 +54,9 @@ func init() {
func stats(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
@@ -61,8 +70,12 @@ func stats(cmd *cobra.Command, args []string) {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("STATES")
printStates(stats)
fmt.Println()
fmt.Println("QUEUES")
printQueues(stats)
printQueues(stats.Queues)
fmt.Println()
fmt.Printf("STATS FOR %s UTC\n", stats.Timestamp.UTC().Format("2006-01-02"))
@@ -74,7 +87,7 @@ func stats(cmd *cobra.Command, args []string) {
fmt.Println()
}
func printQueues(s *rdb.Stats) {
func printStates(s *rdb.Stats) {
format := strings.Repeat("%v\t", 5) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintf(tw, format, "InProgress", "Enqueued", "Scheduled", "Retry", "Dead")
@@ -83,6 +96,24 @@ func printQueues(s *rdb.Stats) {
tw.Flush()
}
func printQueues(queues map[string]int) {
var qnames, seps, counts []string
for q := range queues {
qnames = append(qnames, strings.Title(q))
}
sort.Strings(qnames) // sort for stable order
for _, q := range qnames {
seps = append(seps, strings.Repeat("-", len(q)))
counts = append(counts, strconv.Itoa(queues[strings.ToLower(q)]))
}
format := strings.Repeat("%v\t", len(qnames)) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintf(tw, format, toInterfaceSlice(qnames)...)
fmt.Fprintf(tw, format, toInterfaceSlice(seps)...)
fmt.Fprintf(tw, format, toInterfaceSlice(counts)...)
tw.Flush()
}
func printStats(s *rdb.Stats) {
format := strings.Repeat("%v\t", 3) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
@@ -112,3 +143,11 @@ func printInfo(info map[string]string) {
)
tw.Flush()
}
func toInterfaceSlice(strs []string) []interface{} {
var res []interface{}
for _, s := range strs {
res = append(res, s)
}
return res
}