2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-05-06 20:15:57 +08:00

Compare commits

...

16 Commits

Author SHA1 Message Date
Ken Hibino
2516c4baba v0.17.2 2021-06-06 06:51:30 -07:00
Ken Hibino
ebe482a65c Free uniqueness lock when task is deleted 2021-06-06 06:48:59 -07:00
Vic Shóstak
3e9fc2f972 Update README 2021-04-28 10:25:34 -07:00
Vic Shóstak
63ce9ed0f9 Update README with a new logo 2021-04-14 10:21:47 -07:00
Ken Hibino
32d3f329b9 v0.17.1 2021-04-04 12:51:00 -07:00
Ken Hibino
544c301a8b Fix bug in RDB.memoryUsage 2021-04-04 12:49:19 -07:00
Ken Hibino
8b997d2fab v0.17.0 2021-03-24 16:51:59 -07:00
Ken Hibino
901105a8d7 Add dial, read, write timeout options to RedisConnOpt 2021-03-24 16:49:04 -07:00
Ken Hibino
aaa3f1d4fd v0.16.1 2021-03-20 06:27:03 -07:00
disc
4722ca2d3d Replaced blocking KEYS XXX:* command to non-blocking SCAN XXX:*
More details: https://redis.io/commands/KEYS
2021-03-20 06:24:08 -07:00
Ken Hibino
6a9d9fd717 v0.16.0 2021-03-10 20:39:46 -08:00
Ken Hibino
de28c1ea19 Add Unregister method to Scheduler 2021-03-10 20:38:44 -08:00
Ken Hibino
f618f5b1f5 Add benchmark tests for rdb package 2021-03-07 16:27:14 -08:00
Ken Hibino
aa936466b3 Minor fix 2021-03-07 16:27:14 -08:00
Ken Hibino
5d1ec70544 Run CI build with go1.16 2021-02-25 09:31:17 -08:00
Ken Hibino
d1d3be9b00 Add Web UI section in README 2021-02-01 17:01:04 -08:00
15 changed files with 683 additions and 74 deletions

View File

@@ -20,7 +20,7 @@ jobs:
- name: Set up Go - name: Set up Go
uses: actions/setup-go@v2 uses: actions/setup-go@v2
with: with:
go-version: 1.15.x go-version: 1.16.x
- name: Benchmark - name: Benchmark
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a new.txt run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a new.txt
- name: Upload Benchmark - name: Upload Benchmark

View File

@@ -7,7 +7,7 @@ jobs:
strategy: strategy:
matrix: matrix:
os: [ubuntu-latest] os: [ubuntu-latest]
go-version: [1.13.x, 1.14.x, 1.15.x] go-version: [1.13.x, 1.14.x, 1.15.x, 1.16.x]
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
services: services:
redis: redis:

View File

@@ -7,6 +7,36 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.17.2] - 2021-06-06
### Fixed
- Free unique lock when task is deleted (https://github.com/hibiken/asynq/issues/275).
## [0.17.1] - 2021-04-04
### Fixed
- Fix bug in internal `RDB.memoryUsage` method.
## [0.17.0] - 2021-03-24
### Added
- `DialTimeout`, `ReadTimeout`, and `WriteTimeout` options are added to `RedisConnOpt`.
## [0.16.1] - 2021-03-20
### Fixed
- Replace `KEYS` command with `SCAN` as recommended by [redis doc](https://redis.io/commands/KEYS).
## [0.16.0] - 2021-03-10
### Added
- `Unregister` method is added to `Scheduler` to remove a registered entry.
## [0.15.0] - 2021-01-31 ## [0.15.0] - 2021-01-31
**IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq" **IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq"

View File

@@ -1,14 +1,14 @@
# Asynq <img src="https://user-images.githubusercontent.com/11155743/114697792-ffbfa580-9d26-11eb-8e5b-33bef69476dc.png" alt="Asynq logo" width="360px" />
# Simple, reliable & efficient distributed task queue in Go
![Build Status](https://github.com/hibiken/asynq/workflows/build/badge.svg)
[![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq) [![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq)
[![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq) [![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq)
![Build Status](https://github.com/hibiken/asynq/workflows/build/badge.svg)
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)
[![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community) [![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
## Overview Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by [Redis](https://redis.io/) and is designed to be scalable yet easy to get started.
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.
Highlevel overview of how Asynq works: Highlevel overview of how Asynq works:
@@ -16,16 +16,11 @@ Highlevel overview of how Asynq works:
- Server pulls task off queues and starts a worker goroutine for each task - Server pulls task off queues and starts a worker goroutine for each task
- Tasks are processed concurrently by multiple workers - Tasks are processed concurrently by multiple workers
Task queues are used as a mechanism to distribute work across multiple machines. Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
![Task Queue Diagram](/docs/assets/overview.png) **Example use case**
## Stability and Compatibility ![Task Queue Diagram](https://user-images.githubusercontent.com/11155743/116358505-656f5f80-a806-11eb-9c16-94e49dab0f99.jpg)
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.
**Status**: The library is currently undergoing heavy development with frequent, breaking API changes.
## Features ## Features
@@ -44,16 +39,27 @@ A system can consist of multiple worker servers and brokers, giving way to high
- [Periodic Tasks](https://github.com/hibiken/asynq/wiki/Periodic-Tasks) - [Periodic Tasks](https://github.com/hibiken/asynq/wiki/Periodic-Tasks)
- [Support Redis Cluster](https://github.com/hibiken/asynq/wiki/Redis-Cluster) for automatic sharding and high availability - [Support Redis Cluster](https://github.com/hibiken/asynq/wiki/Redis-Cluster) for automatic sharding and high availability
- [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for high availability - [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for high availability
- [Web UI](#web-ui) to inspect and remote-control queues and tasks
- [CLI](#command-line-tool) to inspect and remote-control queues and tasks - [CLI](#command-line-tool) to inspect and remote-control queues and tasks
## Stability and Compatibility
**Status**: The library is currently undergoing **heavy development** with frequent, breaking API changes.
> ☝️ **Important Note**: Current major version is zero (`v0.x.x`) to accomodate rapid development and fast iteration while getting early feedback from users (_feedback on APIs are appreciated!_). The public API could change without a major version update before `v1.0.0` release.
## Quickstart ## Quickstart
First, make sure you are running a Redis server locally. Make sure you have Go installed ([download](https://golang.org/dl/)). Version `1.13` or higher is required.
Initialize your project by creating a folder and then running `go mod init github.com/your/repo` ([learn more](https://blog.golang.org/using-go-modules)) inside the folder. Then install Asynq library with the [`go get`](https://golang.org/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them) command:
```sh ```sh
$ redis-server go get -u github.com/hibiken/asynq
``` ```
Make sure you're running a Redis server locally or from a [Docker](https://hub.docker.com/_/redis) container. Version `3.0` or higher is required.
Next, write a package that encapsulates task creation and task handling. Next, write a package that encapsulates task creation and task handling.
```go ```go
@@ -203,10 +209,9 @@ func main() {
} }
``` ```
Next, start a worker server to process these tasks in the background. Next, start a worker server to process these tasks in the background. To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler. You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`net/http`](https://golang.org/pkg/net/http/) Handler.
```go ```go
package main package main
@@ -247,52 +252,52 @@ func main() {
} }
``` ```
For a more detailed walk-through of the library, see our [Getting Started Guide](https://github.com/hibiken/asynq/wiki/Getting-Started). For a more detailed walk-through of the library, see our [Getting Started](https://github.com/hibiken/asynq/wiki/Getting-Started) guide.
To Learn more about `asynq` features and APIs, see our [Wiki](https://github.com/hibiken/asynq/wiki) and [godoc](https://godoc.org/github.com/hibiken/asynq). To learn more about `asynq` features and APIs, see the package [godoc](https://godoc.org/github.com/hibiken/asynq).
## Web UI
[Asynqmon](https://github.com/hibiken/asynqmon) is a web based tool for monitoring and administrating Asynq queues and tasks.
Here's a few screenshots of the Web UI:
**Queues view**
![Web UI Queues View](https://user-images.githubusercontent.com/11155743/114697016-07327f00-9d26-11eb-808c-0ac841dc888e.png)
**Tasks view**
![Web UI TasksView](https://user-images.githubusercontent.com/11155743/114697070-1f0a0300-9d26-11eb-855c-d3ec263865b7.png)
**Settings and adaptive dark mode**
![Web UI Settings and adaptive dark mode](https://user-images.githubusercontent.com/11155743/114697149-3517c380-9d26-11eb-9f7a-ae2dd00aad5b.png)
For details on how to use the tool, refer to the tool's [README](https://github.com/hibiken/asynqmon#readme).
## Command Line Tool ## Command Line Tool
Asynq ships with a command line tool to inspect the state of queues and tasks. Asynq ships with a command line tool to inspect the state of queues and tasks.
Here's an example of running the `stats` command.
![Gif](/docs/assets/demo.gif)
For details on how to use the tool, refer to the tool's [README](/tools/asynq/README.md).
## Installation
To install `asynq` library, run the following command:
```sh
go get -u github.com/hibiken/asynq
```
To install the CLI tool, run the following command: To install the CLI tool, run the following command:
```sh ```sh
go get -u github.com/hibiken/asynq/tools/asynq go get -u github.com/hibiken/asynq/tools/asynq
``` ```
## Requirements Here's an example of running the `asynq stats` command:
| Dependency | Version | ![Gif](/docs/assets/demo.gif)
| -------------------------- | ------- |
| [Redis](https://redis.io/) | v3.0+ | For details on how to use the tool, refer to the tool's [README](/tools/asynq/README.md).
| [Go](https://golang.org/) | v1.13+ |
## Contributing ## Contributing
We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. We are open to, and grateful for, any contributions (GitHub issues/PRs, feedback on [Gitter channel](https://gitter.im/go-asynq/community), etc) made by the community.
Please see the [Contribution Guide](/CONTRIBUTING.md) before contributing. Please see the [Contribution Guide](/CONTRIBUTING.md) before contributing.
## Acknowledgements
- [Sidekiq](https://github.com/mperham/sidekiq) : Many of the design ideas are taken from sidekiq and its Web UI
- [RQ](https://github.com/rq/rq) : Client APIs are inspired by rq library.
- [Cobra](https://github.com/spf13/cobra) : Asynq CLI is built with cobra
## License ## License
Asynq is released under the MIT license. See [LICENSE](https://github.com/hibiken/asynq/blob/master/LICENSE). Copyright (c) 2019-present [Ken Hibino](https://github.com/hibiken) and [Contributors](https://github.com/hibiken/asynq/graphs/contributors). `Asynq` is free and open-source software licensed under the [MIT License](https://github.com/hibiken/asynq/blob/master/LICENSE). Official logo was created by [Vic Shóstak](https://github.com/koddr) and distributed under [Creative Commons](https://creativecommons.org/publicdomain/zero/1.0/) license (CC0 1.0 Universal).

View File

@@ -10,6 +10,7 @@ import (
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
) )
@@ -68,6 +69,26 @@ type RedisClientOpt struct {
// See: https://redis.io/commands/select. // See: https://redis.io/commands/select.
DB int DB int
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads.
// If timeout is reached, read commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes.
// If timeout is reached, write commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is ReadTimout.
WriteTimeout time.Duration
// Maximum number of socket connections. // Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU. // Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int PoolSize int
@@ -84,6 +105,9 @@ func (opt RedisClientOpt) MakeRedisClient() interface{} {
Username: opt.Username, Username: opt.Username,
Password: opt.Password, Password: opt.Password,
DB: opt.DB, DB: opt.DB,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
}) })
@@ -116,6 +140,26 @@ type RedisFailoverClientOpt struct {
// See: https://redis.io/commands/select. // See: https://redis.io/commands/select.
DB int DB int
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads.
// If timeout is reached, read commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes.
// If timeout is reached, write commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is ReadTimeout
WriteTimeout time.Duration
// Maximum number of socket connections. // Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU. // Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int PoolSize int
@@ -133,12 +177,15 @@ func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} {
Username: opt.Username, Username: opt.Username,
Password: opt.Password, Password: opt.Password,
DB: opt.DB, DB: opt.DB,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize, PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
}) })
} }
// RedisFailoverClientOpt is used to creates a redis client that connects to // RedisClusterClientOpt is used to creates a redis client that connects to
// redis cluster. // redis cluster.
type RedisClusterClientOpt struct { type RedisClusterClientOpt struct {
// A seed list of host:port addresses of cluster nodes. // A seed list of host:port addresses of cluster nodes.
@@ -157,6 +204,26 @@ type RedisClusterClientOpt struct {
// See: https://redis.io/commands/auth. // See: https://redis.io/commands/auth.
Password string Password string
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads.
// If timeout is reached, read commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes.
// If timeout is reached, write commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is ReadTimeout.
WriteTimeout time.Duration
// TLS Config used to connect to a server. // TLS Config used to connect to a server.
// TLS will be negotiated only if this field is set. // TLS will be negotiated only if this field is set.
TLSConfig *tls.Config TLSConfig *tls.Config
@@ -168,6 +235,9 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
MaxRedirects: opt.MaxRedirects, MaxRedirects: opt.MaxRedirects,
Username: opt.Username, Username: opt.Username,
Password: opt.Password, Password: opt.Password,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
TLSConfig: opt.TLSConfig, TLSConfig: opt.TLSConfig,
}) })
} }

Binary file not shown.

After

Width:  |  Height:  |  Size: 279 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 347 KiB

View File

@@ -19,7 +19,7 @@ import (
) )
// Version of asynq library and CLI. // Version of asynq library and CLI.
const Version = "0.15.0" const Version = "0.17.2"
// DefaultQueueName is the queue name used if none are specified by user. // DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default" const DefaultQueueName = "default"

View File

@@ -0,0 +1,266 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package rdb
import (
"fmt"
"testing"
"time"
"github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
func BenchmarkEnqueue(b *testing.B) {
r := setup(b)
msg := asynqtest.NewTaskMessage("task1", nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
b.StartTimer()
if err := r.Enqueue(msg); err != nil {
b.Fatalf("Enqueue failed: %v", err)
}
}
}
func BenchmarkEnqueueUnique(b *testing.B) {
r := setup(b)
msg := &base.TaskMessage{
Type: "task1",
Payload: nil,
Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey("default", "task1", nil),
}
uniqueTTL := 5 * time.Minute
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
b.StartTimer()
if err := r.EnqueueUnique(msg, uniqueTTL); err != nil {
b.Fatalf("EnqueueUnique failed: %v", err)
}
}
}
func BenchmarkSchedule(b *testing.B) {
r := setup(b)
msg := asynqtest.NewTaskMessage("task1", nil)
processAt := time.Now().Add(3 * time.Minute)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
b.StartTimer()
if err := r.Schedule(msg, processAt); err != nil {
b.Fatalf("Schedule failed: %v", err)
}
}
}
func BenchmarkScheduleUnique(b *testing.B) {
r := setup(b)
msg := &base.TaskMessage{
Type: "task1",
Payload: nil,
Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey("default", "task1", nil),
}
processAt := time.Now().Add(3 * time.Minute)
uniqueTTL := 5 * time.Minute
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
b.StartTimer()
if err := r.ScheduleUnique(msg, processAt, uniqueTTL); err != nil {
b.Fatalf("EnqueueUnique failed: %v", err)
}
}
}
func BenchmarkDequeueSingleQueue(b *testing.B) {
r := setup(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
for i := 0; i < 10; i++ {
m := asynqtest.NewTaskMessageWithQueue(
fmt.Sprintf("task%d", i), nil, base.DefaultQueueName)
if err := r.Enqueue(m); err != nil {
b.Fatalf("Enqueue failed: %v", err)
}
}
b.StartTimer()
if _, _, err := r.Dequeue(base.DefaultQueueName); err != nil {
b.Fatalf("Dequeue failed: %v", err)
}
}
}
func BenchmarkDequeueMultipleQueues(b *testing.B) {
qnames := []string{"critical", "default", "low"}
r := setup(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
for i := 0; i < 10; i++ {
for _, qname := range qnames {
m := asynqtest.NewTaskMessageWithQueue(
fmt.Sprintf("%s_task%d", qname, i), nil, qname)
if err := r.Enqueue(m); err != nil {
b.Fatalf("Enqueue failed: %v", err)
}
}
}
b.StartTimer()
if _, _, err := r.Dequeue(qnames...); err != nil {
b.Fatalf("Dequeue failed: %v", err)
}
}
}
func BenchmarkDone(b *testing.B) {
r := setup(b)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
msgs := []*base.TaskMessage{m1, m2, m3}
zs := []base.Z{
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.Done(msgs[0]); err != nil {
b.Fatalf("Done failed: %v", err)
}
}
}
func BenchmarkRetry(b *testing.B) {
r := setup(b)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
msgs := []*base.TaskMessage{m1, m2, m3}
zs := []base.Z{
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.Retry(msgs[0], time.Now().Add(1*time.Minute), "error"); err != nil {
b.Fatalf("Retry failed: %v", err)
}
}
}
func BenchmarkArchive(b *testing.B) {
r := setup(b)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
msgs := []*base.TaskMessage{m1, m2, m3}
zs := []base.Z{
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.Archive(msgs[0], "error"); err != nil {
b.Fatalf("Archive failed: %v", err)
}
}
}
func BenchmarkRequeue(b *testing.B) {
r := setup(b)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
msgs := []*base.TaskMessage{m1, m2, m3}
zs := []base.Z{
{Message: m1, Score: time.Now().Add(10 * time.Second).Unix()},
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedActiveQueue(b, r.client, msgs, base.DefaultQueueName)
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.Requeue(msgs[0]); err != nil {
b.Fatalf("Requeue failed: %v", err)
}
}
}
func BenchmarkCheckAndEnqueue(b *testing.B) {
r := setup(b)
now := time.Now()
var zs []base.Z
for i := -100; i < 100; i++ {
msg := asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil)
score := now.Add(time.Duration(i) * time.Second).Unix()
zs = append(zs, base.Z{Message: msg, Score: score})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
asynqtest.FlushDB(b, r.client)
asynqtest.SeedScheduledQueue(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.CheckAndEnqueue(base.DefaultQueueName); err != nil {
b.Fatalf("CheckAndEnqueue failed: %v", err)
}
}
}

View File

@@ -172,10 +172,22 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
} }
func (r *RDB) memoryUsage(qname string) (int64, error) { func (r *RDB) memoryUsage(qname string) (int64, error) {
keys, err := r.client.Keys(fmt.Sprintf("asynq:{%s}*", qname)).Result() var (
keys []string
data []string
cursor uint64
err error
)
for {
data, cursor, err = r.client.Scan(cursor, fmt.Sprintf("asynq:{%s}*", qname), 100).Result()
if err != nil { if err != nil {
return 0, err return 0, err
} }
keys = append(keys, data...)
if cursor == 0 {
break
}
}
var usg int64 var usg int64
for _, k := range keys { for _, k := range keys {
n, err := r.client.MemoryUsage(k).Result() n, err := r.client.MemoryUsage(k).Result()
@@ -723,6 +735,11 @@ func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
if n == 0 { if n == 0 {
return ErrTaskNotFound return ErrTaskNotFound
} }
if r.client.Get(msg.UniqueKey).Val() == msg.ID.String() {
if err := r.client.Del(msg.UniqueKey).Err(); err != nil {
return err
}
}
return nil return nil
} }
} }
@@ -735,6 +752,9 @@ for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg) local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg) redis.call("ZREM", KEYS[1], msg)
if redis.call("GET", decoded["UniqueKey"]) == ARGV[2] then
redis.call("DEL", decoded["UniqueKey"])
end
return 1 return 1
end end
end end
@@ -757,9 +777,15 @@ func (r *RDB) deleteTask(key, id string, score float64) error {
// KEYS[1] -> queue to delete // KEYS[1] -> queue to delete
var deleteAllCmd = redis.NewScript(` var deleteAllCmd = redis.NewScript(`
local n = redis.call("ZCARD", KEYS[1]) local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
redis.call("DEL", decoded["UniqueKey"])
end
end
redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[1])
return n`) return table.getn(msgs)`)
// DeleteAllArchivedTasks deletes all archived tasks from the given queue // DeleteAllArchivedTasks deletes all archived tasks from the given queue
// and returns the number of tasks deleted. // and returns the number of tasks deleted.
@@ -793,9 +819,15 @@ func (r *RDB) deleteAll(key string) (int64, error) {
// KEYS[1] -> asynq:{<qname>} // KEYS[1] -> asynq:{<qname>}
var deleteAllPendingCmd = redis.NewScript(` var deleteAllPendingCmd = redis.NewScript(`
local n = redis.call("LLEN", KEYS[1]) local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
redis.call("DEL", decoded["UniqueKey"])
end
end
redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[1])
return n`) return table.getn(msgs)`)
// DeleteAllPendingTasks deletes all pending tasks from the given queue // DeleteAllPendingTasks deletes all pending tasks from the given queue
// and returns the number of tasks deleted. // and returns the number of tasks deleted.

View File

@@ -2445,7 +2445,68 @@ func TestDeleteScheduledTask(t *testing.T) {
} }
} }
func TestDeleteAllDeadTasks(t *testing.T) { func TestDeleteUniqueTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := &base.TaskMessage{
ID: uuid.New(),
Type: "reindex",
Payload: nil,
Timeout: 1800,
Deadline: 0,
UniqueKey: "asynq:{default}:unique:reindex:nil",
Queue: "default",
}
t1 := time.Now().Add(5 * time.Minute)
tests := []struct {
scheduled map[string][]base.Z
qname string
id uuid.UUID
score int64
uniqueKey string
wantScheduled map[string][]*base.TaskMessage
}{
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
},
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
uniqueKey: m1.UniqueKey,
wantScheduled: map[string][]*base.TaskMessage{
"default": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
if err := r.client.SetNX(tc.uniqueKey, tc.id.String(), time.Minute).Err(); err != nil {
t.Fatalf("Could not set unique lock in redis: %v", err)
}
if err := r.DeleteScheduledTask(tc.qname, tc.id, tc.score); err != nil {
t.Errorf("r.DeleteScheduledTask(%q, %v, %v) returned error: %v", tc.qname, tc.id, tc.score, err)
continue
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff)
}
}
if r.client.Exists(tc.uniqueKey).Val() != 0 {
t.Errorf("Uniqueness lock %q still exists", tc.uniqueKey)
}
}
}
func TestDeleteAllArchivedTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
@@ -2507,6 +2568,89 @@ func TestDeleteAllDeadTasks(t *testing.T) {
} }
} }
func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := &base.TaskMessage{
ID: uuid.New(),
Type: "task1",
Payload: nil,
Timeout: 1800,
Deadline: 0,
UniqueKey: "asynq:{default}:unique:task1:nil",
Queue: "default",
}
m2 := &base.TaskMessage{
ID: uuid.New(),
Type: "task2",
Payload: nil,
Timeout: 1800,
Deadline: 0,
UniqueKey: "asynq:{default}:unique:task2:nil",
Queue: "default",
}
m3 := h.NewTaskMessage("task3", nil)
tests := []struct {
archived map[string][]base.Z
qname string
want int64
wantArchived map[string][]*base.TaskMessage
}{
{
archived: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: time.Now().Unix()},
{Message: m3, Score: time.Now().Unix()},
},
},
qname: "default",
want: 3,
wantArchived: map[string][]*base.TaskMessage{
"default": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllArchivedQueues(t, r.client, tc.archived)
var uniqueKeys []string // list of unique keys set in redis
for _, zs := range tc.archived {
for _, z := range zs {
if len(z.Message.UniqueKey) > 0 {
err := r.client.SetNX(z.Message.UniqueKey, z.Message.ID.String(), time.Minute).Err()
if err != nil {
t.Fatalf("Failed to set unique lock in redis: %v", err)
}
uniqueKeys = append(uniqueKeys, z.Message.UniqueKey)
}
}
}
got, err := r.DeleteAllArchivedTasks(tc.qname)
if err != nil {
t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err)
}
if got != tc.want {
t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
}
for qname, want := range tc.wantArchived {
gotArchived := h.GetArchivedMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotArchived, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff)
}
}
for _, uniqueKey := range uniqueKeys {
if r.client.Exists(uniqueKey).Val() != 0 {
t.Errorf("Uniqueness lock %q still exists", uniqueKey)
}
}
}
}
func TestDeleteAllRetryTasks(t *testing.T) { func TestDeleteAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()

View File

@@ -37,12 +37,12 @@ func init() {
flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses") flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses")
} }
func setup(t *testing.T) (r *RDB) { func setup(tb testing.TB) (r *RDB) {
t.Helper() tb.Helper()
if useRedisCluster { if useRedisCluster {
addrs := strings.Split(redisClusterAddrs, ",") addrs := strings.Split(redisClusterAddrs, ",")
if len(addrs) == 0 { if len(addrs) == 0 {
t.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.") tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
} }
r = NewRDB(redis.NewClusterClient(&redis.ClusterOptions{ r = NewRDB(redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs, Addrs: addrs,
@@ -54,7 +54,7 @@ func setup(t *testing.T) (r *RDB) {
})) }))
} }
// Start each test with a clean slate. // Start each test with a clean slate.
h.FlushDB(t, r.client) h.FlushDB(tb, r.client)
return r return r
} }

