mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-24 15:05:53 +08:00
Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2516c4baba | ||
|
|
ebe482a65c | ||
|
|
3e9fc2f972 | ||
|
|
63ce9ed0f9 | ||
|
|
32d3f329b9 | ||
|
|
544c301a8b | ||
|
|
8b997d2fab | ||
|
|
901105a8d7 | ||
|
|
aaa3f1d4fd | ||
|
|
4722ca2d3d | ||
|
|
6a9d9fd717 | ||
|
|
de28c1ea19 | ||
|
|
f618f5b1f5 | ||
|
|
aa936466b3 | ||
|
|
5d1ec70544 | ||
|
|
d1d3be9b00 |
2
.github/workflows/benchstat.yml
vendored
2
.github/workflows/benchstat.yml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: 1.15.x
|
||||
go-version: 1.16.x
|
||||
- name: Benchmark
|
||||
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a new.txt
|
||||
- name: Upload Benchmark
|
||||
|
||||
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -7,7 +7,7 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-latest]
|
||||
go-version: [1.13.x, 1.14.x, 1.15.x]
|
||||
go-version: [1.13.x, 1.14.x, 1.15.x, 1.16.x]
|
||||
runs-on: ${{ matrix.os }}
|
||||
services:
|
||||
redis:
|
||||
|
||||
32
CHANGELOG.md
32
CHANGELOG.md
@@ -7,6 +7,36 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.17.2] - 2021-06-06
|
||||
|
||||
### Fixed
|
||||
|
||||
- Free unique lock when task is deleted (https://github.com/hibiken/asynq/issues/275).
|
||||
|
||||
## [0.17.1] - 2021-04-04
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fix bug in internal `RDB.memoryUsage` method.
|
||||
|
||||
## [0.17.0] - 2021-03-24
|
||||
|
||||
### Added
|
||||
|
||||
- `DialTimeout`, `ReadTimeout`, and `WriteTimeout` options are added to `RedisConnOpt`.
|
||||
|
||||
## [0.16.1] - 2021-03-20
|
||||
|
||||
### Fixed
|
||||
|
||||
- Replace `KEYS` command with `SCAN` as recommended by [redis doc](https://redis.io/commands/KEYS).
|
||||
|
||||
## [0.16.0] - 2021-03-10
|
||||
|
||||
### Added
|
||||
|
||||
- `Unregister` method is added to `Scheduler` to remove a registered entry.
|
||||
|
||||
## [0.15.0] - 2021-01-31
|
||||
|
||||
**IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq"
|
||||
@@ -15,7 +45,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
- `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq".
|
||||
- `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer,
|
||||
update your code to pass as a value.
|
||||
update your code to pass as a value.
|
||||
- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.
|
||||
|
||||
### Added
|
||||
|
||||
99
README.md
99
README.md
@@ -1,14 +1,14 @@
|
||||
# Asynq
|
||||
<img src="https://user-images.githubusercontent.com/11155743/114697792-ffbfa580-9d26-11eb-8e5b-33bef69476dc.png" alt="Asynq logo" width="360px" />
|
||||
|
||||
# Simple, reliable & efficient distributed task queue in Go
|
||||
|
||||

|
||||
[](https://godoc.org/github.com/hibiken/asynq)
|
||||
[](https://goreportcard.com/report/github.com/hibiken/asynq)
|
||||

|
||||
[](https://opensource.org/licenses/MIT)
|
||||
[](https://gitter.im/go-asynq/community)
|
||||
|
||||
## Overview
|
||||
|
||||
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.
|
||||
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by [Redis](https://redis.io/) and is designed to be scalable yet easy to get started.
|
||||
|
||||
Highlevel overview of how Asynq works:
|
||||
|
||||
@@ -16,16 +16,11 @@ Highlevel overview of how Asynq works:
|
||||
- Server pulls task off queues and starts a worker goroutine for each task
|
||||
- Tasks are processed concurrently by multiple workers
|
||||
|
||||
Task queues are used as a mechanism to distribute work across multiple machines.
|
||||
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
|
||||
Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
|
||||
|
||||

|
||||
**Example use case**
|
||||
|
||||
## Stability and Compatibility
|
||||
|
||||
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.
|
||||
|
||||
**Status**: The library is currently undergoing heavy development with frequent, breaking API changes.
|
||||

|
||||
|
||||
## Features
|
||||
|
||||
@@ -44,16 +39,27 @@ A system can consist of multiple worker servers and brokers, giving way to high
|
||||
- [Periodic Tasks](https://github.com/hibiken/asynq/wiki/Periodic-Tasks)
|
||||
- [Support Redis Cluster](https://github.com/hibiken/asynq/wiki/Redis-Cluster) for automatic sharding and high availability
|
||||
- [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for high availability
|
||||
- [Web UI](#web-ui) to inspect and remote-control queues and tasks
|
||||
- [CLI](#command-line-tool) to inspect and remote-control queues and tasks
|
||||
|
||||
## Stability and Compatibility
|
||||
|
||||
**Status**: The library is currently undergoing **heavy development** with frequent, breaking API changes.
|
||||
|
||||
> ☝️ **Important Note**: Current major version is zero (`v0.x.x`) to accomodate rapid development and fast iteration while getting early feedback from users (_feedback on APIs are appreciated!_). The public API could change without a major version update before `v1.0.0` release.
|
||||
|
||||
## Quickstart
|
||||
|
||||
First, make sure you are running a Redis server locally.
|
||||
Make sure you have Go installed ([download](https://golang.org/dl/)). Version `1.13` or higher is required.
|
||||
|
||||
Initialize your project by creating a folder and then running `go mod init github.com/your/repo` ([learn more](https://blog.golang.org/using-go-modules)) inside the folder. Then install Asynq library with the [`go get`](https://golang.org/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them) command:
|
||||
|
||||
```sh
|
||||
$ redis-server
|
||||
go get -u github.com/hibiken/asynq
|
||||
```
|
||||
|
||||
Make sure you're running a Redis server locally or from a [Docker](https://hub.docker.com/_/redis) container. Version `3.0` or higher is required.
|
||||
|
||||
Next, write a package that encapsulates task creation and task handling.
|
||||
|
||||
```go
|
||||
@@ -203,10 +209,9 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
Next, start a worker server to process these tasks in the background.
|
||||
To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
|
||||
Next, start a worker server to process these tasks in the background. To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
|
||||
|
||||
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler.
|
||||
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`net/http`](https://golang.org/pkg/net/http/) Handler.
|
||||
|
||||
```go
|
||||
package main
|
||||
@@ -247,52 +252,52 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
For a more detailed walk-through of the library, see our [Getting Started Guide](https://github.com/hibiken/asynq/wiki/Getting-Started).
|
||||
For a more detailed walk-through of the library, see our [Getting Started](https://github.com/hibiken/asynq/wiki/Getting-Started) guide.
|
||||
|
||||
To Learn more about `asynq` features and APIs, see our [Wiki](https://github.com/hibiken/asynq/wiki) and [godoc](https://godoc.org/github.com/hibiken/asynq).
|
||||
To learn more about `asynq` features and APIs, see the package [godoc](https://godoc.org/github.com/hibiken/asynq).
|
||||
|
||||
## Web UI
|
||||
|
||||
[Asynqmon](https://github.com/hibiken/asynqmon) is a web based tool for monitoring and administrating Asynq queues and tasks.
|
||||
|
||||
Here's a few screenshots of the Web UI:
|
||||
|
||||
**Queues view**
|
||||
|
||||

|
||||
|
||||
**Tasks view**
|
||||
|
||||

|
||||
|
||||
**Settings and adaptive dark mode**
|
||||
|
||||

|
||||
|
||||
For details on how to use the tool, refer to the tool's [README](https://github.com/hibiken/asynqmon#readme).
|
||||
|
||||
## Command Line Tool
|
||||
|
||||
Asynq ships with a command line tool to inspect the state of queues and tasks.
|
||||
|
||||
Here's an example of running the `stats` command.
|
||||
|
||||

|
||||
|
||||
For details on how to use the tool, refer to the tool's [README](/tools/asynq/README.md).
|
||||
|
||||
## Installation
|
||||
|
||||
To install `asynq` library, run the following command:
|
||||
|
||||
```sh
|
||||
go get -u github.com/hibiken/asynq
|
||||
```
|
||||
|
||||
To install the CLI tool, run the following command:
|
||||
|
||||
```sh
|
||||
go get -u github.com/hibiken/asynq/tools/asynq
|
||||
```
|
||||
|
||||
## Requirements
|
||||
Here's an example of running the `asynq stats` command:
|
||||
|
||||
| Dependency | Version |
|
||||
| -------------------------- | ------- |
|
||||
| [Redis](https://redis.io/) | v3.0+ |
|
||||
| [Go](https://golang.org/) | v1.13+ |
|
||||

|
||||
|
||||
For details on how to use the tool, refer to the tool's [README](/tools/asynq/README.md).
|
||||
|
||||
## Contributing
|
||||
|
||||
We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community.
|
||||
We are open to, and grateful for, any contributions (GitHub issues/PRs, feedback on [Gitter channel](https://gitter.im/go-asynq/community), etc) made by the community.
|
||||
|
||||
Please see the [Contribution Guide](/CONTRIBUTING.md) before contributing.
|
||||
|
||||
## Acknowledgements
|
||||
|
||||
- [Sidekiq](https://github.com/mperham/sidekiq) : Many of the design ideas are taken from sidekiq and its Web UI
|
||||
- [RQ](https://github.com/rq/rq) : Client APIs are inspired by rq library.
|
||||
- [Cobra](https://github.com/spf13/cobra) : Asynq CLI is built with cobra
|
||||
|
||||
## License
|
||||
|
||||
Asynq is released under the MIT license. See [LICENSE](https://github.com/hibiken/asynq/blob/master/LICENSE).
|
||||
Copyright (c) 2019-present [Ken Hibino](https://github.com/hibiken) and [Contributors](https://github.com/hibiken/asynq/graphs/contributors). `Asynq` is free and open-source software licensed under the [MIT License](https://github.com/hibiken/asynq/blob/master/LICENSE). Official logo was created by [Vic Shóstak](https://github.com/koddr) and distributed under [Creative Commons](https://creativecommons.org/publicdomain/zero/1.0/) license (CC0 1.0 Universal).
|
||||
|
||||
86
asynq.go
86
asynq.go
@@ -10,6 +10,7 @@ import (
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
)
|
||||
@@ -68,6 +69,26 @@ type RedisClientOpt struct {
|
||||
// See: https://redis.io/commands/select.
|
||||
DB int
|
||||
|
||||
// Dial timeout for establishing new connections.
|
||||
// Default is 5 seconds.
|
||||
DialTimeout time.Duration
|
||||
|
||||
// Timeout for socket reads.
|
||||
// If timeout is reached, read commands will fail with a timeout error
|
||||
// instead of blocking.
|
||||
//
|
||||
// Use value -1 for no timeout and 0 for default.
|
||||
// Default is 3 seconds.
|
||||
ReadTimeout time.Duration
|
||||
|
||||
// Timeout for socket writes.
|
||||
// If timeout is reached, write commands will fail with a timeout error
|
||||
// instead of blocking.
|
||||
//
|
||||
// Use value -1 for no timeout and 0 for default.
|
||||
// Default is ReadTimout.
|
||||
WriteTimeout time.Duration
|
||||
|
||||
// Maximum number of socket connections.
|
||||
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
|
||||
PoolSize int
|
||||
@@ -79,13 +100,16 @@ type RedisClientOpt struct {
|
||||
|
||||
func (opt RedisClientOpt) MakeRedisClient() interface{} {
|
||||
return redis.NewClient(&redis.Options{
|
||||
Network: opt.Network,
|
||||
Addr: opt.Addr,
|
||||
Username: opt.Username,
|
||||
Password: opt.Password,
|
||||
DB: opt.DB,
|
||||
PoolSize: opt.PoolSize,
|
||||
TLSConfig: opt.TLSConfig,
|
||||
Network: opt.Network,
|
||||
Addr: opt.Addr,
|
||||
Username: opt.Username,
|
||||
Password: opt.Password,
|
||||
DB: opt.DB,
|
||||
DialTimeout: opt.DialTimeout,
|
||||
ReadTimeout: opt.ReadTimeout,
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
PoolSize: opt.PoolSize,
|
||||
TLSConfig: opt.TLSConfig,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -116,6 +140,26 @@ type RedisFailoverClientOpt struct {
|
||||
// See: https://redis.io/commands/select.
|
||||
DB int
|
||||
|
||||
// Dial timeout for establishing new connections.
|
||||
// Default is 5 seconds.
|
||||
DialTimeout time.Duration
|
||||
|
||||
// Timeout for socket reads.
|
||||
// If timeout is reached, read commands will fail with a timeout error
|
||||
// instead of blocking.
|
||||
//
|
||||
// Use value -1 for no timeout and 0 for default.
|
||||
// Default is 3 seconds.
|
||||
ReadTimeout time.Duration
|
||||
|
||||
// Timeout for socket writes.
|
||||
// If timeout is reached, write commands will fail with a timeout error
|
||||
// instead of blocking.
|
||||
//
|
||||
// Use value -1 for no timeout and 0 for default.
|
||||
// Default is ReadTimeout
|
||||
WriteTimeout time.Duration
|
||||
|
||||
// Maximum number of socket connections.
|
||||
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
|
||||
PoolSize int
|
||||
@@ -133,12 +177,15 @@ func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} {
|
||||
Username: opt.Username,
|
||||
Password: opt.Password,
|
||||
DB: opt.DB,
|
||||
DialTimeout: opt.DialTimeout,
|
||||
ReadTimeout: opt.ReadTimeout,
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
PoolSize: opt.PoolSize,
|
||||
TLSConfig: opt.TLSConfig,
|
||||
})
|
||||
}
|
||||
|
||||
// RedisFailoverClientOpt is used to creates a redis client that connects to
|
||||
// RedisClusterClientOpt is used to creates a redis client that connects to
|
||||
// redis cluster.
|
||||
type RedisClusterClientOpt struct {
|
||||
// A seed list of host:port addresses of cluster nodes.
|
||||
@@ -157,6 +204,26 @@ type RedisClusterClientOpt struct {
|
||||
// See: https://redis.io/commands/auth.
|
||||
Password string
|
||||
|
||||
// Dial timeout for establishing new connections.
|
||||
// Default is 5 seconds.
|
||||
DialTimeout time.Duration
|
||||
|
||||
// Timeout for socket reads.
|
||||
// If timeout is reached, read commands will fail with a timeout error
|
||||
// instead of blocking.
|
||||
//
|
||||
// Use value -1 for no timeout and 0 for default.
|
||||
// Default is 3 seconds.
|
||||
ReadTimeout time.Duration
|
||||
|
||||
// Timeout for socket writes.
|
||||
// If timeout is reached, write commands will fail with a timeout error
|
||||
// instead of blocking.
|
||||
//
|
||||
// Use value -1 for no timeout and 0 for default.
|
||||
// Default is ReadTimeout.
|
||||
WriteTimeout time.Duration
|
||||
|
||||
// TLS Config used to connect to a server.
|
||||
// TLS will be negotiated only if this field is set.
|
||||
TLSConfig *tls.Config
|
||||
@@ -168,6 +235,9 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
|
||||
MaxRedirects: opt.MaxRedirects,
|
||||
Username: opt.Username,
|
||||
Password: opt.Password,
|
||||
DialTimeout: opt.DialTimeout,
|
||||
ReadTimeout: opt.ReadTimeout,
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
TLSConfig: opt.TLSConfig,
|
||||
})
|
||||
}
|
||||
|
||||
BIN
docs/assets/asynqmon-queues-view.png
Normal file
BIN
docs/assets/asynqmon-queues-view.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 279 KiB |
BIN
docs/assets/asynqmon-task-view.png
Normal file
BIN
docs/assets/asynqmon-task-view.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 347 KiB |
@@ -19,7 +19,7 @@ import (
|
||||
)
|
||||
|
||||
// Version of asynq library and CLI.
|
||||
const Version = "0.15.0"
|
||||
const Version = "0.17.2"
|
||||
|
||||
// DefaultQueueName is the queue name used if none are specified by user.
|
||||
const DefaultQueueName = "default"
|
||||
|
||||
266
internal/rdb/benchmark_test.go
Normal file
266
internal/rdb/benchmark_test.go
Normal file
@@ -0,0 +1,266 @@
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package rdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/asynqtest"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
)
|
||||
|
||||
func BenchmarkEnqueue(b *testing.B) {
|
||||
r := setup(b)
|
||||
msg := asynqtest.NewTaskMessage("task1", nil)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.Enqueue(msg); err != nil {
|
||||
b.Fatalf("Enqueue failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEnqueueUnique(b *testing.B) {
|
||||
r := setup(b)
|
||||
msg := &base.TaskMessage{
|
||||
Type: "task1",
|
||||
Payload: nil,
|
||||
Queue: base.DefaultQueueName,
|
||||
UniqueKey: base.UniqueKey("default", "task1", nil),
|
||||
}
|
||||
uniqueTTL := 5 * time.Minute
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.EnqueueUnique(msg, uniqueTTL); err != nil {
|
||||
b.Fatalf("EnqueueUnique failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSchedule(b *testing.B) {
|
||||
r := setup(b)
|
||||
msg := asynqtest.NewTaskMessage("task1", nil)
|
||||
processAt := time.Now().Add(3 * time.Minute)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.Schedule(msg, processAt); err != nil {
|
||||
b.Fatalf("Schedule failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkScheduleUnique(b *testing.B) {
|
||||
r := setup(b)
|
||||
msg := &base.TaskMessage{
|
||||
Type: "task1",
|
||||
Payload: nil,
|
||||
Queue: base.DefaultQueueName,
|
||||
UniqueKey: base.UniqueKey("default", "task1", nil),
|
||||
}
|
||||
processAt := time.Now().Add(3 * time.Minute)
|
||||
uniqueTTL := 5 * time.Minute
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.ScheduleUnique(msg, processAt, uniqueTTL); err != nil {
|
||||
b.Fatalf("EnqueueUnique failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDequeueSingleQueue(b *testing.B) {
|
||||
r := setup(b)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
for i := 0; i < 10; i++ {
|
||||
m := asynqtest.NewTaskMessageWithQueue(
|
||||
fmt.Sprintf("task%d", i), nil, base.DefaultQueueName)
|
||||
if err := r.Enqueue(m); err != nil {
|
||||
b.Fatalf("Enqueue failed: %v", err)
|
||||
}
|
||||
}
|
||||
b.StartTimer()
|
||||
|
||||
if _, _, err := r.Dequeue(base.DefaultQueueName); err != nil {
|
||||
b.Fatalf("Dequeue failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDequeueMultipleQueues(b *testing.B) {
|
||||
qnames := []string{"critical", "default", "low"}
|
||||
r := setup(b)
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
for i := 0; i < 10; i++ {
|
||||
for _, qname := range qnames {
|
||||
m := asynqtest.NewTaskMessageWithQueue(
|
||||
fmt.Sprintf("%s_task%d", qname, i), nil, qname)
|
||||
if err := r.Enqueue(m); err != nil {
|
||||
b.Fatalf("Enqueue failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
b.StartTimer()
|
||||
|
||||
if _, _, err := r.Dequeue(qnames...); err != nil {
|
||||
b.Fatalf("Dequeue failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDone(b *testing.B) {
|
||||
r := setup(b)
|
||||
m1 := asynqtest.NewTaskMessage("task1", nil)
|
||||
m2 := asynqtest.NewTaskMessage("task2", nil)
|
||||
m3 := asynqtest.NewTaskMessage("task3", nil)
|
||||
msgs := []*base.TaskMessage{m1, m2, m3}
|
||||
zs := []base.Z{
|
||||
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
|
||||
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
|
||||
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
||||
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.Done(msgs[0]); err != nil {
|
||||
b.Fatalf("Done failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRetry(b *testing.B) {
|
||||
r := setup(b)
|
||||
m1 := asynqtest.NewTaskMessage("task1", nil)
|
||||
m2 := asynqtest.NewTaskMessage("task2", nil)
|
||||
m3 := asynqtest.NewTaskMessage("task3", nil)
|
||||
msgs := []*base.TaskMessage{m1, m2, m3}
|
||||
zs := []base.Z{
|
||||
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
|
||||
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
|
||||
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
||||
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.Retry(msgs[0], time.Now().Add(1*time.Minute), "error"); err != nil {
|
||||
b.Fatalf("Retry failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkArchive(b *testing.B) {
|
||||
r := setup(b)
|
||||
m1 := asynqtest.NewTaskMessage("task1", nil)
|
||||
m2 := asynqtest.NewTaskMessage("task2", nil)
|
||||
m3 := asynqtest.NewTaskMessage("task3", nil)
|
||||
msgs := []*base.TaskMessage{m1, m2, m3}
|
||||
zs := []base.Z{
|
||||
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
|
||||
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
|
||||
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
||||
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.Archive(msgs[0], "error"); err != nil {
|
||||
b.Fatalf("Archive failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRequeue(b *testing.B) {
|
||||
r := setup(b)
|
||||
m1 := asynqtest.NewTaskMessage("task1", nil)
|
||||
m2 := asynqtest.NewTaskMessage("task2", nil)
|
||||
m3 := asynqtest.NewTaskMessage("task3", nil)
|
||||
msgs := []*base.TaskMessage{m1, m2, m3}
|
||||
zs := []base.Z{
|
||||
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
|
||||
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
|
||||
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
|
||||
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.Requeue(msgs[0]); err != nil {
|
||||
b.Fatalf("Requeue failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCheckAndEnqueue(b *testing.B) {
|
||||
r := setup(b)
|
||||
now := time.Now()
|
||||
var zs []base.Z
|
||||
for i := -100; i < 100; i++ {
|
||||
msg := asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil)
|
||||
score := now.Add(time.Duration(i) * time.Second).Unix()
|
||||
zs = append(zs, base.Z{Message: msg, Score: score})
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
asynqtest.FlushDB(b, r.client)
|
||||
asynqtest.SeedScheduledQueue(b, r.client, zs, base.DefaultQueueName)
|
||||
b.StartTimer()
|
||||
|
||||
if err := r.CheckAndEnqueue(base.DefaultQueueName); err != nil {
|
||||
b.Fatalf("CheckAndEnqueue failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -172,9 +172,21 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
||||
}
|
||||
|
||||
func (r *RDB) memoryUsage(qname string) (int64, error) {
|
||||
keys, err := r.client.Keys(fmt.Sprintf("asynq:{%s}*", qname)).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
var (
|
||||
keys []string
|
||||
data []string
|
||||
cursor uint64
|
||||
err error
|
||||
)
|
||||
for {
|
||||
data, cursor, err = r.client.Scan(cursor, fmt.Sprintf("asynq:{%s}*", qname), 100).Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
keys = append(keys, data...)
|
||||
if cursor == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
var usg int64
|
||||
for _, k := range keys {
|
||||
@@ -723,6 +735,11 @@ func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
|
||||
if n == 0 {
|
||||
return ErrTaskNotFound
|
||||
}
|
||||
if r.client.Get(msg.UniqueKey).Val() == msg.ID.String() {
|
||||
if err := r.client.Del(msg.UniqueKey).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -735,6 +752,9 @@ for _, msg in ipairs(msgs) do
|
||||
local decoded = cjson.decode(msg)
|
||||
if decoded["ID"] == ARGV[2] then
|
||||
redis.call("ZREM", KEYS[1], msg)
|
||||
if redis.call("GET", decoded["UniqueKey"]) == ARGV[2] then
|
||||
redis.call("DEL", decoded["UniqueKey"])
|
||||
end
|
||||
return 1
|
||||
end
|
||||
end
|
||||
@@ -757,9 +777,15 @@ func (r *RDB) deleteTask(key, id string, score float64) error {
|
||||
|
||||
// KEYS[1] -> queue to delete
|
||||
var deleteAllCmd = redis.NewScript(`
|
||||
local n = redis.call("ZCARD", KEYS[1])
|
||||
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
|
||||
for _, msg in ipairs(msgs) do
|
||||
local decoded = cjson.decode(msg)
|
||||
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
|
||||
redis.call("DEL", decoded["UniqueKey"])
|
||||
end
|
||||
end
|
||||
redis.call("DEL", KEYS[1])
|
||||
return n`)
|
||||
return table.getn(msgs)`)
|
||||
|
||||
// DeleteAllArchivedTasks deletes all archived tasks from the given queue
|
||||
// and returns the number of tasks deleted.
|
||||
@@ -793,9 +819,15 @@ func (r *RDB) deleteAll(key string) (int64, error) {
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}
|
||||
var deleteAllPendingCmd = redis.NewScript(`
|
||||
local n = redis.call("LLEN", KEYS[1])
|
||||
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
|
||||
for _, msg in ipairs(msgs) do
|
||||
local decoded = cjson.decode(msg)
|
||||
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
|
||||
redis.call("DEL", decoded["UniqueKey"])
|
||||
end
|
||||
end
|
||||
redis.call("DEL", KEYS[1])
|
||||
return n`)
|
||||
return table.getn(msgs)`)
|
||||
|
||||
// DeleteAllPendingTasks deletes all pending tasks from the given queue
|
||||
// and returns the number of tasks deleted.
|
||||
|
||||
@@ -2445,7 +2445,68 @@ func TestDeleteScheduledTask(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteAllDeadTasks(t *testing.T) {
|
||||
func TestDeleteUniqueTask(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
m1 := &base.TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: "reindex",
|
||||
Payload: nil,
|
||||
Timeout: 1800,
|
||||
Deadline: 0,
|
||||
UniqueKey: "asynq:{default}:unique:reindex:nil",
|
||||
Queue: "default",
|
||||
}
|
||||
t1 := time.Now().Add(5 * time.Minute)
|
||||
|
||||
tests := []struct {
|
||||
scheduled map[string][]base.Z
|
||||
qname string
|
||||
id uuid.UUID
|
||||
score int64
|
||||
uniqueKey string
|
||||
wantScheduled map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: t1.Unix()},
|
||||
},
|
||||
},
|
||||
qname: "default",
|
||||
id: m1.ID,
|
||||
score: t1.Unix(),
|
||||
uniqueKey: m1.UniqueKey,
|
||||
wantScheduled: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
if err := r.client.SetNX(tc.uniqueKey, tc.id.String(), time.Minute).Err(); err != nil {
|
||||
t.Fatalf("Could not set unique lock in redis: %v", err)
|
||||
}
|
||||
|
||||
if err := r.DeleteScheduledTask(tc.qname, tc.id, tc.score); err != nil {
|
||||
t.Errorf("r.DeleteScheduledTask(%q, %v, %v) returned error: %v", tc.qname, tc.id, tc.score, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantScheduled {
|
||||
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff)
|
||||
}
|
||||
}
|
||||
if r.client.Exists(tc.uniqueKey).Val() != 0 {
|
||||
t.Errorf("Uniqueness lock %q still exists", tc.uniqueKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
func TestDeleteAllArchivedTasks(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
m1 := h.NewTaskMessage("task1", nil)
|
||||
@@ -2507,6 +2568,89 @@ func TestDeleteAllDeadTasks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
m1 := &base.TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: "task1",
|
||||
Payload: nil,
|
||||
Timeout: 1800,
|
||||
Deadline: 0,
|
||||
UniqueKey: "asynq:{default}:unique:task1:nil",
|
||||
Queue: "default",
|
||||
}
|
||||
m2 := &base.TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: "task2",
|
||||
Payload: nil,
|
||||
Timeout: 1800,
|
||||
Deadline: 0,
|
||||
UniqueKey: "asynq:{default}:unique:task2:nil",
|
||||
Queue: "default",
|
||||
}
|
||||
m3 := h.NewTaskMessage("task3", nil)
|
||||
|
||||
tests := []struct {
|
||||
archived map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantArchived map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
archived: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: time.Now().Unix()},
|
||||
{Message: m2, Score: time.Now().Unix()},
|
||||
{Message: m3, Score: time.Now().Unix()},
|
||||
},
|
||||
},
|
||||
qname: "default",
|
||||
want: 3,
|
||||
wantArchived: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||
var uniqueKeys []string // list of unique keys set in redis
|
||||
for _, zs := range tc.archived {
|
||||
for _, z := range zs {
|
||||
if len(z.Message.UniqueKey) > 0 {
|
||||
err := r.client.SetNX(z.Message.UniqueKey, z.Message.ID.String(), time.Minute).Err()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set unique lock in redis: %v", err)
|
||||
}
|
||||
uniqueKeys = append(uniqueKeys, z.Message.UniqueKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
got, err := r.DeleteAllArchivedTasks(tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err)
|
||||
}
|
||||
if got != tc.want {
|
||||
t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
|
||||
}
|
||||
for qname, want := range tc.wantArchived {
|
||||
gotArchived := h.GetArchivedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotArchived, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
|
||||
for _, uniqueKey := range uniqueKeys {
|
||||
if r.client.Exists(uniqueKey).Val() != 0 {
|
||||
t.Errorf("Uniqueness lock %q still exists", uniqueKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteAllRetryTasks(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
@@ -37,12 +37,12 @@ func init() {
|
||||
flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses")
|
||||
}
|
||||
|
||||
func setup(t *testing.T) (r *RDB) {
|
||||
t.Helper()
|
||||
func setup(tb testing.TB) (r *RDB) {
|
||||
tb.Helper()
|
||||
if useRedisCluster {
|
||||
addrs := strings.Split(redisClusterAddrs, ",")
|
||||
if len(addrs) == 0 {
|
||||
t.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
|
||||
tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
|
||||
}
|
||||
r = NewRDB(redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: addrs,
|
||||
@@ -54,7 +54,7 @@ func setup(t *testing.T) (r *RDB) {
|
||||
}))
|
||||
}
|
||||
// Start each test with a clean slate.
|
||||
h.FlushDB(t, r.client)
|
||||
h.FlushDB(tb, r.client)
|
||||
return r
|
||||
}
|
||||
|
||||
|
||||
20
scheduler.go
20
scheduler.go
@@ -30,6 +30,10 @@ type Scheduler struct {
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
errHandler func(task *Task, opts []Option, err error)
|
||||
// idmap maps Scheduler's entry ID to cron.EntryID
|
||||
// to avoid using cron.EntryID as the public API of
|
||||
// the Scheduler.
|
||||
idmap map[string]cron.EntryID
|
||||
}
|
||||
|
||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||
@@ -65,6 +69,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||
location: loc,
|
||||
done: make(chan struct{}),
|
||||
errHandler: opts.EnqueueErrorHandler,
|
||||
idmap: make(map[string]cron.EntryID),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,12 +150,25 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
|
||||
logger: s.logger,
|
||||
errHandler: s.errHandler,
|
||||
}
|
||||
if _, err = s.cron.AddJob(cronspec, job); err != nil {
|
||||
cronID, err := s.cron.AddJob(cronspec, job)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
s.idmap[job.id.String()] = cronID
|
||||
return job.id.String(), nil
|
||||
}
|
||||
|
||||
// Unregister removes a registered entry by entry ID.
|
||||
// Unregister returns a non-nil error if no entries were found for the given entryID.
|
||||
func (s *Scheduler) Unregister(entryID string) error {
|
||||
cronID, ok := s.idmap[entryID]
|
||||
if !ok {
|
||||
return fmt.Errorf("asynq: no scheduler entry found")
|
||||
}
|
||||
s.cron.Remove(cronID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run starts the scheduler until an os signal to exit the program is received.
|
||||
// It returns an error if scheduler is already running or has been stopped.
|
||||
func (s *Scheduler) Run() error {
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
)
|
||||
|
||||
func TestScheduler(t *testing.T) {
|
||||
func TestSchedulerRegister(t *testing.T) {
|
||||
tests := []struct {
|
||||
cronspec string
|
||||
task *Task
|
||||
@@ -116,3 +116,47 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
func TestSchedulerUnregister(t *testing.T) {
|
||||
tests := []struct {
|
||||
cronspec string
|
||||
task *Task
|
||||
opts []Option
|
||||
wait time.Duration
|
||||
queue string
|
||||
}{
|
||||
{
|
||||
cronspec: "@every 3s",
|
||||
task: NewTask("task1", nil),
|
||||
opts: []Option{MaxRetry(10)},
|
||||
wait: 10 * time.Second,
|
||||
queue: "default",
|
||||
},
|
||||
}
|
||||
|
||||
r := setup(t)
|
||||
|
||||
for _, tc := range tests {
|
||||
scheduler := NewScheduler(getRedisConnOpt(t), nil)
|
||||
entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := scheduler.Unregister(entryID); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := scheduler.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(tc.wait)
|
||||
if err := scheduler.Stop(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got := asynqtest.GetPendingMessages(t, r, tc.queue)
|
||||
if len(got) != 0 {
|
||||
t.Errorf("%d tasks were enqueued, want zero", len(got))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func TestServerRun(t *testing.T) {
|
||||
go func() {
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("server did not stop after receiving TERM signal")
|
||||
panic("server did not stop after receiving TERM signal")
|
||||
case <-done:
|
||||
}
|
||||
}()
|
||||
|
||||
Reference in New Issue
Block a user