2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-06 06:45:52 +08:00

Compare commits

..

10 Commits

Author SHA1 Message Date
Ken Hibino
461a6f2587 v0.3.0 2020-02-04 06:29:57 -08:00
Ken Hibino
ba9b42f428 Clean up cli description text 2020-02-04 06:16:29 -08:00
Ken Hibino
3744072e9b Sort processes by host and pid in ps output 2020-02-03 06:59:02 -08:00
Ken Hibino
eadfd5f8b4 [ci skip] Update changelog 2020-02-02 20:59:00 -08:00
Ken Hibino
96f06ac89b Add ps command to asynqmon 2020-02-02 20:56:33 -08:00
Ken Hibino
d03fa34eaf Add hearbeater 2020-02-01 09:35:49 -08:00
Ken Hibino
489e695433 [ci skip] Change font used in demo.gif 2020-01-30 21:47:08 -08:00
Ken Hibino
9ae4be8184 Run and compare benchmarks after successful ci-build 2020-01-30 21:38:16 -08:00
Ken Hibino
36af486303 [ci skip] Add prettier gif for demo 2020-01-29 21:36:17 -08:00
Ken Hibino
a46eb97e6f [ci skip] Shorten readme 2020-01-26 20:06:52 -08:00
24 changed files with 833 additions and 364 deletions

View File

@@ -7,3 +7,5 @@ env:
go: [1.12.x, 1.13.x]
services:
- redis-server
after_success:
- bash ./.travis/benchcmp.sh

15
.travis/benchcmp.sh Executable file
View File

@@ -0,0 +1,15 @@
if [ "${TRAVIS_PULL_REQUEST_BRANCH:-$TRAVIS_BRANCH}" != "master" ]; then
REMOTE_URL="$(git config --get remote.origin.url)";
cd ${TRAVIS_BUILD_DIR}/.. && \
git clone ${REMOTE_URL} "${TRAVIS_REPO_SLUG}-bench" && \
cd "${TRAVIS_REPO_SLUG}-bench" && \
# Benchmark master
git checkout master && \
go test -run=XXX -bench=. ./... > master.txt && \
# Benchmark feature branch
git checkout ${TRAVIS_COMMIT} && \
go test -run=XXX -bench=. ./... > feature.txt && \
go get -u golang.org/x/tools/cmd/benchcmp && \
# compare two benchmarks
benchcmp master.txt feature.txt;
fi

View File

@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.3.0] - 2020-02-04
### Added
- `asynqmon ps` was added to list all background worker processes
## [0.2.2] - 2020-01-26
### Fixed

363
README.md
View File

