mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-12 09:35:51 +08:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2516c4baba | ||
|
|
ebe482a65c | ||
|
|
3e9fc2f972 | ||
|
|
63ce9ed0f9 | ||
|
|
32d3f329b9 | ||
|
|
544c301a8b |
12
CHANGELOG.md
12
CHANGELOG.md
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.17.2] - 2021-06-06
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- Free unique lock when task is deleted (https://github.com/hibiken/asynq/issues/275).
|
||||||
|
|
||||||
|
## [0.17.1] - 2021-04-04
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- Fix bug in internal `RDB.memoryUsage` method.
|
||||||
|
|
||||||
## [0.17.0] - 2021-03-24
|
## [0.17.0] - 2021-03-24
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|||||||
93
README.md
93
README.md
@@ -1,14 +1,14 @@
|
|||||||
# Asynq
|
<img src="https://user-images.githubusercontent.com/11155743/114697792-ffbfa580-9d26-11eb-8e5b-33bef69476dc.png" alt="Asynq logo" width="360px" />
|
||||||
|
|
||||||
|
# Simple, reliable & efficient distributed task queue in Go
|
||||||
|
|
||||||

|
|
||||||
[](https://godoc.org/github.com/hibiken/asynq)
|
[](https://godoc.org/github.com/hibiken/asynq)
|
||||||
[](https://goreportcard.com/report/github.com/hibiken/asynq)
|
[](https://goreportcard.com/report/github.com/hibiken/asynq)
|
||||||
|

|
||||||
[](https://opensource.org/licenses/MIT)
|
[](https://opensource.org/licenses/MIT)
|
||||||
[](https://gitter.im/go-asynq/community)
|
[](https://gitter.im/go-asynq/community)
|
||||||
|
|
||||||
## Overview
|
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by [Redis](https://redis.io/) and is designed to be scalable yet easy to get started.
|
||||||
|
|
||||||
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.
|
|
||||||
|
|
||||||
Highlevel overview of how Asynq works:
|
Highlevel overview of how Asynq works:
|
||||||
|
|
||||||
@@ -16,16 +16,11 @@ Highlevel overview of how Asynq works:
|
|||||||
- Server pulls task off queues and starts a worker goroutine for each task
|
- Server pulls task off queues and starts a worker goroutine for each task
|
||||||
- Tasks are processed concurrently by multiple workers
|
- Tasks are processed concurrently by multiple workers
|
||||||
|
|
||||||
Task queues are used as a mechanism to distribute work across multiple machines.
|
Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
|
||||||
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
|
|
||||||
|
|
||||||

|
**Example use case**
|
||||||
|
|
||||||
## Stability and Compatibility
|

|
||||||
|
|
||||||
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.
|
|
||||||
|
|
||||||
**Status**: The library is currently undergoing heavy development with frequent, breaking API changes.
|
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
@@ -47,14 +42,24 @@ A system can consist of multiple worker servers and brokers, giving way to high
|
|||||||
- [Web UI](#web-ui) to inspect and remote-control queues and tasks
|
- [Web UI](#web-ui) to inspect and remote-control queues and tasks
|
||||||
- [CLI](#command-line-tool) to inspect and remote-control queues and tasks
|
- [CLI](#command-line-tool) to inspect and remote-control queues and tasks
|
||||||
|
|
||||||
|
## Stability and Compatibility
|
||||||
|
|
||||||
|
**Status**: The library is currently undergoing **heavy development** with frequent, breaking API changes.
|
||||||
|
|
||||||
|
> ☝️ **Important Note**: Current major version is zero (`v0.x.x`) to accomodate rapid development and fast iteration while getting early feedback from users (_feedback on APIs are appreciated!_). The public API could change without a major version update before `v1.0.0` release.
|
||||||
|
|
||||||
## Quickstart
|
## Quickstart
|
||||||
|
|
||||||
First, make sure you are running a Redis server locally.
|
Make sure you have Go installed ([download](https://golang.org/dl/)). Version `1.13` or higher is required.
|
||||||
|
|
||||||
|
Initialize your project by creating a folder and then running `go mod init github.com/your/repo` ([learn more](https://blog.golang.org/using-go-modules)) inside the folder. Then install Asynq library with the [`go get`](https://golang.org/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them) command:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
$ redis-server
|
go get -u github.com/hibiken/asynq
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Make sure you're running a Redis server locally or from a [Docker](https://hub.docker.com/_/redis) container. Version `3.0` or higher is required.
|
||||||
|
|
||||||
Next, write a package that encapsulates task creation and task handling.
|
Next, write a package that encapsulates task creation and task handling.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
@@ -204,10 +209,9 @@ func main() {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Next, start a worker server to process these tasks in the background.
|
Next, start a worker server to process these tasks in the background. To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
|
||||||
To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
|
|
||||||
|
|
||||||
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler.
|
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`net/http`](https://golang.org/pkg/net/http/) Handler.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
package main
|
package main
|
||||||
@@ -248,65 +252,52 @@ func main() {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
For a more detailed walk-through of the library, see our [Getting Started Guide](https://github.com/hibiken/asynq/wiki/Getting-Started).
|
For a more detailed walk-through of the library, see our [Getting Started](https://github.com/hibiken/asynq/wiki/Getting-Started) guide.
|
||||||
|
|
||||||
To Learn more about `asynq` features and APIs, see our [Wiki](https://github.com/hibiken/asynq/wiki) and [godoc](https://godoc.org/github.com/hibiken/asynq).
|
To learn more about `asynq` features and APIs, see the package [godoc](https://godoc.org/github.com/hibiken/asynq).
|
||||||
|
|
||||||
## Web UI
|
## Web UI
|
||||||
|
|
||||||
[Asynqmon](https://github.com/hibiken/asynqmon) is a web based tool for monitoring and administrating Asynq queues and tasks.
|
[Asynqmon](https://github.com/hibiken/asynqmon) is a web based tool for monitoring and administrating Asynq queues and tasks.
|
||||||
Please see the tool's [README](https://github.com/hibiken/asynqmon) for details.
|
|
||||||
|
|
||||||
Here's a few screenshots of the web UI.
|
Here's a few screenshots of the Web UI:
|
||||||
|
|
||||||
**Queues view**
|
**Queues view**
|
||||||

|
|
||||||
|

|
||||||
|
|
||||||
**Tasks view**
|
**Tasks view**
|
||||||

|
|
||||||
|

|
||||||
|
|
||||||
|
**Settings and adaptive dark mode**
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
For details on how to use the tool, refer to the tool's [README](https://github.com/hibiken/asynqmon#readme).
|
||||||
|
|
||||||
## Command Line Tool
|
## Command Line Tool
|
||||||
|
|
||||||
Asynq ships with a command line tool to inspect the state of queues and tasks.
|
Asynq ships with a command line tool to inspect the state of queues and tasks.
|
||||||
|
|
||||||
Here's an example of running the `stats` command.
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
For details on how to use the tool, refer to the tool's [README](/tools/asynq/README.md).
|
|
||||||
|
|
||||||
## Installation
|
|
||||||
|
|
||||||
To install `asynq` library, run the following command:
|
|
||||||
|
|
||||||
```sh
|
|
||||||
go get -u github.com/hibiken/asynq
|
|
||||||
```
|
|
||||||
|
|
||||||
To install the CLI tool, run the following command:
|
To install the CLI tool, run the following command:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
go get -u github.com/hibiken/asynq/tools/asynq
|
go get -u github.com/hibiken/asynq/tools/asynq
|
||||||
```
|
```
|
||||||
|
|
||||||
## Requirements
|
Here's an example of running the `asynq stats` command:
|
||||||
|
|
||||||
| Dependency | Version |
|

|
||||||
| -------------------------- | ------- |
|
|
||||||
| [Redis](https://redis.io/) | v3.0+ |
|
For details on how to use the tool, refer to the tool's [README](/tools/asynq/README.md).
|
||||||
| [Go](https://golang.org/) | v1.13+ |
|
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community.
|
We are open to, and grateful for, any contributions (GitHub issues/PRs, feedback on [Gitter channel](https://gitter.im/go-asynq/community), etc) made by the community.
|
||||||
|
|
||||||
Please see the [Contribution Guide](/CONTRIBUTING.md) before contributing.
|
Please see the [Contribution Guide](/CONTRIBUTING.md) before contributing.
|
||||||
|
|
||||||
## Acknowledgements
|
|
||||||
|
|
||||||
- [Sidekiq](https://github.com/mperham/sidekiq) : Many of the design ideas are taken from sidekiq and its Web UI
|
|
||||||
- [RQ](https://github.com/rq/rq) : Client APIs are inspired by rq library.
|
|
||||||
- [Cobra](https://github.com/spf13/cobra) : Asynq CLI is built with cobra
|
|
||||||
|
|
||||||
## License
|
## License
|
||||||
|
|
||||||
Asynq is released under the MIT license. See [LICENSE](https://github.com/hibiken/asynq/blob/master/LICENSE).
|
Copyright (c) 2019-present [Ken Hibino](https://github.com/hibiken) and [Contributors](https://github.com/hibiken/asynq/graphs/contributors). `Asynq` is free and open-source software licensed under the [MIT License](https://github.com/hibiken/asynq/blob/master/LICENSE). Official logo was created by [Vic Shóstak](https://github.com/koddr) and distributed under [Creative Commons](https://creativecommons.org/publicdomain/zero/1.0/) license (CC0 1.0 Universal).
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Version of asynq library and CLI.
|
// Version of asynq library and CLI.
|
||||||
const Version = "0.17.0"
|
const Version = "0.17.2"
|
||||||
|
|
||||||
// DefaultQueueName is the queue name used if none are specified by user.
|
// DefaultQueueName is the queue name used if none are specified by user.
|
||||||
const DefaultQueueName = "default"
|
const DefaultQueueName = "default"
|
||||||
|
|||||||
@@ -172,10 +172,14 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *RDB) memoryUsage(qname string) (int64, error) {
|
func (r *RDB) memoryUsage(qname string) (int64, error) {
|
||||||
var cursor uint64
|
var (
|
||||||
var keys []string
|
keys []string
|
||||||
|
data []string
|
||||||
|
cursor uint64
|
||||||
|
err error
|
||||||
|
)
|
||||||
for {
|
for {
|
||||||
data, cursor, err := r.client.Scan(cursor, fmt.Sprintf("asynq:{%s}*", qname), 100).Result()
|
data, cursor, err = r.client.Scan(cursor, fmt.Sprintf("asynq:{%s}*", qname), 100).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -731,6 +735,11 @@ func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
|
|||||||
if n == 0 {
|
if n == 0 {
|
||||||
return ErrTaskNotFound
|
return ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
if r.client.Get(msg.UniqueKey).Val() == msg.ID.String() {
|
||||||
|
if err := r.client.Del(msg.UniqueKey).Err(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -743,6 +752,9 @@ for _, msg in ipairs(msgs) do
|
|||||||
local decoded = cjson.decode(msg)
|
local decoded = cjson.decode(msg)
|
||||||
if decoded["ID"] == ARGV[2] then
|
if decoded["ID"] == ARGV[2] then
|
||||||
redis.call("ZREM", KEYS[1], msg)
|
redis.call("ZREM", KEYS[1], msg)
|
||||||
|
if redis.call("GET", decoded["UniqueKey"]) == ARGV[2] then
|
||||||
|
redis.call("DEL", decoded["UniqueKey"])
|
||||||
|
end
|
||||||
return 1
|
return 1
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@@ -765,9 +777,15 @@ func (r *RDB) deleteTask(key, id string, score float64) error {
|
|||||||
|
|
||||||
// KEYS[1] -> queue to delete
|
// KEYS[1] -> queue to delete
|
||||||
var deleteAllCmd = redis.NewScript(`
|
var deleteAllCmd = redis.NewScript(`
|
||||||
local n = redis.call("ZCARD", KEYS[1])
|
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
|
||||||
|
for _, msg in ipairs(msgs) do
|
||||||
|
local decoded = cjson.decode(msg)
|
||||||
|
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
|
||||||
|
redis.call("DEL", decoded["UniqueKey"])
|
||||||
|
end
|
||||||
|
end
|
||||||
redis.call("DEL", KEYS[1])
|
redis.call("DEL", KEYS[1])
|
||||||
return n`)
|
return table.getn(msgs)`)
|
||||||
|
|
||||||
// DeleteAllArchivedTasks deletes all archived tasks from the given queue
|
// DeleteAllArchivedTasks deletes all archived tasks from the given queue
|
||||||
// and returns the number of tasks deleted.
|
// and returns the number of tasks deleted.
|
||||||
@@ -801,9 +819,15 @@ func (r *RDB) deleteAll(key string) (int64, error) {
|
|||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}
|
// KEYS[1] -> asynq:{<qname>}
|
||||||
var deleteAllPendingCmd = redis.NewScript(`
|
var deleteAllPendingCmd = redis.NewScript(`
|
||||||
local n = redis.call("LLEN", KEYS[1])
|
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
|
||||||
|
for _, msg in ipairs(msgs) do
|
||||||
|
local decoded = cjson.decode(msg)
|
||||||
|
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
|
||||||
|
redis.call("DEL", decoded["UniqueKey"])
|
||||||
|
end
|
||||||
|
end
|
||||||
redis.call("DEL", KEYS[1])
|
redis.call("DEL", KEYS[1])
|
||||||
return n`)
|
return table.getn(msgs)`)
|
||||||
|
|
||||||
// DeleteAllPendingTasks deletes all pending tasks from the given queue
|
// DeleteAllPendingTasks deletes all pending tasks from the given queue
|
||||||
// and returns the number of tasks deleted.
|
// and returns the number of tasks deleted.
|
||||||
|
|||||||
@@ -2445,7 +2445,68 @@ func TestDeleteScheduledTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteAllDeadTasks(t *testing.T) {
|
func TestDeleteUniqueTask(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
m1 := &base.TaskMessage{
|
||||||
|
ID: uuid.New(),
|
||||||
|
Type: "reindex",
|
||||||
|
Payload: nil,
|
||||||
|
Timeout: 1800,
|
||||||
|
Deadline: 0,
|
||||||
|
UniqueKey: "asynq:{default}:unique:reindex:nil",
|
||||||
|
Queue: "default",
|
||||||
|
}
|
||||||
|
t1 := time.Now().Add(5 * time.Minute)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
scheduled map[string][]base.Z
|
||||||
|
qname string
|
||||||
|
id uuid.UUID
|
||||||
|
score int64
|
||||||
|
uniqueKey string
|
||||||
|
wantScheduled map[string][]*base.TaskMessage
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
scheduled: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
{Message: m1, Score: t1.Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
id: m1.ID,
|
||||||
|
score: t1.Unix(),
|
||||||
|
uniqueKey: m1.UniqueKey,
|
||||||
|
wantScheduled: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
|
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||||
|
if err := r.client.SetNX(tc.uniqueKey, tc.id.String(), time.Minute).Err(); err != nil {
|
||||||
|
t.Fatalf("Could not set unique lock in redis: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.DeleteScheduledTask(tc.qname, tc.id, tc.score); err != nil {
|
||||||
|
t.Errorf("r.DeleteScheduledTask(%q, %v, %v) returned error: %v", tc.qname, tc.id, tc.score, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for qname, want := range tc.wantScheduled {
|
||||||
|
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
|
||||||
|
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if r.client.Exists(tc.uniqueKey).Val() != 0 {
|
||||||
|
t.Errorf("Uniqueness lock %q still exists", tc.uniqueKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func TestDeleteAllArchivedTasks(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
m1 := h.NewTaskMessage("task1", nil)
|
m1 := h.NewTaskMessage("task1", nil)
|
||||||
@@ -2507,6 +2568,89 @@ func TestDeleteAllDeadTasks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
m1 := &base.TaskMessage{
|
||||||
|
ID: uuid.New(),
|
||||||
|
Type: "task1",
|
||||||
|
Payload: nil,
|
||||||
|
Timeout: 1800,
|
||||||
|
Deadline: 0,
|
||||||
|
UniqueKey: "asynq:{default}:unique:task1:nil",
|
||||||
|
Queue: "default",
|
||||||
|
}
|
||||||
|
m2 := &base.TaskMessage{
|
||||||
|
ID: uuid.New(),
|
||||||
|
Type: "task2",
|
||||||
|
Payload: nil,
|
||||||
|
Timeout: 1800,
|
||||||
|
Deadline: 0,
|
||||||
|
UniqueKey: "asynq:{default}:unique:task2:nil",
|
||||||
|
Queue: "default",
|
||||||
|
}
|
||||||
|
m3 := h.NewTaskMessage("task3", nil)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
archived map[string][]base.Z
|
||||||
|
qname string
|
||||||
|
want int64
|
||||||
|
wantArchived map[string][]*base.TaskMessage
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
archived: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
{Message: m1, Score: time.Now().Unix()},
|
||||||
|
{Message: m2, Score: time.Now().Unix()},
|
||||||
|
{Message: m3, Score: time.Now().Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
want: 3,
|
||||||
|
wantArchived: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
|
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||||
|
var uniqueKeys []string // list of unique keys set in redis
|
||||||
|
for _, zs := range tc.archived {
|
||||||
|
for _, z := range zs {
|
||||||
|
if len(z.Message.UniqueKey) > 0 {
|
||||||
|
err := r.client.SetNX(z.Message.UniqueKey, z.Message.ID.String(), time.Minute).Err()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to set unique lock in redis: %v", err)
|
||||||
|
}
|
||||||
|
uniqueKeys = append(uniqueKeys, z.Message.UniqueKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := r.DeleteAllArchivedTasks(tc.qname)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err)
|
||||||
|
}
|
||||||
|
if got != tc.want {
|
||||||
|
t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
|
||||||
|
}
|
||||||
|
for qname, want := range tc.wantArchived {
|
||||||
|
gotArchived := h.GetArchivedMessages(t, r.client, qname)
|
||||||
|
if diff := cmp.Diff(want, gotArchived, h.SortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, uniqueKey := range uniqueKeys {
|
||||||
|
if r.client.Exists(uniqueKey).Val() != 0 {
|
||||||
|
t.Errorf("Uniqueness lock %q still exists", uniqueKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDeleteAllRetryTasks(t *testing.T) {
|
func TestDeleteAllRetryTasks(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|||||||
Reference in New Issue
Block a user