2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-22 14:05:52 +08:00

Compare commits

...

6 Commits

Author SHA1 Message Date
Ken Hibino
c934ef115b v0.2.2 2020-01-26 16:07:44 -08:00
Ken Hibino
6fbaa2ed6c (fix): RestoreUnfinished to select correct queue 2020-01-26 16:05:46 -08:00
Ken Hibino
166497748b (fix): Requeue to select correct queue 2020-01-26 16:05:46 -08:00
Ken Hibino
31123fd42a Paginate tasks with asynqmon ls command
Changes:
* Added --page and --size flags to ls command
* By default, the command will show first 30 tasks from the specified
queue
2020-01-26 13:12:01 -08:00
Ken Hibino
3ed155b45b [ci skip] Update readme 2020-01-25 08:08:13 -08:00
Ken Hibino
58d2ed94e7 [ci skip] Fix typo 2020-01-23 06:05:18 -08:00
10 changed files with 643 additions and 214 deletions

View File

@@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.2.2] - 2020-01-26
### Fixed
- Fixed restoring unfinished tasks back to correct queues.
### Changed
- `asynqmon ls` command is now paginated (default 30 tasks from first page)
- `asynqmon ls enqueued:[queue name]` requires queue name to be specified
## [0.2.1] - 2020-01-22
### Fixed

105
README.md
View File

