2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-05-22 20:49:08 +08:00

Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
d10b8af090 build(deps): bump github.com/prometheus/client_golang in /tools
Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.11.1 to 1.23.2.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md)
- [Commits](https://github.com/prometheus/client_golang/compare/v1.11.1...v1.23.2)

---
updated-dependencies:
- dependency-name: github.com/prometheus/client_golang
  dependency-version: 1.23.2
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-04-13 23:30:44 +00:00
20 changed files with 277 additions and 1081 deletions

View File

@@ -40,7 +40,6 @@ type Task struct {
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Headers() map[string]string { return t.headers }
func (t *Task) Options() []Option { return t.opts }
// ResultWriter returns a pointer to the ResultWriter associated with the task.
//
@@ -472,7 +471,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][/dbnumber][?master=masterName]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
@@ -546,20 +545,11 @@ func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
addrs := strings.Split(u.Host, ",")
master := u.Query().Get("master")
var db int
var err error
if len(u.Path) > 0 {
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
db, err = strconv.Atoi(xs[0])
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis sentinel uri: database number should be the first segment of the path")
}
}
var password string
if v, ok := u.User.Password(); ok {
password = v
}
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password, DB: db}, nil
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password}, nil
}
// ResultWriter is a client interface to write result data for a task.

View File

@@ -10,7 +10,6 @@ import (
"sort"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
@@ -149,23 +148,6 @@ func TestParseRedisURI(t *testing.T) {
SentinelPassword: "mypassword",
},
},
{
"redis-sentinel://localhost:5000,localhost:5001,localhost:5002/3?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
DB: 3,
},
},
{
"redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002/7?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
SentinelPassword: "mypassword",
DB: 7,
},
},
}
for _, tc := range tests {
@@ -206,10 +188,6 @@ func TestParseRedisURIErrors(t *testing.T) {
"non integer for db numbers for socket",
"redis-socket:///some/path/to/redis?db=one",
},
{
"non integer for db number for sentinel",
"redis-sentinel://localhost:5000/abc?master=mymaster",
},
}
for _, tc := range tests {
@@ -220,30 +198,3 @@ func TestParseRedisURIErrors(t *testing.T) {
}
}
}
func TestTaskOptions(t *testing.T) {
opts := []Option{
MaxRetry(3),
Queue("critical"),
Timeout(5 * time.Minute),
}
task := NewTask("mytask", []byte("payload"), opts...)
got := task.Options()
if len(got) != len(opts) {
t.Fatalf("task.Options() returned %d options, want %d", len(got), len(opts))
}
for i, o := range opts {
if got[i].String() != o.String() {
t.Errorf("task.Options()[%d] = %v, want %v", i, got[i], o)
}
}
}
func TestTaskOptionsNil(t *testing.T) {
task := NewTask("mytask", []byte("payload"))
got := task.Options()
if got != nil {
t.Errorf("task.Options() = %v, want nil for task with no options", got)
}
}

174
client.go
View File

@@ -6,9 +6,7 @@ package asynq
import (
"context"
"encoding/json"
"fmt"
"maps"
"strings"
"time"
@@ -62,7 +60,6 @@ const (
TaskIDOpt
RetentionOpt
GroupOpt
HeaderOpt
)
// Option specifies the task processing behavior.
@@ -89,7 +86,6 @@ type (
processInOption time.Duration
retentionOption time.Duration
groupOption string
headerOption [2]string
)
// MaxRetry returns an option to specify the max number of times
@@ -221,27 +217,6 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st
func (name groupOption) Type() OptionType { return GroupOpt }
func (name groupOption) Value() interface{} { return string(name) }
// Header returns an option to associate the key-value header to the task.
//
// This option is composable with other Client options and can be used together
// with other options like MaxRetry, Queue, etc. For use cases where headers
// need to be combined with other options, using Header option is recommended.
//
// Alternatively, NewTaskWithHeaders can be used to create a task with headers
// directly, which may be preferable when headers are an intrinsic part of the
// task definition rather than enqueue-time configuration.
func Header(key, value string) Option {
return headerOption{key, value}
}
func (h headerOption) String() string {
var bytes []byte
bytes, _ = json.Marshal(h)
return fmt.Sprintf("Header(%s)", bytes)
}
func (h headerOption) Type() OptionType { return HeaderOpt }
func (h headerOption) Value() interface{} { return [2]string{h[0], h[1]} }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
@@ -262,7 +237,6 @@ type option struct {
processAt time.Time
retention time.Duration
group string
headers map[string]string
}
// composeOptions merges user provided options into the default options
@@ -277,7 +251,6 @@ func composeOptions(opts ...Option) (option, error) {
timeout: 0, // do not set to defaultTimeout here
deadline: time.Time{},
processAt: time.Now(),
headers: make(map[string]string),
}
for _, opt := range opts {
switch opt := opt.(type) {
@@ -317,9 +290,6 @@ func composeOptions(opts ...Option) (option, error) {
return option{}, errors.New("group key cannot be empty")
}
res.group = key
case headerOption:
key, value := opt[0], opt[1]
res.headers[key] = value
default:
// ignore unexpected option
}
@@ -415,7 +385,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: maps.Clone(task.Headers()),
Headers: task.Headers(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
@@ -424,12 +394,6 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
GroupKey: opt.group,
Retention: int64(opt.retention.Seconds()),
}
if len(opt.headers) > 0 {
if msg.Headers == nil {
msg.Headers = make(map[string]string)
}
maps.Copy(msg.Headers, opt.headers)
}
now := time.Now()
var state base.TaskState
if opt.processAt.After(now) {
@@ -456,142 +420,6 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
return newTaskInfo(msg, state, opt.processAt, nil), nil
}
// BatchEnqueueResult holds the result of enqueuing a single task within a batch.
type BatchEnqueueResult struct {
TaskInfo *TaskInfo
Err error
}
// BatchEnqueueContext enqueues multiple tasks in a single Redis pipeline round-trip,
// returning a per-task result slice aligned with the input tasks slice.
//
// # Atomicity Guarantees
//
// There is no all-or-nothing guarantee across the batch. Each task is executed as
// an independent Lua script inside a Redis pipeline. Individual scripts are atomic
// (the existence check, hash write, and list/sorted-set push for one task cannot
// be partially applied), but the pipeline as a whole is not wrapped in a
// MULTI/EXEC transaction. This means:
//
// - Partial success is possible: some tasks may be enqueued while others are not.
// - A task whose ID already exists in Redis is silently skipped (treated as a
// no-op by the Lua script), and its result will still show success.
// - If the Redis pipeline call itself fails (e.g. connection lost, context
// cancelled), every task that passed client-side validation receives that
// error — none of them can be assumed to have been enqueued.
//
// # Validation Errors (pre-pipeline)
//
// The following are caught before any Redis call and rejected in the
// corresponding BatchEnqueueResult.Err without affecting other tasks:
//
// - nil task
// - empty task type name
// - invalid options
// - group tasks (not supported in batch mode)
// - unique tasks (not supported in batch mode)
//
// # Supported Task Types
//
// Immediate and scheduled (via [ProcessAt] or [ProcessIn]) tasks are supported.
// Group and unique tasks are rejected as described above.
func (c *Client) BatchEnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) []BatchEnqueueResult {
results := make([]BatchEnqueueResult, len(tasks))
if len(tasks) == 0 {
return results
}
type itemMeta struct {
state base.TaskState
processAt time.Time
}
items := make([]base.BatchEnqueueItem, 0, len(tasks))
itemIndexes := make([]int, 0, len(tasks))
itemMetas := make([]itemMeta, 0, len(tasks))
for i, task := range tasks {
if task == nil {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task cannot be nil")}
continue
}
if strings.TrimSpace(task.Type()) == "" {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("task typename cannot be empty")}
continue
}
merged := append(task.opts, opts...)
opt, err := composeOptions(merged...)
if err != nil {
results[i] = BatchEnqueueResult{Err: err}
continue
}
if opt.group != "" {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support group tasks")}
continue
}
if opt.uniqueTTL > 0 {
results[i] = BatchEnqueueResult{Err: fmt.Errorf("batch enqueue does not support unique tasks")}
continue
}
deadline := noDeadline
if !opt.deadline.IsZero() {
deadline = opt.deadline
}
timeout := noTimeout
if opt.timeout != 0 {
timeout = opt.timeout
}
if deadline.Equal(noDeadline) && timeout == noTimeout {
timeout = defaultTimeout
}
msg := &base.TaskMessage{
ID: opt.taskID,
Type: task.Type(),
Payload: task.Payload(),
Headers: task.Headers(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()),
Retention: int64(opt.retention.Seconds()),
}
now := time.Now()
scheduled := opt.processAt.After(now)
item := base.BatchEnqueueItem{Msg: msg}
var meta itemMeta
if scheduled {
item.ProcessAt = opt.processAt
meta = itemMeta{state: base.TaskStateScheduled, processAt: opt.processAt}
} else {
meta = itemMeta{state: base.TaskStatePending, processAt: now}
}
items = append(items, item)
itemIndexes = append(itemIndexes, i)
itemMetas = append(itemMetas, meta)
}
if len(items) == 0 {
return results
}
_, err := c.broker.BatchEnqueue(ctx, items)
if err != nil {
for _, idx := range itemIndexes {
results[idx] = BatchEnqueueResult{Err: err}
}
return results
}
for j, idx := range itemIndexes {
info := newTaskInfo(items[j].Msg, itemMetas[j].state, itemMetas[j].processAt, nil)
results[idx] = BatchEnqueueResult{TaskInfo: info}
}
return results
}
// Ping performs a ping against the redis connection.
func (c *Client) Ping() error {
return c.broker.Ping()

View File

@@ -13,8 +13,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
)
@@ -1341,70 +1339,6 @@ func TestClientEnqueueWithHeaders(t *testing.T) {
},
},
},
{
desc: "Task with header option",
task: NewTask("store_data", []byte("data"), Header("channel", "email"), Header("user-id", "bob1234")),
opts: []Option{},
wantInfo: &TaskInfo{
Queue: "default",
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
State: TaskStatePending,
MaxRetry: 25,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
Retry: 25,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
{
desc: "Enqueue task with header option",
task: NewTask("store_data", []byte("data")),
opts: []Option{Header("channel", "email"), Header("user-id", "bob1234"), MaxRetry(5)},
wantInfo: &TaskInfo{
Queue: "default",
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
State: TaskStatePending,
MaxRetry: 5,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: "store_data",
Payload: []byte("data"),
Headers: map[string]string{"channel": "email", "user-id": "bob1234"},
Retry: 5,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
},
},
},
}
for _, tc := range tests {
@@ -1727,209 +1661,3 @@ func TestClientEnqueueWithHeadersAndGroup(t *testing.T) {
}
}
}
func TestBatchEnqueueContext_ImmediateTasks(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
tasks := []*Task{
NewTask("task1", []byte("payload1")),
NewTask("task2", []byte("payload2")),
NewTask("task3", []byte("payload3")),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 3 {
t.Fatalf("BatchEnqueueContext returned %d results, want 3", len(results))
}
for i, res := range results {
if res.Err != nil {
t.Errorf("results[%d].Err = %v, want nil", i, res.Err)
}
if res.TaskInfo == nil {
t.Errorf("results[%d].TaskInfo is nil, want non-nil", i)
continue
}
if res.TaskInfo.Queue != "default" {
t.Errorf("results[%d].TaskInfo.Queue = %q, want %q", i, res.TaskInfo.Queue, "default")
}
if res.TaskInfo.State != TaskStatePending {
t.Errorf("results[%d].TaskInfo.State = %v, want %v", i, res.TaskInfo.State, TaskStatePending)
}
}
gotPending := h.GetPendingMessages(t, r, "default")
if len(gotPending) != 3 {
t.Errorf("len(pending) = %d, want 3", len(gotPending))
}
}
func TestBatchEnqueueContext_ScheduledTask(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
future := time.Now().Add(1 * time.Hour)
tasks := []*Task{
NewTask("scheduled_task", []byte("payload"), ProcessAt(future)),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 1 {
t.Fatalf("BatchEnqueueContext returned %d results, want 1", len(results))
}
if results[0].Err != nil {
t.Fatalf("results[0].Err = %v, want nil", results[0].Err)
}
if results[0].TaskInfo == nil {
t.Fatal("results[0].TaskInfo is nil, want non-nil")
}
if results[0].TaskInfo.State != TaskStateScheduled {
t.Errorf("results[0].TaskInfo.State = %v, want %v", results[0].TaskInfo.State, TaskStateScheduled)
}
gotScheduled := h.GetScheduledMessages(t, r, "default")
if len(gotScheduled) != 1 {
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
}
}
func TestBatchEnqueueContext_MixedBatch(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
future := time.Now().Add(1 * time.Hour)
tasks := []*Task{
NewTask("immediate1", []byte("p1")),
NewTask("scheduled1", []byte("p2"), ProcessAt(future)),
NewTask("immediate2", []byte("p3")),
NewTask("grouped1", []byte("p4"), Group("mygroup")),
NewTask("immediate3", []byte("p5")),
}
results := client.BatchEnqueueContext(context.Background(), tasks)
if len(results) != 5 {
t.Fatalf("BatchEnqueueContext returned %d results, want 5", len(results))
}
// Immediate tasks (indices 0, 2, 4) should succeed with Pending state.
for _, idx := range []int{0, 2, 4} {
if results[idx].Err != nil {
t.Errorf("results[%d].Err = %v, want nil (immediate task)", idx, results[idx].Err)
}
if results[idx].TaskInfo == nil {
t.Errorf("results[%d].TaskInfo is nil, want non-nil", idx)
continue
}
if results[idx].TaskInfo.State != TaskStatePending {
t.Errorf("results[%d].TaskInfo.State = %v, want %v", idx, results[idx].TaskInfo.State, TaskStatePending)
}
}
// Scheduled task (index 1) should succeed with Scheduled state.
if results[1].Err != nil {
t.Errorf("results[1].Err = %v, want nil (scheduled task)", results[1].Err)
}
if results[1].TaskInfo != nil && results[1].TaskInfo.State != TaskStateScheduled {
t.Errorf("results[1].TaskInfo.State = %v, want %v", results[1].TaskInfo.State, TaskStateScheduled)
}
// Grouped task (index 3) should be rejected.
if results[3].Err == nil {
t.Error("results[3].Err is nil, want error for group task")
}
gotPending := h.GetPendingMessages(t, r, "default")
if len(gotPending) != 3 {
t.Errorf("len(pending) = %d, want 3", len(gotPending))
}
gotScheduled := h.GetScheduledMessages(t, r, "default")
if len(gotScheduled) != 1 {
t.Errorf("len(scheduled) = %d, want 1", len(gotScheduled))
}
}
func TestBatchEnqueueContext_ValidationErrors(t *testing.T) {
setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
tests := []struct {
desc string
tasks []*Task
opts []Option
}{
{
desc: "nil task",
tasks: []*Task{nil},
},
{
desc: "empty task typename",
tasks: []*Task{NewTask("", []byte("payload"))},
},
{
desc: "blank task typename",
tasks: []*Task{NewTask(" ", []byte("payload"))},
},
{
desc: "invalid option: unique TTL less than 1s",
tasks: []*Task{NewTask("foo", nil)},
opts: []Option{Unique(300 * time.Millisecond)},
},
{
desc: "group task rejected",
tasks: []*Task{NewTask("foo", nil, Group("mygroup"))},
},
{
desc: "unique task rejected",
tasks: []*Task{NewTask("foo", nil, Unique(time.Hour))},
},
}
for _, tc := range tests {
results := client.BatchEnqueueContext(context.Background(), tc.tasks, tc.opts...)
if len(results) != len(tc.tasks) {
t.Errorf("%s: got %d results, want %d", tc.desc, len(results), len(tc.tasks))
continue
}
for i, res := range results {
if res.Err == nil {
t.Errorf("%s: results[%d].Err = nil, want non-nil error", tc.desc, i)
}
if res.TaskInfo != nil {
t.Errorf("%s: results[%d].TaskInfo = %v, want nil", tc.desc, i, res.TaskInfo)
}
}
}
}
func TestBatchEnqueueContext_BrokerError(t *testing.T) {
r := rdb.NewRDB(setup(t))
defer r.Close()
testBroker := testbroker.NewTestBroker(r)
client := &Client{broker: testBroker, sharedConnection: true}
tasks := []*Task{
NewTask("task1", []byte("p1")),
NewTask("task2", []byte("p2")),
}
testBroker.Sleep()
results := client.BatchEnqueueContext(context.Background(), tasks)
testBroker.Wakeup()
if len(results) != 2 {
t.Fatalf("BatchEnqueueContext returned %d results, want 2", len(results))
}
for i, res := range results {
if res.Err == nil {
t.Errorf("results[%d].Err = nil, want non-nil error when broker is down", i)
}
if res.TaskInfo != nil {
t.Errorf("results[%d].TaskInfo = %v, want nil on broker error", i, res.TaskInfo)
}
}
}

View File

@@ -5,7 +5,6 @@
package asynq
import (
"encoding/json"
"fmt"
"strconv"
"strings"
@@ -1003,13 +1002,6 @@ func parseOption(s string) (Option, error) {
return nil, err
}
return Retention(d), nil
case "Header":
var h [2]string
err := json.Unmarshal([]byte(arg), &h)
if err != nil {
return nil, err
}
return Header(h[0], h[1]), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}

View File

@@ -3526,7 +3526,6 @@ func TestParseOption(t *testing.T) {
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
{`Retention(24h)`, RetentionOpt, 24 * time.Hour},
{`Header(["email", "hello@example.com"])`, HeaderOpt, [2]string{"email", "hello@example.com"}},
}
for _, tc := range tests {
@@ -3574,14 +3573,6 @@ func TestParseOption(t *testing.T) {
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case HeaderOpt:
gotVal, ok := got.Value().([2]string)
if !ok {
t.Fatal("returned Option with non array value")
}
if gotVal != tc.wantVal.([2]string) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
default:
t.Fatalf("returned Option with unexpected type: %v", got.Type())
}

View File

@@ -684,14 +684,6 @@ func (l *Lease) IsValid() bool {
return l.expireAt.After(now) || l.expireAt.Equal(now)
}
// BatchEnqueueItem pairs a task message with optional scheduling metadata for
// batch enqueue operations. If ProcessAt is zero, the task is enqueued for
// immediate processing; otherwise it is added to the scheduled set.
type BatchEnqueueItem struct {
Msg *TaskMessage
ProcessAt time.Time // zero value → immediate
}
// Broker is a message broker that supports operations to manage task queues.
//
// See rdb.RDB as a reference implementation.
@@ -700,12 +692,6 @@ type Broker interface {
Close() error
Enqueue(ctx context.Context, msg *TaskMessage) error
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
// BatchEnqueue enqueues multiple tasks in a single round-trip. It returns the
// count of newly enqueued tasks; duplicate IDs are silently skipped. The error
// is non-nil only on infrastructure failure (e.g. lost connection), in which
// case the count is meaningless. Individual task scripts are atomic but the
// batch as a whole is not transactional — partial success is possible.
BatchEnqueue(ctx context.Context, items []BatchEnqueueItem) (int, error)
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
Done(ctx context.Context, msg *TaskMessage) error
MarkAsComplete(ctx context.Context, msg *TaskMessage) error

View File

@@ -139,118 +139,6 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
return nil
}
// BatchEnqueue adds all given tasks to Redis using a single pipeline round-trip.
// Each item is either enqueued immediately (ProcessAt is zero) or added to the
// scheduled sorted set.
//
// WARNING: tasks whose IDs already exist in Redis are silently skipped.
//
// The pipeline executes independent Lua scripts per task — there is no
// MULTI/EXEC wrapping the batch, so individual tasks may succeed or fail
// independently. The returned int is the number of tasks actually written;
// skipped duplicates do not count. The returned error is non-nil only when the
// pipeline call itself fails (network error, context cancellation, etc.), in
// which case no individual result should be trusted.
//
// Message encoding errors cause an immediate return before any Redis I/O.
func (r *RDB) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
var op errors.Op = "rdb.BatchEnqueue"
if len(items) == 0 {
return 0, nil
}
// Preload Lua scripts so that EVALSHA inside the pipeline does not fail with
// NOSCRIPT. Script.Run on a pipeline only sends EVALSHA (unlike non-pipeline
// Run which retries with EVAL on NOSCRIPT).
needsEnqueue, needsSchedule := false, false
for _, item := range items {
if item.ProcessAt.IsZero() {
needsEnqueue = true
} else {
needsSchedule = true
}
if needsEnqueue && needsSchedule {
break
}
}
if needsEnqueue {
if err := enqueueCmd.Load(ctx, r.client).Err(); err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load enqueue script: %v", err))
}
}
if needsSchedule {
if err := scheduleCmd.Load(ctx, r.client).Err(); err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("failed to load schedule script: %v", err))
}
}
pipe := r.client.Pipeline()
// Track which pipeline slot holds each item's script result.
scriptIdxs := make([]int, 0, len(items))
pipeLen := 0
// Track queues we add to AllQueues in this pipeline so we can roll back the
// in-memory cache on failure.
var newQueues []string
now := r.clock.Now().UnixNano()
for _, item := range items {
encoded, err := base.EncodeMessage(item.Msg)
if err != nil {
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
}
if _, found := r.queuesPublished.Load(item.Msg.Queue); !found {
pipe.SAdd(ctx, base.AllQueues, item.Msg.Queue)
r.queuesPublished.Store(item.Msg.Queue, true)
newQueues = append(newQueues, item.Msg.Queue)
pipeLen++
}
if item.ProcessAt.IsZero() {
keys := []string{
base.TaskKey(item.Msg.Queue, item.Msg.ID),
base.PendingKey(item.Msg.Queue),
}
argv := []interface{}{encoded, item.Msg.ID, now}
enqueueCmd.Run(ctx, pipe, keys, argv...)
} else {
keys := []string{
base.TaskKey(item.Msg.Queue, item.Msg.ID),
base.ScheduledKey(item.Msg.Queue),
}
argv := []interface{}{encoded, item.ProcessAt.Unix(), item.Msg.ID}
scheduleCmd.Run(ctx, pipe, keys, argv...)
}
scriptIdxs = append(scriptIdxs, pipeLen)
pipeLen++
}
cmds, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
for _, q := range newQueues {
r.queuesPublished.Delete(q)
}
return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis pipeline error: %v", err))
}
enqueued := 0
for _, idx := range scriptIdxs {
if idx >= len(cmds) {
continue
}
res, err := cmds[idx].(*redis.Cmd).Result()
if err != nil {
continue
}
if n, ok := res.(int64); ok && n == 1 {
enqueued++
}
}
return enqueued, nil
}
// enqueueUniqueCmd enqueues the task message if the task is unique.
//
// KEYS[1] -> unique key

View File

@@ -160,156 +160,6 @@ func TestEnqueueTaskIdConflictError(t *testing.T) {
}
}
func TestBatchEnqueue(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"to": "user@example.com"}))
t2 := h.NewTaskMessageWithQueue("generate_csv", h.JSON(map[string]interface{}{}), "csv")
t3 := h.NewTaskMessageWithQueue("sync", nil, "low")
enqueueTime := time.Now()
r.SetClock(timeutil.NewSimulatedClock(enqueueTime))
t.Run("enqueue multiple tasks", func(t *testing.T) {
h.FlushDB(t, r.client)
items := []base.BatchEnqueueItem{
{Msg: t1},
{Msg: t2},
{Msg: t3},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 3 {
t.Errorf("BatchEnqueue returned %d, want 3", n)
}
for _, item := range items {
msg := item.Msg
pendingKey := base.PendingKey(msg.Queue)
pendingIDs := r.client.LRange(context.Background(), pendingKey, 0, -1).Val()
found := false
for _, id := range pendingIDs {
if id == msg.ID {
found = true
break
}
}
if !found {
t.Errorf("task %s not found in pending list %s", msg.ID, pendingKey)
}
taskKey := base.TaskKey(msg.Queue, msg.ID)
state := r.client.HGet(context.Background(), taskKey, "state").Val()
if state != "pending" {
t.Errorf("state for task %s = %q, want %q", msg.ID, state, "pending")
}
}
})
t.Run("empty batch", func(t *testing.T) {
h.FlushDB(t, r.client)
n, err := r.BatchEnqueue(context.Background(), nil)
if err != nil {
t.Fatalf("BatchEnqueue(nil) returned error: %v", err)
}
if n != 0 {
t.Errorf("BatchEnqueue(nil) returned %d, want 0", n)
}
})
t.Run("duplicate IDs skipped", func(t *testing.T) {
h.FlushDB(t, r.client)
if err := r.Enqueue(context.Background(), t1); err != nil {
t.Fatalf("pre-enqueue failed: %v", err)
}
dup := *t1
newMsg := h.NewTaskMessage("new_task", nil)
items := []base.BatchEnqueueItem{
{Msg: &dup},
{Msg: newMsg},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 1 {
t.Errorf("BatchEnqueue returned %d, want 1 (duplicate should be skipped)", n)
}
})
t.Run("scheduled tasks", func(t *testing.T) {
h.FlushDB(t, r.client)
future := time.Now().Add(1 * time.Hour)
s1 := h.NewTaskMessage("deferred_email", nil)
items := []base.BatchEnqueueItem{
{Msg: t1},
{Msg: s1, ProcessAt: future},
}
n, err := r.BatchEnqueue(context.Background(), items)
if err != nil {
t.Fatalf("BatchEnqueue returned error: %v", err)
}
if n != 2 {
t.Errorf("BatchEnqueue returned %d, want 2", n)
}
// Immediate task should be in pending.
pendingIDs := r.client.LRange(context.Background(), base.PendingKey(t1.Queue), 0, -1).Val()
foundPending := false
for _, id := range pendingIDs {
if id == t1.ID {
foundPending = true
}
}
if !foundPending {
t.Errorf("immediate task %s not found in pending list", t1.ID)
}
// Scheduled task should be in scheduled set.
scheduledIDs := r.client.ZRange(context.Background(), base.ScheduledKey(s1.Queue), 0, -1).Val()
foundScheduled := false
for _, id := range scheduledIDs {
if id == s1.ID {
foundScheduled = true
}
}
if !foundScheduled {
t.Errorf("scheduled task %s not found in scheduled set", s1.ID)
}
taskKey := base.TaskKey(s1.Queue, s1.ID)
state := r.client.HGet(context.Background(), taskKey, "state").Val()
if state != "scheduled" {
t.Errorf("state for scheduled task %s = %q, want %q", s1.ID, state, "scheduled")
}
})
t.Run("pipeline error from cancelled context", func(t *testing.T) {
h.FlushDB(t, r.client)
msg := h.NewTaskMessage("pipeline_error_task", nil)
items := []base.BatchEnqueueItem{{Msg: msg}}
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := r.BatchEnqueue(ctx, items)
if err == nil {
t.Error("BatchEnqueue with cancelled context returned nil error, want non-nil")
}
})
}
func TestEnqueueQueueCache(t *testing.T) {
r := setup(t)
defer r.Close()

View File

@@ -64,15 +64,6 @@ func (tb *TestBroker) EnqueueUnique(ctx context.Context, msg *base.TaskMessage,
return tb.real.EnqueueUnique(ctx, msg, ttl)
}
func (tb *TestBroker) BatchEnqueue(ctx context.Context, items []base.BatchEnqueueItem) (int, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return 0, errRedisDown
}
return tb.real.BatchEnqueue(ctx, items)
}
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) {
tb.mu.Lock()
defer tb.mu.Unlock()

View File

@@ -7,6 +7,7 @@ package cmd
import (
"fmt"
"io"
"os"
"sort"
"time"
@@ -35,14 +36,14 @@ var cronListCmd = &cobra.Command{
Use: "list",
Aliases: []string{"ls"},
Short: "List cron entries",
RunE: cronList,
Run: cronList,
}
var cronHistoryCmd = &cobra.Command{
Use: "history <entry_id> [<entry_id>...]",
Short: "Show history of each cron tasks",
Args: cobra.MinimumNArgs(1),
RunE: cronHistory,
Run: cronHistory,
Example: heredoc.Doc(`
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 bf6a8594-cd03-4968-b36a-8572c5e160dd
@@ -50,16 +51,17 @@ var cronHistoryCmd = &cobra.Command{
$ asynq cron history 7837f142-6337-4217-9276-8f27281b67d1 --page=2`),
}
func cronList(cmd *cobra.Command, args []string) error {
func cronList(cmd *cobra.Command, args []string) {
inspector := createInspector()
entries, err := inspector.SchedulerEntries()
if err != nil {
return fmt.Errorf("could not fetch scheduler entries: %v", err)
fmt.Println(err)
os.Exit(1)
}
if len(entries) == 0 {
fmt.Println("No scheduler entries")
return nil
return
}
// Sort entries by spec.
@@ -76,7 +78,6 @@ func cronList(cmd *cobra.Command, args []string) error {
}
}
printTable(cols, printRows)
return nil
}
// Returns a string describing when the next enqueue will happen.
@@ -96,14 +97,16 @@ func prevEnqueue(prevEnqueuedAt time.Time) string {
return fmt.Sprintf("%v ago", time.Since(prevEnqueuedAt).Round(time.Second))
}
func cronHistory(cmd *cobra.Command, args []string) error {
func cronHistory(cmd *cobra.Command, args []string) {
pageNum, err := cmd.Flags().GetInt("page")
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
pageSize, err := cmd.Flags().GetInt("size")
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
inspector := createInspector()
for i, entryID := range args {
@@ -133,5 +136,4 @@ func cronHistory(cmd *cobra.Command, args []string) error {
}
printTable(cols, printRows)
}
return nil
}

View File

@@ -6,6 +6,7 @@ package cmd
import (
"fmt"
"os"
"time"
"github.com/MakeNowJust/heredoc/v2"
@@ -31,14 +32,14 @@ var dashCmd = &cobra.Command{
Example: heredoc.Doc(`
$ asynq dash
$ asynq dash --refresh=3s`),
RunE: func(cmd *cobra.Command, args []string) error {
Run: func(cmd *cobra.Command, args []string) {
if flagPollInterval < 1*time.Second {
return fmt.Errorf("--refresh cannot be less than 1s")
fmt.Println("error: --refresh cannot be less than 1s")
os.Exit(1)
}
dash.Run(dash.Options{
PollInterval: flagPollInterval,
RedisConnOpt: getRedisConnOpt(),
})
return nil
},
}

View File

@@ -6,6 +6,7 @@ package cmd
import (
"fmt"
"os"
"github.com/MakeNowJust/heredoc/v2"
"github.com/spf13/cobra"
@@ -30,25 +31,22 @@ var groupListCmd = &cobra.Command{
Aliases: []string{"ls"},
Short: "List groups",
Args: cobra.NoArgs,
RunE: groupLists,
Run: groupLists,
}
func groupLists(cmd *cobra.Command, args []string) error {
func groupLists(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
inspector := createInspector()
groups, err := inspector.Groups(qname)
if err != nil {
return fmt.Errorf("could not fetch groups: %v", err)
}
if len(groups) == 0 {
fmt.Printf("No groups found in queue %q\n", qname)
return nil
return
}
for _, g := range groups {
fmt.Println(g.Group)
}
return nil
}

View File

@@ -7,6 +7,7 @@ package cmd
import (
"fmt"
"io"
"os"
"github.com/MakeNowJust/heredoc/v2"
"github.com/fatih/color"
@@ -43,14 +44,16 @@ var queueListCmd = &cobra.Command{
Use: "list",
Short: "List queues",
Aliases: []string{"ls"},
RunE: queueList,
// TODO: Use RunE instead?
Run: queueList,
}
var queueInspectCmd = &cobra.Command{
Use: "inspect <queue> [<queue>...]",
Short: "Display detailed information on one or more queues",
Args: cobra.MinimumNArgs(1),
RunE: queueInspect,
Args: cobra.MinimumNArgs(1),
// TODO: Use RunE instead?
Run: queueInspect,
Example: heredoc.Doc(`
$ asynq queue inspect myqueue
$ asynq queue inspect queue1 queue2 queue3`),
@@ -59,8 +62,8 @@ var queueInspectCmd = &cobra.Command{
var queueHistoryCmd = &cobra.Command{
Use: "history <queue> [<queue>...]",
Short: "Display historical aggregate data from one or more queues",
Args: cobra.MinimumNArgs(1),
RunE: queueHistory,
Args: cobra.MinimumNArgs(1),
Run: queueHistory,
Example: heredoc.Doc(`
$ asynq queue history myqueue
$ asynq queue history queue1 queue2 queue3
@@ -71,7 +74,7 @@ var queuePauseCmd = &cobra.Command{
Use: "pause <queue> [<queue>...]",
Short: "Pause one or more queues",
Args: cobra.MinimumNArgs(1),
RunE: queuePause,
Run: queuePause,
Example: heredoc.Doc(`
$ asynq queue pause myqueue
$ asynq queue pause queue1 queue2 queue3`),
@@ -82,7 +85,7 @@ var queueUnpauseCmd = &cobra.Command{
Short: "Resume (unpause) one or more queues",
Args: cobra.MinimumNArgs(1),
Aliases: []string{"unpause"},
RunE: queueUnpause,
Run: queueUnpause,
Example: heredoc.Doc(`
$ asynq queue resume myqueue
$ asynq queue resume queue1 queue2 queue3`),
@@ -93,14 +96,14 @@ var queueRemoveCmd = &cobra.Command{
Short: "Remove one or more queues",
Aliases: []string{"rm", "delete"},
Args: cobra.MinimumNArgs(1),
RunE: queueRemove,
Run: queueRemove,
Example: heredoc.Doc(`
$ asynq queue rm myqueue
$ asynq queue rm queue1 queue2 queue3
$ asynq queue rm myqueue --force`),
}
func queueList(cmd *cobra.Command, args []string) error {
func queueList(cmd *cobra.Command, args []string) {
type queueInfo struct {
name string
keyslot int64
@@ -109,7 +112,8 @@ func queueList(cmd *cobra.Command, args []string) error {
inspector := createInspector()
queues, err := inspector.Queues()
if err != nil {
return fmt.Errorf("could not fetch list of queues: %v", err)
fmt.Printf("error: Could not fetch list of queues: %v\n", err)
os.Exit(1)
}
var qs []*queueInfo
for _, qname := range queues {
@@ -117,13 +121,13 @@ func queueList(cmd *cobra.Command, args []string) error {
if useRedisCluster {
keyslot, err := inspector.ClusterKeySlot(qname)
if err != nil {
fmt.Printf("error: could not get cluster keyslot for %q\n", qname)
fmt.Errorf("error: Could not get cluster keyslot for %q\n", qname)
continue
}
q.keyslot = keyslot
nodes, err := inspector.ClusterNodes(qname)
if err != nil {
fmt.Printf("error: could not get cluster nodes for %q\n", qname)
fmt.Errorf("error: Could not get cluster nodes for %q\n", qname)
continue
}
q.nodes = nodes
@@ -144,10 +148,9 @@ func queueList(cmd *cobra.Command, args []string) error {
fmt.Println(q.name)
}
}
return nil
}
func queueInspect(cmd *cobra.Command, args []string) error {
func queueInspect(cmd *cobra.Command, args []string) {
inspector := createInspector()
for i, qname := range args {
if i > 0 {
@@ -160,7 +163,6 @@ func queueInspect(cmd *cobra.Command, args []string) error {
}
printQueueInfo(info)
}
return nil
}
func printQueueInfo(info *asynq.QueueInfo) {
@@ -193,10 +195,11 @@ func printQueueInfo(info *asynq.QueueInfo) {
)
}
func queueHistory(cmd *cobra.Command, args []string) error {
func queueHistory(cmd *cobra.Command, args []string) {
days, err := cmd.Flags().GetInt("days")
if err != nil {
return err
fmt.Printf("error: Internal error: %v\n", err)
os.Exit(1)
}
inspector := createInspector()
for i, qname := range args {
@@ -211,7 +214,6 @@ func queueHistory(cmd *cobra.Command, args []string) error {
}
printDailyStats(stats)
}
return nil
}
func printDailyStats(stats []*asynq.DailyStats) {
@@ -231,63 +233,49 @@ func printDailyStats(stats []*asynq.DailyStats) {
)
}
func queuePause(cmd *cobra.Command, args []string) error {
func queuePause(cmd *cobra.Command, args []string) {
inspector := createInspector()
var firstErr error
for _, qname := range args {
err := inspector.PauseQueue(qname)
if err != nil {
fmt.Println(err)
if firstErr == nil {
firstErr = err
}
continue
}
fmt.Printf("Successfully paused queue %q\n", qname)
}
return firstErr
}
func queueUnpause(cmd *cobra.Command, args []string) error {
func queueUnpause(cmd *cobra.Command, args []string) {
inspector := createInspector()
var firstErr error
for _, qname := range args {
err := inspector.UnpauseQueue(qname)
if err != nil {
fmt.Println(err)
if firstErr == nil {
firstErr = err
}
continue
}
fmt.Printf("Successfully unpaused queue %q\n", qname)
}
return firstErr
}
func queueRemove(cmd *cobra.Command, args []string) error {
func queueRemove(cmd *cobra.Command, args []string) {
// TODO: Use inspector once RemoveQueue become public API.
force, err := cmd.Flags().GetBool("force")
if err != nil {
return err
fmt.Printf("error: Internal error: %v\n", err)
os.Exit(1)
}
r := createRDB()
var firstErr error
for _, qname := range args {
err = r.RemoveQueue(qname, force)
if err != nil {
if errors.IsQueueNotEmpty(err) {
fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname)
} else {
fmt.Printf("error: %v\n", err)
}
if firstErr == nil {
firstErr = err
continue
}
fmt.Printf("error: %v\n", err)
continue
}
fmt.Printf("Successfully removed queue %q\n", qname)
}
return firstErr
}

View File

@@ -487,31 +487,35 @@ func isPrintable(data []byte) bool {
}
// Helper to turn a command line flag into a duration
func getDuration(cmd *cobra.Command, arg string) (time.Duration, error) {
func getDuration(cmd *cobra.Command, arg string) time.Duration {
durationStr, err := cmd.Flags().GetString(arg)
if err != nil {
return 0, err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
duration, err := time.ParseDuration(durationStr)
if err != nil {
return 0, err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
return duration, nil
return duration
}
// Helper to turn a command line flag into a time
func getTime(cmd *cobra.Command, arg string) (time.Time, error) {
func getTime(cmd *cobra.Command, arg string) time.Time {
timeStr, err := cmd.Flags().GetString(arg)
if err != nil {
return time.Time{}, err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
timeVal, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
return time.Time{}, err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
return timeVal, nil
return timeVal
}

View File

@@ -7,6 +7,7 @@ package cmd
import (
"fmt"
"io"
"os"
"sort"
"strings"
"time"
@@ -43,19 +44,20 @@ The command shows the following for each server:
A "active" server is pulling tasks from queues and processing them.
A "stopped" server is no longer pulling new tasks from queues`,
RunE: serverList,
Run: serverList,
}
func serverList(cmd *cobra.Command, args []string) error {
func serverList(cmd *cobra.Command, args []string) {
r := createRDB()
servers, err := r.ListServers()
if err != nil {
return fmt.Errorf("could not fetch list of servers: %v", err)
fmt.Println(err)
os.Exit(1)
}
if len(servers) == 0 {
fmt.Println("No running servers")
return nil
return
}
// sort by hostname and pid
@@ -78,7 +80,6 @@ func serverList(cmd *cobra.Command, args []string) error {
}
}
printTable(cols, printRows)
return nil
}
func formatQueues(qmap map[string]int) string {

View File

@@ -35,7 +35,7 @@ var statsCmd = &cobra.Command{
* Aggregate data for the current day
* Basic information about the running redis instance`),
Args: cobra.NoArgs,
RunE: stats,
Run: stats,
}
var jsonFlag bool
@@ -74,12 +74,13 @@ type FullStats struct {
RedisInfo map[string]string `json:"redis"`
}
func stats(cmd *cobra.Command, args []string) error {
func stats(cmd *cobra.Command, args []string) {
r := createRDB()
queues, err := r.AllQueues()
if err != nil {
return fmt.Errorf("could not fetch queues: %v", err)
fmt.Println(err)
os.Exit(1)
}
var aggStats AggregateStats
@@ -87,7 +88,8 @@ func stats(cmd *cobra.Command, args []string) error {
for _, qname := range queues {
s, err := r.CurrentStats(qname)
if err != nil {
return fmt.Errorf("could not fetch stats for queue %q: %v", qname, err)
fmt.Println(err)
os.Exit(1)
}
aggStats.Active += s.Active
aggStats.Pending += s.Pending
@@ -108,7 +110,8 @@ func stats(cmd *cobra.Command, args []string) error {
info, err = r.RedisInfo()
}
if err != nil {
return fmt.Errorf("could not fetch redis info: %v", err)
fmt.Println(err)
os.Exit(1)
}
if jsonFlag {
@@ -119,11 +122,12 @@ func stats(cmd *cobra.Command, args []string) error {
})
if err != nil {
return fmt.Errorf("could not marshal stats to JSON: %v", err)
fmt.Println(err)
os.Exit(1)
}
fmt.Println(string(statsJSON))
return nil
return
}
bold := color.New(color.Bold)
@@ -147,7 +151,6 @@ func stats(cmd *cobra.Command, args []string) error {
printInfo(info)
}
fmt.Println()
return nil
}
func printStatsByState(s *AggregateStats) {

View File

@@ -7,6 +7,7 @@ package cmd
import (
"fmt"
"io"
"os"
"time"
"github.com/MakeNowJust/heredoc/v2"
@@ -119,14 +120,14 @@ var taskListCmd = &cobra.Command{
$ asynq task list --queue=myqueue --state=pending
$ asynq task list --queue=myqueue --state=aggregating --group=mygroup
$ asynq task list --queue=myqueue --state=scheduled --page=2`),
RunE: taskList,
Run: taskList,
}
var taskInspectCmd = &cobra.Command{
Use: "inspect --queue=<queue> --id=<task_id>",
Short: "Display detailed information on the specified task",
Args: cobra.NoArgs,
RunE: taskInspect,
Run: taskInspect,
Example: heredoc.Doc(`
$ asynq task inspect --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -135,7 +136,7 @@ var taskCancelCmd = &cobra.Command{
Use: "cancel <task_id> [<task_id>...]",
Short: "Cancel one or more active tasks",
Args: cobra.MinimumNArgs(1),
RunE: taskCancel,
Run: taskCancel,
Example: heredoc.Doc(`
$ asynq task cancel f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -144,7 +145,7 @@ var taskArchiveCmd = &cobra.Command{
Use: "archive --queue=<queue> --id=<task_id>",
Short: "Archive a task with the given id",
Args: cobra.NoArgs,
RunE: taskArchive,
Run: taskArchive,
Example: heredoc.Doc(`
$ asynq task archive --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -154,7 +155,7 @@ var taskDeleteCmd = &cobra.Command{
Aliases: []string{"remove", "rm"},
Short: "Delete a task with the given id",
Args: cobra.NoArgs,
RunE: taskDelete,
Run: taskDelete,
Example: heredoc.Doc(`
$ asynq task delete --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -163,7 +164,7 @@ var taskRunCmd = &cobra.Command{
Use: "run --queue=<queue> --id=<task_id>",
Short: "Run a task with the given id",
Args: cobra.NoArgs,
RunE: taskRun,
Run: taskRun,
Example: heredoc.Doc(`
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
}
@@ -172,7 +173,7 @@ var taskEnqueueCmd = &cobra.Command{
Use: "enqueue --type_name=footype --payload=barpayload",
Short: "Enqueue a task",
Args: cobra.NoArgs,
RunE: taskEnqueue,
Run: taskEnqueue,
Example: heredoc.Doc(`
$ asynq task enqueue -t footype -l barpayload
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
@@ -182,7 +183,7 @@ var taskArchiveAllCmd = &cobra.Command{
Use: "archiveall --queue=<queue> --state=<state>",
Short: "Archive all tasks in the given state",
Args: cobra.NoArgs,
RunE: taskArchiveAll,
Run: taskArchiveAll,
Example: heredoc.Doc(`
$ asynq task archiveall --queue=myqueue --state=retry
$ asynq task archiveall --queue=myqueue --state=aggregating --group=mygroup`),
@@ -192,7 +193,7 @@ var taskDeleteAllCmd = &cobra.Command{
Use: "deleteall --queue=<queue> --state=<state>",
Short: "Delete all tasks in the given state",
Args: cobra.NoArgs,
RunE: taskDeleteAll,
Run: taskDeleteAll,
Example: heredoc.Doc(`
$ asynq task deleteall --queue=myqueue --state=archived
$ asynq task deleteall --queue=myqueue --state=aggregating --group=mygroup`),
@@ -202,66 +203,74 @@ var taskRunAllCmd = &cobra.Command{
Use: "runall --queue=<queue> --state=<state>",
Short: "Run all tasks in the given state",
Args: cobra.NoArgs,
RunE: taskRunAll,
Run: taskRunAll,
Example: heredoc.Doc(`
$ asynq task runall --queue=myqueue --state=retry
$ asynq task runall --queue=myqueue --state=aggregating --group=mygroup`),
}
func taskList(cmd *cobra.Command, args []string) error {
func taskList(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
state, err := cmd.Flags().GetString("state")
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
pageNum, err := cmd.Flags().GetInt("page")
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
pageSize, err := cmd.Flags().GetInt("size")
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
switch state {
case "active":
return listActiveTasks(qname, pageNum, pageSize)
listActiveTasks(qname, pageNum, pageSize)
case "pending":
return listPendingTasks(qname, pageNum, pageSize)
listPendingTasks(qname, pageNum, pageSize)
case "scheduled":
return listScheduledTasks(qname, pageNum, pageSize)
listScheduledTasks(qname, pageNum, pageSize)
case "retry":
return listRetryTasks(qname, pageNum, pageSize)
listRetryTasks(qname, pageNum, pageSize)
case "archived":
return listArchivedTasks(qname, pageNum, pageSize)
listArchivedTasks(qname, pageNum, pageSize)
case "completed":
return listCompletedTasks(qname, pageNum, pageSize)
listCompletedTasks(qname, pageNum, pageSize)
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
if group == "" {
return fmt.Errorf("flag --group is required for listing aggregating tasks")
fmt.Println("Flag --group is required for listing aggregating tasks")
os.Exit(1)
}
return listAggregatingTasks(qname, group, pageNum, pageSize)
listAggregatingTasks(qname, group, pageNum, pageSize)
default:
return fmt.Errorf("state=%q is not supported", state)
fmt.Printf("error: state=%q is not supported\n", state)
os.Exit(1)
}
}
func listActiveTasks(qname string, pageNum, pageSize int) error {
func listActiveTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No active tasks in %q queue\n", qname)
return nil
return
}
printTable(
[]string{"ID", "Type", "Payload"},
@@ -271,18 +280,18 @@ func listActiveTasks(qname string, pageNum, pageSize int) error {
}
},
)
return nil
}
func listPendingTasks(qname string, pageNum, pageSize int) error {
func listPendingTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No pending tasks in %q queue\n", qname)
return nil
return
}
printTable(
[]string{"ID", "Type", "Payload"},
@@ -292,18 +301,18 @@ func listPendingTasks(qname string, pageNum, pageSize int) error {
}
},
)
return nil
}
func listScheduledTasks(qname string, pageNum, pageSize int) error {
func listScheduledTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No scheduled tasks in %q queue\n", qname)
return nil
return
}
printTable(
[]string{"ID", "Type", "Payload", "Process In"},
@@ -313,7 +322,6 @@ func listScheduledTasks(qname string, pageNum, pageSize int) error {
}
},
)
return nil
}
// formatProcessAt formats next process at time to human friendly string.
@@ -327,15 +335,16 @@ func formatProcessAt(processAt time.Time) string {
return fmt.Sprintf("in %v", d.Round(time.Second))
}
func listRetryTasks(qname string, pageNum, pageSize int) error {
func listRetryTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No retry tasks in %q queue\n", qname)
return nil
return
}
printTable(
[]string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"},
@@ -346,18 +355,18 @@ func listRetryTasks(qname string, pageNum, pageSize int) error {
}
},
)
return nil
}
func listArchivedTasks(qname string, pageNum, pageSize int) error {
func listArchivedTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No archived tasks in %q queue\n", qname)
return nil
return
}
printTable(
[]string{"ID", "Type", "Payload", "Last Failed", "Last Error"},
@@ -366,18 +375,18 @@ func listArchivedTasks(qname string, pageNum, pageSize int) error {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.LastFailedAt), t.LastErr)
}
})
return nil
}
func listCompletedTasks(qname string, pageNum, pageSize int) error {
func listCompletedTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No completed tasks in %q queue\n", qname)
return nil
return
}
printTable(
[]string{"ID", "Type", "Payload", "CompletedAt", "Result"},
@@ -386,18 +395,18 @@ func listCompletedTasks(qname string, pageNum, pageSize int) error {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.CompletedAt), sprintBytes(t.Result))
}
})
return nil
}
func listAggregatingTasks(qname, group string, pageNum, pageSize int) error {
func listAggregatingTasks(qname, group string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
return err
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No aggregating tasks in group %q \n", group)
return nil
return
}
printTable(
[]string{"ID", "Type", "Payload", "Group"},
@@ -407,42 +416,38 @@ func listAggregatingTasks(qname, group string, pageNum, pageSize int) error {
}
},
)
return nil
}
func taskCancel(cmd *cobra.Command, args []string) error {
func taskCancel(cmd *cobra.Command, args []string) {
i := createInspector()
var firstErr error
for _, id := range args {
if err := i.CancelProcessing(id); err != nil {
fmt.Printf("error: could not send cancelation signal: %v\n", err)
if firstErr == nil {
firstErr = err
}
continue
}
fmt.Printf("Sent cancelation signal for task %s\n", id)
}
return firstErr
}
func taskInspect(cmd *cobra.Command, args []string) error {
func taskInspect(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
id, err := cmd.Flags().GetString("id")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
i := createInspector()
info, err := i.GetTaskInfo(qname, id)
if err != nil {
return fmt.Errorf("could not get task info: %v", err)
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
printTaskInfo(info)
return nil
}
func printTaskInfo(info *asynq.TaskInfo) {
@@ -481,72 +486,80 @@ func formatPastTime(t time.Time) string {
return t.Format(time.UnixDate)
}
func taskArchive(cmd *cobra.Command, args []string) error {
func taskArchive(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
id, err := cmd.Flags().GetString("id")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
i := createInspector()
err = i.ArchiveTask(qname, id)
if err != nil {
return fmt.Errorf("could not archive task: %v", err)
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Println("task archived")
return nil
}
func taskDelete(cmd *cobra.Command, args []string) error {
func taskDelete(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
id, err := cmd.Flags().GetString("id")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
i := createInspector()
err = i.DeleteTask(qname, id)
if err != nil {
return fmt.Errorf("could not delete task: %v", err)
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Println("task deleted")
return nil
}
func taskRun(cmd *cobra.Command, args []string) error {
func taskRun(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
id, err := cmd.Flags().GetString("id")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
i := createInspector()
err = i.RunTask(qname, id)
if err != nil {
return fmt.Errorf("could not run task: %v", err)
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Println("task is now pending")
return nil
}
func taskEnqueue(cmd *cobra.Command, args []string) error {
func taskEnqueue(cmd *cobra.Command, args []string) {
typeName, err := cmd.Flags().GetString("type_name")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
payload, err := cmd.Flags().GetString("payload")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
// For all of the optional flags, we need to explicitly check whether they were set or
@@ -556,7 +569,8 @@ func taskEnqueue(cmd *cobra.Command, args []string) error {
if cmd.Flags().Changed("retry") {
retry, err := cmd.Flags().GetInt("retry")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.MaxRetry(retry))
}
@@ -564,7 +578,8 @@ func taskEnqueue(cmd *cobra.Command, args []string) error {
if cmd.Flags().Changed("queue") {
queue, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.Queue(queue))
}
@@ -572,63 +587,41 @@ func taskEnqueue(cmd *cobra.Command, args []string) error {
if cmd.Flags().Changed("id") {
id, err := cmd.Flags().GetString("id")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.TaskID(id))
}
if cmd.Flags().Changed("timeout") {
d, err := getDuration(cmd, "timeout")
if err != nil {
return err
}
opts = append(opts, asynq.Timeout(d))
opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout")))
}
if cmd.Flags().Changed("deadline") {
t, err := getTime(cmd, "deadline")
if err != nil {
return err
}
opts = append(opts, asynq.Deadline(t))
opts = append(opts, asynq.Deadline(getTime(cmd, "deadline")))
}
if cmd.Flags().Changed("unique") {
d, err := getDuration(cmd, "unique")
if err != nil {
return err
}
opts = append(opts, asynq.Unique(d))
opts = append(opts, asynq.Unique(getDuration(cmd, "unique")))
}
if cmd.Flags().Changed("process_at") {
t, err := getTime(cmd, "process_at")
if err != nil {
return err
}
opts = append(opts, asynq.ProcessAt(t))
opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at")))
}
if cmd.Flags().Changed("process_in") {
d, err := getDuration(cmd, "process_in")
if err != nil {
return err
}
opts = append(opts, asynq.ProcessIn(d))
opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in")))
}
if cmd.Flags().Changed("retention") {
d, err := getDuration(cmd, "retention")
if err != nil {
return err
}
opts = append(opts, asynq.Retention(d))
opts = append(opts, asynq.Retention(getDuration(cmd, "retention")))
}
if cmd.Flags().Changed("group") {
group, err := cmd.Flags().GetString("group")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
opts = append(opts, asynq.Group(group))
}
@@ -638,21 +631,23 @@ func taskEnqueue(cmd *cobra.Command, args []string) error {
taskInfo, err := c.Enqueue(task)
if err != nil {
return fmt.Errorf("could not enqueue task: %v", err)
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
return nil
}
func taskArchiveAll(cmd *cobra.Command, args []string) error {
func taskArchiveAll(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
state, err := cmd.Flags().GetString("state")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
i := createInspector()
@@ -667,35 +662,35 @@ func taskArchiveAll(cmd *cobra.Command, args []string) error {
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
if group == "" {
return fmt.Errorf("flag --group is required for aggregating tasks")
fmt.Println("error: Flag --group is required for aggregating tasks")
os.Exit(1)
}
n, err = i.ArchiveAllAggregatingTasks(qname, group)
if err != nil {
return err
}
fmt.Printf("%d tasks archived\n", n)
return nil
default:
return fmt.Errorf("unsupported state %q", state)
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
}
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Printf("%d tasks archived\n", n)
return nil
}
func taskDeleteAll(cmd *cobra.Command, args []string) error {
func taskDeleteAll(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
state, err := cmd.Flags().GetString("state")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
i := createInspector()
@@ -714,35 +709,35 @@ func taskDeleteAll(cmd *cobra.Command, args []string) error {
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
if group == "" {
return fmt.Errorf("flag --group is required for aggregating tasks")
fmt.Println("error: Flag --group is required for aggregating tasks")
os.Exit(1)
}
n, err = i.DeleteAllAggregatingTasks(qname, group)
if err != nil {
return err
}
fmt.Printf("%d tasks deleted\n", n)
return nil
default:
return fmt.Errorf("unsupported state %q", state)
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
}
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Printf("%d tasks deleted\n", n)
return nil
}
func taskRunAll(cmd *cobra.Command, args []string) error {
func taskRunAll(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
state, err := cmd.Flags().GetString("state")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
i := createInspector()
@@ -757,23 +752,21 @@ func taskRunAll(cmd *cobra.Command, args []string) error {
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
if group == "" {
return fmt.Errorf("flag --group is required for aggregating tasks")
fmt.Println("error: Flag --group is required for aggregating tasks")
os.Exit(1)
}
n, err = i.RunAllAggregatingTasks(qname, group)
if err != nil {
return err
}
fmt.Printf("%d tasks are now pending\n", n)
return nil
default:
return fmt.Errorf("unsupported state %q", state)
fmt.Printf("error: unsupported state %q\n", state)
os.Exit(1)
}
if err != nil {
return err
fmt.Printf("error: %v\n", err)
os.Exit(1)
}
fmt.Printf("%d tasks are now pending\n", n)
return nil
}

View File

@@ -1,17 +1,17 @@
module github.com/hibiken/asynq/tools
go 1.22
go 1.23.0
require (
github.com/MakeNowJust/heredoc/v2 v2.0.1
github.com/fatih/color v1.18.0
github.com/gdamore/tcell/v2 v2.5.1
github.com/google/go-cmp v0.6.0
github.com/google/go-cmp v0.7.0
github.com/hibiken/asynq v0.25.0
github.com/hibiken/asynq/x v0.0.0-20220131170841-349f4c50fb1d
github.com/mattn/go-runewidth v0.0.16
github.com/mitchellh/go-homedir v1.1.0
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_golang v1.23.2
github.com/redis/go-redis/v9 v9.7.0
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
@@ -21,11 +21,10 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gdamore/encoding v1.0.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
@@ -33,23 +32,24 @@ require (
github.com/magiconair/properties v1.8.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.1.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/sys v0.26.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf // indirect
golang.org/x/text v0.3.8 // indirect
golang.org/x/text v0.28.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/protobuf v1.35.1 // indirect
google.golang.org/protobuf v1.36.8 // indirect
gopkg.in/ini.v1 v1.51.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

View File

@@ -39,8 +39,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
@@ -101,8 +101,6 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@@ -113,8 +111,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -173,6 +171,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@@ -183,6 +183,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
@@ -197,7 +199,6 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
@@ -214,6 +215,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
@@ -241,25 +244,28 @@ github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDf
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
@@ -269,8 +275,8 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
@@ -305,8 +311,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
@@ -321,6 +328,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -411,8 +420,8 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220318055525-2edf467146b5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf h1:MZ2shdL+ZM/XzY3ZGOnh4Nlpnxz5GSOhOmtHo3iPU6M=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -422,8 +431,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
@@ -487,13 +496,14 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
@@ -509,8 +519,9 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=