2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-07 07:05:51 +08:00

Compare commits

..

16 Commits

Author SHA1 Message Date
Ken Hibino
461a6f2587 v0.3.0 2020-02-04 06:29:57 -08:00
Ken Hibino
ba9b42f428 Clean up cli description text 2020-02-04 06:16:29 -08:00
Ken Hibino
3744072e9b Sort processes by host and pid in ps output 2020-02-03 06:59:02 -08:00
Ken Hibino
eadfd5f8b4 [ci skip] Update changelog 2020-02-02 20:59:00 -08:00
Ken Hibino
96f06ac89b Add ps command to asynqmon 2020-02-02 20:56:33 -08:00
Ken Hibino
d03fa34eaf Add hearbeater 2020-02-01 09:35:49 -08:00
Ken Hibino
489e695433 [ci skip] Change font used in demo.gif 2020-01-30 21:47:08 -08:00
Ken Hibino
9ae4be8184 Run and compare benchmarks after successful ci-build 2020-01-30 21:38:16 -08:00
Ken Hibino
36af486303 [ci skip] Add prettier gif for demo 2020-01-29 21:36:17 -08:00
Ken Hibino
a46eb97e6f [ci skip] Shorten readme 2020-01-26 20:06:52 -08:00
Ken Hibino
c934ef115b v0.2.2 2020-01-26 16:07:44 -08:00
Ken Hibino
6fbaa2ed6c (fix): RestoreUnfinished to select correct queue 2020-01-26 16:05:46 -08:00
Ken Hibino
166497748b (fix): Requeue to select correct queue 2020-01-26 16:05:46 -08:00
Ken Hibino
31123fd42a Paginate tasks with asynqmon ls command
Changes:
* Added --page and --size flags to ls command
* By default, the command will show first 30 tasks from the specified
queue
2020-01-26 13:12:01 -08:00
Ken Hibino
3ed155b45b [ci skip] Update readme 2020-01-25 08:08:13 -08:00
Ken Hibino
58d2ed94e7 [ci skip] Fix typo 2020-01-23 06:05:18 -08:00
26 changed files with 1409 additions and 511 deletions

View File

@@ -7,3 +7,5 @@ env:
go: [1.12.x, 1.13.x]
services:
- redis-server
after_success:
- bash ./.travis/benchcmp.sh

15
.travis/benchcmp.sh Executable file
View File

@@ -0,0 +1,15 @@
if [ "${TRAVIS_PULL_REQUEST_BRANCH:-$TRAVIS_BRANCH}" != "master" ]; then
REMOTE_URL="$(git config --get remote.origin.url)";
cd ${TRAVIS_BUILD_DIR}/.. && \
git clone ${REMOTE_URL} "${TRAVIS_REPO_SLUG}-bench" && \
cd "${TRAVIS_REPO_SLUG}-bench" && \
# Benchmark master
git checkout master && \
go test -run=XXX -bench=. ./... > master.txt && \
# Benchmark feature branch
git checkout ${TRAVIS_COMMIT} && \
go test -run=XXX -bench=. ./... > feature.txt && \
go get -u golang.org/x/tools/cmd/benchcmp && \
# compare two benchmarks
benchcmp master.txt feature.txt;
fi

View File

@@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.3.0] - 2020-02-04
### Added
- `asynqmon ps` was added to list all background worker processes
## [0.2.2] - 2020-01-26
### Fixed
- Fixed restoring unfinished tasks back to correct queues.
### Changed
- `asynqmon ls` command is now paginated (default 30 tasks from first page)
- `asynqmon ls enqueued:[queue name]` requires queue name to be specified
## [0.2.1] - 2020-01-22
### Fixed

336
README.md
View File

