2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-05-10 17:33:47 +08:00

Compare commits

...

4 Commits

Author SHA1 Message Date
Ken Hibino
e253211a60 v0.2.1 2020-01-22 06:45:16 -08:00
Ken Hibino
44c657bec6 Rate limit error logs 2020-01-22 06:36:18 -08:00
Ken Hibino
db8e9d05c3 Add custom logger 2020-01-22 06:02:53 -08:00
Ken Hibino
b02e4e6b09 [ci skip] Update readme 2020-01-21 17:48:55 -08:00
10 changed files with 330 additions and 47 deletions

View File

@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.2.1] - 2020-01-22
### Fixed
- More structured log messages
- Prevent spamming logs with a bunch of errors when Redis connection is lost
- Fixed and updated README doc
## [0.2.0] - 2020-01-19 ## [0.2.0] - 2020-01-19
### Added ### Added

150
README.md
View File

@@ -1,6 +1,10 @@
# Asynq # Asynq
[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) [![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq) [![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq) [![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community) [![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq)
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)
[![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq)
[![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq)
[![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
Simple and efficent asynchronous task processing library in Go. Simple and efficent asynchronous task processing library in Go.
@@ -29,12 +33,12 @@ Asynq provides:
- Clear separation of task producer and consumer - Clear separation of task producer and consumer
- Ability to schedule task processing in the future - Ability to schedule task processing in the future
- Automatic retry of failed tasks with exponential backoff - Automatic retry of failed tasks with exponential backoff
- Automatic failover using Redis sentinels - [Automatic failover](https://github.com/hibiken/asynq/wiki/Automatic-Failover) using Redis sentinels
- Ability to configure max retry count per task - [Ability to configure](https://github.com/hibiken/asynq/wiki/Task-Retry) max retry count per task
- Ability to configure max number of worker goroutines to process tasks - Ability to configure max number of worker goroutines to process tasks
- Support for priority queues - Support for [priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)
- Unix signal handling to gracefully shutdown background processing - [Unix signal handling](https://github.com/hibiken/asynq/wiki/Signals) to gracefully shutdown background processing
- CLI tool to query and mutate queues state for mointoring and administrative purposes - [CLI tool](/tools/asynqmon/README.md) to query and mutate queues state for mointoring and administrative purposes
## Requirements ## Requirements
@@ -45,22 +49,35 @@ Asynq provides:
## Installation ## Installation
To install both `asynq` library and `asynqmon` CLI tool, run the following command:
``` ```
go get -u github.com/hibiken/asynq go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon
``` ```
## Getting Started ## Getting Started
1. Import `asynq` in your file. In this quick tour of `asynq`, we are going to create two programs.
- `producer.go` will create and schedule tasks to be processed asynchronously by the consumer.
- `consumer.go` will process the tasks created by the producer.
**This guide assumes that you are running a Redis server at `localhost:6379`**.
Before we start, make sure you have Redis installed and running.
1. Import `asynq` in both files.
```go ```go
import "github.com/hibiken/asynq" import "github.com/hibiken/asynq"
``` ```
2. Asynq uses redis as a message broker. 2. Asynq uses Redis as a message broker.
Use one of `RedisConnOpt` types to specify how to connect to Redis. Use one of `RedisConnOpt` types to specify how to connect to Redis.
We are going to use `RedisClientOpt` here.
```go ```go
// both in producer.go and consumer.go
var redis = &asynq.RedisClientOpt{ var redis = &asynq.RedisClientOpt{
Addr: "localhost:6379", Addr: "localhost:6379",
// Omit if no password is required // Omit if no password is required
@@ -71,9 +88,10 @@ var redis = &asynq.RedisClientOpt{
} }
``` ```
3. Create a `Client` instance to create and schedule tasks. 3. In `producer.go`, create a `Client` instance to create and schedule tasks.
```go ```go
// producer.go
func main() { func main() {
client := asynq.NewClient(redis) client := asynq.NewClient(redis)
@@ -88,32 +106,31 @@ func main() {
// Process the task immediately. // Process the task immediately.
err := client.Schedule(t1, time.Now()) err := client.Schedule(t1, time.Now())
if err != nil {
log.Fatal(err)
}
// Process the task 24 hours later. // Process the task 24 hours later.
err = client.Schedule(t2, time.Now().Add(24 * time.Hour)) err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
if err != nil {
// Specify the max number of retry (default: 25) log.Fatal(err)
err = client.Schedule(t1, time.Now(), asynq.MaxRetry(1)) }
} }
``` ```
4. Create a `Background` instance to process tasks. 4. In `consumer.go`, create a `Background` instance to process tasks.
```go ```go
// consumer.go
func main() { func main() {
bg := asynq.NewBackground(redis, &asynq.Config{ bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10, Concurrency: 10,
}) })
// Blocks until signal TERM or INT is received.
// For graceful shutdown, send signal TSTP to stop processing more tasks
// before sending TERM or INT signal to terminate the process.
bg.Run(handler) bg.Run(handler)
} }
``` ```
Note that `Client` and `Background` are intended to be used in separate executable binaries.
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`. The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
```go ```go
@@ -137,9 +154,14 @@ func handler(t *asynq.Task) error {
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("Send Welcome Email to %d\n", id) fmt.Printf("Send Welcome Email to User %d\n", id)
// ... handle other types ... case "send_reminder_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
default: default:
return fmt.Errorf("unexpected task type: %s", t.Type) return fmt.Errorf("unexpected task type: %s", t.Type)
@@ -157,6 +179,94 @@ func main() {
} }
``` ```
We could kep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
To refactor our code, let's create a simple dispatcher which maps task type to its handler.
```go
// consumer.go
// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
mapping map[string]asynq.HandlerFunc
}
// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
d.mapping[taskType] = fn
}
// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
fn, ok := d.mapping[task.Type]
if !ok {
return fmt.Errorf("no handler registered for %q", task.Type)
}
return fn(task)
}
func main() {
d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
d.HandleFunc("send_welcome_email", sendWelcomeEmail)
d.HandleFunc("send_reminder_email", sendReminderEmail)
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(d)
}
func sendWelcomeEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func sendReminderEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
```
Now that we have both task producer and consumer, we can run both programs.
```sh
go run consumer.go
```
**Note**: This will not exit until you send a signal to terminate the program. See [Signal Wiki page](https://github.com/hibiken/asynq/wiki/Signals) for best practice on how to safely terminate background processing.
With our consumer running, also run
```sh
go run producer.go
```
This will create a task and the first task will get processed immediately by the consumer. The second task will be processed 24 hours later.
Let's use `asynqmon` tool to inspect the tasks.
```sh
asynqmon stats
```
This command will show the number of tasks in each state and stats for the current date as well as redis information.
To understand the meaning of each state, see [Life of a Task Wiki page](https://github.com/hibiken/asynq/wiki/Life-of-a-Task).
For in-depth guide on `asynqmon` tool, see the [README](/tools/asynqmon/README.md) for the CLI.
This was a quick tour of `asynq` basics. To see all of its features such as **[priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)** and **[custom retry](https://github.com/hibiken/asynq/wiki/Task-Retry)**, see [the Wiki page](https://github.com/hibiken/asynq/wiki).
## Monitoring CLI ## Monitoring CLI
Asynq ships with a CLI tool to inspect the state of queues and tasks. Asynq ships with a CLI tool to inspect the state of queues and tasks.

View File

@@ -6,7 +6,6 @@ package asynq
import ( import (
"fmt" "fmt"
"log"
"math" "math"
"math/rand" "math/rand"
"os" "os"
@@ -150,9 +149,15 @@ func (fn HandlerFunc) ProcessTask(task *Task) error {
// a signal, it gracefully shuts down all pending workers and other // a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks. // goroutines to process the tasks.
func (bg *Background) Run(handler Handler) { func (bg *Background) Run(handler Handler) {
logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
logger.info("Starting processing")
bg.start(handler) bg.start(handler)
defer bg.stop() defer bg.stop()
logger.info("Send signal TSTP to stop processing new tasks")
logger.info("Send signal TERM or INT to terminate the process")
// Wait for a signal to terminate. // Wait for a signal to terminate.
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP) signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP)
@@ -165,7 +170,7 @@ func (bg *Background) Run(handler Handler) {
break break
} }
fmt.Println() fmt.Println()
log.Println("[INFO] Starting graceful shutdown...") logger.info("Starting graceful shutdown")
} }
// starts the background-task processing. // starts the background-task processing.
@@ -201,6 +206,8 @@ func (bg *Background) stop() {
bg.rdb.Close() bg.rdb.Close()
bg.processor.handler = nil bg.processor.handler = nil
bg.running = false bg.running = false
logger.info("Bye!")
} }
// normalizeQueueCfg divides priority numbers by their // normalizeQueueCfg divides priority numbers by their

1
go.mod
View File

@@ -17,5 +17,6 @@ require (
go.uber.org/goleak v0.10.0 go.uber.org/goleak v0.10.0
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect
golang.org/x/text v0.3.2 // indirect golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
gopkg.in/yaml.v2 v2.2.7 // indirect gopkg.in/yaml.v2 v2.2.7 // indirect
) )

1
go.sum
View File

@@ -179,6 +179,7 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

35
logger.go Normal file
View File

@@ -0,0 +1,35 @@
package asynq
import (
"io"
"log"
"os"
)
// global logger used in asynq package.
var logger = newLogger(os.Stderr)
func newLogger(out io.Writer) *asynqLogger {
return &asynqLogger{
log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.LUTC),
}
}
type asynqLogger struct {
*log.Logger
}
func (l *asynqLogger) info(format string, args ...interface{}) {
format = "INFO: " + format
l.Printf(format, args...)
}
func (l *asynqLogger) warn(format string, args ...interface{}) {
format = "WARN: " + format
l.Printf(format, args...)
}
func (l *asynqLogger) error(format string, args ...interface{}) {
format = "ERROR: " + format
l.Printf(format, args...)
}

117
logger_test.go Normal file
View File

@@ -0,0 +1,117 @@
package asynq
import (
"bytes"
"fmt"
"regexp"
"testing"
)
// regexp for timestamps
const (
rgxdate = `[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]`
rgxtime = `[0-9][0-9]:[0-9][0-9]:[0-9][0-9]`
rgxmicroseconds = `\.[0-9][0-9][0-9][0-9][0-9][0-9]`
)
type tester struct {
desc string
message string
wantPattern string // regexp that log output must match
}
func TestLoggerInfo(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.info(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
func TestLoggerWarn(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.warn(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
func TestLoggerError(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.error(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}

View File

@@ -6,7 +6,6 @@ package asynq
import ( import (
"fmt" "fmt"
"log"
"math/rand" "math/rand"
"sort" "sort"
"sync" "sync"
@@ -14,6 +13,7 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"golang.org/x/time/rate"
) )
type processor struct { type processor struct {
@@ -31,6 +31,9 @@ type processor struct {
// channel via which to send sync requests to syncer. // channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest syncRequestCh chan<- *syncRequest
// rate limiter to prevent spamming logs with a bunch of errors.
errLogLimiter *rate.Limiter
// sema is a counting semaphore to ensure the number of active workers // sema is a counting semaphore to ensure the number of active workers
// does not exceed the limit. // does not exceed the limit.
sema chan struct{} sema chan struct{}
@@ -67,6 +70,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
retryDelayFunc: fn, retryDelayFunc: fn,
syncRequestCh: syncRequestCh, syncRequestCh: syncRequestCh,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, n), sema: make(chan struct{}, n),
done: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
@@ -79,7 +83,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
// It's safe to call this method multiple times. // It's safe to call this method multiple times.
func (p *processor) stop() { func (p *processor) stop() {
p.once.Do(func() { p.once.Do(func() {
log.Println("[INFO] Processor shutting down...") logger.info("Processor shutting down...")
// Unblock if processor is waiting for sema token. // Unblock if processor is waiting for sema token.
close(p.abort) close(p.abort)
// Signal the processor goroutine to stop processing tasks // Signal the processor goroutine to stop processing tasks
@@ -95,12 +99,12 @@ func (p *processor) terminate() {
// IDEA: Allow user to customize this timeout value. // IDEA: Allow user to customize this timeout value.
const timeout = 8 * time.Second const timeout = 8 * time.Second
time.AfterFunc(timeout, func() { close(p.quit) }) time.AfterFunc(timeout, func() { close(p.quit) })
log.Println("[INFO] Waiting for all workers to finish...") logger.info("Waiting for all workers to finish...")
// block until all workers have released the token // block until all workers have released the token
for i := 0; i < cap(p.sema); i++ { for i := 0; i < cap(p.sema); i++ {
p.sema <- struct{}{} p.sema <- struct{}{}
} }
log.Println("[INFO] All workers have finished.") logger.info("All workers have finished")
p.restore() // move any unfinished tasks back to the queue. p.restore() // move any unfinished tasks back to the queue.
} }
@@ -112,7 +116,7 @@ func (p *processor) start() {
for { for {
select { select {
case <-p.done: case <-p.done:
log.Println("[INFO] Processor done.") logger.info("Processor done")
return return
default: default:
p.exec() p.exec()
@@ -137,7 +141,9 @@ func (p *processor) exec() {
return return
} }
if err != nil { if err != nil {
log.Printf("[ERROR] unexpected error while pulling a task out of queue: %v\n", err) if p.errLogLimiter.Allow() {
logger.error("Dequeue error: %v", err)
}
return return
} }
@@ -159,7 +165,7 @@ func (p *processor) exec() {
select { select {
case <-p.quit: case <-p.quit:
// time is up, quit this worker goroutine. // time is up, quit this worker goroutine.
log.Printf("[WARN] Terminating in-progress task %+v\n", msg) logger.warn("Quitting worker to process task id=%s", msg.ID)
return return
case resErr := <-resCh: case resErr := <-resCh:
// Note: One of three things should happen. // Note: One of three things should happen.
@@ -185,25 +191,25 @@ func (p *processor) exec() {
func (p *processor) restore() { func (p *processor) restore() {
n, err := p.rdb.RestoreUnfinished() n, err := p.rdb.RestoreUnfinished()
if err != nil { if err != nil {
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) logger.error("Could not restore unfinished tasks: %v", err)
} }
if n > 0 { if n > 0 {
log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n) logger.info("Restored %d unfinished tasks back to queue", n)
} }
} }
func (p *processor) requeue(msg *base.TaskMessage) { func (p *processor) requeue(msg *base.TaskMessage) {
err := p.rdb.Requeue(msg) err := p.rdb.Requeue(msg)
if err != nil { if err != nil {
log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err) logger.error("Could not push task id=%s back to queue: %v", msg.ID, err)
} }
} }
func (p *processor) markAsDone(msg *base.TaskMessage) { func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.rdb.Done(msg) err := p.rdb.Done(msg)
if err != nil { if err != nil {
errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue) errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
log.Printf("[WARN] %s; will retry\n", errMsg) logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Done(msg) return p.rdb.Done(msg)
@@ -218,8 +224,8 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.rdb.Retry(msg, retryAt, e.Error()) err := p.rdb.Retry(msg, retryAt, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
log.Printf("[WARN] %s; will retry\n", errMsg) logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Retry(msg, retryAt, e.Error()) return p.rdb.Retry(msg, retryAt, e.Error())
@@ -230,11 +236,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
} }
func (p *processor) kill(msg *base.TaskMessage, e error) { func (p *processor) kill(msg *base.TaskMessage, e error) {
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) logger.warn("Retry exhausted for task id=%s", msg.ID)
err := p.rdb.Kill(msg, e.Error()) err := p.rdb.Kill(msg, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
log.Printf("[WARN] %s; will retry\n", errMsg) logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Kill(msg, e.Error()) return p.rdb.Kill(msg, e.Error())

View File

@@ -5,7 +5,6 @@
package asynq package asynq
import ( import (
"log"
"time" "time"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
@@ -38,7 +37,7 @@ func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *
} }
func (s *scheduler) terminate() { func (s *scheduler) terminate() {
log.Println("[INFO] Scheduler shutting down...") logger.info("Scheduler shutting down...")
// Signal the scheduler goroutine to stop polling. // Signal the scheduler goroutine to stop polling.
s.done <- struct{}{} s.done <- struct{}{}
} }
@@ -49,7 +48,7 @@ func (s *scheduler) start() {
for { for {
select { select {
case <-s.done: case <-s.done:
log.Println("[INFO] Scheduler done.") logger.info("Scheduler done")
return return
case <-time.After(s.avgInterval): case <-time.After(s.avgInterval):
s.exec() s.exec()
@@ -60,6 +59,6 @@ func (s *scheduler) start() {
func (s *scheduler) exec() { func (s *scheduler) exec() {
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil { if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err) logger.error("Could not enqueue scheduled tasks: %v", err)
} }
} }

View File

@@ -5,7 +5,6 @@
package asynq package asynq
import ( import (
"log"
"time" "time"
) )
@@ -35,7 +34,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
} }
func (s *syncer) terminate() { func (s *syncer) terminate() {
log.Println("[INFO] Syncer shutting down...") logger.info("Syncer shutting down...")
// Signal the syncer goroutine to stop. // Signal the syncer goroutine to stop.
s.done <- struct{}{} s.done <- struct{}{}
} }
@@ -49,10 +48,10 @@ func (s *syncer) start() {
// Try sync one last time before shutting down. // Try sync one last time before shutting down.
for _, req := range requests { for _, req := range requests {
if err := req.fn(); err != nil { if err := req.fn(); err != nil {
log.Printf("[ERROR] %s\n", req.errMsg) logger.error(req.errMsg)
} }
} }
log.Println("[INFO] Syncer done.") logger.info("Syncer done")
return return
case req := <-s.requestsCh: case req := <-s.requestsCh:
requests = append(requests, req) requests = append(requests, req)