@@ -6,37 +6,99 @@
[![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq)
[![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
Simple and efficient asynchronous task processing library in Go.
Asynq is a simple Go library for queueing tasks and processing them in the background with workers.
It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily.
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release.
## Table of Contents
![Gif](/docs/assets/demo.gif)
- [Overview](#overview)
- [Requirements](#requirements)
- [Installation](#installation)
- [Getting Started](#getting-started)
- [Command Line Tool](#command-line-tool)
- [Acknowledgements](#acknowledgements)
- [License](#license)
## Installation
## Overview
To install `asynq` library, run the following command:
![Gif](/docs/assets/asynqmon_stats.gif)
```sh
go get -u github.com/hibiken/asynq
```
Package asynq provides a framework for asynchronous task processing.
## Quick Start
Asynq provides:
First, make sure you are running a Redis server locally.
- Clear separation of task producer and consumer
- Ability to process multiple tasks concurrently
- Ability to schedule task processing in the future
- Automatic retry of failed tasks with exponential backoff
- [Ability to configure](https://github.com/hibiken/asynq/wiki/Task-Retry) task retry count and retry delay
- Support for [priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)
- [Unix signal handling](https://github.com/hibiken/asynq/wiki/Signals) to gracefully shutdown background processing
- [Automatic failover](https://github.com/hibiken/asynq/wiki/Automatic-Failover) using Redis sentinels
- [Command line tool](/tools/asynqmon/README.md) to query tasks for monitoring and troubleshooting purposes
```sh
redis-server
```
To create and schedule tasks, use `Client` and provide a task and when to process the task.
```go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
client := asynq.NewClient(r)
// Create a task with task type and payload
t1 := asynq.NewTask("send_welcome_email", map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask("send_reminder_email", map[string]interface{}{"user_id": 42})
// Process immediately
err := client.Schedule(t1, time.Now())
// Process 24 hrs later
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
// If processing fails, retry up to 10 times (Default is 25)
err = client.Schedule(t1, time.Now(), asynq.Retry(10))
// Use custom queue called "critical"
err = client.Schedule(t1, time.Now(), asynq.Queue("critical"))
}
```
To start the background workers, use `Background` and provide your `Handler` to process the tasks.
```go
func main() {
r := &asynq.RedisClientOpt{
Addr: "localhost:6379",
}
bg := asynq.NewBackground(r, &asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// You can optionally create multiple queues
// with different priority level
Queues: map[string]uint{
"critical": 6,
"default": 3,
"low": 1,
},
// See the godoc for other configuration options
})
bg.Run(handler)
}
```
`Handler` is an interface with one method `ProcessTask` with the following signature.
```go
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried after delay.
type Handler interface {
ProcessTask(*Task) error
}
```
For a more detailed walk-through of the library, see our [Getting Started Guide](https://github.com/hibiken/asynq/wiki/Getting-Started).
To Learn more about `asynq` features and APIs, see our [Wiki pages](https://github.com/hibiken/asynq/wiki) and [godoc](https://godoc.org/github.com/hibiken/asynq).
## Requirements
@@ -45,263 +107,6 @@ Asynq provides:
| [Redis](https://redis.io/) | v2.8+ |
| [Go](https://golang.org/) | v1.12+ |
## Installation
To install both `asynq` library and `asynqmon` command line tool, run the following command:
```
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon
```
## Getting Started
In this quick tour of `asynq`, we are going to create two programs.
- `producer.go` will create and schedule tasks to be processed asynchronously by the consumer.
- `consumer.go` will process the tasks created by the producer.
**This guide assumes that you are running a Redis server at `localhost:6379`**.
Before we start, make sure you have Redis installed and running.
The first thing we need to do is create two main files.
```sh
mkdir producer consumer
touch producer/producer.go consumer/consumer.go
```
Import `asynq` in both files.
```go
import "github.com/hibiken/asynq"
```
Asynq uses Redis as a message broker.
Use one of `RedisConnOpt` types to specify how to connect to Redis.
We are going to use `RedisClientOpt` here.
```go
// both in producer.go and consumer.go
var redis = &asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}
```
In `producer.go`, we are going to create a `Client` instance to create and schedule tasks.
In `asynq`, a unit of work to be performed is encapsluated in a struct called `Task`.
Which has two fields: `Type` and `Payload`.
```go
// Task represents a task to be performed.
type Task struct {
// Type indicates the type of task to be performed.
Type string
// Payload holds data needed to perform the task.
Payload Payload
}
```
To create a task, use `NewTask` function and pass type and payload for the task.
You schedule a task by calling `Client.Schedule` passing in the task and the timethe task neeeds to be processed.
```go
// producer.go
func main() {
client := asynq.NewClient(redis)
// Create a task with typename and payload.
t1 := asynq.NewTask(
"send_welcome_email",
map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask(
"send_reminder_email",
map[string]interface{}{"user_id": 42})
// Process the task immediately.
err := client.Schedule(t1, time.Now())
if err != nil {
log.Fatal(err)
}
// Process the task 24 hours later.
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
if err != nil {
log.Fatal(err)
}
}
```
In `consumer.go`, create a `Background` instance to process the tasks.
`NewBackground` function takes `RedisConnOpt` and `Config`.
You can take a look at documentation on `Config` to see the available options.
We are only going to specify the concurrency in this example.
```go
// consumer.go
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(handler)
}
```
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
```go
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
ProcessTask(*Task) error
}
```
The simplest way to implement a handler is to define a function with the same signature and use `asynq.HandlerFunc` adapter type when passing it to `Run`.
```go
func handler(t *asynq.Task) error {
switch t.Type {
case "send_welcome_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
case "send_reminder_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
default:
return fmt.Errorf("unexpected task type: %s", t.Type)
}
return nil
}
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
// Use asynq.HandlerFunc adapter for a handler function
bg.Run(asynq.HandlerFunc(handler))
}
```
We could keep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
To refactor our code, let's create a simple dispatcher which maps task type to its handler.
```go
// consumer.go
// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
mapping map[string]asynq.HandlerFunc
}
// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
d.mapping[taskType] = fn
}
// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
fn, ok := d.mapping[task.Type]
if !ok {
return fmt.Errorf("no handler registered for %q", task.Type)
}
return fn(task)
}
func main() {
d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
d.HandleFunc("send_welcome_email", sendWelcomeEmail)
d.HandleFunc("send_reminder_email", sendReminderEmail)
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(d)
}
func sendWelcomeEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func sendReminderEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
```
Now that we have both task producer and consumer, we can run both programs.
```sh
go run producer.go
```
This will create two tasks: One that should processed immediately and another to be processed 24 hours later.
Let's use `asynqmon` tool to inspect the tasks.
```sh
asynqmon stats
```
You should able to see that there's one task in **Enqueued** state and another in **Scheduled** state.
Note: To understand the meaning of each state, see [Life of a Task](https://github.com/hibiken/asynq/wiki/Life-of-a-Task) on our Wiki page.
Let's run `asynqmon` with `watch` command so that we can continuously run the command to see the changes.
```sh
watch -n 3 asynqmon stats # Runs `asynqmon stats` every 3 seconds
```
And finally, let's start the consumer program to process scheduled tasks.
```sh
go run consumer.go
```
**Note**: This will not exit until you send a signal to terminate the program. See [Signal Wiki page](https://github.com/hibiken/asynq/wiki/Signals) for best practice on how to safely terminate background processing.
You should be able to see text printed in your terminal indicating that the task was processed successfully.
This was a whirlwind tour of `asynq` basics. To learn more about all of its features such as **[priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)** and **[custom retry](https://github.com/hibiken/asynq/wiki/Task-Retry)**, see our [Wiki page](https://github.com/hibiken/asynq/wiki).
## Command Line Tool
Asynq ships with a command line tool to inspect the state of queues and tasks.

View File

@@ -33,10 +33,12 @@ type Background struct {
mu sync.Mutex
running bool
rdb *rdb.RDB
scheduler *scheduler
processor *processor
syncer *syncer
pinfo *base.ProcessInfo
rdb *rdb.RDB
scheduler *scheduler
processor *processor
syncer *syncer
heartbeater *heartbeater
}
// Config specifies the background-task processing behavior.
@@ -107,18 +109,27 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
if queues == nil || len(queues) == 0 {
queues = defaultQueueConfig
}
qcfg := normalizeQueueCfg(queues)
host, err := os.Hostname()
if err != nil {
host = "unknown-host"
}
pid := os.Getpid()
pinfo := base.NewProcessInfo(host, pid, n, queues, cfg.StrictPriority)
rdb := rdb.NewRDB(createRedisClient(r))
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, 5*time.Second)
rdb := rdb.NewRDB(createRedisClient(r))
scheduler := newScheduler(rdb, 5*time.Second, qcfg)
processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh)
heartbeater := newHeartbeater(rdb, pinfo, 5*time.Second)
scheduler := newScheduler(rdb, 5*time.Second, queues)
processor := newProcessor(rdb, pinfo, delayFunc, syncRequestCh)
return &Background{
rdb: rdb,
scheduler: scheduler,
processor: processor,
syncer: syncer,
pinfo: pinfo,
rdb: rdb,
scheduler: scheduler,
processor: processor,
syncer: syncer,
heartbeater: heartbeater,
}
}
@@ -165,6 +176,7 @@ func (bg *Background) Run(handler Handler) {
sig := <-sigs
if sig == syscall.SIGTSTP {
bg.processor.stop()
bg.pinfo.SetState("stopped")
continue
}
break
@@ -184,6 +196,7 @@ func (bg *Background) start(handler Handler) {
bg.running = true
bg.processor.handler = handler
bg.heartbeater.start()
bg.syncer.start()
bg.scheduler.start()
bg.processor.start()
@@ -202,42 +215,12 @@ func (bg *Background) stop() {
// Note: processor and all worker goroutines need to be exited
// before shutting down syncer to avoid goroutine leak.
bg.syncer.terminate()
bg.heartbeater.terminate()
bg.rdb.ClearProcessInfo(bg.pinfo)
bg.rdb.Close()
bg.processor.handler = nil
bg.running = false
logger.info("Bye!")
}
// normalizeQueueCfg divides priority numbers by their
// greatest common divisor.
func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint {
var xs []uint
for _, x := range queueCfg {
xs = append(xs, x)
}
d := gcd(xs...)
res := make(map[string]uint)
for q, x := range queueCfg {
res[q] = x / d
}
return res
}
func gcd(xs ...uint) uint {
fn := func(x, y uint) uint {
for y > 0 {
x, y = y, x%y
}
return x
}
res := xs[0]
for i := 0; i < len(xs); i++ {
res = fn(xs[i], res)
if res == 1 {
return 1
}
}
return res
}

BIN
docs/assets/demo.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 983 KiB

67
heartbeat.go Normal file
View File

@@ -0,0 +1,67 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
// heartbeater is responsible for writing process info to redis periodically to
// indicate that the background worker process is up.
type heartbeater struct {
rdb *rdb.RDB
pinfo *base.ProcessInfo
// channel to communicate back to the long running "heartbeater" goroutine.
done chan struct{}
// interval between heartbeats.
interval time.Duration
}
func newHeartbeater(rdb *rdb.RDB, pinfo *base.ProcessInfo, interval time.Duration) *heartbeater {
return &heartbeater{
rdb: rdb,
pinfo: pinfo,
done: make(chan struct{}),
interval: interval,
}
}
func (h *heartbeater) terminate() {
logger.info("Heartbeater shutting down...")
// Signal the heartbeater goroutine to stop.
h.done <- struct{}{}
}
func (h *heartbeater) start() {
h.pinfo.SetStarted(time.Now())
h.pinfo.SetState("running")
go func() {
h.beat()
for {
select {
case <-h.done:
logger.info("Heartbeater done")
return
case <-time.After(h.interval):
h.beat()
}
}
}()
}
func (h *heartbeater) beat() {
// Note: Set TTL to be long enough so that it won't expire before we write again
// and short enough to expire quickly once the process is shut down or killed.
err := h.rdb.WriteProcessInfo(h.pinfo, h.interval*2)
if err != nil {
logger.error("could not write heartbeat data: %v", err)
}
}

88
heartbeat_test.go Normal file
View File

@@ -0,0 +1,88 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
func TestHeartbeater(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
tests := []struct {
interval time.Duration
host string
pid int
queues map[string]uint
concurrency int
}{
{time.Second, "some.address.ec2.aws.com", 45678, map[string]uint{"default": 1}, 10},
}
timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond)
ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{})
for _, tc := range tests {
h.FlushDB(t, r)
pi := base.NewProcessInfo(tc.host, tc.pid, tc.concurrency, tc.queues, false)
hb := newHeartbeater(rdbClient, pi, tc.interval)
want := &base.ProcessInfo{
Host: tc.host,
PID: tc.pid,
Queues: tc.queues,
Concurrency: tc.concurrency,
Started: time.Now(),
State: "running",
}
hb.start()
// allow for heartbeater to write to redis
time.Sleep(tc.interval * 2)
got, err := rdbClient.ReadProcessInfo(tc.host, tc.pid)
if err != nil {
t.Errorf("could not read process status from redis: %v", err)
hb.terminate()
continue
}
if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff)
hb.terminate()
continue
}
// state change
pi.SetState("stopped")
// allow for heartbeater to write to redis
time.Sleep(tc.interval * 2)
want.State = "stopped"
got, err = rdbClient.ReadProcessInfo(tc.host, tc.pid)
if err != nil {
t.Errorf("could not read process status from redis: %v", err)
hb.terminate()
continue
}
if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff)
hb.terminate()
continue
}
hb.terminate()
}
}

View File

@@ -41,6 +41,18 @@ var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) [
return out
})
// SortProcessInfoOpt is a cmp.Option to sort base.ProcessInfo for comparing slice of process info.
var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.ProcessInfo) []*base.ProcessInfo {
out := append([]*base.ProcessInfo(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
if out[i].Host != out[j].Host {
return out[i].Host < out[j].Host
}
return out[i].PID < out[j].PID
})
return out
})
// IgnoreIDOpt is an cmp.Option to ignore ID field in task messages when comparing.
var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID")

View File

@@ -6,7 +6,9 @@
package base
import (
"fmt"
"strings"
"sync"
"time"
"github.com/rs/xid"
@@ -17,6 +19,8 @@ const DefaultQueueName = "default"
// Redis keys
const (
psPrefix = "asynq:ps:" // HASH
AllProcesses = "asynq:ps" // ZSET
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
@@ -45,6 +49,11 @@ func FailureKey(t time.Time) string {
return failurePrefix + t.UTC().Format("2006-01-02")
}
// ProcessInfoKey returns a redis key string for process info.
func ProcessInfoKey(hostname string, pid int) string {
return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid)
}
// TaskMessage is the internal representation of a task with additional metadata fields.
// Serialized data of this type gets written to redis.
type TaskMessage struct {
@@ -69,3 +78,48 @@ type TaskMessage struct {
// ErrorMsg holds the error message from the last failure.
ErrorMsg string
}
// ProcessInfo holds information about running background worker process.
type ProcessInfo struct {
mu sync.Mutex
Concurrency int
Queues map[string]uint
StrictPriority bool
PID int
Host string
State string
Started time.Time
ActiveWorkerCount int
}
// NewProcessInfo returns a new instance of ProcessInfo.
func NewProcessInfo(host string, pid, concurrency int, queues map[string]uint, strict bool) *ProcessInfo {
return &ProcessInfo{
Host: host,
PID: pid,
Concurrency: concurrency,
Queues: queues,
StrictPriority: strict,
}
}
// SetState set the state field of the process info.
func (p *ProcessInfo) SetState(state string) {
p.mu.Lock()
defer p.mu.Unlock()
p.State = state
}
// SetStarted set the started field of the process info.
func (p *ProcessInfo) SetStarted(t time.Time) {
p.mu.Lock()
defer p.mu.Unlock()
p.Started = t
}
// IncrActiveWorkerCount increments active worker count by delta.
func (p *ProcessInfo) IncrActiveWorkerCount(delta int) {
p.mu.Lock()
defer p.mu.Unlock()
p.ActiveWorkerCount += delta
}

View File

@@ -5,6 +5,7 @@
package base
import (
"sync"
"testing"
"time"
)
@@ -60,3 +61,48 @@ func TestFailureKey(t *testing.T) {
}
}
}
func TestProcessInfoKey(t *testing.T) {
tests := []struct {
hostname string
pid int
want string
}{
{"localhost", 9876, "asynq:ps:localhost:9876"},
{"127.0.0.1", 1234, "asynq:ps:127.0.0.1:1234"},
}
for _, tc := range tests {
got := ProcessInfoKey(tc.hostname, tc.pid)
if got != tc.want {
t.Errorf("ProcessInfoKey(%s, %d) = %s, want %s", tc.hostname, tc.pid, got, tc.want)
}
}
}
// Note: Run this test with -race flag to check for data race.
func TestProcessInfoSetter(t *testing.T) {
pi := NewProcessInfo("localhost", 1234, 8, map[string]uint{"default": 1}, false)
var wg sync.WaitGroup
wg.Add(3)
go func() {
pi.SetState("runnning")
wg.Done()
}()
go func() {
pi.SetStarted(time.Now())
pi.IncrActiveWorkerCount(1)
wg.Done()
}()
go func() {
pi.SetState("stopped")
wg.Done()
}()
wg.Wait()
}

View File

@@ -755,3 +755,40 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
}
return nil
}
// ListProcesses returns the list of process statuses.
func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) {
// Note: Script also removes stale keys.
script := redis.NewScript(`
local res = {}
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
for _, key in ipairs(keys) do
local ps = redis.call("GET", key)
if ps then
table.insert(res, ps)
end
end
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return res
`)
res, err := script.Run(r.client,
[]string{base.AllProcesses}, time.Now().UTC().Unix()).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
var processes []*base.ProcessInfo
for _, s := range data {
var ps base.ProcessInfo
err := json.Unmarshal([]byte(s), &ps)
if err != nil {
continue // skip bad data
}
processes = append(processes, &ps)
}
return processes, nil
}

View File

@@ -2050,3 +2050,56 @@ func TestRemoveQueueError(t *testing.T) {
}
}
}
func TestListProcesses(t *testing.T) {
r := setup(t)
ps1 := &base.ProcessInfo{
Concurrency: 10,
Queues: map[string]uint{"default": 1},
Host: "do.droplet1",
PID: 1234,
State: "running",
Started: time.Now().Add(-time.Hour),
ActiveWorkerCount: 5,
}
ps2 := &base.ProcessInfo{
Concurrency: 20,
Queues: map[string]uint{"email": 1},
Host: "do.droplet2",
PID: 9876,
State: "stopped",
Started: time.Now().Add(-2 * time.Hour),
ActiveWorkerCount: 20,
}
tests := []struct {
processes []*base.ProcessInfo
}{
{processes: []*base.ProcessInfo{}},
{processes: []*base.ProcessInfo{ps1}},
{processes: []*base.ProcessInfo{ps1, ps2}},
}
ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{})
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, ps := range tc.processes {
if err := r.WriteProcessInfo(ps, 5*time.Second); err != nil {
t.Fatal(err)
}
}
got, err := r.ListProcesses()
if err != nil {
t.Errorf("r.ListProcesses returned an error: %v", err)
}
if diff := cmp.Diff(tc.processes, got, h.SortProcessInfoOpt, ignoreOpt); diff != "" {
t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s",
got, tc.processes, diff)
}
}
}

View File

@@ -346,3 +346,53 @@ func (r *RDB) forwardSingle(src, dst string) error {
return script.Run(r.client,
[]string{src, dst}, now).Err()
}
// WriteProcessInfo writes process information to redis with expiration
// set to the value ttl.
func (r *RDB) WriteProcessInfo(ps *base.ProcessInfo, ttl time.Duration) error {
bytes, err := json.Marshal(ps)
if err != nil {
return err
}
// Note: Add key to ZSET with expiration time as score.
// ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996
exp := time.Now().Add(ttl).UTC()
key := base.ProcessInfoKey(ps.Host, ps.PID)
// KEYS[1] -> asynq:ps
// KEYS[2] -> asynq:ps:<host:pid>
// ARGV[1] -> expiration time
// ARGV[2] -> TTL in seconds
// ARGV[3] -> process info
script := redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2])
redis.call("SETEX", KEYS[2], ARGV[2], ARGV[3])
return redis.status_reply("OK")
`)
return script.Run(r.client, []string{base.AllProcesses, key}, float64(exp.Unix()), ttl.Seconds(), string(bytes)).Err()
}
// ReadProcessInfo reads process information stored in redis.
func (r *RDB) ReadProcessInfo(host string, pid int) (*base.ProcessInfo, error) {
key := base.ProcessInfoKey(host, pid)
data, err := r.client.Get(key).Result()
if err != nil {
return nil, err
}
var pinfo base.ProcessInfo
err = json.Unmarshal([]byte(data), &pinfo)
if err != nil {
return nil, err
}
return &pinfo, nil
}
// ClearProcessInfo deletes process information from redis.
func (r *RDB) ClearProcessInfo(ps *base.ProcessInfo) error {
key := base.ProcessInfoKey(ps.Host, ps.PID)
script := redis.NewScript(`
redis.call("ZREM", KEYS[1], KEYS[2])
redis.call("DEL", KEYS[2])
return redis.status_reply("OK")
`)
return script.Run(r.client, []string{base.AllProcesses, key}).Err()
}

View File

@@ -6,11 +6,13 @@ package rdb
import (
"fmt"
"strconv"
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
@@ -738,3 +740,82 @@ func TestCheckAndEnqueue(t *testing.T) {
}
}
}
func TestReadWriteClearProcessInfo(t *testing.T) {
r := setup(t)
pinfo := &base.ProcessInfo{
Concurrency: 10,
Queues: map[string]uint{"default": 2, "email": 5, "low": 1},
PID: 98765,
Host: "localhost",
State: "running",
Started: time.Now(),
ActiveWorkerCount: 1,
}
tests := []struct {
pi *base.ProcessInfo
ttl time.Duration
}{
{pinfo, 5 * time.Second},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
err := r.WriteProcessInfo(tc.pi, tc.ttl)
if err != nil {
t.Errorf("r.WriteProcessInfo returned an error: %v", err)
continue
}
got, err := r.ReadProcessInfo(tc.pi.Host, tc.pi.PID)
if err != nil {
t.Errorf("r.ReadProcessInfo returned an error: %v", err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{})
if diff := cmp.Diff(tc.pi, got, ignoreOpt); diff != "" {
t.Errorf("r.ReadProcessInfo(%q, %d) = %+v, want %+v; (-want,+got)\n%s",
tc.pi.Host, tc.pi.PID, got, tc.pi, diff)
}
key := base.ProcessInfoKey(tc.pi.Host, tc.pi.PID)
gotTTL := r.client.TTL(key).Val()
if !cmp.Equal(tc.ttl, gotTTL, timeCmpOpt) {
t.Errorf("redis TTL %q returned %v, want %v", key, gotTTL, tc.ttl)
}
now := time.Now().UTC()
allKeys, err := r.client.ZRangeByScore(base.AllProcesses, &redis.ZRangeBy{
Min: strconv.Itoa(int(now.Unix())),
Max: "+inf",
}).Result()
if err != nil {
t.Errorf("redis ZRANGEBYSCORE %q %d +inf returned an error: %v",
base.AllProcesses, now.Unix(), err)
continue
}
wantAllKeys := []string{key}
if diff := cmp.Diff(wantAllKeys, allKeys); diff != "" {
t.Errorf("all keys = %v, want %v; (-want,+got)\n%s", allKeys, wantAllKeys, diff)
}
if err := r.ClearProcessInfo(tc.pi); err != nil {
t.Errorf("r.ClearProcessInfo returned an error: %v", err)
continue
}
// 1 means key exists
if r.client.Exists(key).Val() == 1 {
t.Errorf("expected %q to be deleted", key)
}
if r.client.ZCard(base.AllProcesses).Val() != 0 {
t.Errorf("expected %q to be empty", base.AllProcesses)
}
}
}

View File

@@ -19,6 +19,8 @@ import (
type processor struct {
rdb *rdb.RDB
pinfo *base.ProcessInfo
handler Handler
queueConfig map[string]uint
@@ -53,25 +55,21 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor.
//
// r is an instance of RDB used by the processor.
// n specifies the max number of concurrenct worker goroutines.
// qfcg is a mapping of queue names to associated priority level.
// strict specifies whether queue priority should be treated strictly.
// fn is a function to compute retry delay.
func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor {
func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor {
qcfg := normalizeQueueCfg(pinfo.Queues)
orderedQueues := []string(nil)
if strict {
if pinfo.StrictPriority {
orderedQueues = sortByPriority(qcfg)
}
return &processor{
rdb: r,
pinfo: pinfo,
queueConfig: qcfg,
orderedQueues: orderedQueues,
retryDelayFunc: fn,
syncRequestCh: syncRequestCh,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, n),
sema: make(chan struct{}, pinfo.Concurrency),
done: make(chan struct{}),
abort: make(chan struct{}),
quit: make(chan struct{}),
@@ -153,8 +151,12 @@ func (p *processor) exec() {
p.requeue(msg)
return
case p.sema <- struct{}{}: // acquire token
p.pinfo.IncrActiveWorkerCount(1)
go func() {
defer func() { <-p.sema /* release token */ }()
defer func() {
<-p.sema /* release token */
p.pinfo.IncrActiveWorkerCount(-1)
}()
resCh := make(chan error, 1)
task := NewTask(msg.Type, msg.Payload)
@@ -331,3 +333,35 @@ type byPriority []*queue
func (x byPriority) Len() int { return len(x) }
func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority }
func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
// normalizeQueueCfg divides priority numbers by their
// greatest common divisor.
func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint {
var xs []uint
for _, x := range queueCfg {
xs = append(xs, x)
}
d := gcd(xs...)
res := make(map[string]uint)
for q, x := range queueCfg {
res[q] = x / d
}
return res
}
func gcd(xs ...uint) uint {
fn := func(x, y uint) uint {
for y > 0 {
x, y = y, x%y
}
return x
}
res := xs[0]
for i := 0; i < len(xs); i++ {
res = fn(xs[i], res)
if res == 1 {
return 1
}
}
return res
}

View File

@@ -65,7 +65,8 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task)
return nil
}
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc, nil)
pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false)
p := newProcessor(rdbClient, pi, defaultDelayFunc, nil)
p.handler = HandlerFunc(handler)
p.start()
@@ -148,7 +149,8 @@ func TestProcessorRetry(t *testing.T) {
handler := func(task *Task) error {
return fmt.Errorf(errMsg)
}
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc, nil)
pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false)
p := newProcessor(rdbClient, pi, delayFunc, nil)
p.handler = HandlerFunc(handler)
p.start()
@@ -207,7 +209,8 @@ func TestProcessorQueues(t *testing.T) {
}
for _, tc := range tests {
p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc, nil)
pi := base.NewProcessInfo("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(nil, pi, defaultDelayFunc, nil)
got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@@ -273,7 +276,8 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"low": 1,
}
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc, nil)
pi := base.NewProcessInfo("localhost", 1234, 1 /*concurrency */, queueCfg, true /* strict */)
p := newProcessor(rdbClient, pi, defaultDelayFunc, nil)
p.handler = HandlerFunc(handler)
p.start()

View File

@@ -87,6 +87,7 @@ func TestSyncerRetry(t *testing.T) {
t.Errorf("%q has length %d; want %d", base.InProgressQueue, l, len(inProgress))
}
// FIXME: This assignment introduces data race and running the test with -race will fail.
// simualate failover.
rdbClient = rdb.NewRDB(goodClient)

View File

@@ -65,7 +65,7 @@ Example:
asynqmon ls retry
asynqmon ls scheduled
asynqmon ls dead
asynqmon ls enqueued
asynqmon ls enqueued:default
asynqmon ls inprogress
### Enqueue

View File

@@ -19,7 +19,7 @@ var delallValidArgs = []string{"scheduled", "retry", "dead"}
// delallCmd represents the delall command
var delallCmd = &cobra.Command{
Use: "delall [state]",
Short: "Deletes all tasks from the specified state",
Short: "Deletes all tasks in the specified state",
Long: `Delall (asynqmon delall) will delete all tasks in the specified state.
The argument should be one of "scheduled", "retry", or "dead".

