2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-09 08:15:53 +08:00

Compare commits

...

4 Commits

Author SHA1 Message Date
Ken Hibino
e253211a60 v0.2.1 2020-01-22 06:45:16 -08:00
Ken Hibino
44c657bec6 Rate limit error logs 2020-01-22 06:36:18 -08:00
Ken Hibino
db8e9d05c3 Add custom logger 2020-01-22 06:02:53 -08:00
Ken Hibino
b02e4e6b09 [ci skip] Update readme 2020-01-21 17:48:55 -08:00
10 changed files with 330 additions and 47 deletions

View File

@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.2.1] - 2020-01-22
### Fixed
- More structured log messages
- Prevent spamming logs with a bunch of errors when Redis connection is lost
- Fixed and updated README doc
## [0.2.0] - 2020-01-19
### Added

150
README.md
View File

@@ -1,6 +1,10 @@
# Asynq
[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) [![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq) [![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq) [![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq)
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)
[![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq)
[![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq)
[![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
Simple and efficent asynchronous task processing library in Go.
@@ -29,12 +33,12 @@ Asynq provides:
- Clear separation of task producer and consumer
- Ability to schedule task processing in the future
- Automatic retry of failed tasks with exponential backoff
- Automatic failover using Redis sentinels
- Ability to configure max retry count per task
- [Automatic failover](https://github.com/hibiken/asynq/wiki/Automatic-Failover) using Redis sentinels
- [Ability to configure](https://github.com/hibiken/asynq/wiki/Task-Retry) max retry count per task
- Ability to configure max number of worker goroutines to process tasks
- Support for priority queues
- Unix signal handling to gracefully shutdown background processing
- CLI tool to query and mutate queues state for mointoring and administrative purposes
- Support for [priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)
- [Unix signal handling](https://github.com/hibiken/asynq/wiki/Signals) to gracefully shutdown background processing
- [CLI tool](/tools/asynqmon/README.md) to query and mutate queues state for mointoring and administrative purposes
## Requirements
@@ -45,22 +49,35 @@ Asynq provides:
## Installation
To install both `asynq` library and `asynqmon` CLI tool, run the following command:
```
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon
```
## Getting Started
1. Import `asynq` in your file.
In this quick tour of `asynq`, we are going to create two programs.
- `producer.go` will create and schedule tasks to be processed asynchronously by the consumer.
- `consumer.go` will process the tasks created by the producer.
**This guide assumes that you are running a Redis server at `localhost:6379`**.
Before we start, make sure you have Redis installed and running.
1. Import `asynq` in both files.
```go
import "github.com/hibiken/asynq"
```
2. Asynq uses redis as a message broker.
2. Asynq uses Redis as a message broker.
Use one of `RedisConnOpt` types to specify how to connect to Redis.
We are going to use `RedisClientOpt` here.
```go
// both in producer.go and consumer.go
var redis = &asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
@@ -71,9 +88,10 @@ var redis = &asynq.RedisClientOpt{
}
```
3. Create a `Client` instance to create and schedule tasks.
3. In `producer.go`, create a `Client` instance to create and schedule tasks.
```go
// producer.go
func main() {
client := asynq.NewClient(redis)
@@ -88,32 +106,31 @@ func main() {
// Process the task immediately.
err := client.Schedule(t1, time.Now())
if err != nil {
log.Fatal(err)
}
// Process the task 24 hours later.
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
// Specify the max number of retry (default: 25)
err = client.Schedule(t1, time.Now(), asynq.MaxRetry(1))
if err != nil {
log.Fatal(err)
}
}
```
4. Create a `Background` instance to process tasks.
4. In `consumer.go`, create a `Background` instance to process tasks.
```go
// consumer.go
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
// Blocks until signal TERM or INT is received.
// For graceful shutdown, send signal TSTP to stop processing more tasks
// before sending TERM or INT signal to terminate the process.
bg.Run(handler)
}
```
Note that `Client` and `Background` are intended to be used in separate executable binaries.
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
```go
@@ -137,9 +154,14 @@ func handler(t *asynq.Task) error {
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to %d\n", id)
fmt.Printf("Send Welcome Email to User %d\n", id)
// ... handle other types ...
case "send_reminder_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
default:
return fmt.Errorf("unexpected task type: %s", t.Type)
@@ -157,6 +179,94 @@ func main() {
}
```
We could kep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
To refactor our code, let's create a simple dispatcher which maps task type to its handler.
```go
// consumer.go
// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
mapping map[string]asynq.HandlerFunc
}
// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
d.mapping[taskType] = fn
}
// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
fn, ok := d.mapping[task.Type]
if !ok {
return fmt.Errorf("no handler registered for %q", task.Type)
}
return fn(task)
}
func main() {
d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
d.HandleFunc("send_welcome_email", sendWelcomeEmail)
d.HandleFunc("send_reminder_email", sendReminderEmail)
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(d)
}
func sendWelcomeEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func sendReminderEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
```
Now that we have both task producer and consumer, we can run both programs.
```sh
go run consumer.go
```
**Note**: This will not exit until you send a signal to terminate the program. See [Signal Wiki page](https://github.com/hibiken/asynq/wiki/Signals) for best practice on how to safely terminate background processing.
With our consumer running, also run
```sh
go run producer.go
```
This will create a task and the first task will get processed immediately by the consumer. The second task will be processed 24 hours later.
Let's use `asynqmon` tool to inspect the tasks.
```sh
asynqmon stats
```
This command will show the number of tasks in each state and stats for the current date as well as redis information.
To understand the meaning of each state, see [Life of a Task Wiki page](https://github.com/hibiken/asynq/wiki/Life-of-a-Task).
For in-depth guide on `asynqmon` tool, see the [README](/tools/asynqmon/README.md) for the CLI.
This was a quick tour of `asynq` basics. To see all of its features such as **[priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)** and **[custom retry](https://github.com/hibiken/asynq/wiki/Task-Retry)**, see [the Wiki page](https://github.com/hibiken/asynq/wiki).
## Monitoring CLI
Asynq ships with a CLI tool to inspect the state of queues and tasks.

View File

@@ -6,7 +6,6 @@ package asynq
import (
"fmt"
"log"
"math"
"math/rand"
"os"
@@ -150,9 +149,15 @@ func (fn HandlerFunc) ProcessTask(task *Task) error {
// a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks.
func (bg *Background) Run(handler Handler) {
logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
logger.info("Starting processing")
bg.start(handler)
defer bg.stop()
logger.info("Send signal TSTP to stop processing new tasks")
logger.info("Send signal TERM or INT to terminate the process")
// Wait for a signal to terminate.
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP)
@@ -165,7 +170,7 @@ func (bg *Background) Run(handler Handler) {
break
}
fmt.Println()
log.Println("[INFO] Starting graceful shutdown...")
logger.info("Starting graceful shutdown")
}
// starts the background-task processing.
@@ -201,6 +206,8 @@ func (bg *Background) stop() {
bg.rdb.Close()
bg.processor.handler = nil
bg.running = false
logger.info("Bye!")
}
// normalizeQueueCfg divides priority numbers by their

1
go.mod
View File

@@ -17,5 +17,6 @@ require (
go.uber.org/goleak v0.10.0
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/yaml.v2 v2.2.7 // indirect
)

1
go.sum
View File

@@ -179,6 +179,7 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

35
logger.go Normal file
View File

@@ -0,0 +1,35 @@
package asynq
import (
"io"
"log"
"os"
)
// global logger used in asynq package.
var logger = newLogger(os.Stderr)
func newLogger(out io.Writer) *asynqLogger {
return &asynqLogger{
log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.LUTC),
}
}
type asynqLogger struct {
*log.Logger
}
func (l *asynqLogger) info(format string, args ...interface{}) {
format = "INFO: " + format
l.Printf(format, args...)
}
func (l *asynqLogger) warn(format string, args ...interface{}) {
format = "WARN: " + format
l.Printf(format, args...)
}
func (l *asynqLogger) error(format string, args ...interface{}) {
format = "ERROR: " + format
l.Printf(format, args...)
}

117
logger_test.go Normal file
View File

@@ -0,0 +1,117 @@
package asynq
import (
"bytes"
"fmt"
"regexp"
"testing"
)
// regexp for timestamps
const (
rgxdate = `[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]`
rgxtime = `[0-9][0-9]:[0-9][0-9]:[0-9][0-9]`
rgxmicroseconds = `\.[0-9][0-9][0-9][0-9][0-9][0-9]`
)
type tester struct {
desc string
message string
wantPattern string // regexp that log output must match
}
func TestLoggerInfo(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.info(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
func TestLoggerWarn(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.warn(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
func TestLoggerError(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.error(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}

View File

@@ -6,7 +6,6 @@ package asynq
import (
"fmt"
"log"
"math/rand"
"sort"
"sync"
@@ -14,6 +13,7 @@ import (
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"golang.org/x/time/rate"
)
type processor struct {
@@ -31,6 +31,9 @@ type processor struct {
// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest
// rate limiter to prevent spamming logs with a bunch of errors.
errLogLimiter *rate.Limiter
// sema is a counting semaphore to ensure the number of active workers
// does not exceed the limit.
sema chan struct{}
@@ -67,6 +70,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
orderedQueues: orderedQueues,
retryDelayFunc: fn,
syncRequestCh: syncRequestCh,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, n),
done: make(chan struct{}),
abort: make(chan struct{}),
@@ -79,7 +83,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
// It's safe to call this method multiple times.
func (p *processor) stop() {
p.once.Do(func() {
log.Println("[INFO] Processor shutting down...")
logger.info("Processor shutting down...")
// Unblock if processor is waiting for sema token.
close(p.abort)
// Signal the processor goroutine to stop processing tasks
@@ -95,12 +99,12 @@ func (p *processor) terminate() {
// IDEA: Allow user to customize this timeout value.
const timeout = 8 * time.Second
time.AfterFunc(timeout, func() { close(p.quit) })
log.Println("[INFO] Waiting for all workers to finish...")
logger.info("Waiting for all workers to finish...")
// block until all workers have released the token
for i := 0; i < cap(p.sema); i++ {
p.sema <- struct{}{}
}
log.Println("[INFO] All workers have finished.")
logger.info("All workers have finished")
p.restore() // move any unfinished tasks back to the queue.
}
@@ -112,7 +116,7 @@ func (p *processor) start() {
for {
select {
case <-p.done:
log.Println("[INFO] Processor done.")
logger.info("Processor done")
return
default:
p.exec()
@@ -137,7 +141,9 @@ func (p *processor) exec() {
return
}
if err != nil {
log.Printf("[ERROR] unexpected error while pulling a task out of queue: %v\n", err)
if p.errLogLimiter.Allow() {
logger.error("Dequeue error: %v", err)
}
return
}
@@ -159,7 +165,7 @@ func (p *processor) exec() {
select {
case <-p.quit:
// time is up, quit this worker goroutine.
log.Printf("[WARN] Terminating in-progress task %+v\n", msg)
logger.warn("Quitting worker to process task id=%s", msg.ID)
return
case resErr := <-resCh:
// Note: One of three things should happen.
@@ -185,25 +191,25 @@ func (p *processor) exec() {
func (p *processor) restore() {
n, err := p.rdb.RestoreUnfinished()
if err != nil {
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err)
logger.error("Could not restore unfinished tasks: %v", err)
}
if n > 0 {
log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n)
logger.info("Restored %d unfinished tasks back to queue", n)
}
}
func (p *processor) requeue(msg *base.TaskMessage) {
err := p.rdb.Requeue(msg)
if err != nil {
log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err)
logger.error("Could not push task id=%s back to queue: %v", msg.ID, err)
}
}
func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.rdb.Done(msg)
if err != nil {
errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Done(msg)
@@ -218,8 +224,8 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
retryAt := time.Now().Add(d)
err := p.rdb.Retry(msg, retryAt, e.Error())
if err != nil {
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Retry(msg, retryAt, e.Error())
@@ -230,11 +236,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
}
func (p *processor) kill(msg *base.TaskMessage, e error) {
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
logger.warn("Retry exhausted for task id=%s", msg.ID)
err := p.rdb.Kill(msg, e.Error())
if err != nil {
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Kill(msg, e.Error())

View File

@@ -5,7 +5,6 @@
package asynq
import (
"log"
"time"
"github.com/hibiken/asynq/internal/rdb"
@@ -38,7 +37,7 @@ func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *
}
func (s *scheduler) terminate() {
log.Println("[INFO] Scheduler shutting down...")
logger.info("Scheduler shutting down...")
// Signal the scheduler goroutine to stop polling.
s.done <- struct{}{}
}
@@ -49,7 +48,7 @@ func (s *scheduler) start() {
for {
select {
case <-s.done:
log.Println("[INFO] Scheduler done.")
logger.info("Scheduler done")
return
case <-time.After(s.avgInterval):
s.exec()
@@ -60,6 +59,6 @@ func (s *scheduler) start() {
func (s *scheduler) exec() {
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err)
logger.error("Could not enqueue scheduled tasks: %v", err)
}
}

View File

@@ -5,7 +5,6 @@
package asynq
import (
"log"
"time"
)
@@ -35,7 +34,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
}
func (s *syncer) terminate() {
log.Println("[INFO] Syncer shutting down...")
logger.info("Syncer shutting down...")
// Signal the syncer goroutine to stop.
s.done <- struct{}{}
}
@@ -49,10 +48,10 @@ func (s *syncer) start() {
// Try sync one last time before shutting down.
for _, req := range requests {
if err := req.fn(); err != nil {
log.Printf("[ERROR] %s\n", req.errMsg)
logger.error(req.errMsg)
}
}
log.Println("[INFO] Syncer done.")
logger.info("Syncer done")
return
case req := <-s.requestsCh:
requests = append(requests, req)