@@ -6,9 +6,9 @@
[![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq)
[![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
Simple and efficent asynchronous task processing library in Go.
Simple and efficient asynchronous task processing library in Go.
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before the release of verson 1.0.0.
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release.
## Table of Contents
@@ -16,7 +16,7 @@ Simple and efficent asynchronous task processing library in Go.
- [Requirements](#requirements)
- [Installation](#installation)
- [Getting Started](#getting-started)
- [Monitoring CLI](#monitoring-cli)
- [Command Line Tool](#command-line-tool)
- [Acknowledgements](#acknowledgements)
- [License](#license)
@@ -24,21 +24,19 @@ Simple and efficent asynchronous task processing library in Go.
![Gif](/docs/assets/asynqmon_stats.gif)
Asynq provides a simple interface to asynchronous task processing.
It also ships with a tool to monitor the queues and take manual actions if needed.
Package asynq provides a framework for asynchronous task processing.
Asynq provides:
- Clear separation of task producer and consumer
- Ability to process multiple tasks concurrently
- Ability to schedule task processing in the future
- Automatic retry of failed tasks with exponential backoff
- [Automatic failover](https://github.com/hibiken/asynq/wiki/Automatic-Failover) using Redis sentinels
- [Ability to configure](https://github.com/hibiken/asynq/wiki/Task-Retry) max retry count per task
- Ability to configure max number of worker goroutines to process tasks
- [Ability to configure](https://github.com/hibiken/asynq/wiki/Task-Retry) task retry count and retry delay
- Support for [priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)
- [Unix signal handling](https://github.com/hibiken/asynq/wiki/Signals) to gracefully shutdown background processing
- [CLI tool](/tools/asynqmon/README.md) to query and mutate queues state for mointoring and administrative purposes
- [Automatic failover](https://github.com/hibiken/asynq/wiki/Automatic-Failover) using Redis sentinels
- [Command line tool](/tools/asynqmon/README.md) to query tasks for monitoring and troubleshooting purposes
## Requirements
@@ -49,7 +47,7 @@ Asynq provides:
## Installation
To install both `asynq` library and `asynqmon` CLI tool, run the following command:
To install both `asynq` library and `asynqmon` command line tool, run the following command:
```
go get -u github.com/hibiken/asynq
@@ -66,15 +64,22 @@ In this quick tour of `asynq`, we are going to create two programs.
**This guide assumes that you are running a Redis server at `localhost:6379`**.
Before we start, make sure you have Redis installed and running.
1. Import `asynq` in both files.
The first thing we need to do is create two main files.
```sh
mkdir producer consumer
touch producer/producer.go consumer/consumer.go
```
Import `asynq` in both files.
```go
import "github.com/hibiken/asynq"
```
2. Asynq uses Redis as a message broker.
Use one of `RedisConnOpt` types to specify how to connect to Redis.
We are going to use `RedisClientOpt` here.
Asynq uses Redis as a message broker.
Use one of `RedisConnOpt` types to specify how to connect to Redis.
We are going to use `RedisClientOpt` here.
```go
// both in producer.go and consumer.go
@@ -88,7 +93,25 @@ var redis = &asynq.RedisClientOpt{
}
```
3. In `producer.go`, create a `Client` instance to create and schedule tasks.
In `producer.go`, we are going to create a `Client` instance to create and schedule tasks.
In `asynq`, a unit of work to be performed is encapsluated in a struct called `Task`.
Which has two fields: `Type` and `Payload`.
```go
// Task represents a task to be performed.
type Task struct {
// Type indicates the type of task to be performed.
Type string
// Payload holds data needed to perform the task.
Payload Payload
}
```
To create a task, use `NewTask` function and pass type and payload for the task.
You schedule a task by calling `Client.Schedule` passing in the task and the timethe task neeeds to be processed.
```go
// producer.go
@@ -118,7 +141,13 @@ func main() {
}
```
4. In `consumer.go`, create a `Background` instance to process tasks.
In `consumer.go`, create a `Background` instance to process the tasks.
`NewBackground` function takes `RedisConnOpt` and `Config`.
You can take a look at documentation on `Config` to see the available options.
We are only going to specify the concurrency in this example.
```go
// consumer.go
@@ -179,7 +208,7 @@ func main() {
}
```
We could kep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
We could keep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
To refactor our code, let's create a simple dispatcher which maps task type to its handler.
@@ -239,19 +268,11 @@ func sendReminderEmail(t *asynq.Task) error {
Now that we have both task producer and consumer, we can run both programs.
```sh
go run consumer.go
```
**Note**: This will not exit until you send a signal to terminate the program. See [Signal Wiki page](https://github.com/hibiken/asynq/wiki/Signals) for best practice on how to safely terminate background processing.
With our consumer running, also run
```sh
go run producer.go
```
This will create a task and the first task will get processed immediately by the consumer. The second task will be processed 24 hours later.
This will create two tasks: One that should processed immediately and another to be processed 24 hours later.
Let's use `asynqmon` tool to inspect the tasks.
@@ -259,23 +280,37 @@ Let's use `asynqmon` tool to inspect the tasks.
asynqmon stats
```
This command will show the number of tasks in each state and stats for the current date as well as redis information.
You should able to see that there's one task in **Enqueued** state and another in **Scheduled** state.
To understand the meaning of each state, see [Life of a Task Wiki page](https://github.com/hibiken/asynq/wiki/Life-of-a-Task).
Note: To understand the meaning of each state, see [Life of a Task](https://github.com/hibiken/asynq/wiki/Life-of-a-Task) on our Wiki page.
For in-depth guide on `asynqmon` tool, see the [README](/tools/asynqmon/README.md) for the CLI.
Let's run `asynqmon` with `watch` command so that we can continuously run the command to see the changes.
This was a quick tour of `asynq` basics. To see all of its features such as **[priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)** and **[custom retry](https://github.com/hibiken/asynq/wiki/Task-Retry)**, see [the Wiki page](https://github.com/hibiken/asynq/wiki).
```sh
watch -n 3 asynqmon stats # Runs `asynqmon stats` every 3 seconds
```
## Monitoring CLI
And finally, let's start the consumer program to process scheduled tasks.
Asynq ships with a CLI tool to inspect the state of queues and tasks.
```sh
go run consumer.go
```
To install the CLI, run the following command:
**Note**: This will not exit until you send a signal to terminate the program. See [Signal Wiki page](https://github.com/hibiken/asynq/wiki/Signals) for best practice on how to safely terminate background processing.
You should be able to see text printed in your terminal indicating that the task was processed successfully.
This was a whirlwind tour of `asynq` basics. To learn more about all of its features such as **[priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)** and **[custom retry](https://github.com/hibiken/asynq/wiki/Task-Retry)**, see our [Wiki page](https://github.com/hibiken/asynq/wiki).
## Command Line Tool
Asynq ships with a command line tool to inspect the state of queues and tasks.
To install, run the following command:
go get github.com/hibiken/asynq/tools/asynqmon
For details on how to use the tool, see the [README](/tools/asynqmon/README.md) for the asynqmon CLI.
For details on how to use the tool, refer to the tool's [README](/tools/asynqmon/README.md).
## Acknowledgements

2
doc.go
View File

@@ -3,7 +3,7 @@
// that can be found in the LICENSE file.
/*
Package asynq provides a framework for background task processing.
Package asynq provides a framework for asynchronous task processing.
Asynq uses Redis as a message broker. To connect to redis server,
specify the options using one of RedisConnOpt types.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 MiB

View File

@@ -235,67 +235,46 @@ func (r *RDB) RedisInfo() (map[string]string, error) {
return info, nil
}
func reverse(x []string) {
for i := len(x)/2 - 1; i >= 0; i-- {
opp := len(x) - 1 - i
x[i], x[opp] = x[opp], x[i]
}
}
// Pagination specifies the page size and page number
// for the list operation.
type Pagination struct {
// Number of items in the page.
Size uint
// Page number starting from zero.
Page uint
}
func (p Pagination) start() int64 {
return int64(p.Size * p.Page)
}
func (p Pagination) stop() int64 {
return int64(p.Size*p.Page + p.Size - 1)
}
// ListEnqueued returns enqueued tasks that are ready to be processed.
//
// Queue names can be optionally passed to query only the specified queues.
// If none are passed, it will query all queues.
func (r *RDB) ListEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
if len(qnames) == 0 {
return r.listAllEnqueued()
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error) {
qkey := base.QueueKey(qname)
if !r.client.SIsMember(base.AllQueues, qkey).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listEnqueued(qnames...)
}
func (r *RDB) listAllEnqueued() ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
local queues = redis.call("SMEMBERS", KEYS[1])
for _, qkey in ipairs(queues) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
res, err := script.Run(r.client, []string{base.AllQueues}).Result()
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(qkey, start, stop).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func (r *RDB) listEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
for _, qkey in ipairs(KEYS) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
var keys []string
for _, q := range qnames {
keys = append(keys, base.QueueKey(q))
}
res, err := script.Run(r.client, keys).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) {
reverse(data)
var tasks []*EnqueuedTask
for _, s := range data {
var msg base.TaskMessage
@@ -314,11 +293,16 @@ func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) {
}
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
data, err := r.client.LRange(base.InProgressQueue, 0, -1).Result()
func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(base.InProgressQueue, start, stop).Result()
if err != nil {
return nil, err
}
reverse(data)
var tasks []*InProgressTask
for _, s := range data {
var msg base.TaskMessage
@@ -337,8 +321,8 @@ func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
// ListScheduled returns all tasks that are scheduled to be processed
// in the future.
func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Result()
func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
@@ -368,8 +352,8 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry() ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Result()
func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
@@ -401,8 +385,8 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
}
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead() ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Result()
func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}

View File

@@ -5,6 +5,7 @@
package rdb
import (
"fmt"
"sort"
"testing"
"time"
@@ -231,25 +232,24 @@ func TestListEnqueued(t *testing.T) {
t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, Queue: m1.Queue}
t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, Queue: m2.Queue}
t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload, Queue: m3.Queue}
t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload, Queue: m4.Queue}
tests := []struct {
enqueued map[string][]*base.TaskMessage
qnames []string
qname string
want []*EnqueuedTask
}{
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
},
qnames: []string{},
want: []*EnqueuedTask{t1, t2},
qname: base.DefaultQueueName,
want: []*EnqueuedTask{t1, t2},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {},
},
qnames: []string{},
want: []*EnqueuedTask{},
qname: base.DefaultQueueName,
want: []*EnqueuedTask{},
},
{
enqueued: map[string][]*base.TaskMessage{
@@ -257,8 +257,8 @@ func TestListEnqueued(t *testing.T) {
"critical": {m3},
"low": {m4},
},
qnames: []string{},
want: []*EnqueuedTask{t1, t2, t3, t4},
qname: base.DefaultQueueName,
want: []*EnqueuedTask{t1, t2},
},
{
enqueued: map[string][]*base.TaskMessage{
@@ -266,17 +266,8 @@ func TestListEnqueued(t *testing.T) {
"critical": {m3},
"low": {m4},
},
qnames: []string{"critical"},
want: []*EnqueuedTask{t3},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
"critical": {m3},
"low": {m4},
},
qnames: []string{"critical", "low"},
want: []*EnqueuedTask{t3, t4},
qname: "critical",
want: []*EnqueuedTask{t3},
},
}
@@ -286,9 +277,10 @@ func TestListEnqueued(t *testing.T) {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
got, err := r.ListEnqueued(tc.qnames...)
got, err := r.ListEnqueued(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListEnqueued(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*EnqueuedTask) []*EnqueuedTask {
@@ -299,11 +291,76 @@ func TestListEnqueued(t *testing.T) {
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListEnqueuedPagination(t *testing.T) {
r := setup(t)
var msgs []*base.TaskMessage
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
msgs = append(msgs, msg)
}
// create 100 tasks in default queue
h.SeedEnqueuedQueue(t, r.client, msgs)
msgs = []*base.TaskMessage(nil) // empty list
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("custom %d", i), nil)
msgs = append(msgs, msg)
}
// create 100 tasks in custom queue
h.SeedEnqueuedQueue(t, r.client, msgs, "custom")
tests := []struct {
desc string
qname string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", "default", 4, 30, 0, "", ""},
{"second page with custom queue", "custom", 1, 20, 20, "custom 20", "custom 39"},
}
for _, tc := range tests {
got, err := r.ListEnqueued(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListEnqueued(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned a list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListInProgress(t *testing.T) {
r := setup(t)
@@ -330,9 +387,10 @@ func TestListInProgress(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress)
got, err := r.ListInProgress()
got, err := r.ListInProgress(Pagination{Size: 20, Page: 0})
op := "r.ListInProgress(Pagination{Size: 20, Page: 0})"
if err != nil {
t.Errorf("r.ListInProgress() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*InProgressTask) []*InProgressTask {
@@ -343,12 +401,67 @@ func TestListInProgress(t *testing.T) {
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("r.ListInProgress() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListInProgressPagination(t *testing.T) {
r := setup(t)
var msgs []*base.TaskMessage
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
msgs = append(msgs, msg)
}
h.SeedInProgressQueue(t, r.client, msgs)
tests := []struct {
desc string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListInProgress(Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListInProgress(Pagination{Size: %d, Page: %d})", tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListScheduled(t *testing.T) {
r := setup(t)
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
@@ -379,9 +492,10 @@ func TestListScheduled(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedScheduledQueue(t, r.client, tc.scheduled)
got, err := r.ListScheduled()
got, err := r.ListScheduled(Pagination{Size: 20, Page: 0})
op := "r.ListScheduled(Pagination{Size: 20, Page: 0})"
if err != nil {
t.Errorf("r.ListScheduled() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*ScheduledTask) []*ScheduledTask {
@@ -392,12 +506,68 @@ func TestListScheduled(t *testing.T) {
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("r.ListScheduled() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListScheduledPagination(t *testing.T) {
r := setup(t)
// create 100 tasks with an increasing number of wait time.
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
if err := r.Schedule(msg, time.Now().Add(time.Duration(i)*time.Second)); err != nil {
t.Fatal(err)
}
}
tests := []struct {
desc string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListScheduled(Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListScheduled(Pagination{Size: %d, Page: %d})", tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListRetry(t *testing.T) {
r := setup(t)
m1 := &base.TaskMessage{
@@ -464,9 +634,10 @@ func TestListRetry(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedRetryQueue(t, r.client, tc.retry)
got, err := r.ListRetry()
got, err := r.ListRetry(Pagination{Size: 20, Page: 0})
op := "r.ListRetry(Pagination{Size: 20, Page: 0})"
if err != nil {
t.Errorf("r.ListRetry() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*RetryTask) []*RetryTask {
@@ -478,12 +649,68 @@ func TestListRetry(t *testing.T) {
})
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("r.ListRetry() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListRetryPagination(t *testing.T) {
r := setup(t)
// create 100 tasks with an increasing number of wait time.
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
if err := r.Retry(msg, time.Now().Add(time.Duration(i)*time.Second), "error"); err != nil {
t.Fatal(err)
}
}
tests := []struct {
desc string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListRetry(Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListRetry(Pagination{Size: %d, Page: %d})", tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListDead(t *testing.T) {
r := setup(t)
m1 := &base.TaskMessage{
@@ -542,9 +769,10 @@ func TestListDead(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDeadQueue(t, r.client, tc.dead)
got, err := r.ListDead()
got, err := r.ListDead(Pagination{Size: 20, Page: 0})
op := "r.ListDead(Pagination{Size: 20, Page: 0})"
if err != nil {
t.Errorf("r.ListDead() = %v, %v, want %v, nil", got, err, tc.want)
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortMsg", func(in []*DeadTask) []*DeadTask {
@@ -555,12 +783,67 @@ func TestListDead(t *testing.T) {
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("r.ListDead() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListDeadPagination(t *testing.T) {
r := setup(t)
var entries []h.ZSetEntry
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
entries = append(entries, h.ZSetEntry{Msg: msg, Score: float64(i)})
}
h.SeedDeadQueue(t, r.client, entries)
tests := []struct {
desc string
page uint
size uint
wantSize int
wantFirst string
wantLast string
}{
{"first page", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListDead(Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})", tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
var timeCmpOpt = cmpopts.EquateApproxTime(time.Second)
func TestEnqueueDeadTask(t *testing.T) {

View File

@@ -156,7 +156,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
return redis.status_reply("OK")
`)
return script.Run(r.client,
[]string{base.InProgressQueue, base.DefaultQueue},
[]string{base.InProgressQueue, base.QueueKey(msg.Queue)},
string(bytes)).Err()
}
@@ -273,13 +273,16 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
// and reports the number of tasks restored.
func (r *RDB) RestoreUnfinished() (int64, error) {
script := redis.NewScript(`
local len = redis.call("LLEN", KEYS[1])
for i = len, 1, -1 do
redis.call("RPOPLPUSH", KEYS[1], KEYS[2])
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
local qkey = ARGV[1] .. decoded["Queue"]
redis.call("LREM", KEYS[1], 0, msg)
redis.call("RPUSH", qkey, msg)
end
return len
return table.getn(msgs)
`)
res, err := script.Run(r.client, []string{base.InProgressQueue, base.DefaultQueue}).Result()
res, err := script.Run(r.client, []string{base.InProgressQueue}, base.QueuePrefix).Result()
if err != nil {
return 0, err
}

View File

@@ -236,33 +236,57 @@ func TestRequeue(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("export_csv", nil)
t3 := h.NewTaskMessageWithQueue("send_email", nil, "critical")
tests := []struct {
enqueued []*base.TaskMessage // initial state of the default queue
inProgress []*base.TaskMessage // initial state of the in-progress list
target *base.TaskMessage // task to requeue
wantEnqueued []*base.TaskMessage // final state of the default queue
wantInProgress []*base.TaskMessage // final state of the in-progress list
enqueued map[string][]*base.TaskMessage // initial state of queues
inProgress []*base.TaskMessage // initial state of the in-progress list
target *base.TaskMessage // task to requeue
wantEnqueued map[string][]*base.TaskMessage // final state of queues
wantInProgress []*base.TaskMessage // final state of the in-progress list
}{
{
enqueued: []*base.TaskMessage{},
inProgress: []*base.TaskMessage{t1, t2},
target: t1,
wantEnqueued: []*base.TaskMessage{t1},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {},
},
inProgress: []*base.TaskMessage{t1, t2},
target: t1,
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
},
wantInProgress: []*base.TaskMessage{t2},
},
{
enqueued: []*base.TaskMessage{t1},
inProgress: []*base.TaskMessage{t2},
target: t2,
wantEnqueued: []*base.TaskMessage{t1, t2},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
},
inProgress: []*base.TaskMessage{t2},
target: t2,
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2},
},
wantInProgress: []*base.TaskMessage{},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
"critical": {},
},
inProgress: []*base.TaskMessage{t2, t3},
target: t3,
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
"critical": {t3},
},
wantInProgress: []*base.TaskMessage{t2},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
for qname, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
h.SeedInProgressQueue(t, r.client, tc.inProgress)
err := r.Requeue(tc.target)
@@ -271,9 +295,11 @@ func TestRequeue(t *testing.T) {
continue
}
gotEnqueued := h.GetEnqueuedMessages(t, r.client)
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff)
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
gotInProgress := h.GetInProgressMessages(t, r.client)
@@ -518,41 +544,72 @@ func TestRestoreUnfinished(t *testing.T) {
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("export_csv", nil)
t3 := h.NewTaskMessage("sync_stuff", nil)
t4 := h.NewTaskMessageWithQueue("important", nil, "critical")
t5 := h.NewTaskMessageWithQueue("minor", nil, "low")
tests := []struct {
inProgress []*base.TaskMessage
enqueued []*base.TaskMessage
enqueued map[string][]*base.TaskMessage
want int64
wantInProgress []*base.TaskMessage
wantEnqueued []*base.TaskMessage
wantEnqueued map[string][]*base.TaskMessage
}{
{
inProgress: []*base.TaskMessage{t1, t2, t3},
enqueued: []*base.TaskMessage{},
inProgress: []*base.TaskMessage{t1, t2, t3},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {},
},
want: 3,
wantInProgress: []*base.TaskMessage{},
wantEnqueued: []*base.TaskMessage{t1, t2, t3},
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
},
},
{
inProgress: []*base.TaskMessage{},
enqueued: []*base.TaskMessage{t1, t2, t3},
inProgress: []*base.TaskMessage{},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
},
want: 0,
wantInProgress: []*base.TaskMessage{},
wantEnqueued: []*base.TaskMessage{t1, t2, t3},
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
},
},
{
inProgress: []*base.TaskMessage{t2, t3},
enqueued: []*base.TaskMessage{t1},
inProgress: []*base.TaskMessage{t2, t3},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
},
want: 2,
wantInProgress: []*base.TaskMessage{},
wantEnqueued: []*base.TaskMessage{t1, t2, t3},
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
},
},
{
inProgress: []*base.TaskMessage{t2, t3, t4, t5},
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
"critical": {},
"low": {},
},
want: 4,
wantInProgress: []*base.TaskMessage{},
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2, t3},
"critical": {t4},
"low": {t5},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress)
h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
for qname, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
got, err := r.RestoreUnfinished()
if got != tc.want || err != nil {
@@ -565,9 +622,11 @@ func TestRestoreUnfinished(t *testing.T) {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff)
}
gotEnqueued := h.GetEnqueuedMessages(t, r.client)
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff)
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.QueueKey(qname), diff)
}
}
}
}

View File

@@ -1,6 +1,6 @@
# Asynqmon
Asynqmon is a CLI tool to monitor the queues managed by `asynq` package.
Asynqmon is a command line tool to monitor the tasks managed by `asynq` package.
## Table of Contents
@@ -24,7 +24,7 @@ This will create the asynqmon executable under your `$GOPATH/bin` directory.
## Quick Start
Asynqmon tool has a few commands to inspect the state of tasks and queues.
The tool has a few commands to inspect the state of tasks and queues.
Run `asynqmon help` to see all the available commands.
@@ -34,7 +34,7 @@ By default, Asynqmon will try to connect to a redis server running at `localhost
### Stats
Stats command gives the overview of the current state of tasks and queues. Run it in conjunction with `watch` command to repeatedly run `stats`.
Stats command gives the overview of the current state of tasks and queues. You can run it in conjunction with `watch` command to repeatedly run `stats`.
Example:
@@ -46,35 +46,93 @@ This will run `asynqmon stats` command every 3 seconds.
### History
TODO: Add discription
History command shows the number of processed and failed tasks from the last x days.
By default, it shows the stats from the last 10 days. Use `--days` to specify the number of days.
Example:
asynqmon history --days=30
![Gif](/docs/assets/asynqmon_history.gif)
### List
TODO: Add discription
List command shows all tasks in the specified state in a table format
Example:
asynqmon ls retry
asynqmon ls scheduled
asynqmon ls dead
asynqmon ls enqueued
asynqmon ls inprogress
### Enqueue
TODO: Add discription
There are two commands to enqueue tasks.
Command `enq` takes a task ID and moves the task to **Enqueued** state. You can obtain the task ID by running `ls` command.
Example:
asynqmon enq d:1575732274:bnogo8gt6toe23vhef0g
Command `enqall` moves all tasks to **Enqueued** state from the specified state.
Example:
asynqmon enqall retry
Running the above command will move all **Retry** tasks to **Enqueued** state.
### Delete
TODO: Add discription
There are two commands for task deletion.
Command `del` takes a task ID and deletes the task. You can obtain the task ID by running `ls` command.
Example:
asynqmon del r:1575732274:bnogo8gt6toe23vhef0g
Command `delall` deletes all tasks which are in the specified state.
Example:
asynqmon delall retry
Running the above command will delete all **Retry** tasks.
### Kill
TODO: Add discription
There are two commands to kill (i.e. move to dead state) tasks.
Command `kill` takes a task ID and kills the task. You can obtain the task ID by running `ls` command.
Example:
asynqmon kill r:1575732274:bnogo8gt6toe23vhef0g
Command `killall` kills all tasks which are in the specified state.
Example:
asynqmon killall retry
Running the above command will move all **Retry** tasks to **Dead** state.
## Config File
You can use a config file to set default values for flags.
You can use a config file to set default values for the flags.
This is useful, for example when you have to connect to a remote redis server.
By default, `asynqmon` will try to read config file located in
`$HOME/.asynqmon.(yml|json)`. You can specify the file location via `--config` flag.
`$HOME/.asynqmon.(yaml|json)`. You can specify the file location via `--config` flag.
Config file example:
```yml
```yaml
uri: 127.0.0.1:6379
db: 2
password: mypassword

View File

@@ -35,26 +35,23 @@ The argument value should be one of "enqueued", "inprogress", "scheduled",
Example:
asynqmon ls dead -> Lists all tasks in dead state
Enqueued tasks can optionally be filtered by providing queue names after ":"
Enqueued tasks requires a queue name after ":"
Example:
asynqmon ls enqueued:critical -> List tasks from critical queue only
asynqmon ls enqueued:default -> List tasks from default queue
asynqmon ls enqueued:critical -> List tasks from critical queue
`,
Args: cobra.ExactValidArgs(1),
Run: ls,
}
// Flags
var pageSize uint
var pageNum uint
func init() {
rootCmd.AddCommand(lsCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// lsCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// lsCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
lsCmd.Flags().UintVar(&pageSize, "size", 30, "page size")
lsCmd.Flags().UintVar(&pageNum, "page", 0, "page number - zero indexed (default 0)")
}
func ls(cmd *cobra.Command, args []string) {
@@ -67,7 +64,11 @@ func ls(cmd *cobra.Command, args []string) {
parts := strings.Split(args[0], ":")
switch parts[0] {
case "enqueued":
listEnqueued(r, parts[1:]...)
if len(parts) != 2 {
fmt.Printf("error: Missing queue name\n`asynqmon ls enqueued:[queue name]`\n")
os.Exit(1)
}
listEnqueued(r, parts[1])
case "inprogress":
listInProgress(r)
case "scheduled":
@@ -77,7 +78,7 @@ func ls(cmd *cobra.Command, args []string) {
case "dead":
listDead(r)
default:
fmt.Printf("error: `asynqmon ls [state]` only accepts %v as the argument.\n", lsValidArgs)
fmt.Printf("error: `asynqmon ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs)
os.Exit(1)
}
}
@@ -113,24 +114,14 @@ func parseQueryID(queryID string) (id xid.ID, score int64, qtype string, err err
return id, score, qtype, nil
}
func listEnqueued(r *rdb.RDB, qnames ...string) {
tasks, err := r.ListEnqueued(qnames...)
func listEnqueued(r *rdb.RDB, qname string) {
tasks, err := r.ListEnqueued(qname, rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
msg := "No enqueued tasks"
if len(qnames) > 0 {
msg += " in"
for i, q := range qnames {
msg += fmt.Sprintf(" %q queue", q)
if i != len(qnames)-1 {
msg += ","
}
}
}
fmt.Println(msg)
fmt.Printf("No enqueued tasks in %q queue\n", qname)
return
}
cols := []string{"ID", "Type", "Payload", "Queue"}
@@ -140,10 +131,11 @@ func listEnqueued(r *rdb.RDB, qnames ...string) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listInProgress(r *rdb.RDB) {
tasks, err := r.ListInProgress()
tasks, err := r.ListInProgress(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -159,10 +151,11 @@ func listInProgress(r *rdb.RDB) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listScheduled(r *rdb.RDB) {
tasks, err := r.ListScheduled()
tasks, err := r.ListScheduled(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -179,10 +172,11 @@ func listScheduled(r *rdb.RDB) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listRetry(r *rdb.RDB) {
tasks, err := r.ListRetry()
tasks, err := r.ListRetry(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -199,10 +193,11 @@ func listRetry(r *rdb.RDB) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listDead(r *rdb.RDB) {
tasks, err := r.ListDead()
tasks, err := r.ListDead(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -218,6 +213,7 @@ func listDead(r *rdb.RDB) {
}
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {