2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-18 00:26:08 +08:00

Compare commits

..

2 Commits

Author SHA1 Message Date
Trịnh Đức Bảo Linh(Kevin)
92973d6add feat(*): update CHANGELOG.md file (#708)
* error panic handling

* updated CHANGELOG.md file

* correct msg panic error (#5)

* fix(typo): delete-all to deleteall (#827)

* typo: delete-all to deleteall

* docs: update tools/asynq/README.md

* fix archiveall runall

---------

Co-authored-by: Mohamed Sohail <sohailsameja@gmail.com>

* Fixed go:build for BSD

* chore: fix function names in comment

Signed-off-by: camcui <cuishua@sina.cn>

---------

Signed-off-by: camcui <cuishua@sina.cn>
Co-authored-by: crazyoptimist <me@crazyoptimist.net>
Co-authored-by: Mohamed Sohail <sohailsameja@gmail.com>
Co-authored-by: mrusme <marius@xn--gckvb8fzb.com>
Co-authored-by: camcui <cuishua@sina.cn>
2024-05-06 14:12:42 +08:00
Mohammed Sohail
4c5202ee13 docs (CHANGELOG): IsPanicError function (closes #708) 2024-01-29 11:33:46 +03:00
36 changed files with 182 additions and 798 deletions

View File

@@ -18,7 +18,4 @@ updates:
interval: "weekly"
labels:
- "pr-deps"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "weekly"

View File

@@ -18,9 +18,9 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
uses: actions/setup-go@v4
with:
go-version: 1.23.x
go-version: 1.21.x
- name: Benchmark
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a new.txt
- name: Upload Benchmark
@@ -42,9 +42,9 @@ jobs:
with:
ref: master
- name: Set up Go
uses: actions/setup-go@v5
uses: actions/setup-go@v4
with:
go-version: 1.23.x
go-version: 1.21.x
- name: Benchmark
run: go test -run=^$ -bench=. -count=5 -timeout=60m ./... | tee -a old.txt
- name: Upload Benchmark
@@ -60,9 +60,9 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
uses: actions/setup-go@v4
with:
go-version: 1.23.x
go-version: 1.21.x
- name: Install benchstat
run: go get -u golang.org/x/perf/cmd/benchstat
- name: Download Incoming

View File

@@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.22.x, 1.23.x]
go-version: [1.20.x, 1.21.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -18,7 +18,7 @@ jobs:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
cache: false
@@ -45,7 +45,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.22.x, 1.23.x]
go-version: [1.20.x, 1.21.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -56,7 +56,7 @@ jobs:
- uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
cache: false
@@ -67,17 +67,3 @@ jobs:
- name: Test tools module
run: cd tools && go test -race -v ./... && cd ..
golangci:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: stable
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.61

View File

@@ -7,10 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.25.0] - 2023-01-02
### Added
- Added configuration for Janitor's Interval and Deletion Batch Size (PR: https://github.com/hibiken/asynq/pull/715)
- `IsPanicError` function is added to support catching of panic errors when processing tasks (PR: https://github.com/hibiken/asynq/pull/491)
## [0.24.1] - 2023-05-01

View File

@@ -4,8 +4,4 @@ proto: internal/proto/asynq.proto
protoc -I=$(ROOT_DIR)/internal/proto \
--go_out=$(ROOT_DIR)/internal/proto \
--go_opt=module=github.com/hibiken/asynq/internal/proto \
$(ROOT_DIR)/internal/proto/asynq.proto
.PHONY: lint
lint:
golangci-lint run
$(ROOT_DIR)/internal/proto/asynq.proto

View File

@@ -316,9 +316,6 @@ type RedisFailoverClientOpt struct {
// https://redis.io/topics/sentinel.
SentinelAddrs []string
// Redis sentinel username.
SentinelUsername string
// Redis sentinel password.
SentinelPassword string
@@ -367,7 +364,6 @@ func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} {
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: opt.MasterName,
SentinelAddrs: opt.SentinelAddrs,
SentinelUsername: opt.SentinelUsername,
SentinelPassword: opt.SentinelPassword,
Username: opt.Username,
Password: opt.Password,

View File

@@ -55,7 +55,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
}
b.StartTimer() // end setup
_ = srv.Start(HandlerFunc(handler))
srv.Start(HandlerFunc(handler))
wg.Wait()
b.StopTimer() // begin teardown
@@ -117,7 +117,7 @@ func BenchmarkEndToEnd(b *testing.B) {
}
b.StartTimer() // end setup
_ = srv.Start(HandlerFunc(handler))
srv.Start(HandlerFunc(handler))
wg.Wait()
b.StopTimer() // begin teardown
@@ -174,7 +174,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
}
b.StartTimer() // end setup
_ = srv.Start(HandlerFunc(handler))
srv.Start(HandlerFunc(handler))
wg.Wait()
b.StopTimer() // begin teardown
@@ -215,7 +215,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
handler := func(ctx context.Context, t *Task) error {
return nil
}
_ = srv.Start(HandlerFunc(handler))
srv.Start(HandlerFunc(handler))
b.StartTimer() // end setup

View File

@@ -10,11 +10,11 @@ import (
"strings"
"time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)
// A Client is responsible for scheduling tasks.
@@ -25,26 +25,15 @@ import (
// Clients are safe for concurrent use by multiple goroutines.
type Client struct {
broker base.Broker
// When a Client has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}
// NewClient returns a new Client instance given a redis connection option.
func NewClient(r RedisConnOpt) *Client {
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
client := NewClientFromRedisClient(redisClient)
client.sharedConnection = false
return client
}
// NewClientFromRedisClient returns a new instance of Client given a redis.UniversalClient
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewClientFromRedisClient(c redis.UniversalClient) *Client {
return &Client{broker: rdb.NewRDB(c), sharedConnection: true}
return &Client{broker: rdb.NewRDB(c)}
}
type OptionType int
@@ -161,9 +150,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
// TTL duration must be greater than or equal to 1 second.
//
// Uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
// - Task Type
// - Task Payload
// - Queue Name
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
@@ -318,9 +307,6 @@ var (
// Close closes the connection with redis.
func (c *Client) Close() error {
if c.sharedConnection {
return fmt.Errorf("redis connection is shared so the Client can't be closed through asynq")
}
return c.broker.Close()
}
@@ -419,11 +405,6 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
return newTaskInfo(msg, state, opt.processAt, nil), nil
}
// Ping performs a ping against the redis connection.
func (c *Client) Ping() error {
return c.broker.Ping()
}
func (c *Client) enqueue(ctx context.Context, msg *base.TaskMessage, uniqueTTL time.Duration) error {
if uniqueTTL > 0 {
return c.broker.EnqueueUnique(ctx, msg, uniqueTTL)
@@ -433,7 +414,7 @@ func (c *Client) enqueue(ctx context.Context, msg *base.TaskMessage, uniqueTTL t
func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
if uniqueTTL > 0 {
ttl := time.Until(t.Add(uniqueTTL))
ttl := t.Add(uniqueTTL).Sub(time.Now())
return c.broker.ScheduleUnique(ctx, msg, t, ttl)
}
return c.broker.Schedule(ctx, msg, t)

View File

@@ -14,7 +14,6 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/base"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
)
func TestClientEnqueueWithProcessAtOption(t *testing.T) {
@@ -144,7 +143,11 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
}
}
func testClientEnqueue(t *testing.T, client *Client, r redis.UniversalClient) {
func TestClientEnqueue(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
now := time.Now()
@@ -475,24 +478,6 @@ func testClientEnqueue(t *testing.T, client *Client, r redis.UniversalClient) {
}
}
func TestClientEnqueue(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
testClientEnqueue(t, client, r)
}
func TestClientFromRedisClientEnqueue(t *testing.T) {
r := setup(t)
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
client := NewClientFromRedisClient(redisClient)
testClientEnqueue(t, client, r)
err := client.Close()
if err == nil {
t.Error("client.Close() should have failed because of a shared client but it didn't")
}
}
func TestClientEnqueueWithGroupOption(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
@@ -556,11 +541,11 @@ func TestClientEnqueueWithGroupOption(t *testing.T) {
},
},
{
desc: "With Group and ProcessAt options",
desc: "With Group and ProcessIn options",
task: task,
opts: []Option{
Group("mygroup"),
ProcessAt(now.Add(30 * time.Minute)),
ProcessIn(30 * time.Minute),
},
wantInfo: &TaskInfo{
Queue: "default",
@@ -1173,7 +1158,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
}
gotTTL := r.TTL(context.Background(), base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val()
wantTTL := time.Until(tc.at.Add(tc.ttl))
wantTTL := tc.at.Add(tc.ttl).Sub(time.Now())
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
continue

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/hibiken/asynq
go 1.22
go 1.20
require (
github.com/golang/protobuf v1.5.3

10
go.sum
View File

@@ -1,15 +1,11 @@
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
@@ -19,21 +15,16 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk=
github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
@@ -46,4 +37,3 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -120,9 +120,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
for {
select {
case <-h.done:
if err := h.broker.ClearServerState(h.host, h.pid, h.serverID); err != nil {
h.logger.Errorf("Failed to clear server state: %v", err)
}
h.broker.ClearServerState(h.host, h.pid, h.serverID)
h.logger.Debug("Heartbeater done")
timer.Stop()
return

View File

@@ -10,19 +10,16 @@ import (
"strings"
"time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
)
// Inspector is a client interface to inspect and mutate the state of
// queues and tasks.
type Inspector struct {
rdb *rdb.RDB
// When an Inspector has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}
// New returns a new instance of Inspector.
@@ -31,25 +28,13 @@ func NewInspector(r RedisConnOpt) *Inspector {
if !ok {
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
}
inspector := NewInspectorFromRedisClient(c)
inspector.sharedConnection = false
return inspector
}
// NewInspectorFromRedisClient returns a new instance of Inspector given a redis.UniversalClient
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewInspectorFromRedisClient(c redis.UniversalClient) *Inspector {
return &Inspector{
rdb: rdb.NewRDB(c),
sharedConnection: true,
rdb: rdb.NewRDB(c),
}
}
// Close closes the connection with redis.
func (i *Inspector) Close() error {
if i.sharedConnection {
return fmt.Errorf("redis connection is shared so the Inspector can't be closed through asynq")
}
return i.rdb.Close()
}

View File

@@ -22,7 +22,11 @@ import (
"github.com/redis/go-redis/v9"
)
func testInspectorQueues(t *testing.T, inspector *Inspector, r redis.UniversalClient) {
func TestInspectorQueues(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
queues []string
}{
@@ -48,21 +52,7 @@ func testInspectorQueues(t *testing.T, inspector *Inspector, r redis.UniversalCl
t.Errorf("Queues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff)
}
}
}
func TestInspectorQueues(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
testInspectorQueues(t, inspector, r)
}
func TestInspectorFromRedisClientQueues(t *testing.T) {
r := setup(t)
defer r.Close()
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
inspector := NewInspectorFromRedisClient(redisClient)
testInspectorQueues(t, inspector, r)
}
func TestInspectorDeleteQueue(t *testing.T) {

View File

@@ -14,7 +14,7 @@ import (
"sync"
"time"
"github.com/golang/protobuf/ptypes" //nolint: staticcheck
"github.com/golang/protobuf/ptypes"
"github.com/hibiken/asynq/internal/errors"
pb "github.com/hibiken/asynq/internal/proto"
"github.com/hibiken/asynq/internal/timeutil"
@@ -379,7 +379,7 @@ func EncodeServerInfo(info *ServerInfo) ([]byte, error) {
for q, p := range info.Queues {
queues[q] = int32(p)
}
started, err := ptypes.TimestampProto(info.Started) //nolint: staticcheck
started, err := ptypes.TimestampProto(info.Started)
if err != nil {
return nil, err
}
@@ -406,7 +406,7 @@ func DecodeServerInfo(b []byte) (*ServerInfo, error) {
for q, p := range pbmsg.GetQueues() {
queues[q] = int(p)
}
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) //nolint: staticcheck
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
if err != nil {
return nil, err
}
@@ -441,11 +441,11 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
if info == nil {
return nil, fmt.Errorf("cannot encode nil worker info")
}
startTime, err := ptypes.TimestampProto(info.Started) //nolint: staticcheck
startTime, err := ptypes.TimestampProto(info.Started)
if err != nil {
return nil, err
}
deadline, err := ptypes.TimestampProto(info.Deadline) //nolint: staticcheck
deadline, err := ptypes.TimestampProto(info.Deadline)
if err != nil {
return nil, err
}
@@ -468,11 +468,11 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) //nolint: staticcheck
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
if err != nil {
return nil, err
}
deadline, err := ptypes.Timestamp(pbmsg.GetDeadline()) //nolint: staticcheck
deadline, err := ptypes.Timestamp(pbmsg.GetDeadline())
if err != nil {
return nil, err
}
@@ -519,11 +519,11 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
if entry == nil {
return nil, fmt.Errorf("cannot encode nil scheduler entry")
}
next, err := ptypes.TimestampProto(entry.Next) //nolint: staticcheck
next, err := ptypes.TimestampProto(entry.Next)
if err != nil {
return nil, err
}
prev, err := ptypes.TimestampProto(entry.Prev) //nolint: staticcheck
prev, err := ptypes.TimestampProto(entry.Prev)
if err != nil {
return nil, err
}
@@ -544,11 +544,11 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) //nolint: staticcheck
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime())
if err != nil {
return nil, err
}
prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime()) //nolint: staticcheck
prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime())
if err != nil {
return nil, err
}
@@ -578,7 +578,7 @@ func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) {
if event == nil {
return nil, fmt.Errorf("cannot encode nil enqueue event")
}
enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt) //nolint: staticcheck
enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt)
if err != nil {
return nil, err
}
@@ -595,7 +595,7 @@ func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) {
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime()) //nolint: staticcheck
enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime())
if err != nil {
return nil, err
}
@@ -737,7 +737,7 @@ type Broker interface {
ReclaimStaleAggregationSets(qname string) error
// Task retention related method
DeleteExpiredCompletedTasks(qname string, batchSize int) error
DeleteExpiredCompletedTasks(qname string) error
// Lease related methods
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)

View File

@@ -829,7 +829,6 @@ const (
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// KEYS[7] -> asynq:{<qname>}:processed
// KEYS[8] -> asynq:{<qname>}:failed
// KEYS[9] -> asynq:{<qname>}:t:
// -------
// ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value
@@ -846,22 +845,8 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
local old = redis.call("ZRANGE", KEYS[4], "-inf", ARGV[4], "BYSCORE")
if #old > 0 then
for _, id in ipairs(old) do
redis.call("DEL", KEYS[9] .. id)
end
redis.call("ZREM", KEYS[4], unpack(old))
end
local extra = redis.call("ZRANGE", KEYS[4], 0, -ARGV[5])
if #extra > 0 then
for _, id in ipairs(extra) do
redis.call("DEL", KEYS[9] .. id)
end
redis.call("ZREM", KEYS[4], unpack(extra))
end
redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5])
redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived")
local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then
@@ -904,7 +889,6 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string)
base.FailedKey(msg.Queue, now),
base.ProcessedTotalKey(msg.Queue),
base.FailedTotalKey(msg.Queue),
base.TaskKeyPrefix(msg.Queue),
}
argv := []interface{}{
msg.ID,
@@ -1233,7 +1217,7 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
return redis.status_reply("OK")
`)
// ReclaimStaleAggregationSets checks for any stale aggregation sets in the given queue, and
// ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and
// reclaim tasks in the stale aggregation set by putting them back in the group.
func (r *RDB) ReclaimStaleAggregationSets(qname string) error {
var op errors.Op = "RDB.ReclaimStaleAggregationSets"
@@ -1257,7 +1241,9 @@ return table.getn(ids)`)
// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,
// and delete all expired tasks.
func (r *RDB) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
func (r *RDB) DeleteExpiredCompletedTasks(qname string) error {
// Note: Do this operation in fix batches to prevent long running script.
const batchSize = 100
for {
n, err := r.deleteExpiredCompletedTasks(qname, batchSize)
if err != nil {

View File

@@ -2002,6 +2002,7 @@ func TestArchive(t *testing.T) {
}
errMsg := "SMTP server not responding"
// TODO(hibiken): add test cases for trimming
tests := []struct {
active map[string][]*base.TaskMessage
lease map[string][]base.Z
@@ -2170,163 +2171,6 @@ func TestArchive(t *testing.T) {
}
}
func TestArchiveTrim(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
t1 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "send_email",
Payload: nil,
Queue: "default",
Retry: 25,
Retried: 25,
Timeout: 1800,
}
t2 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "reindex",
Payload: nil,
Queue: "default",
Retry: 25,
Retried: 0,
Timeout: 3000,
}
errMsg := "SMTP server not responding"
maxArchiveSet := make([]base.Z, 0)
for i := 0; i < maxArchiveSize-1; i++ {
maxArchiveSet = append(maxArchiveSet, base.Z{Message: &base.TaskMessage{
ID: uuid.NewString(),
Type: "generate_csv",
Payload: nil,
Queue: "default",
Retry: 25,
Retried: 0,
Timeout: 60,
}, Score: now.Add(-time.Hour + -time.Second*time.Duration(i)).Unix()})
}
wantMaxArchiveSet := make([]base.Z, 0)
// newly archived task should be at the front
wantMaxArchiveSet = append(wantMaxArchiveSet, base.Z{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()})
// oldest task should be dropped from the set
wantMaxArchiveSet = append(wantMaxArchiveSet, maxArchiveSet[:len(maxArchiveSet)-1]...)
tests := []struct {
toArchive map[string][]*base.TaskMessage
lease map[string][]base.Z
archived map[string][]base.Z
wantArchived map[string][]base.Z
}{
{ // simple, 1 to be archived, 1 already archived, both are in the archive set
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": {
{Message: t2, Score: now.Add(-time.Hour).Unix()},
},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()},
{Message: t2, Score: now.Add(-time.Hour).Unix()},
},
},
},
{ // 1 to be archived, 1 already archived but past expiry, only the newly archived task should be left
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": {
{Message: t2, Score: now.Add(-time.Hour * 24 * (archivedExpirationInDays + 1)).Unix()},
},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()},
},
},
},
{ // 1 to be archived, maxArchiveSize in archive set, archive set should be trimmed back to maxArchiveSize and newly archived task should be in the set
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": maxArchiveSet,
},
wantArchived: map[string][]base.Z{
"default": wantMaxArchiveSet,
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllActiveQueues(t, r.client, tc.toArchive)
h.SeedAllLease(t, r.client, tc.lease)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
for _, tasks := range tc.toArchive {
for _, target := range tasks {
err := r.Archive(context.Background(), target, errMsg)
if err != nil {
t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err)
continue
}
}
}
for queue, want := range tc.wantArchived {
gotArchived := h.GetArchivedEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)
}
// check that only keys present in the archived set are in rdb
vals := r.client.Keys(context.Background(), base.TaskKeyPrefix(queue)+"*").Val()
if len(vals) != len(gotArchived) {
t.Errorf("len of keys = %v, want %v", len(vals), len(gotArchived))
return
}
for _, val := range vals {
found := false
for _, entry := range gotArchived {
if strings.Contains(val, entry.Message.ID) {
found = true
break
}
}
if !found {
t.Errorf("key %v not found in archived set (it was orphaned by the archive trim)", val)
}
}
}
}
}
func TestForwardIfReadyWithGroup(t *testing.T) {
r := setup(t)
defer r.Close()
@@ -2698,8 +2542,8 @@ func TestDeleteExpiredCompletedTasks(t *testing.T) {
h.FlushDB(t, r.client)
h.SeedAllCompletedQueues(t, r.client, tc.completed)
if err := r.DeleteExpiredCompletedTasks(tc.qname, 100); err != nil {
t.Errorf("DeleteExpiredCompletedTasks(%q, 100) failed: %v", tc.qname, err)
if err := r.DeleteExpiredCompletedTasks(tc.qname); err != nil {
t.Errorf("DeleteExpiredCompletedTasks(%q) failed: %v", tc.qname, err)
continue
}
@@ -3206,7 +3050,7 @@ func TestCancelationPubSub(t *testing.T) {
publish := []string{"one", "two", "three"}
for _, msg := range publish {
_ = r.PublishCancelation(msg)
r.PublishCancelation(msg)
}
// allow for message to reach subscribers.

View File

@@ -11,8 +11,8 @@ import (
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
)
var errRedisDown = errors.New("testutil: redis is down")
@@ -145,13 +145,13 @@ func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
return tb.real.ForwardIfReady(qnames...)
}
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.DeleteExpiredCompletedTasks(qname, batchSize)
return tb.real.DeleteExpiredCompletedTasks(qname)
}
func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {

View File

@@ -27,17 +27,13 @@ type janitor struct {
// average interval between checks.
avgInterval time.Duration
// number of tasks to be deleted when janitor runs to delete the expired completed tasks.
batchSize int
}
type janitorParams struct {
logger *log.Logger
broker base.Broker
queues []string
interval time.Duration
batchSize int
logger *log.Logger
broker base.Broker
queues []string
interval time.Duration
}
func newJanitor(params janitorParams) *janitor {
@@ -47,7 +43,6 @@ func newJanitor(params janitorParams) *janitor {
done: make(chan struct{}),
queues: params.queues,
avgInterval: params.interval,
batchSize: params.batchSize,
}
}
@@ -78,7 +73,7 @@ func (j *janitor) start(wg *sync.WaitGroup) {
func (j *janitor) exec() {
for _, qname := range j.queues {
if err := j.broker.DeleteExpiredCompletedTasks(qname, j.batchSize); err != nil {
if err := j.broker.DeleteExpiredCompletedTasks(qname); err != nil {
j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v",
qname, err)
}

View File

@@ -26,13 +26,11 @@ func TestJanitor(t *testing.T) {
defer r.Close()
rdbClient := rdb.NewRDB(r)
const interval = 1 * time.Second
const batchSize = 100
janitor := newJanitor(janitorParams{
logger: testLogger,
broker: rdbClient,
queues: []string{"default", "custom"},
interval: interval,
batchSize: batchSize,
logger: testLogger,
broker: rdbClient,
queues: []string{"default", "custom"},
interval: interval,
})
now := time.Now()

View File

@@ -7,6 +7,7 @@ package asynq
import (
"crypto/sha256"
"fmt"
"io"
"sort"
"sync"
"time"
@@ -78,13 +79,13 @@ type PeriodicTaskConfig struct {
func (c *PeriodicTaskConfig) hash() string {
h := sha256.New()
_, _ = h.Write([]byte(c.Cronspec))
_, _ = h.Write([]byte(c.Task.Type()))
io.WriteString(h, c.Cronspec)
io.WriteString(h, c.Task.Type())
h.Write(c.Task.Payload())
opts := stringifyOptions(c.Opts)
sort.Strings(opts)
for _, opt := range opts {
_, _ = h.Write([]byte(opt))
io.WriteString(h, opt)
}
return fmt.Sprintf("%x", h.Sum(nil))
}

View File

@@ -32,7 +32,6 @@ func (p *FakeConfigProvider) GetConfigs() ([]*PeriodicTaskConfig, error) {
}
func TestNewPeriodicTaskManager(t *testing.T) {
redisConnOpt := getRedisConnOpt(t)
cfgs := []*PeriodicTaskConfig{
{Cronspec: "* * * * *", Task: NewTask("foo", nil)},
{Cronspec: "* * * * *", Task: NewTask("bar", nil)},
@@ -44,14 +43,14 @@ func TestNewPeriodicTaskManager(t *testing.T) {
{
desc: "with provider and redisConnOpt",
opts: PeriodicTaskManagerOpts{
RedisConnOpt: redisConnOpt,
RedisConnOpt: RedisClientOpt{Addr: ":6379"},
PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs},
},
},
{
desc: "with sync option",
opts: PeriodicTaskManagerOpts{
RedisConnOpt: redisConnOpt,
RedisConnOpt: RedisClientOpt{Addr: ":6379"},
PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs},
SyncInterval: 5 * time.Minute,
},
@@ -59,7 +58,7 @@ func TestNewPeriodicTaskManager(t *testing.T) {
{
desc: "with scheduler option",
opts: PeriodicTaskManagerOpts{
RedisConnOpt: redisConnOpt,
RedisConnOpt: RedisClientOpt{Addr: ":6379"},
PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs},
SyncInterval: 5 * time.Minute,
SchedulerOpts: &SchedulerOpts{
@@ -75,33 +74,37 @@ func TestNewPeriodicTaskManager(t *testing.T) {
t.Errorf("%s; NewPeriodicTaskManager returned error: %v", tc.desc, err)
}
}
}
t.Run("error", func(t *testing.T) {
tests := []struct {
desc string
opts PeriodicTaskManagerOpts
}{
{
desc: "without provider",
opts: PeriodicTaskManagerOpts{
RedisConnOpt: redisConnOpt,
},
func TestNewPeriodicTaskManagerError(t *testing.T) {
cfgs := []*PeriodicTaskConfig{
{Cronspec: "* * * * *", Task: NewTask("foo", nil)},
{Cronspec: "* * * * *", Task: NewTask("bar", nil)},
}
tests := []struct {
desc string
opts PeriodicTaskManagerOpts
}{
{
desc: "without provider",
opts: PeriodicTaskManagerOpts{
RedisConnOpt: RedisClientOpt{Addr: ":6379"},
},
{
desc: "without redisConOpt",
opts: PeriodicTaskManagerOpts{
PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs},
},
},
{
desc: "without redisConOpt",
opts: PeriodicTaskManagerOpts{
PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs},
},
}
},
}
for _, tc := range tests {
_, err := NewPeriodicTaskManager(tc.opts)
if err == nil {
t.Errorf("%s; NewPeriodicTaskManager did not return error", tc.desc)
}
for _, tc := range tests {
_, err := NewPeriodicTaskManager(tc.opts)
if err == nil {
t.Errorf("%s; NewPeriodicTaskManager did not return error", tc.desc)
}
})
}
}
func TestPeriodicTaskConfigHash(t *testing.T) {

View File

@@ -8,7 +8,7 @@ import (
"context"
"fmt"
"math"
"math/rand/v2"
"math/rand"
"runtime"
"runtime/debug"
"sort"
@@ -181,8 +181,7 @@ func (p *processor) exec() {
// Sleep to avoid slamming redis and let scheduler move tasks into queues.
// Note: We are not using blocking pop operation and polling queues instead.
// This adds significant load to redis.
jitter := rand.N(p.taskCheckInterval)
time.Sleep(p.taskCheckInterval/2 + jitter)
time.Sleep(p.taskCheckInterval)
<-p.sema // release token
return
case err != nil:
@@ -262,8 +261,7 @@ func (p *processor) requeue(l *base.Lease, msg *base.TaskMessage) {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
err := p.broker.Requeue(ctx, msg)
if err != nil {
p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err)
@@ -285,8 +283,7 @@ func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
err := p.broker.MarkAsComplete(ctx, msg)
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s type=%q from %q to %q: %+v",
@@ -307,8 +304,7 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
err := p.broker.Done(ctx, msg)
if err != nil {
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), err)
@@ -327,23 +323,20 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
// the task should not be retried and should be archived instead.
var SkipRetry = errors.New("skip retry for the task")
// RevokeTask is used as a return value from Handler.ProcessTask to indicate that
// the task should not be retried or archived.
var RevokeTask = errors.New("revoke task")
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
}
switch {
case errors.Is(err, RevokeTask):
p.logger.Warnf("revoke task id=%s", msg.ID)
p.markAsDone(l, msg)
case msg.Retried >= msg.Retry || errors.Is(err, SkipRetry):
if !p.isFailureFunc(err) {
// retry the task without marking it as failed
p.retry(l, msg, err, false /*isFailure*/)
return
}
if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.archive(l, msg, err)
default:
p.retry(l, msg, err, p.isFailureFunc(err))
} else {
p.retry(l, msg, err, true /*isFailure*/)
}
}
@@ -352,8 +345,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d)
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
@@ -375,8 +367,7 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
defer cancel()
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
err := p.broker.Archive(ctx, msg, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.ArchivedKey(msg.Queue))
@@ -413,7 +404,8 @@ func (p *processor) queues() []string {
names = append(names, qname)
}
}
rand.Shuffle(len(names), func(i, j int) { names[i], names[j] = names[j], names[i] })
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(names), func(i, j int) { names[i], names[j] = names[j], names[i] })
return uniq(names, len(p.queueConfig))
}

View File

@@ -295,7 +295,6 @@ func TestProcessorRetry(t *testing.T) {
errMsg := "something went wrong"
wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry)
wrappedRevokeTask := fmt.Errorf("%s:%w", errMsg, RevokeTask)
tests := []struct {
desc string // test description
@@ -313,7 +312,7 @@ func TestProcessorRetry(t *testing.T) {
pending: []*base.TaskMessage{m1, m2, m3, m4},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return errors.New(errMsg)
return fmt.Errorf(errMsg)
}),
wait: 2 * time.Second,
wantErrMsg: errMsg,
@@ -347,32 +346,6 @@ func TestProcessorRetry(t *testing.T) {
wantArchived: []*base.TaskMessage{m1, m2},
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
},
{
desc: "Should revoke task",
pending: []*base.TaskMessage{m1, m2},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return RevokeTask // return RevokeTask without wrapping
}),
wait: 2 * time.Second,
wantErrMsg: RevokeTask.Error(),
wantRetry: []*base.TaskMessage{},
wantArchived: []*base.TaskMessage{},
wantErrCount: 2, // ErrorHandler should still be called with RevokeTask error
},
{
desc: "Should revoke task (with error wrapping)",
pending: []*base.TaskMessage{m1, m2},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return wrappedRevokeTask
}),
wait: 2 * time.Second,
wantErrMsg: wrappedRevokeTask.Error(),
wantRetry: []*base.TaskMessage{},
wantArchived: []*base.TaskMessage{},
wantErrCount: 2, // ErrorHandler should still be called with RevokeTask error
},
}
for _, tc := range tests {

View File

@@ -10,11 +10,11 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
@@ -43,27 +43,15 @@ type Scheduler struct {
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false
return scheduler
}
// NewSchedulerFromRedisClient returns a new instance of Scheduler given a redis.UniversalClient
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
@@ -84,7 +72,7 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
id: generateSchedulerID(),
state: &serverState{value: srvStateNew},
logger: logger,
client: NewClientFromRedisClient(c),
client: NewClient(r),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
@@ -273,12 +261,8 @@ func (s *Scheduler) Shutdown() {
s.wg.Wait()
s.clearHistory()
if err := s.client.Close(); err != nil {
s.logger.Errorf("Failed to close redis client connection: %v", err)
}
if !s.sharedConnection {
s.rdb.Close()
}
s.client.Close()
s.rdb.Close()
s.logger.Info("Scheduler stopped")
}
@@ -289,9 +273,7 @@ func (s *Scheduler) runHeartbeater() {
select {
case <-s.done:
s.logger.Debugf("Scheduler heatbeater shutting down")
if err := s.rdb.ClearSchedulerEntries(s.id); err != nil {
s.logger.Errorf("Failed to clear the scheduler entries: %v", err)
}
s.rdb.ClearSchedulerEntries(s.id)
ticker.Stop()
return
case <-ticker.C:
@@ -338,14 +320,3 @@ func (s *Scheduler) clearHistory() {
}
}
}
// Ping performs a ping against the redis connection.
func (s *Scheduler) Ping() error {
s.state.mu.Lock()
defer s.state.mu.Unlock()
if s.state.value == srvStateClosed {
return nil
}
return s.rdb.Ping()
}

View File

@@ -10,7 +10,6 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/testutil"
@@ -59,7 +58,6 @@ func TestSchedulerRegister(t *testing.T) {
r := setup(t)
// Tests for new redis connection.
for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
@@ -77,28 +75,6 @@ func TestSchedulerRegister(t *testing.T) {
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
}
}
r = setup(t)
// Tests for existing redis connection.
for _, tc := range tests {
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
scheduler := NewSchedulerFromRedisClient(redisClient, nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
t.Fatal(err)
}
if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
scheduler.Shutdown()
got := testutil.GetPendingMessages(t, r, tc.queue)
if diff := cmp.Diff(tc.want, got, testutil.IgnoreIDOpt); diff != "" {
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
}
}
}
func TestSchedulerWhenRedisDown(t *testing.T) {
@@ -114,7 +90,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
// Connect to non-existent redis instance to simulate a redis server being down.
scheduler := NewScheduler(
RedisClientOpt{Addr: ":9876"}, // no Redis listening to this port.
RedisClientOpt{Addr: ":9876"},
&SchedulerOpts{EnqueueErrorHandler: errorHandler},
)

107
server.go
View File

@@ -9,7 +9,7 @@ import (
"errors"
"fmt"
"math"
"math/rand/v2"
"math/rand"
"runtime"
"strings"
"sync"
@@ -37,9 +37,6 @@ type Server struct {
logger *log.Logger
broker base.Broker
// When a Server has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
state *serverState
@@ -106,7 +103,7 @@ type Config struct {
// If BaseContext is nil, the default is context.Background().
// If this is defined, then it MUST return a non-nil context
BaseContext func() context.Context
// TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty.
//
// If unset, zero or a negative value, the interval is set to 1 second.
@@ -242,17 +239,6 @@ type Config struct {
//
// If unset or nil, the group aggregation feature will be disabled on the server.
GroupAggregator GroupAggregator
// JanitorInterval specifies the average interval of janitor checks for expired completed tasks.
//
// If unset or zero, default interval of 8 seconds is used.
JanitorInterval time.Duration
// JanitorBatchSize specifies the number of expired completed tasks to be deleted in one run.
//
// If unset or zero, default batch size of 100 is used.
// Make sure to not put a big number as the batch size to prevent a long-running script.
JanitorBatchSize int
}
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
@@ -400,8 +386,9 @@ func toInternalLogLevel(l LogLevel) log.Level {
// DefaultRetryDelayFunc is the default RetryDelayFunc used if one is not specified in Config.
// It uses exponential back-off strategy to calculate the retry delay.
func DefaultRetryDelayFunc(n int, e error, t *Task) time.Duration {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Formula taken from https://github.com/mperham/sidekiq.
s := int(math.Pow(float64(n), 4)) + 15 + (rand.IntN(30) * (n + 1))
s := int(math.Pow(float64(n), 4)) + 15 + (r.Intn(30) * (n + 1))
return time.Duration(s) * time.Second
}
@@ -421,28 +408,15 @@ const (
defaultDelayedTaskCheckInterval = 5 * time.Second
defaultGroupGracePeriod = 1 * time.Minute
defaultJanitorInterval = 8 * time.Second
defaultJanitorBatchSize = 100
)
// NewServer returns a new Server given a redis connection option
// and server configuration.
func NewServer(r RedisConnOpt, cfg Config) *Server {
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
server := NewServerFromRedisClient(redisClient, cfg)
server.sharedConnection = false
return server
}
// NewServerFromRedisClient returns a new instance of Server given a redis.UniversalClient
// and server configuration
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
baseCtxFn := cfg.BaseContext
if baseCtxFn == nil {
baseCtxFn = context.Background
@@ -573,26 +547,11 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc,
})
janitorInterval := cfg.JanitorInterval
if janitorInterval == 0 {
janitorInterval = defaultJanitorInterval
}
janitorBatchSize := cfg.JanitorBatchSize
if janitorBatchSize == 0 {
janitorBatchSize = defaultJanitorBatchSize
}
if janitorBatchSize > defaultJanitorBatchSize {
logger.Warnf("Janitor batch size of %d is greater than the recommended batch size of %d. "+
"This might cause a long-running script", janitorBatchSize, defaultJanitorBatchSize)
}
janitor := newJanitor(janitorParams{
logger: logger,
broker: rdb,
queues: qnames,
interval: janitorInterval,
batchSize: janitorBatchSize,
logger: logger,
broker: rdb,
queues: qnames,
interval: 8 * time.Second,
})
aggregator := newAggregator(aggregatorParams{
logger: logger,
@@ -604,19 +563,18 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
groupAggregator: cfg.GroupAggregator,
})
return &Server{
logger: logger,
broker: rdb,
sharedConnection: true,
state: srvState,
forwarder: forwarder,
processor: processor,
syncer: syncer,
heartbeater: heartbeater,
subscriber: subscriber,
recoverer: recoverer,
healthchecker: healthchecker,
janitor: janitor,
aggregator: aggregator,
logger: logger,
broker: rdb,
state: srvState,
forwarder: forwarder,
processor: processor,
syncer: syncer,
heartbeater: heartbeater,
subscriber: subscriber,
recoverer: recoverer,
healthchecker: healthchecker,
janitor: janitor,
aggregator: aggregator,
}
}
@@ -632,10 +590,6 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
// One exception to this rule is when ProcessTask returns a SkipRetry error.
// If the returned error is SkipRetry or an error wraps SkipRetry, retry is
// skipped and the task will be immediately archived instead.
//
// Another exception to this rule is when ProcessTask returns a RevokeTask error.
// If the returned error is RevokeTask or an error wraps RevokeTask, the task
// will not be retried or archived.
type Handler interface {
ProcessTask(context.Context, *Task) error
}
@@ -748,9 +702,7 @@ func (srv *Server) Shutdown() {
srv.heartbeater.shutdown()
srv.wg.Wait()
if !srv.sharedConnection {
srv.broker.Close()
}
srv.broker.Close()
srv.logger.Info("Exiting")
}
@@ -762,7 +714,7 @@ func (srv *Server) Shutdown() {
func (srv *Server) Stop() {
srv.state.mu.Lock()
if srv.state.value != srvStateActive {
// Invalid call to Stop, server can only go from Active state to Stopped state.
// Invalid calll to Stop, server can only go from Active state to Stopped state.
srv.state.mu.Unlock()
return
}
@@ -773,16 +725,3 @@ func (srv *Server) Stop() {
srv.processor.stop()
srv.logger.Info("Processor stopped")
}
// Ping performs a ping against the redis connection.
//
// This is an alternative to the HealthCheckFunc available in the Config object.
func (srv *Server) Ping() error {
srv.state.mu.Lock()
defer srv.state.mu.Unlock()
if srv.state.value == srvStateClosed {
return nil
}
return srv.broker.Ping()
}

View File

@@ -14,12 +14,22 @@ import (
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
"github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
"go.uber.org/goleak"
)
func testServer(t *testing.T, c *Client, srv *Server) {
func TestServer(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
redisConnOpt := getRedisConnOpt(t)
c := NewClient(redisConnOpt)
defer c.Close()
srv := NewServer(redisConnOpt, Config{
Concurrency: 10,
LogLevel: testLogLevel,
})
// no-op handler
h := func(ctx context.Context, task *Task) error {
return nil
@@ -43,55 +53,18 @@ func testServer(t *testing.T, c *Client, srv *Server) {
srv.Shutdown()
}
func TestServer(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
redisConnOpt := getRedisConnOpt(t)
c := NewClient(redisConnOpt)
defer c.Close()
srv := NewServer(redisConnOpt, Config{
Concurrency: 10,
LogLevel: testLogLevel,
})
testServer(t, c, srv)
}
func TestServerFromRedisClient(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
redisConnOpt := getRedisConnOpt(t)
redisClient := redisConnOpt.MakeRedisClient().(redis.UniversalClient)
c := NewClientFromRedisClient(redisClient)
srv := NewServerFromRedisClient(redisClient, Config{
Concurrency: 10,
LogLevel: testLogLevel,
})
testServer(t, c, srv)
err := c.Close()
if err == nil {
t.Error("client.Close() should have failed because of a shared client but it didn't")
}
}
func TestServerRun(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
srv := NewServer(getRedisConnOpt(t), Config{LogLevel: testLogLevel})
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
done := make(chan struct{})
// Make sure server exits when receiving TERM signal.
go func() {
time.Sleep(2 * time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
done <- struct{}{}
}()
@@ -110,7 +83,7 @@ func TestServerRun(t *testing.T) {
}
func TestServerErrServerClosed(t *testing.T) {
srv := NewServer(getRedisConnOpt(t), Config{LogLevel: testLogLevel})
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
handler := NewServeMux()
if err := srv.Start(handler); err != nil {
t.Fatal(err)
@@ -123,7 +96,7 @@ func TestServerErrServerClosed(t *testing.T) {
}
func TestServerErrNilHandler(t *testing.T) {
srv := NewServer(getRedisConnOpt(t), Config{LogLevel: testLogLevel})
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
err := srv.Start(nil)
if err == nil {
t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error")
@@ -132,7 +105,7 @@ func TestServerErrNilHandler(t *testing.T) {
}
func TestServerErrServerRunning(t *testing.T) {
srv := NewServer(getRedisConnOpt(t), Config{LogLevel: testLogLevel})
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
handler := NewServeMux()
if err := srv.Start(handler); err != nil {
t.Fatal(err)
@@ -153,7 +126,7 @@ func TestServerWithRedisDown(t *testing.T) {
}()
r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r)
srv := NewServer(getRedisConnOpt(t), Config{LogLevel: testLogLevel})
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
srv.broker = testBroker
srv.forwarder.broker = testBroker
srv.heartbeater.broker = testBroker

View File

@@ -1,4 +1,4 @@
//go:build linux || dragonfly || freebsd || netbsd || openbsd || darwin
//go:build linux || bsd || darwin
package asynq

View File

@@ -25,7 +25,7 @@ To view details on any command, use `asynq help <command> <subcommand>`.
- `asynq dash`
- `asynq stats`
- `asynq queue [ls inspect history rm pause unpause]`
- `asynq task [ls cancel delete archive run deleteall archiveall runall]`
- `asynq task [ls cancel delete archive run delete-all archive-all run-all]`
- `asynq server [ls]`
### Global flags

View File

@@ -11,7 +11,6 @@ import (
"os"
"strings"
"text/tabwriter"
"time"
"unicode"
"unicode/utf8"
@@ -370,12 +369,7 @@ func createRDB() *rdb.RDB {
return rdb.NewRDB(c)
}
// createClient creates a Client instance using flag values and returns it.
func createClient() *asynq.Client {
return asynq.NewClient(getRedisConnOpt())
}
// createInspector creates a Inspector instance using flag values and returns it.
// createRDB creates a Inspector instance using flag values and returns it.
func createInspector() *asynq.Inspector {
return asynq.NewInspector(getRedisConnOpt())
}
@@ -462,37 +456,3 @@ func isPrintable(data []byte) bool {
}
return !isAllSpace
}
// Helper to turn a command line flag into a duration
func getDuration(cmd *cobra.Command, arg string) time.Duration {
durationStr, err := cmd.Flags().GetString(arg)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
duration, err := time.ParseDuration(durationStr)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
return duration
}
// Helper to turn a command line flag into a time
func getTime(cmd *cobra.Command, arg string) time.Time {
timeStr, err := cmd.Flags().GetString(arg)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
timeVal, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
return timeVal
}

View File

@@ -53,24 +53,6 @@ func init() {
taskRunCmd.MarkFlagRequired("queue")
taskRunCmd.MarkFlagRequired("id")
taskCmd.AddCommand(taskEnqueueCmd)
taskEnqueueCmd.Flags().StringP("type_name", "t", "", "type name to enqueue the task as (required)")
taskEnqueueCmd.Flags().StringP("payload", "l", "", "payload to enqueue (required)")
// The following are the various OptionTypes; if not specified we won't pass them so that composeOptions()
// can apply its own defaults
taskEnqueueCmd.Flags().Int("retry", 0, "maximum retries")
taskEnqueueCmd.Flags().String("queue", "", "queue to enqueue the task to")
taskEnqueueCmd.Flags().String("id", "", "id to enqueue the task as")
taskEnqueueCmd.Flags().String("timeout", "", "timeout for the task (how long it can run); must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("deadline", "", "deadline for the task; must be in RFC3339 format")
taskEnqueueCmd.Flags().String("unique", "", "unique period for the task (duration within which it is guaranteed to be unique); must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("process_at", "", "process at time for the task; must be in RFC3339 format")
taskEnqueueCmd.Flags().String("process_in", "", "process in window for the task; must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("retention", "", "retention window for the task; must be parseable as a time.Duration")
taskEnqueueCmd.Flags().String("group", "", "group for the task")
taskEnqueueCmd.MarkFlagRequired("type_name")
taskEnqueueCmd.MarkFlagRequired("payload")
taskCmd.AddCommand(taskArchiveAllCmd)
taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong (required)")
taskArchiveAllCmd.Flags().StringP("state", "s", "", "state of the tasks; one of { pending | aggregating | scheduled | retry } (required)")
@@ -169,16 +151,6 @@ var taskRunCmd = &cobra.Command{
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
var taskEnqueueCmd = &cobra.Command{
Use: "enqueue --type_name=footype --payload=barpayload",
Short: "Enqueue a task",
Args: cobra.NoArgs,
Run: taskEnqueue,
Example: heredoc.Doc(`
$ asynq task enqueue -t footype -l barpayload
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
}
var taskArchiveAllCmd = &cobra.Command{
Use: "archiveall --queue=<queue> --state=<state>",
Short: "Archive all tasks in the given state",
@@ -549,95 +521,6 @@ func taskRun(cmd *cobra.Command, args []string) {
fmt.Println("task is now pending")
}
func taskEnqueue(cmd *cobra.Command, args []string) {
typeName, err := cmd.Flags().GetString("type_name")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
payload, err := cmd.Flags().GetString("payload")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
// For all of the optional flags, we need to explicitly check whether they were set or
// not; for consistency we want to use the defaults set in composeOptions() rather than
// the ones in the flag definitions.
opts := []asynq.Option{}
if cmd.Flags().Changed("retry") {
retry, err := cmd.Flags().GetInt("retry")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.MaxRetry(retry))
}
if cmd.Flags().Changed("queue") {
queue, err := cmd.Flags().GetString("queue")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.Queue(queue))
}
if cmd.Flags().Changed("id") {
id, err := cmd.Flags().GetString("id")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.TaskID(id))
}
if cmd.Flags().Changed("timeout") {
opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout")))
}
if cmd.Flags().Changed("deadline") {
opts = append(opts, asynq.Deadline(getTime(cmd, "deadline")))
}
if cmd.Flags().Changed("unique") {
opts = append(opts, asynq.Unique(getDuration(cmd, "unique")))
}
if cmd.Flags().Changed("process_at") {
opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at")))
}
if cmd.Flags().Changed("process_in") {
opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in")))
}
if cmd.Flags().Changed("retention") {
opts = append(opts, asynq.Retention(getDuration(cmd, "retention")))
}
if cmd.Flags().Changed("group") {
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.Group(group))
}
c := createClient()
task := asynq.NewTask(typeName, []byte(payload), opts...)
taskInfo, err := c.Enqueue(task)
if err != nil {
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
}
func taskArchiveAll(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
@@ -770,4 +653,3 @@ func taskRunAll(cmd *cobra.Command, args []string) {
}
fmt.Printf("%d tasks are now pending\n", n)
}

View File

@@ -1,6 +1,6 @@
module github.com/hibiken/asynq/tools
go 1.22
go 1.20
require (
github.com/MakeNowJust/heredoc/v2 v2.0.1

View File

@@ -61,7 +61,6 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@@ -177,11 +176,9 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
@@ -269,7 +266,6 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=

View File

@@ -1,6 +1,6 @@
module github.com/hibiken/asynq/x
go 1.22
go 1.20
require (
github.com/google/uuid v1.4.0

View File

@@ -10,10 +10,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.26.0/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@@ -22,7 +20,6 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
@@ -52,7 +49,6 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
@@ -70,11 +66,9 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@@ -111,7 +105,6 @@ github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=