mirror of
https://github.com/hibiken/asynq.git
synced 2026-05-10 17:33:47 +08:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e253211a60 | ||
|
|
44c657bec6 | ||
|
|
db8e9d05c3 | ||
|
|
b02e4e6b09 |
@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.2.1] - 2020-01-22
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- More structured log messages
|
||||||
|
- Prevent spamming logs with a bunch of errors when Redis connection is lost
|
||||||
|
- Fixed and updated README doc
|
||||||
|
|
||||||
## [0.2.0] - 2020-01-19
|
## [0.2.0] - 2020-01-19
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|||||||
150
README.md
150
README.md
@@ -1,6 +1,10 @@
|
|||||||
# Asynq
|
# Asynq
|
||||||
|
|
||||||
[](https://travis-ci.com/hibiken/asynq) [](https://opensource.org/licenses/MIT) [](https://goreportcard.com/report/github.com/hibiken/asynq) [](https://godoc.org/github.com/hibiken/asynq) [](https://gitter.im/go-asynq/community)
|
[](https://travis-ci.com/hibiken/asynq)
|
||||||
|
[](https://opensource.org/licenses/MIT)
|
||||||
|
[](https://goreportcard.com/report/github.com/hibiken/asynq)
|
||||||
|
[](https://godoc.org/github.com/hibiken/asynq)
|
||||||
|
[](https://gitter.im/go-asynq/community)
|
||||||
|
|
||||||
Simple and efficent asynchronous task processing library in Go.
|
Simple and efficent asynchronous task processing library in Go.
|
||||||
|
|
||||||
@@ -29,12 +33,12 @@ Asynq provides:
|
|||||||
- Clear separation of task producer and consumer
|
- Clear separation of task producer and consumer
|
||||||
- Ability to schedule task processing in the future
|
- Ability to schedule task processing in the future
|
||||||
- Automatic retry of failed tasks with exponential backoff
|
- Automatic retry of failed tasks with exponential backoff
|
||||||
- Automatic failover using Redis sentinels
|
- [Automatic failover](https://github.com/hibiken/asynq/wiki/Automatic-Failover) using Redis sentinels
|
||||||
- Ability to configure max retry count per task
|
- [Ability to configure](https://github.com/hibiken/asynq/wiki/Task-Retry) max retry count per task
|
||||||
- Ability to configure max number of worker goroutines to process tasks
|
- Ability to configure max number of worker goroutines to process tasks
|
||||||
- Support for priority queues
|
- Support for [priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)
|
||||||
- Unix signal handling to gracefully shutdown background processing
|
- [Unix signal handling](https://github.com/hibiken/asynq/wiki/Signals) to gracefully shutdown background processing
|
||||||
- CLI tool to query and mutate queues state for mointoring and administrative purposes
|
- [CLI tool](/tools/asynqmon/README.md) to query and mutate queues state for mointoring and administrative purposes
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
@@ -45,22 +49,35 @@ Asynq provides:
|
|||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
|
To install both `asynq` library and `asynqmon` CLI tool, run the following command:
|
||||||
|
|
||||||
```
|
```
|
||||||
go get -u github.com/hibiken/asynq
|
go get -u github.com/hibiken/asynq
|
||||||
|
go get -u github.com/hibiken/asynq/tools/asynqmon
|
||||||
```
|
```
|
||||||
|
|
||||||
## Getting Started
|
## Getting Started
|
||||||
|
|
||||||
1. Import `asynq` in your file.
|
In this quick tour of `asynq`, we are going to create two programs.
|
||||||
|
|
||||||
|
- `producer.go` will create and schedule tasks to be processed asynchronously by the consumer.
|
||||||
|
- `consumer.go` will process the tasks created by the producer.
|
||||||
|
|
||||||
|
**This guide assumes that you are running a Redis server at `localhost:6379`**.
|
||||||
|
Before we start, make sure you have Redis installed and running.
|
||||||
|
|
||||||
|
1. Import `asynq` in both files.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
import "github.com/hibiken/asynq"
|
import "github.com/hibiken/asynq"
|
||||||
```
|
```
|
||||||
|
|
||||||
2. Asynq uses redis as a message broker.
|
2. Asynq uses Redis as a message broker.
|
||||||
Use one of `RedisConnOpt` types to specify how to connect to Redis.
|
Use one of `RedisConnOpt` types to specify how to connect to Redis.
|
||||||
|
We are going to use `RedisClientOpt` here.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
// both in producer.go and consumer.go
|
||||||
var redis = &asynq.RedisClientOpt{
|
var redis = &asynq.RedisClientOpt{
|
||||||
Addr: "localhost:6379",
|
Addr: "localhost:6379",
|
||||||
// Omit if no password is required
|
// Omit if no password is required
|
||||||
@@ -71,9 +88,10 @@ var redis = &asynq.RedisClientOpt{
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
3. Create a `Client` instance to create and schedule tasks.
|
3. In `producer.go`, create a `Client` instance to create and schedule tasks.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
// producer.go
|
||||||
func main() {
|
func main() {
|
||||||
client := asynq.NewClient(redis)
|
client := asynq.NewClient(redis)
|
||||||
|
|
||||||
@@ -88,32 +106,31 @@ func main() {
|
|||||||
|
|
||||||
// Process the task immediately.
|
// Process the task immediately.
|
||||||
err := client.Schedule(t1, time.Now())
|
err := client.Schedule(t1, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Process the task 24 hours later.
|
// Process the task 24 hours later.
|
||||||
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
|
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
|
||||||
|
if err != nil {
|
||||||
// Specify the max number of retry (default: 25)
|
log.Fatal(err)
|
||||||
err = client.Schedule(t1, time.Now(), asynq.MaxRetry(1))
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
4. Create a `Background` instance to process tasks.
|
4. In `consumer.go`, create a `Background` instance to process tasks.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
// consumer.go
|
||||||
func main() {
|
func main() {
|
||||||
bg := asynq.NewBackground(redis, &asynq.Config{
|
bg := asynq.NewBackground(redis, &asynq.Config{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Blocks until signal TERM or INT is received.
|
|
||||||
// For graceful shutdown, send signal TSTP to stop processing more tasks
|
|
||||||
// before sending TERM or INT signal to terminate the process.
|
|
||||||
bg.Run(handler)
|
bg.Run(handler)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Note that `Client` and `Background` are intended to be used in separate executable binaries.
|
|
||||||
|
|
||||||
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
|
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
@@ -137,9 +154,14 @@ func handler(t *asynq.Task) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fmt.Printf("Send Welcome Email to %d\n", id)
|
fmt.Printf("Send Welcome Email to User %d\n", id)
|
||||||
|
|
||||||
// ... handle other types ...
|
case "send_reminder_email":
|
||||||
|
id, err := t.Payload.GetInt("user_id")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("Send Reminder Email to User %d\n", id)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unexpected task type: %s", t.Type)
|
return fmt.Errorf("unexpected task type: %s", t.Type)
|
||||||
@@ -157,6 +179,94 @@ func main() {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
We could kep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
|
||||||
|
|
||||||
|
To refactor our code, let's create a simple dispatcher which maps task type to its handler.
|
||||||
|
|
||||||
|
```go
|
||||||
|
// consumer.go
|
||||||
|
|
||||||
|
// Dispatcher is used to dispatch tasks to registered handlers.
|
||||||
|
type Dispatcher struct {
|
||||||
|
mapping map[string]asynq.HandlerFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleFunc registers a task handler
|
||||||
|
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
|
||||||
|
d.mapping[taskType] = fn
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessTask processes a task.
|
||||||
|
//
|
||||||
|
// NOTE: Dispatcher satisfies asynq.Handler interface.
|
||||||
|
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
|
||||||
|
fn, ok := d.mapping[task.Type]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("no handler registered for %q", task.Type)
|
||||||
|
}
|
||||||
|
return fn(task)
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
|
||||||
|
d.HandleFunc("send_welcome_email", sendWelcomeEmail)
|
||||||
|
d.HandleFunc("send_reminder_email", sendReminderEmail)
|
||||||
|
|
||||||
|
bg := asynq.NewBackground(redis, &asynq.Config{
|
||||||
|
Concurrency: 10,
|
||||||
|
})
|
||||||
|
bg.Run(d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendWelcomeEmail(t *asynq.Task) error {
|
||||||
|
id, err := t.Payload.GetInt("user_id")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("Send Welcome Email to User %d\n", id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendReminderEmail(t *asynq.Task) error {
|
||||||
|
id, err := t.Payload.GetInt("user_id")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("Send Welcome Email to User %d\n", id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Now that we have both task producer and consumer, we can run both programs.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
go run consumer.go
|
||||||
|
```
|
||||||
|
|
||||||
|
**Note**: This will not exit until you send a signal to terminate the program. See [Signal Wiki page](https://github.com/hibiken/asynq/wiki/Signals) for best practice on how to safely terminate background processing.
|
||||||
|
|
||||||
|
With our consumer running, also run
|
||||||
|
|
||||||
|
```sh
|
||||||
|
go run producer.go
|
||||||
|
```
|
||||||
|
|
||||||
|
This will create a task and the first task will get processed immediately by the consumer. The second task will be processed 24 hours later.
|
||||||
|
|
||||||
|
Let's use `asynqmon` tool to inspect the tasks.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
asynqmon stats
|
||||||
|
```
|
||||||
|
|
||||||
|
This command will show the number of tasks in each state and stats for the current date as well as redis information.
|
||||||
|
|
||||||
|
To understand the meaning of each state, see [Life of a Task Wiki page](https://github.com/hibiken/asynq/wiki/Life-of-a-Task).
|
||||||
|
|
||||||
|
For in-depth guide on `asynqmon` tool, see the [README](/tools/asynqmon/README.md) for the CLI.
|
||||||
|
|
||||||
|
This was a quick tour of `asynq` basics. To see all of its features such as **[priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)** and **[custom retry](https://github.com/hibiken/asynq/wiki/Task-Retry)**, see [the Wiki page](https://github.com/hibiken/asynq/wiki).
|
||||||
|
|
||||||
## Monitoring CLI
|
## Monitoring CLI
|
||||||
|
|
||||||
Asynq ships with a CLI tool to inspect the state of queues and tasks.
|
Asynq ships with a CLI tool to inspect the state of queues and tasks.
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ package asynq
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
@@ -150,9 +149,15 @@ func (fn HandlerFunc) ProcessTask(task *Task) error {
|
|||||||
// a signal, it gracefully shuts down all pending workers and other
|
// a signal, it gracefully shuts down all pending workers and other
|
||||||
// goroutines to process the tasks.
|
// goroutines to process the tasks.
|
||||||
func (bg *Background) Run(handler Handler) {
|
func (bg *Background) Run(handler Handler) {
|
||||||
|
logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
|
||||||
|
logger.info("Starting processing")
|
||||||
|
|
||||||
bg.start(handler)
|
bg.start(handler)
|
||||||
defer bg.stop()
|
defer bg.stop()
|
||||||
|
|
||||||
|
logger.info("Send signal TSTP to stop processing new tasks")
|
||||||
|
logger.info("Send signal TERM or INT to terminate the process")
|
||||||
|
|
||||||
// Wait for a signal to terminate.
|
// Wait for a signal to terminate.
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP)
|
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP)
|
||||||
@@ -165,7 +170,7 @@ func (bg *Background) Run(handler Handler) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
log.Println("[INFO] Starting graceful shutdown...")
|
logger.info("Starting graceful shutdown")
|
||||||
}
|
}
|
||||||
|
|
||||||
// starts the background-task processing.
|
// starts the background-task processing.
|
||||||
@@ -201,6 +206,8 @@ func (bg *Background) stop() {
|
|||||||
bg.rdb.Close()
|
bg.rdb.Close()
|
||||||
bg.processor.handler = nil
|
bg.processor.handler = nil
|
||||||
bg.running = false
|
bg.running = false
|
||||||
|
|
||||||
|
logger.info("Bye!")
|
||||||
}
|
}
|
||||||
|
|
||||||
// normalizeQueueCfg divides priority numbers by their
|
// normalizeQueueCfg divides priority numbers by their
|
||||||
|
|||||||
1
go.mod
1
go.mod
@@ -17,5 +17,6 @@ require (
|
|||||||
go.uber.org/goleak v0.10.0
|
go.uber.org/goleak v0.10.0
|
||||||
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect
|
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect
|
||||||
golang.org/x/text v0.3.2 // indirect
|
golang.org/x/text v0.3.2 // indirect
|
||||||
|
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
||||||
gopkg.in/yaml.v2 v2.2.7 // indirect
|
gopkg.in/yaml.v2 v2.2.7 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
1
go.sum
1
go.sum
@@ -179,6 +179,7 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
|||||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||||
|
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
|
||||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||||
|
|||||||
35
logger.go
Normal file
35
logger.go
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
package asynq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
// global logger used in asynq package.
|
||||||
|
var logger = newLogger(os.Stderr)
|
||||||
|
|
||||||
|
func newLogger(out io.Writer) *asynqLogger {
|
||||||
|
return &asynqLogger{
|
||||||
|
log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.LUTC),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type asynqLogger struct {
|
||||||
|
*log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *asynqLogger) info(format string, args ...interface{}) {
|
||||||
|
format = "INFO: " + format
|
||||||
|
l.Printf(format, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *asynqLogger) warn(format string, args ...interface{}) {
|
||||||
|
format = "WARN: " + format
|
||||||
|
l.Printf(format, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *asynqLogger) error(format string, args ...interface{}) {
|
||||||
|
format = "ERROR: " + format
|
||||||
|
l.Printf(format, args...)
|
||||||
|
}
|
||||||
117
logger_test.go
Normal file
117
logger_test.go
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
package asynq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"regexp"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// regexp for timestamps
|
||||||
|
const (
|
||||||
|
rgxdate = `[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]`
|
||||||
|
rgxtime = `[0-9][0-9]:[0-9][0-9]:[0-9][0-9]`
|
||||||
|
rgxmicroseconds = `\.[0-9][0-9][0-9][0-9][0-9][0-9]`
|
||||||
|
)
|
||||||
|
|
||||||
|
type tester struct {
|
||||||
|
desc string
|
||||||
|
message string
|
||||||
|
wantPattern string // regexp that log output must match
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoggerInfo(t *testing.T) {
|
||||||
|
tests := []tester{
|
||||||
|
{
|
||||||
|
desc: "without trailing newline, logger adds newline",
|
||||||
|
message: "hello, world!",
|
||||||
|
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "with trailing newline, logger preserves newline",
|
||||||
|
message: "hello, world!\n",
|
||||||
|
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
logger := newLogger(&buf)
|
||||||
|
|
||||||
|
logger.info(tc.message)
|
||||||
|
|
||||||
|
got := buf.String()
|
||||||
|
matched, err := regexp.MatchString(tc.wantPattern, got)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("pattern did not compile:", err)
|
||||||
|
}
|
||||||
|
if !matched {
|
||||||
|
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
|
||||||
|
tc.message, got, tc.wantPattern)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoggerWarn(t *testing.T) {
|
||||||
|
tests := []tester{
|
||||||
|
{
|
||||||
|
desc: "without trailing newline, logger adds newline",
|
||||||
|
message: "hello, world!",
|
||||||
|
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "with trailing newline, logger preserves newline",
|
||||||
|
message: "hello, world!\n",
|
||||||
|
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
logger := newLogger(&buf)
|
||||||
|
|
||||||
|
logger.warn(tc.message)
|
||||||
|
|
||||||
|
got := buf.String()
|
||||||
|
matched, err := regexp.MatchString(tc.wantPattern, got)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("pattern did not compile:", err)
|
||||||
|
}
|
||||||
|
if !matched {
|
||||||
|
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
|
||||||
|
tc.message, got, tc.wantPattern)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoggerError(t *testing.T) {
|
||||||
|
tests := []tester{
|
||||||
|
{
|
||||||
|
desc: "without trailing newline, logger adds newline",
|
||||||
|
message: "hello, world!",
|
||||||
|
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "with trailing newline, logger preserves newline",
|
||||||
|
message: "hello, world!\n",
|
||||||
|
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
logger := newLogger(&buf)
|
||||||
|
|
||||||
|
logger.error(tc.message)
|
||||||
|
|
||||||
|
got := buf.String()
|
||||||
|
matched, err := regexp.MatchString(tc.wantPattern, got)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("pattern did not compile:", err)
|
||||||
|
}
|
||||||
|
if !matched {
|
||||||
|
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
|
||||||
|
tc.message, got, tc.wantPattern)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
40
processor.go
40
processor.go
@@ -6,7 +6,6 @@ package asynq
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -14,6 +13,7 @@ import (
|
|||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
type processor struct {
|
type processor struct {
|
||||||
@@ -31,6 +31,9 @@ type processor struct {
|
|||||||
// channel via which to send sync requests to syncer.
|
// channel via which to send sync requests to syncer.
|
||||||
syncRequestCh chan<- *syncRequest
|
syncRequestCh chan<- *syncRequest
|
||||||
|
|
||||||
|
// rate limiter to prevent spamming logs with a bunch of errors.
|
||||||
|
errLogLimiter *rate.Limiter
|
||||||
|
|
||||||
// sema is a counting semaphore to ensure the number of active workers
|
// sema is a counting semaphore to ensure the number of active workers
|
||||||
// does not exceed the limit.
|
// does not exceed the limit.
|
||||||
sema chan struct{}
|
sema chan struct{}
|
||||||
@@ -67,6 +70,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
|
|||||||
orderedQueues: orderedQueues,
|
orderedQueues: orderedQueues,
|
||||||
retryDelayFunc: fn,
|
retryDelayFunc: fn,
|
||||||
syncRequestCh: syncRequestCh,
|
syncRequestCh: syncRequestCh,
|
||||||
|
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
|
||||||
sema: make(chan struct{}, n),
|
sema: make(chan struct{}, n),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
abort: make(chan struct{}),
|
abort: make(chan struct{}),
|
||||||
@@ -79,7 +83,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
|
|||||||
// It's safe to call this method multiple times.
|
// It's safe to call this method multiple times.
|
||||||
func (p *processor) stop() {
|
func (p *processor) stop() {
|
||||||
p.once.Do(func() {
|
p.once.Do(func() {
|
||||||
log.Println("[INFO] Processor shutting down...")
|
logger.info("Processor shutting down...")
|
||||||
// Unblock if processor is waiting for sema token.
|
// Unblock if processor is waiting for sema token.
|
||||||
close(p.abort)
|
close(p.abort)
|
||||||
// Signal the processor goroutine to stop processing tasks
|
// Signal the processor goroutine to stop processing tasks
|
||||||
@@ -95,12 +99,12 @@ func (p *processor) terminate() {
|
|||||||
// IDEA: Allow user to customize this timeout value.
|
// IDEA: Allow user to customize this timeout value.
|
||||||
const timeout = 8 * time.Second
|
const timeout = 8 * time.Second
|
||||||
time.AfterFunc(timeout, func() { close(p.quit) })
|
time.AfterFunc(timeout, func() { close(p.quit) })
|
||||||
log.Println("[INFO] Waiting for all workers to finish...")
|
logger.info("Waiting for all workers to finish...")
|
||||||
// block until all workers have released the token
|
// block until all workers have released the token
|
||||||
for i := 0; i < cap(p.sema); i++ {
|
for i := 0; i < cap(p.sema); i++ {
|
||||||
p.sema <- struct{}{}
|
p.sema <- struct{}{}
|
||||||
}
|
}
|
||||||
log.Println("[INFO] All workers have finished.")
|
logger.info("All workers have finished")
|
||||||
p.restore() // move any unfinished tasks back to the queue.
|
p.restore() // move any unfinished tasks back to the queue.
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,7 +116,7 @@ func (p *processor) start() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-p.done:
|
case <-p.done:
|
||||||
log.Println("[INFO] Processor done.")
|
logger.info("Processor done")
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
p.exec()
|
p.exec()
|
||||||
@@ -137,7 +141,9 @@ func (p *processor) exec() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[ERROR] unexpected error while pulling a task out of queue: %v\n", err)
|
if p.errLogLimiter.Allow() {
|
||||||
|
logger.error("Dequeue error: %v", err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,7 +165,7 @@ func (p *processor) exec() {
|
|||||||
select {
|
select {
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
// time is up, quit this worker goroutine.
|
// time is up, quit this worker goroutine.
|
||||||
log.Printf("[WARN] Terminating in-progress task %+v\n", msg)
|
logger.warn("Quitting worker to process task id=%s", msg.ID)
|
||||||
return
|
return
|
||||||
case resErr := <-resCh:
|
case resErr := <-resCh:
|
||||||
// Note: One of three things should happen.
|
// Note: One of three things should happen.
|
||||||
@@ -185,25 +191,25 @@ func (p *processor) exec() {
|
|||||||
func (p *processor) restore() {
|
func (p *processor) restore() {
|
||||||
n, err := p.rdb.RestoreUnfinished()
|
n, err := p.rdb.RestoreUnfinished()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err)
|
logger.error("Could not restore unfinished tasks: %v", err)
|
||||||
}
|
}
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n)
|
logger.info("Restored %d unfinished tasks back to queue", n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) requeue(msg *base.TaskMessage) {
|
func (p *processor) requeue(msg *base.TaskMessage) {
|
||||||
err := p.rdb.Requeue(msg)
|
err := p.rdb.Requeue(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err)
|
logger.error("Could not push task id=%s back to queue: %v", msg.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) markAsDone(msg *base.TaskMessage) {
|
func (p *processor) markAsDone(msg *base.TaskMessage) {
|
||||||
err := p.rdb.Done(msg)
|
err := p.rdb.Done(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue)
|
errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
|
||||||
log.Printf("[WARN] %s; will retry\n", errMsg)
|
logger.warn("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.rdb.Done(msg)
|
return p.rdb.Done(msg)
|
||||||
@@ -218,8 +224,8 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
|
|||||||
retryAt := time.Now().Add(d)
|
retryAt := time.Now().Add(d)
|
||||||
err := p.rdb.Retry(msg, retryAt, e.Error())
|
err := p.rdb.Retry(msg, retryAt, e.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue)
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
|
||||||
log.Printf("[WARN] %s; will retry\n", errMsg)
|
logger.warn("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.rdb.Retry(msg, retryAt, e.Error())
|
return p.rdb.Retry(msg, retryAt, e.Error())
|
||||||
@@ -230,11 +236,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) kill(msg *base.TaskMessage, e error) {
|
func (p *processor) kill(msg *base.TaskMessage, e error) {
|
||||||
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
|
logger.warn("Retry exhausted for task id=%s", msg.ID)
|
||||||
err := p.rdb.Kill(msg, e.Error())
|
err := p.rdb.Kill(msg, e.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue)
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
|
||||||
log.Printf("[WARN] %s; will retry\n", errMsg)
|
logger.warn("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.rdb.Kill(msg, e.Error())
|
return p.rdb.Kill(msg, e.Error())
|
||||||
|
|||||||
@@ -5,7 +5,6 @@
|
|||||||
package asynq
|
package asynq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
@@ -38,7 +37,7 @@ func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) terminate() {
|
func (s *scheduler) terminate() {
|
||||||
log.Println("[INFO] Scheduler shutting down...")
|
logger.info("Scheduler shutting down...")
|
||||||
// Signal the scheduler goroutine to stop polling.
|
// Signal the scheduler goroutine to stop polling.
|
||||||
s.done <- struct{}{}
|
s.done <- struct{}{}
|
||||||
}
|
}
|
||||||
@@ -49,7 +48,7 @@ func (s *scheduler) start() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
log.Println("[INFO] Scheduler done.")
|
logger.info("Scheduler done")
|
||||||
return
|
return
|
||||||
case <-time.After(s.avgInterval):
|
case <-time.After(s.avgInterval):
|
||||||
s.exec()
|
s.exec()
|
||||||
@@ -60,6 +59,6 @@ func (s *scheduler) start() {
|
|||||||
|
|
||||||
func (s *scheduler) exec() {
|
func (s *scheduler) exec() {
|
||||||
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
|
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
|
||||||
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err)
|
logger.error("Could not enqueue scheduled tasks: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,6 @@
|
|||||||
package asynq
|
package asynq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -35,7 +34,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncer) terminate() {
|
func (s *syncer) terminate() {
|
||||||
log.Println("[INFO] Syncer shutting down...")
|
logger.info("Syncer shutting down...")
|
||||||
// Signal the syncer goroutine to stop.
|
// Signal the syncer goroutine to stop.
|
||||||
s.done <- struct{}{}
|
s.done <- struct{}{}
|
||||||
}
|
}
|
||||||
@@ -49,10 +48,10 @@ func (s *syncer) start() {
|
|||||||
// Try sync one last time before shutting down.
|
// Try sync one last time before shutting down.
|
||||||
for _, req := range requests {
|
for _, req := range requests {
|
||||||
if err := req.fn(); err != nil {
|
if err := req.fn(); err != nil {
|
||||||
log.Printf("[ERROR] %s\n", req.errMsg)
|
logger.error(req.errMsg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Println("[INFO] Syncer done.")
|
logger.info("Syncer done")
|
||||||
return
|
return
|
||||||
case req := <-s.requestsCh:
|
case req := <-s.requestsCh:
|
||||||
requests = append(requests, req)
|
requests = append(requests, req)
|
||||||
|
|||||||
Reference in New Issue
Block a user