@@ -6,39 +6,99 @@
[![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq)
[![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
Simple and efficent asynchronous task processing library in Go.
Asynq is a simple Go library for queueing tasks and processing them in the background with workers.
It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily.
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before the release of verson 1.0.0.
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release.
## Table of Contents
![Gif](/docs/assets/demo.gif)
- [Overview](#overview)
- [Requirements](#requirements)
- [Installation](#installation)
- [Getting Started](#getting-started)
- [Monitoring CLI](#monitoring-cli)
- [Acknowledgements](#acknowledgements)
- [License](#license)
## Installation
## Overview
To install `asynq` library, run the following command:
![Gif](/docs/assets/asynqmon_stats.gif)
```sh
go get -u github.com/hibiken/asynq
```
Asynq provides a simple interface to asynchronous task processing.
## Quick Start
It also ships with a tool to monitor the queues and take manual actions if needed.
First, make sure you are running a Redis server locally.
Asynq provides:
```sh
redis-server
```
- Clear separation of task producer and consumer
- Ability to schedule task processing in the future
- Automatic retry of failed tasks with exponential backoff
- [Automatic failover](https://github.com/hibiken/asynq/wiki/Automatic-Failover) using Redis sentinels
- [Ability to configure](https://github.com/hibiken/asynq/wiki/Task-Retry) max retry count per task
- Ability to configure max number of worker goroutines to process tasks
- Support for [priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)
- [Unix signal handling](https://github.com/hibiken/asynq/wiki/Signals) to gracefully shutdown background processing
- [CLI tool](/tools/asynqmon/README.md) to query and mutate queues state for mointoring and administrative purposes
To create and schedule tasks, use `Client` and provide a task and when to process the task.
```go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
client := asynq.NewClient(r)
// Create a task with task type and payload
t1 := asynq.NewTask("send_welcome_email", map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask("send_reminder_email", map[string]interface{}{"user_id": 42})
// Process immediately
err := client.Schedule(t1, time.Now())
// Process 24 hrs later
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
// If processing fails, retry up to 10 times (Default is 25)
err = client.Schedule(t1, time.Now(), asynq.Retry(10))
// Use custom queue called "critical"
err = client.Schedule(t1, time.Now(), asynq.Queue("critical"))
}
```
To start the background workers, use `Background` and provide your `Handler` to process the tasks.
```go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
bg := asynq.NewBackground(r, &asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// You can optionally create multiple queues
// with different priority level
Queues: map[string]uint{
"critical": 6,
"default": 3,
"low": 1,
},
// See the godoc for other configuration options
})
bg.Run(handler)
}
```
`Handler` is an interface with one method `ProcessTask` with the following signature.
```go
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried after delay.
type Handler interface {
ProcessTask(*Task) error
}
```
For a more detailed walk-through of the library, see our [Getting Started Guide](https://github.com/hibiken/asynq/wiki/Getting-Started).
To Learn more about `asynq` features and APIs, see our [Wiki pages](https://github.com/hibiken/asynq/wiki) and [godoc](https://godoc.org/github.com/hibiken/asynq).
## Requirements
@@ -47,235 +107,15 @@ Asynq provides:
| [Redis](https://redis.io/) | v2.8+ |
| [Go](https://golang.org/) | v1.12+ |
## Installation
## Command Line Tool
To install both `asynq` library and `asynqmon` CLI tool, run the following command:
Asynq ships with a command line tool to inspect the state of queues and tasks.
```
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon
```
## Getting Started
In this quick tour of `asynq`, we are going to create two programs.
- `producer.go` will create and schedule tasks to be processed asynchronously by the consumer.
- `consumer.go` will process the tasks created by the producer.
**This guide assumes that you are running a Redis server at `localhost:6379`**.
Before we start, make sure you have Redis installed and running.
1. Import `asynq` in both files.
```go
import "github.com/hibiken/asynq"
```
2. Asynq uses Redis as a message broker.
Use one of `RedisConnOpt` types to specify how to connect to Redis.
We are going to use `RedisClientOpt` here.
```go
// both in producer.go and consumer.go
var redis = &asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}
```
3. In `producer.go`, create a `Client` instance to create and schedule tasks.
```go
// producer.go
func main() {
client := asynq.NewClient(redis)
// Create a task with typename and payload.
t1 := asynq.NewTask(
"send_welcome_email",
map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask(
"send_reminder_email",
map[string]interface{}{"user_id": 42})
// Process the task immediately.
err := client.Schedule(t1, time.Now())
if err != nil {
log.Fatal(err)
}
// Process the task 24 hours later.
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
if err != nil {
log.Fatal(err)
}
}
```
4. In `consumer.go`, create a `Background` instance to process tasks.
```go
// consumer.go
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(handler)
}
```
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
```go
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
ProcessTask(*Task) error
}
```
The simplest way to implement a handler is to define a function with the same signature and use `asynq.HandlerFunc` adapter type when passing it to `Run`.
```go
func handler(t *asynq.Task) error {
switch t.Type {
case "send_welcome_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
case "send_reminder_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
default:
return fmt.Errorf("unexpected task type: %s", t.Type)
}
return nil
}
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
// Use asynq.HandlerFunc adapter for a handler function
bg.Run(asynq.HandlerFunc(handler))
}
```
We could kep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
To refactor our code, let's create a simple dispatcher which maps task type to its handler.
```go
// consumer.go
// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
mapping map[string]asynq.HandlerFunc
}
// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
d.mapping[taskType] = fn
}
// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
fn, ok := d.mapping[task.Type]
if !ok {
return fmt.Errorf("no handler registered for %q", task.Type)
}
return fn(task)
}
func main() {
d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
d.HandleFunc("send_welcome_email", sendWelcomeEmail)
d.HandleFunc("send_reminder_email", sendReminderEmail)
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(d)
}
func sendWelcomeEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func sendReminderEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
```
Now that we have both task producer and consumer, we can run both programs.
```sh
go run consumer.go
```
**Note**: This will not exit until you send a signal to terminate the program. See [Signal Wiki page](https://github.com/hibiken/asynq/wiki/Signals) for best practice on how to safely terminate background processing.
With our consumer running, also run
```sh
go run producer.go
```
This will create a task and the first task will get processed immediately by the consumer. The second task will be processed 24 hours later.
Let's use `asynqmon` tool to inspect the tasks.
```sh
asynqmon stats
```
This command will show the number of tasks in each state and stats for the current date as well as redis information.
To understand the meaning of each state, see [Life of a Task Wiki page](https://github.com/hibiken/asynq/wiki/Life-of-a-Task).
For in-depth guide on `asynqmon` tool, see the [README](/tools/asynqmon/README.md) for the CLI.
This was a quick tour of `asynq` basics. To see all of its features such as **[priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)** and **[custom retry](https://github.com/hibiken/asynq/wiki/Task-Retry)**, see [the Wiki page](https://github.com/hibiken/asynq/wiki).
## Monitoring CLI
Asynq ships with a CLI tool to inspect the state of queues and tasks.
To install the CLI, run the following command:
To install, run the following command:
go get github.com/hibiken/asynq/tools/asynqmon
For details on how to use the tool, see the [README](/tools/asynqmon/README.md) for the asynqmon CLI.
For details on how to use the tool, refer to the tool's [README](/tools/asynqmon/README.md).
## Acknowledgements

View File

@@ -33,10 +33,12 @@ type Background struct {
mu sync.Mutex
running bool
rdb *rdb.RDB
scheduler *scheduler
processor *processor
syncer *syncer
pinfo *base.ProcessInfo
rdb *rdb.RDB
scheduler *scheduler
processor *processor
syncer *syncer
heartbeater *heartbeater
}
// Config specifies the background-task processing behavior.
@@ -107,18 +109,27 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
if queues == nil || len(queues) == 0 {
queues = defaultQueueConfig
}
qcfg := normalizeQueueCfg(queues)
host, err := os.Hostname()
if err != nil {
host = "unknown-host"
}
pid := os.Getpid()
pinfo := base.NewProcessInfo(host, pid, n, queues, cfg.StrictPriority)
rdb := rdb.NewRDB(createRedisClient(r))
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, 5*time.Second)
rdb := rdb.NewRDB(createRedisClient(r))
scheduler := newScheduler(rdb, 5*time.Second, qcfg)
processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh)
heartbeater := newHeartbeater(rdb, pinfo, 5*time.Second)
scheduler := newScheduler(rdb, 5*time.Second, queues)
processor := newProcessor(rdb, pinfo, delayFunc, syncRequestCh)
return &Background{
rdb: rdb,
scheduler: scheduler,
processor: processor,
syncer: syncer,
pinfo: pinfo,
rdb: rdb,
scheduler: scheduler,
processor: processor,
syncer: syncer,
heartbeater: heartbeater,
}
}
@@ -165,6 +176,7 @@ func (bg *Background) Run(handler Handler) {
sig := <-sigs
if sig == syscall.SIGTSTP {
bg.processor.stop()
bg.pinfo.SetState("stopped")
continue
}
break
@@ -184,6 +196,7 @@ func (bg *Background) start(handler Handler) {
bg.running = true
bg.processor.handler = handler
bg.heartbeater.start()
bg.syncer.start()
bg.scheduler.start()
bg.processor.start()
@@ -202,42 +215,12 @@ func (bg *Background) stop() {
// Note: processor and all worker goroutines need to be exited
// before shutting down syncer to avoid goroutine leak.
bg.syncer.terminate()
bg.heartbeater.terminate()
bg.rdb.ClearProcessInfo(bg.pinfo)
bg.rdb.Close()
bg.processor.handler = nil
bg.running = false
logger.info("Bye!")
}
// normalizeQueueCfg divides priority numbers by their
// greatest common divisor.
func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint {
var xs []uint
for _, x := range queueCfg {
xs = append(xs, x)
}
d := gcd(xs...)
res := make(map[string]uint)
for q, x := range queueCfg {
res[q] = x / d
}
return res
}
func gcd(xs ...uint) uint {
fn := func(x, y uint) uint {
for y > 0 {
x, y = y, x%y
}
return x
}
res := xs[0]
for i := 0; i < len(xs); i++ {
res = fn(xs[i], res)
if res == 1 {
return 1
}
}
return res
}

2
doc.go
View File

@@ -3,7 +3,7 @@
// that can be found in the LICENSE file.
/*
Package asynq provides a framework for background task processing.
Package asynq provides a framework for asynchronous task processing.
Asynq uses Redis as a message broker. To connect to redis server,
specify the options using one of RedisConnOpt types.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

BIN
docs/assets/demo.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 983 KiB

67
heartbeat.go Normal file
View File

@@ -0,0 +1,67 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
// heartbeater is responsible for writing process info to redis periodically to
// indicate that the background worker process is up.
type heartbeater struct {
rdb *rdb.RDB
pinfo *base.ProcessInfo
// channel to communicate back to the long running "heartbeater" goroutine.
done chan struct{}
// interval between heartbeats.
interval time.Duration
}
func newHeartbeater(rdb *rdb.RDB, pinfo *base.ProcessInfo, interval time.Duration) *heartbeater {
return &heartbeater{
rdb: rdb,
pinfo: pinfo,
done: make(chan struct{}),
interval: interval,
}
}
func (h *heartbeater) terminate() {
logger.info("Heartbeater shutting down...")
// Signal the heartbeater goroutine to stop.
h.done <- struct{}{}
}
func (h *heartbeater) start() {
h.pinfo.SetStarted(time.Now())
h.pinfo.SetState("running")
go func() {
h.beat()
for {
select {
case <-h.done:
logger.info("Heartbeater done")
return
case <-time.After(h.interval):
h.beat()
}
}
}()
}
func (h *heartbeater) beat() {
// Note: Set TTL to be long enough so that it won't expire before we write again
// and short enough to expire quickly once the process is shut down or killed.
err := h.rdb.WriteProcessInfo(h.pinfo, h.interval*2)
if err != nil {
logger.error("could not write heartbeat data: %v", err)
}
}

88
heartbeat_test.go Normal file
View File

@@ -0,0 +1,88 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
func TestHeartbeater(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
tests := []struct {
interval time.Duration
host string
pid int
queues map[string]uint
concurrency int
}{
{time.Second, "some.address.ec2.aws.com", 45678, map[string]uint{"default": 1}, 10},
}
timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond)
ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{})
for _, tc := range tests {
h.FlushDB(t, r)
pi := base.NewProcessInfo(tc.host, tc.pid, tc.concurrency, tc.queues, false)
hb := newHeartbeater(rdbClient, pi, tc.interval)
want := &base.ProcessInfo{
Host: tc.host,
PID: tc.pid,
Queues: tc.queues,
Concurrency: tc.concurrency,
Started: time.Now(),
State: "running",
}
hb.start()
// allow for heartbeater to write to redis
time.Sleep(tc.interval * 2)
got, err := rdbClient.ReadProcessInfo(tc.host, tc.pid)
if err != nil {
t.Errorf("could not read process status from redis: %v", err)
hb.terminate()
continue
}
if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff)
hb.terminate()
continue
}
// state change
pi.SetState("stopped")
// allow for heartbeater to write to redis
time.Sleep(tc.interval * 2)
want.State = "stopped"
got, err = rdbClient.ReadProcessInfo(tc.host, tc.pid)
if err != nil {
t.Errorf("could not read process status from redis: %v", err)
hb.terminate()
continue
}
if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff)
hb.terminate()
continue
}
hb.terminate()
}
}

View File

@@ -41,6 +41,18 @@ var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) [
return out
})
// SortProcessInfoOpt is a cmp.Option to sort base.ProcessInfo for comparing slice of process info.
var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.ProcessInfo) []*base.ProcessInfo {
out := append([]*base.ProcessInfo(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
if out[i].Host != out[j].Host {
return out[i].Host < out[j].Host
}
return out[i].PID < out[j].PID
})
return out
})
// IgnoreIDOpt is an cmp.Option to ignore ID field in task messages when comparing.
var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID")

View File

@@ -6,7 +6,9 @@
package base
import (
"fmt"
"strings"
"sync"
"time"
"github.com/rs/xid"
@@ -17,6 +19,8 @@ const DefaultQueueName = "default"
// Redis keys
const (
psPrefix = "asynq:ps:" // HASH
AllProcesses = "asynq:ps" // ZSET
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
@@ -45,6 +49,11 @@ func FailureKey(t time.Time) string {
return failurePrefix + t.UTC().Format("2006-01-02")
}
// ProcessInfoKey returns a redis key string for process info.
func ProcessInfoKey(hostname string, pid int) string {
return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid)
}
// TaskMessage is the internal representation of a task with additional metadata fields.
// Serialized data of this type gets written to redis.
type TaskMessage struct {
@@ -69,3 +78,48 @@ type TaskMessage struct {
// ErrorMsg holds the error message from the last failure.
ErrorMsg string
}
// ProcessInfo holds information about running background worker process.
type ProcessInfo struct {
mu sync.Mutex
Concurrency int
Queues map[string]uint
StrictPriority bool
PID int
Host string
State string
Started time.Time
ActiveWorkerCount int
}
// NewProcessInfo returns a new instance of ProcessInfo.
func NewProcessInfo(host string, pid, concurrency int, queues map[string]uint, strict bool) *ProcessInfo {
return &ProcessInfo{
Host: host,
PID: pid,
Concurrency: concurrency,
Queues: queues,
StrictPriority: strict,
}
}
// SetState set the state field of the process info.
func (p *ProcessInfo) SetState(state string) {
p.mu.Lock()
defer p.mu.Unlock()
p.State = state
}
// SetStarted set the started field of the process info.
func (p *ProcessInfo) SetStarted(t time.Time) {
p.mu.Lock()
defer p.mu.Unlock()
p.Started = t
}
// IncrActiveWorkerCount increments active worker count by delta.
func (p *ProcessInfo) IncrActiveWorkerCount(delta int) {
p.mu.Lock()
defer p.mu.Unlock()
p.ActiveWorkerCount += delta
}

View File

@@ -5,6 +5,7 @@
package base
import (
"sync"
"testing"
"time"
)
@@ -60,3 +61,48 @@ func TestFailureKey(t *testing.T) {
}
}
}
func TestProcessInfoKey(t *testing.T) {
tests := []struct {
hostname string
pid int
want string
}{
{"localhost", 9876, "asynq:ps:localhost:9876"},
{"127.0.0.1", 1234, "asynq:ps:127.0.0.1:1234"},
}
for _, tc := range tests {
got := ProcessInfoKey(tc.hostname, tc.pid)
if got != tc.want {
t.Errorf("ProcessInfoKey(%s, %d) = %s, want %s", tc.hostname, tc.pid, got, tc.want)
}
}
}
// Note: Run this test with -race flag to check for data race.
func TestProcessInfoSetter(t *testing.T) {
pi := NewProcessInfo("localhost", 1234, 8, map[string]uint{"default": 1}, false)
var wg sync.WaitGroup
wg.Add(3)
go func() {
pi.SetState("runnning")
wg.Done()
}()
go func() {
pi.SetStarted(time.Now())
pi.IncrActiveWorkerCount(1)
wg.Done()
}()
go func() {
pi.SetState("stopped")
wg.Done()
}()
wg.Wait()
}

View File

@@ -235,67 +235,46 @@ func (r *RDB) RedisInfo() (map[string]string, error) {
return info, nil
}
func reverse(x []string) {
for i := len(x)/2 - 1; i >= 0; i-- {
opp := len(x) - 1 - i
x[i], x[opp] = x[opp], x[i]
}
}
// Pagination specifies the page size and page number
// for the list operation.
type Pagination struct {
// Number of items in the page.
Size uint
// Page number starting from zero.
Page uint
}
func (p Pagination) start() int64 {
return int64(p.Size * p.Page)
}
func (p Pagination) stop() int64 {
return int64(p.Size*p.Page + p.Size - 1)
}
// ListEnqueued returns enqueued tasks that are ready to be processed.
//
// Queue names can be optionally passed to query only the specified queues.
// If none are passed, it will query all queues.
func (r *RDB) ListEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
if len(qnames) == 0 {
return r.listAllEnqueued()
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error) {
qkey := base.QueueKey(qname)
if !r.client.SIsMember(base.AllQueues, qkey).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listEnqueued(qnames...)
}
func (r *RDB) listAllEnqueued() ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
local queues = redis.call("SMEMBERS", KEYS[1])
for _, qkey in ipairs(queues) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
res, err := script.Run(r.client, []string{base.AllQueues}).Result()
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(qkey, start, stop).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func (r *RDB) listEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
for _, qkey in ipairs(KEYS) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
var keys []string
for _, q := range qnames {
keys = append(keys, base.QueueKey(q))
}
res, err := script.Run(r.client, keys).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) {
reverse(data)
var tasks []*EnqueuedTask
for _, s := range data {
var msg base.TaskMessage
@@ -314,11 +293,16 @@ func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) {
}
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
data, err := r.client.LRange(base.InProgressQueue, 0, -1).Result()
func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(base.InProgressQueue, start, stop).Result()
if err != nil {
return nil, err
}
reverse(data)
var tasks []*InProgressTask
for _, s := range data {
var msg base.TaskMessage
@@ -337,8 +321,8 @@ func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
// ListScheduled returns all tasks that are scheduled to be processed
// in the future.
func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Result()
func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
@@ -368,8 +352,8 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry() ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Result()
func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
@@ -401,8 +385,8 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
}
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead() ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Result()
func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
@@ -771,3 +755,40 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
}
return nil
}
// ListProcesses returns the list of process statuses.
func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) {
// Note: Script also removes stale keys.
script := redis.NewScript(`
local res = {}
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
for _, key in ipairs(keys) do
local ps = redis.call("GET", key)
if ps then
table.insert(res, ps)
end
end
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return res
`)
res, err := script.Run(r.client,
[]string{base.AllProcesses}, time.Now().UTC().Unix()).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
var processes []*base.ProcessInfo
for _, s := range data {
var ps base.ProcessInfo
err := json.Unmarshal([]byte(s), &ps)
if err != nil {
continue // skip bad data
}
processes = append(processes, &ps)
}
return processes, nil
}

View File

@@ -5,6 +5,7 @@
package rdb
import (
"fmt"
"sort"
"testing"
"time"
@@ -231,25 +232,24 @@ func TestListEnqueued(t *testing.T) {
t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, Queue: m1.Queue}
t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, Queue: m2.Queue}
t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload, Queue: m3.Queue}
t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload, Queue: m4.Queue}
tests := []struct {
enqueued map[string][]*base.TaskMessage
qnames []string
qname string
want []*EnqueuedTask
}{
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
},
qnames: []string{},
want: []*EnqueuedTask{t1, t2},
qname: base.DefaultQueueName,
want: []*EnqueuedTask{t1, t2},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {},
},
qnames: []string{},
want: []*EnqueuedTask{},
qname: base.DefaultQueueName,
want: []*EnqueuedTask{},
},
{
enqueued: map[string][]*base.TaskMessage{
@@ -257,8 +257,8 @@ func TestListEnqueued(t *testing.T) {
"critical": {m3},
"low": {m4},
},
qnames: []string{},
want: []*EnqueuedTask{t1, t2, t3, t4},
qname: base.DefaultQueueName,
want: []*EnqueuedTask{t1, t2},
},
{
enqueued: map[string][]*base.TaskMessage{
@@ -266,17 +266,8 @@ func TestListEnqueued(t *testing.T) {
"critical": {m3},
"low": {m4},
},
qnames: []string{"critical"},
want: []*EnqueuedTask{t3},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
"critical": {m3},
"low": {m4},
},
qnames: []string{"critical", "low"},
want: []*EnqueuedTask{t3, t4},
qname: "critical",
want: []*EnqueuedTask{t3},
},
}
@@ -286,9 +277,10 @@ func TestListEnqueued(t *testing.T) {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
got, err := r.ListEnqueued(tc.qnames...)
got, err := r.ListEnqueued(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListEnqueued(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*EnqueuedTask) []*EnqueuedTask {
@@ -299,11 +291,76 @@ func TestListEnqueued(t *testing.T) {
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListEnqueuedPagination(t *testing.T) {
r := setup(t)
var msgs []*base.TaskMessage
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
msgs = append(msgs, msg)
}
// create 100 tasks in default queue
h.SeedEnqueuedQueue(t, r.client, msgs)
msgs = []*base.TaskMessage(nil) // empty list
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("custom %d", i), nil)
msgs = append(msgs, msg)
}
// create 100 tasks in custom queue
h.SeedEnqueuedQueue(t, r.client, msgs, "custom")
tests := []struct {
desc string
qname string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", "default", 4, 30, 0, "", ""},
{"second page with custom queue", "custom", 1, 20, 20, "custom 20", "custom 39"},
}
for _, tc := range tests {
got, err := r.ListEnqueued(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListEnqueued(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned a list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListInProgress(t *testing.T) {
r := setup(t)
@@ -330,9 +387,10 @@ func TestListInProgress(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress)
got, err := r.ListInProgress()
got, err := r.ListInProgress(Pagination{Size: 20, Page: 0})
op := "r.ListInProgress(Pagination{Size: 20, Page: 0})"
if err != nil {
t.Errorf("r.ListInProgress() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*InProgressTask) []*InProgressTask {
@@ -343,12 +401,67 @@ func TestListInProgress(t *testing.T) {
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("r.ListInProgress() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListInProgressPagination(t *testing.T) {
r := setup(t)
var msgs []*base.TaskMessage
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
msgs = append(msgs, msg)
}
h.SeedInProgressQueue(t, r.client, msgs)
tests := []struct {
desc string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListInProgress(Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListInProgress(Pagination{Size: %d, Page: %d})", tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListScheduled(t *testing.T) {
r := setup(t)
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
@@ -379,9 +492,10 @@ func TestListScheduled(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedScheduledQueue(t, r.client, tc.scheduled)
got, err := r.ListScheduled()
got, err := r.ListScheduled(Pagination{Size: 20, Page: 0})
op := "r.ListScheduled(Pagination{Size: 20, Page: 0})"
if err != nil {
t.Errorf("r.ListScheduled() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*ScheduledTask) []*ScheduledTask {
@@ -392,12 +506,68 @@ func TestListScheduled(t *testing.T) {
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("r.ListScheduled() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListScheduledPagination(t *testing.T) {
r := setup(t)
// create 100 tasks with an increasing number of wait time.
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
if err := r.Schedule(msg, time.Now().Add(time.Duration(i)*time.Second)); err != nil {
t.Fatal(err)
}
}
tests := []struct {
desc string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListScheduled(Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListScheduled(Pagination{Size: %d, Page: %d})", tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListRetry(t *testing.T) {
r := setup(t)
m1 := &base.TaskMessage{
@@ -464,9 +634,10 @@ func TestListRetry(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedRetryQueue(t, r.client, tc.retry)
got, err := r.ListRetry()
got, err := r.ListRetry(Pagination{Size: 20, Page: 0})
op := "r.ListRetry(Pagination{Size: 20, Page: 0})"
if err != nil {
t.Errorf("r.ListRetry() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*RetryTask) []*RetryTask {
@@ -478,12 +649,68 @@ func TestListRetry(t *testing.T) {
})
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("r.ListRetry() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListRetryPagination(t *testing.T) {
r := setup(t)
// create 100 tasks with an increasing number of wait time.
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
if err := r.Retry(msg, time.Now().Add(time.Duration(i)*time.Second), "error"); err != nil {
t.Fatal(err)
}
}
tests := []struct {
desc string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListRetry(Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListRetry(Pagination{Size: %d, Page: %d})", tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListDead(t *testing.T) {
r := setup(t)
m1 := &base.TaskMessage{
@@ -542,9 +769,10 @@ func TestListDead(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDeadQueue(t, r.client, tc.dead)
got, err := r.ListDead()
got, err := r.ListDead(Pagination{Size: 20, Page: 0})
op := "r.ListDead(Pagination{Size: 20, Page: 0})"
if err != nil {
t.Errorf("r.ListDead() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*DeadTask) []*DeadTask {
@@ -555,12 +783,67 @@ func TestListDead(t *testing.T) {
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("r.ListDead() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListDeadPagination(t *testing.T) {
r := setup(t)
var entries []h.ZSetEntry
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
entries = append(entries, h.ZSetEntry{Msg: msg, Score: float64(i)})
}
h.SeedDeadQueue(t, r.client, entries)
tests := []struct {
desc string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListDead(Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})", tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
var timeCmpOpt = cmpopts.EquateApproxTime(time.Second)
func TestEnqueueDeadTask(t *testing.T) {
@@ -1767,3 +2050,56 @@ func TestRemoveQueueError(t *testing.T) {
}
}
}
func TestListProcesses(t *testing.T) {
r := setup(t)
ps1 := &base.ProcessInfo{
Concurrency: 10,
Queues: map[string]uint{"default": 1},
Host: "do.droplet1",
PID: 1234,
State: "running",
Started: time.Now().Add(-time.Hour),
ActiveWorkerCount: 5,
}
ps2 := &base.ProcessInfo{
Concurrency: 20,
Queues: map[string]uint{"email": 1},
Host: "do.droplet2",
PID: 9876,
State: "stopped",
Started: time.Now().Add(-2 * time.Hour),
ActiveWorkerCount: 20,
}
tests := []struct {
processes []*base.ProcessInfo
}{
{processes: []*base.ProcessInfo{}},
{processes: []*base.ProcessInfo{ps1}},
{processes: []*base.ProcessInfo{ps1, ps2}},
}
ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{})
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, ps := range tc.processes {
if err := r.WriteProcessInfo(ps, 5*time.Second); err != nil {
t.Fatal(err)
}
}
got, err := r.ListProcesses()
if err != nil {
t.Errorf("r.ListProcesses returned an error: %v", err)
}
if diff := cmp.Diff(tc.processes, got, h.SortProcessInfoOpt, ignoreOpt); diff != "" {
t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s",
got, tc.processes, diff)
}
}
}

View File

@@ -156,7 +156,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
return redis.status_reply("OK")
`)
return script.Run(r.client,
[]string{base.InProgressQueue, base.DefaultQueue},
[]string{base.InProgressQueue, base.QueueKey(msg.Queue)},
string(bytes)).Err()
}
@@ -273,13 +273,16 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
// and reports the number of tasks restored.
func (r *RDB) RestoreUnfinished() (int64, error) {
script := redis.NewScript(`
local len = redis.call("LLEN", KEYS[1])
for i = len, 1, -1 do
redis.call("RPOPLPUSH", KEYS[1], KEYS[2])
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
local qkey = ARGV[1] .. decoded["Queue"]
redis.call("LREM", KEYS[1], 0, msg)
redis.call("RPUSH", qkey, msg)
end
return len
return table.getn(msgs)
`)
res, err := script.Run(r.client, []string{base.InProgressQueue, base.DefaultQueue}).Result()
res, err := script.Run(r.client, []string{base.InProgressQueue}, base.QueuePrefix).Result()
if err != nil {
return 0, err
}
@@ -343,3 +346,53 @@ func (r *RDB) forwardSingle(src, dst string) error {
return script.Run(r.client,
[]string{src, dst}, now).Err()
}
// WriteProcessInfo writes process information to redis with expiration
// set to the value ttl.
func (r *RDB) WriteProcessInfo(ps *base.ProcessInfo, ttl time.Duration) error {
bytes, err := json.Marshal(ps)
if err != nil {
return err
}
// Note: Add key to ZSET with expiration time as score.
// ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996
exp := time.Now().Add(ttl).UTC()
key := base.ProcessInfoKey(ps.Host, ps.PID)
// KEYS[1] -> asynq:ps
// KEYS[2] -> asynq:ps:<host:pid>
// ARGV[1] -> expiration time
// ARGV[2] -> TTL in seconds
// ARGV[3] -> process info
script := redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
redis.call("SETEX", KEYS[2], ARGV[2], ARGV[3])
return redis.status_reply("OK")
`)
return script.Run(r.client, []string{base.AllProcesses, key}, float64(exp.Unix()), ttl.Seconds(), string(bytes)).Err()
}
// ReadProcessInfo reads process information stored in redis.
func (r *RDB) ReadProcessInfo(host string, pid int) (*base.ProcessInfo, error) {
key := base.ProcessInfoKey(host, pid)
data, err := r.client.Get(key).Result()
if err != nil {
return nil, err
}
var pinfo base.ProcessInfo
err = json.Unmarshal([]byte(data), &pinfo)
if err != nil {
return nil, err
}
return &pinfo, nil
}
// ClearProcessInfo deletes process information from redis.
func (r *RDB) ClearProcessInfo(ps *base.ProcessInfo) error {
key := base.ProcessInfoKey(ps.Host, ps.PID)
script := redis.NewScript(`
redis.call("ZREM", KEYS[1], KEYS[2])
redis.call("DEL", KEYS[2])
return redis.status_reply("OK")
`)
return script.Run(r.client, []string{base.AllProcesses, key}).Err()
}

View File

@@ -6,11 +6,13 @@ package rdb
import (
"fmt"
"strconv"
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
@@ -236,33 +238,57 @@ func TestRequeue(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("export_csv", nil)
t3 := h.NewTaskMessageWithQueue("send_email", nil, "critical")
tests := []struct {
enqueued []*base.TaskMessage // initial state of the default queue
inProgress []*base.TaskMessage // initial state of the in-progress list
target *base.TaskMessage // task to requeue
wantEnqueued []*base.TaskMessage // final state of the default queue
wantInProgress []*base.TaskMessage // final state of the in-progress list
enqueued map[string][]*base.TaskMessage // initial state of queues
inProgress []*base.TaskMessage // initial state of the in-progress list
target *base.TaskMessage // task to requeue
wantEnqueued map[string][]*base.TaskMessage // final state of queues
wantInProgress []*base.TaskMessage // final state of the in-progress list
}{
{
enqueued: []*base.TaskMessage{},
inProgress: []*base.TaskMessage{t1, t2},
target: t1,
wantEnqueued: []*base.TaskMessage{t1},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {},
},
inProgress: []*base.TaskMessage{t1, t2},
target: t1,
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
},
wantInProgress: []*base.TaskMessage{t2},
},
{
enqueued: []*base.TaskMessage{t1},
inProgress: []*base.TaskMessage{t2},
target: t2,
wantEnqueued: []*base.TaskMessage{t1, t2},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
},
inProgress: []*base.TaskMessage{t2},
target: t2,
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2},
},
wantInProgress: []*base.TaskMessage{},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
"critical": {},
},
inProgress: []*base.TaskMessage{t2, t3},
target: t3,
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
"critical": {t3},
},
wantInProgress: []*base.TaskMessage{t2},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
for qname, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
h.SeedInProgressQueue(t, r.client, tc.inProgress)
err := r.Requeue(tc.target)
@@ -271,9 +297,11 @@ func TestRequeue(t *testing.T) {
continue
}
gotEnqueued := h.GetEnqueuedMessages(t, r.client)
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff)
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
gotInProgress := h.GetInProgressMessages(t, r.client)
@@ -518,41 +546,72 @@ func TestRestoreUnfinished(t *testing.T) {
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("export_csv", nil)
t3 := h.NewTaskMessage("sync_stuff", nil)
t4 := h.NewTaskMessageWithQueue("important", nil, "critical")
t5 := h.NewTaskMessageWithQueue("minor", nil, "low")
tests := []struct {
inProgress []*base.TaskMessage
enqueued []*base.TaskMessage
enqueued map[string][]*base.TaskMessage
want int64
wantInProgress []*base.TaskMessage
wantEnqueued []*base.TaskMessage
wantEnqueued map[string][]*base.TaskMessage
}{
{
inProgress: []*base.TaskMessage{t1, t2, t3},
enqueued: []*base.TaskMessage{},
inProgress: []*base.TaskMessage{t1, t2, t3},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {},
},
want: 3,
wantInProgress: []*base.TaskMessage{},
wantEnqueued: []*base.TaskMessage{t1, t2, t3},
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
},
},
{
inProgress: []*base.TaskMessage{},
enqueued: []*base.TaskMessage{t1, t2, t3},
inProgress: []*base.TaskMessage{},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
},
want: 0,
wantInProgress: []*base.TaskMessage{},
wantEnqueued: []*base.TaskMessage{t1, t2, t3},
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
},
},
{
inProgress: []*base.TaskMessage{t2, t3},
enqueued: []*base.TaskMessage{t1},
inProgress: []*base.TaskMessage{t2, t3},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
},
want: 2,
wantInProgress: []*base.TaskMessage{},
wantEnqueued: []*base.TaskMessage{t1, t2, t3},
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
},
},
{
inProgress: []*base.TaskMessage{t2, t3, t4, t5},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
"critical": {},
"low": {},
},
want: 4,
wantInProgress: []*base.TaskMessage{},
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
"critical": {t4},
"low": {t5},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress)
h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
for qname, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
got, err := r.RestoreUnfinished()
if got != tc.want || err != nil {
@@ -565,9 +624,11 @@ func TestRestoreUnfinished(t *testing.T) {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff)
}
gotEnqueued := h.GetEnqueuedMessages(t, r.client)
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff)
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.QueueKey(qname), diff)
}
}
}
}
@@ -679,3 +740,82 @@ func TestCheckAndEnqueue(t *testing.T) {
}
}
}
func TestReadWriteClearProcessInfo(t *testing.T) {
r := setup(t)
pinfo := &base.ProcessInfo{
Concurrency: 10,
Queues: map[string]uint{"default": 2, "email": 5, "low": 1},
PID: 98765,
Host: "localhost",
State: "running",
Started: time.Now(),
ActiveWorkerCount: 1,
}
tests := []struct {
pi *base.ProcessInfo
ttl time.Duration
}{
{pinfo, 5 * time.Second},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
err := r.WriteProcessInfo(tc.pi, tc.ttl)
if err != nil {
t.Errorf("r.WriteProcessInfo returned an error: %v", err)
continue
}
got, err := r.ReadProcessInfo(tc.pi.Host, tc.pi.PID)
if err != nil {
t.Errorf("r.ReadProcessInfo returned an error: %v", err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{})
if diff := cmp.Diff(tc.pi, got, ignoreOpt); diff != "" {
t.Errorf("r.ReadProcessInfo(%q, %d) = %+v, want %+v; (-want,+got)\n%s",
tc.pi.Host, tc.pi.PID, got, tc.pi, diff)
}
key := base.ProcessInfoKey(tc.pi.Host, tc.pi.PID)
gotTTL := r.client.TTL(key).Val()
if !cmp.Equal(tc.ttl, gotTTL, timeCmpOpt) {
t.Errorf("redis TTL %q returned %v, want %v", key, gotTTL, tc.ttl)
}
now := time.Now().UTC()
allKeys, err := r.client.ZRangeByScore(base.AllProcesses, &redis.ZRangeBy{
Min: strconv.Itoa(int(now.Unix())),
Max: "+inf",
}).Result()
if err != nil {
t.Errorf("redis ZRANGEBYSCORE %q %d +inf returned an error: %v",
base.AllProcesses, now.Unix(), err)
continue
}
wantAllKeys := []string{key}
if diff := cmp.Diff(wantAllKeys, allKeys); diff != "" {
t.Errorf("all keys = %v, want %v; (-want,+got)\n%s", allKeys, wantAllKeys, diff)
}
if err := r.ClearProcessInfo(tc.pi); err != nil {
t.Errorf("r.ClearProcessInfo returned an error: %v", err)
continue
}
// 1 means key exists
if r.client.Exists(key).Val() == 1 {
t.Errorf("expected %q to be deleted", key)
}
if r.client.ZCard(base.AllProcesses).Val() != 0 {
t.Errorf("expected %q to be empty", base.AllProcesses)
}
}
}

View File

@@ -19,6 +19,8 @@ import (
type processor struct {
rdb *rdb.RDB
pinfo *base.ProcessInfo
handler Handler
queueConfig map[string]uint
@@ -53,25 +55,21 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor.
//
// r is an instance of RDB used by the processor.
// n specifies the max number of concurrenct worker goroutines.
// qfcg is a mapping of queue names to associated priority level.
// strict specifies whether queue priority should be treated strictly.
// fn is a function to compute retry delay.
func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor {
func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor {
qcfg := normalizeQueueCfg(pinfo.Queues)
orderedQueues := []string(nil)
if strict {
if pinfo.StrictPriority {
orderedQueues = sortByPriority(qcfg)
}
return &processor{
rdb: r,
pinfo: pinfo,
queueConfig: qcfg,
orderedQueues: orderedQueues,
retryDelayFunc: fn,
syncRequestCh: syncRequestCh,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, n),
sema: make(chan struct{}, pinfo.Concurrency),
done: make(chan struct{}),
abort: make(chan struct{}),
quit: make(chan struct{}),
@@ -153,8 +151,12 @@ func (p *processor) exec() {
p.requeue(msg)
return
case p.sema <- struct{}{}: // acquire token
p.pinfo.IncrActiveWorkerCount(1)
go func() {
defer func() { <-p.sema /* release token */ }()
defer func() {
<-p.sema /* release token */
p.pinfo.IncrActiveWorkerCount(-1)
}()
resCh := make(chan error, 1)
task := NewTask(msg.Type, msg.Payload)
@@ -331,3 +333,35 @@ type byPriority []*queue
func (x byPriority) Len() int { return len(x) }
func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority }
func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
// normalizeQueueCfg divides priority numbers by their
// greatest common divisor.
func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint {
var xs []uint
for _, x := range queueCfg {
xs = append(xs, x)
}
d := gcd(xs...)
res := make(map[string]uint)
for q, x := range queueCfg {
res[q] = x / d
}
return res
}
func gcd(xs ...uint) uint {
fn := func(x, y uint) uint {
for y > 0 {
x, y = y, x%y
}
return x
}
res := xs[0]
for i := 0; i < len(xs); i++ {
res = fn(xs[i], res)
if res == 1 {
return 1
}
}
return res
}

View File

@@ -65,7 +65,8 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task)
return nil
}
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc, nil)
pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false)
p := newProcessor(rdbClient, pi, defaultDelayFunc, nil)
p.handler = HandlerFunc(handler)
p.start()
@@ -148,7 +149,8 @@ func TestProcessorRetry(t *testing.T) {
handler := func(task *Task) error {
return fmt.Errorf(errMsg)
}
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc, nil)
pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false)
p := newProcessor(rdbClient, pi, delayFunc, nil)
p.handler = HandlerFunc(handler)
p.start()
@@ -207,7 +209,8 @@ func TestProcessorQueues(t *testing.T) {
}
for _, tc := range tests {
p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc, nil)
pi := base.NewProcessInfo("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(nil, pi, defaultDelayFunc, nil)
got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@@ -273,7 +276,8 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"low": 1,
}
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc, nil)
pi := base.NewProcessInfo("localhost", 1234, 1 /*concurrency */, queueCfg, true /* strict */)
p := newProcessor(rdbClient, pi, defaultDelayFunc, nil)
p.handler = HandlerFunc(handler)
p.start()

View File

@@ -87,6 +87,7 @@ func TestSyncerRetry(t *testing.T) {
t.Errorf("%q has length %d; want %d", base.InProgressQueue, l, len(inProgress))
}
// FIXME: This assignment introduces data race and running the test with -race will fail.
// simualate failover.
rdbClient = rdb.NewRDB(goodClient)

View File

@@ -1,6 +1,6 @@
# Asynqmon
Asynqmon is a CLI tool to monitor the queues managed by `asynq` package.
Asynqmon is a command line tool to monitor the tasks managed by `asynq` package.
## Table of Contents
@@ -24,7 +24,7 @@ This will create the asynqmon executable under your `$GOPATH/bin` directory.
## Quick Start
Asynqmon tool has a few commands to inspect the state of tasks and queues.
The tool has a few commands to inspect the state of tasks and queues.
Run `asynqmon help` to see all the available commands.
@@ -34,7 +34,7 @@ By default, Asynqmon will try to connect to a redis server running at `localhost
### Stats
Stats command gives the overview of the current state of tasks and queues. Run it in conjunction with `watch` command to repeatedly run `stats`.
Stats command gives the overview of the current state of tasks and queues. You can run it in conjunction with `watch` command to repeatedly run `stats`.
Example:
@@ -46,35 +46,93 @@ This will run `asynqmon stats` command every 3 seconds.
### History
TODO: Add discription
History command shows the number of processed and failed tasks from the last x days.
By default, it shows the stats from the last 10 days. Use `--days` to specify the number of days.
Example:
asynqmon history --days=30
![Gif](/docs/assets/asynqmon_history.gif)
### List
TODO: Add discription
List command shows all tasks in the specified state in a table format
Example:
asynqmon ls retry
asynqmon ls scheduled
asynqmon ls dead
asynqmon ls enqueued:default
asynqmon ls inprogress
### Enqueue
TODO: Add discription
There are two commands to enqueue tasks.
Command `enq` takes a task ID and moves the task to **Enqueued** state. You can obtain the task ID by running `ls` command.
Example:
asynqmon enq d:1575732274:bnogo8gt6toe23vhef0g
Command `enqall` moves all tasks to **Enqueued** state from the specified state.
Example:
asynqmon enqall retry
Running the above command will move all **Retry** tasks to **Enqueued** state.
### Delete
TODO: Add discription
There are two commands for task deletion.
Command `del` takes a task ID and deletes the task. You can obtain the task ID by running `ls` command.
Example:
asynqmon del r:1575732274:bnogo8gt6toe23vhef0g
Command `delall` deletes all tasks which are in the specified state.
Example:
asynqmon delall retry
Running the above command will delete all **Retry** tasks.
### Kill
TODO: Add discription
There are two commands to kill (i.e. move to dead state) tasks.
Command `kill` takes a task ID and kills the task. You can obtain the task ID by running `ls` command.
Example:
asynqmon kill r:1575732274:bnogo8gt6toe23vhef0g
Command `killall` kills all tasks which are in the specified state.
Example:
asynqmon killall retry
Running the above command will move all **Retry** tasks to **Dead** state.
## Config File
You can use a config file to set default values for flags.
You can use a config file to set default values for the flags.
This is useful, for example when you have to connect to a remote redis server.
By default, `asynqmon` will try to read config file located in
`$HOME/.asynqmon.(yml|json)`. You can specify the file location via `--config` flag.
`$HOME/.asynqmon.(yaml|json)`. You can specify the file location via `--config` flag.
Config file example:
```yml
```yaml
uri: 127.0.0.1:6379
db: 2
password: mypassword

View File

@@ -19,7 +19,7 @@ var delallValidArgs = []string{"scheduled", "retry", "dead"}
// delallCmd represents the delall command
var delallCmd = &cobra.Command{
Use: "delall [state]",
Short: "Deletes all tasks from the specified state",
Short: "Deletes all tasks in the specified state",
Long: `Delall (asynqmon delall) will delete all tasks in the specified state.
The argument should be one of "scheduled", "retry", or "dead".

View File

@@ -19,7 +19,7 @@ var killallValidArgs = []string{"scheduled", "retry"}
// killallCmd represents the killall command
var killallCmd = &cobra.Command{
Use: "killall [state]",
Short: "Update all tasks to dead state from the specified state",
Short: "Kills all tasks in the specified state",
Long: `Killall (asynqmon killall) will update all tasks from the specified state to dead state.
The argument should be either "scheduled" or "retry".

View File

@@ -10,7 +10,6 @@ import (
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/go-redis/redis/v7"
@@ -35,26 +34,23 @@ The argument value should be one of "enqueued", "inprogress", "scheduled",
Example:
asynqmon ls dead -> Lists all tasks in dead state
Enqueued tasks can optionally be filtered by providing queue names after ":"
Enqueued tasks requires a queue name after ":"
Example:
asynqmon ls enqueued:critical -> List tasks from critical queue only
asynqmon ls enqueued:default -> List tasks from default queue
asynqmon ls enqueued:critical -> List tasks from critical queue
`,
Args: cobra.ExactValidArgs(1),
Run: ls,
}
// Flags
var pageSize uint
var pageNum uint
func init() {
rootCmd.AddCommand(lsCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// lsCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// lsCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
lsCmd.Flags().UintVar(&pageSize, "size", 30, "page size")
lsCmd.Flags().UintVar(&pageNum, "page", 0, "page number - zero indexed (default 0)")
}
func ls(cmd *cobra.Command, args []string) {
@@ -67,7 +63,11 @@ func ls(cmd *cobra.Command, args []string) {
parts := strings.Split(args[0], ":")
switch parts[0] {
case "enqueued":
listEnqueued(r, parts[1:]...)
if len(parts) != 2 {
fmt.Printf("error: Missing queue name\n`asynqmon ls enqueued:[queue name]`\n")
os.Exit(1)
}
listEnqueued(r, parts[1])
case "inprogress":
listInProgress(r)
case "scheduled":
@@ -77,7 +77,7 @@ func ls(cmd *cobra.Command, args []string) {
case "dead":
listDead(r)
default:
fmt.Printf("error: `asynqmon ls [state]` only accepts %v as the argument.\n", lsValidArgs)
fmt.Printf("error: `asynqmon ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs)
os.Exit(1)
}
}
@@ -113,24 +113,14 @@ func parseQueryID(queryID string) (id xid.ID, score int64, qtype string, err err
return id, score, qtype, nil
}
func listEnqueued(r *rdb.RDB, qnames ...string) {
tasks, err := r.ListEnqueued(qnames...)
func listEnqueued(r *rdb.RDB, qname string) {
tasks, err := r.ListEnqueued(qname, rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
msg := "No enqueued tasks"
if len(qnames) > 0 {
msg += " in"
for i, q := range qnames {
msg += fmt.Sprintf(" %q queue", q)
if i != len(qnames)-1 {
msg += ","
}
}
}
fmt.Println(msg)
fmt.Printf("No enqueued tasks in %q queue\n", qname)
return
}
cols := []string{"ID", "Type", "Payload", "Queue"}
@@ -140,10 +130,11 @@ func listEnqueued(r *rdb.RDB, qnames ...string) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listInProgress(r *rdb.RDB) {
tasks, err := r.ListInProgress()
tasks, err := r.ListInProgress(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -159,10 +150,11 @@ func listInProgress(r *rdb.RDB) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listScheduled(r *rdb.RDB) {
tasks, err := r.ListScheduled()
tasks, err := r.ListScheduled(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -179,10 +171,11 @@ func listScheduled(r *rdb.RDB) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listRetry(r *rdb.RDB) {
tasks, err := r.ListRetry()
tasks, err := r.ListRetry(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -199,10 +192,11 @@ func listRetry(r *rdb.RDB) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listDead(r *rdb.RDB) {
tasks, err := r.ListDead()
tasks, err := r.ListDead(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -218,19 +212,5 @@ func listDead(r *rdb.RDB) {
}
}
printTable(cols, printRows)
}
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
format := strings.Repeat("%v\t", len(cols)) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
var headers []interface{}
var seps []interface{}
for _, name := range cols {
headers = append(headers, name)
seps = append(seps, strings.Repeat("-", len(name)))
}
fmt.Fprintf(tw, format, headers...)
fmt.Fprintf(tw, format, seps...)
printRows(tw, format)
tw.Flush()
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}

118
tools/asynqmon/cmd/ps.go Normal file
View File

@@ -0,0 +1,118 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package cmd
import (
"fmt"
"io"
"os"
"sort"
"strings"
"time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// psCmd represents the ps command
var psCmd = &cobra.Command{
Use: "ps",
Short: "Shows all background worker processes",
Long: `Ps (asynqmon ps) will show all background worker processes
backed by the specified redis instance.
The command shows the following for each process:
* Host and PID of the process
* Number of active workers out of worker pool
* Queue configuration
* State of the worker process ("running" | "stopped")
* Time the process was started
A "running" process is processing tasks in queues.
A "stopped" process is no longer processing new tasks.`,
Args: cobra.NoArgs,
Run: ps,
}
func init() {
rootCmd.AddCommand(psCmd)
}
func ps(cmd *cobra.Command, args []string) {
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}))
processes, err := r.ListProcesses()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(processes) == 0 {
fmt.Println("No processes")
return
}
// sort by hostname and pid
sort.Slice(processes, func(i, j int) bool {
x, y := processes[i], processes[j]
if x.Host != y.Host {
return x.Host < y.Host
}
return x.PID < y.PID
})
// print processes
cols := []string{"Host", "PID", "State", "Active Workers", "Queues", "Started"}
printRows := func(w io.Writer, tmpl string) {
for _, ps := range processes {
fmt.Fprintf(w, tmpl,
ps.Host, ps.PID, ps.State,
fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency),
formatQueues(ps.Queues), timeAgo(ps.Started))
}
}
printTable(cols, printRows)
}
// timeAgo takes a time and returns a string of the format "<duration> ago".
func timeAgo(since time.Time) string {
d := time.Since(since).Round(time.Second)
return fmt.Sprintf("%v ago", d)
}
func formatQueues(qmap map[string]uint) string {
// sort queues by priority and name
type queue struct {
name string
priority uint
}
var queues []*queue
for qname, p := range qmap {
queues = append(queues, &queue{qname, p})
}
sort.Slice(queues, func(i, j int) bool {
x, y := queues[i], queues[j]
if x.priority != y.priority {
return x.priority > y.priority
}
return x.name < y.name
})
var b strings.Builder
l := len(queues)
for _, q := range queues {
fmt.Fprintf(&b, "%s:%d", q.name, q.priority)
l--
if l > 0 {
b.WriteString(" ")
}
}
return b.String()
}

View File

@@ -6,7 +6,10 @@ package cmd
import (
"fmt"
"io"
"os"
"strings"
"text/tabwriter"
"github.com/spf13/cobra"
@@ -25,14 +28,7 @@ var password string
var rootCmd = &cobra.Command{
Use: "asynqmon",
Short: "A monitoring tool for asynq queues",
Long: `Asynqmon is a CLI tool to inspect tasks and queues managed by asynq package.
Use commands to query and mutate the current state of tasks and queues.
Monitoring commands such as "stats" and "ls" can be used in conjunction with the
"watch" command to continuously run the command at a certain interval.
Example: watch -n 5 asynqmon stats`,
Long: `Asynqmon is a montoring CLI to inspect tasks and queues managed by asynq.`,
}
// Execute adds all child commands to the root command and sets flags appropriately.
@@ -81,3 +77,36 @@ func initConfig() {
fmt.Println("Using config file:", viper.ConfigFileUsed())
}
}
// printTable is a helper function to print data in table format.
//
// cols is a list of headers and printRow specifies how to print rows.
//
// Example:
// type User struct {
// Name string
// Addr string
// Age int
// }
// data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...}
// cols := []string{"Name", "Addr", "Age"}
// printRows := func(w io.Writer, tmpl string) {
// for _, u := range data {
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// }
// }
// printTable(cols, printRows)
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
format := strings.Repeat("%v\t", len(cols)) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
var headers []interface{}
var seps []interface{}
for _, name := range cols {
headers = append(headers, name)
seps = append(seps, strings.Repeat("-", len(name)))
}
fmt.Fprintf(tw, format, headers...)
fmt.Fprintf(tw, format, seps...)
printRows(tw, format)
tw.Flush()
}