View File

@@ -30,6 +30,10 @@ type Scheduler struct {
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
errHandler func(task *Task, opts []Option, err error) errHandler func(task *Task, opts []Option, err error)
// idmap maps Scheduler's entry ID to cron.EntryID
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
} }
// NewScheduler returns a new Scheduler instance given the redis connection option. // NewScheduler returns a new Scheduler instance given the redis connection option.
@@ -65,6 +69,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
location: loc, location: loc,
done: make(chan struct{}), done: make(chan struct{}),
errHandler: opts.EnqueueErrorHandler, errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID),
} }
} }
@@ -145,12 +150,25 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
logger: s.logger, logger: s.logger,
errHandler: s.errHandler, errHandler: s.errHandler,
} }
if _, err = s.cron.AddJob(cronspec, job); err != nil { cronID, err := s.cron.AddJob(cronspec, job)
if err != nil {
return "", err return "", err
} }
s.idmap[job.id.String()] = cronID
return job.id.String(), nil return job.id.String(), nil
} }
// Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID.
func (s *Scheduler) Unregister(entryID string) error {
cronID, ok := s.idmap[entryID]
if !ok {
return fmt.Errorf("asynq: no scheduler entry found")
}
s.cron.Remove(cronID)
return nil
}
// Run starts the scheduler until an os signal to exit the program is received. // Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been stopped. // It returns an error if scheduler is already running or has been stopped.
func (s *Scheduler) Run() error { func (s *Scheduler) Run() error {

View File

@@ -14,7 +14,7 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
func TestScheduler(t *testing.T) { func TestSchedulerRegister(t *testing.T) {
tests := []struct { tests := []struct {
cronspec string cronspec string
task *Task task *Task
@@ -116,3 +116,47 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
} }
mu.Unlock() mu.Unlock()
} }
func TestSchedulerUnregister(t *testing.T) {
tests := []struct {
cronspec string
task *Task
opts []Option
wait time.Duration
queue string
}{
{
cronspec: "@every 3s",
task: NewTask("task1", nil),
opts: []Option{MaxRetry(10)},
wait: 10 * time.Second,
queue: "default",
},
}
r := setup(t)
for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...)
if err != nil {
t.Fatal(err)
}
if err := scheduler.Unregister(entryID); err != nil {
t.Fatal(err)
}
if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
if err := scheduler.Stop(); err != nil {
t.Fatal(err)
}
got := asynqtest.GetPendingMessages(t, r, tc.queue)
if len(got) != 0 {
t.Errorf("%d tasks were enqueued, want zero", len(got))
}
}
}

View File

@@ -70,7 +70,7 @@ func TestServerRun(t *testing.T) {
go func() { go func() {
select { select {
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Fatal("server did not stop after receiving TERM signal") panic("server did not stop after receiving TERM signal")
case <-done: case <-done:
} }
}() }()