View File

@@ -19,7 +19,7 @@ var killallValidArgs = []string{"scheduled", "retry"}
// killallCmd represents the killall command
var killallCmd = &cobra.Command{
Use: "killall [state]",
Short: "Update all tasks to dead state from the specified state",
Short: "Kills all tasks in the specified state",
Long: `Killall (asynqmon killall) will update all tasks from the specified state to dead state.
The argument should be either "scheduled" or "retry".

View File

@@ -10,7 +10,6 @@ import (
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/go-redis/redis/v7"
@@ -215,18 +214,3 @@ func listDead(r *rdb.RDB) {
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
format := strings.Repeat("%v\t", len(cols)) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
var headers []interface{}
var seps []interface{}
for _, name := range cols {
headers = append(headers, name)
seps = append(seps, strings.Repeat("-", len(name)))
}
fmt.Fprintf(tw, format, headers...)
fmt.Fprintf(tw, format, seps...)
printRows(tw, format)
tw.Flush()
}

118
tools/asynqmon/cmd/ps.go Normal file
View File

@@ -0,0 +1,118 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package cmd
import (
"fmt"
"io"
"os"
"sort"
"strings"
"time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// psCmd represents the ps command
var psCmd = &cobra.Command{
Use: "ps",
Short: "Shows all background worker processes",
Long: `Ps (asynqmon ps) will show all background worker processes
backed by the specified redis instance.
The command shows the following for each process:
* Host and PID of the process
* Number of active workers out of worker pool
* Queue configuration
* State of the worker process ("running" | "stopped")
* Time the process was started
A "running" process is processing tasks in queues.
A "stopped" process is no longer processing new tasks.`,
Args: cobra.NoArgs,
Run: ps,
}
func init() {
rootCmd.AddCommand(psCmd)
}
func ps(cmd *cobra.Command, args []string) {
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}))
processes, err := r.ListProcesses()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(processes) == 0 {
fmt.Println("No processes")
return
}
// sort by hostname and pid
sort.Slice(processes, func(i, j int) bool {
x, y := processes[i], processes[j]
if x.Host != y.Host {
return x.Host < y.Host
}
return x.PID < y.PID
})
// print processes
cols := []string{"Host", "PID", "State", "Active Workers", "Queues", "Started"}
printRows := func(w io.Writer, tmpl string) {
for _, ps := range processes {
fmt.Fprintf(w, tmpl,
ps.Host, ps.PID, ps.State,
fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency),
formatQueues(ps.Queues), timeAgo(ps.Started))
}
}
printTable(cols, printRows)
}
// timeAgo takes a time and returns a string of the format "<duration> ago".
func timeAgo(since time.Time) string {
d := time.Since(since).Round(time.Second)
return fmt.Sprintf("%v ago", d)
}
func formatQueues(qmap map[string]uint) string {
// sort queues by priority and name
type queue struct {
name string
priority uint
}
var queues []*queue
for qname, p := range qmap {
queues = append(queues, &queue{qname, p})
}
sort.Slice(queues, func(i, j int) bool {
x, y := queues[i], queues[j]
if x.priority != y.priority {
return x.priority > y.priority
}
return x.name < y.name
})
var b strings.Builder
l := len(queues)
for _, q := range queues {
fmt.Fprintf(&b, "%s:%d", q.name, q.priority)
l--
if l > 0 {
b.WriteString(" ")
}
}
return b.String()
}

View File

@@ -6,7 +6,10 @@ package cmd
import (
"fmt"
"io"
"os"
"strings"
"text/tabwriter"
"github.com/spf13/cobra"
@@ -25,14 +28,7 @@ var password string
var rootCmd = &cobra.Command{
Use: "asynqmon",
Short: "A monitoring tool for asynq queues",
Long: `Asynqmon is a CLI tool to inspect tasks and queues managed by asynq package.
Use commands to query and mutate the current state of tasks and queues.
Monitoring commands such as "stats" and "ls" can be used in conjunction with the
"watch" command to continuously run the command at a certain interval.
Example: watch -n 5 asynqmon stats`,
Long: `Asynqmon is a montoring CLI to inspect tasks and queues managed by asynq.`,
}
// Execute adds all child commands to the root command and sets flags appropriately.
@@ -81,3 +77,36 @@ func initConfig() {
fmt.Println("Using config file:", viper.ConfigFileUsed())
}
}
// printTable is a helper function to print data in table format.
//
// cols is a list of headers and printRow specifies how to print rows.
//
// Example:
// type User struct {
// Name string
// Addr string
// Age int
// }
// data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...}
// cols := []string{"Name", "Addr", "Age"}
// printRows := func(w io.Writer, tmpl string) {
// for _, u := range data {
// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age)
// }
// }
// printTable(cols, printRows)
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
format := strings.Repeat("%v\t", len(cols)) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
var headers []interface{}
var seps []interface{}
for _, name := range cols {
headers = append(headers, name)
seps = append(seps, strings.Repeat("-", len(name)))
}
fmt.Fprintf(tw, format, headers...)
fmt.Fprintf(tw, format, seps...)
printRows(tw, format)
tw.Flush()
}