2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-02-02 15:47:50 +00:00

wrap all fmt.Errorf errors (#1047)

Users need to be able to match with `errors.Is()` also on external
errors, for example `context.Canceled`.
This commit is contained in:
Benjamin Grosse
2025-11-04 15:25:47 +00:00
committed by GitHub
parent a889ef0b08
commit d64f0b7ed0
6 changed files with 31 additions and 31 deletions

View File

@@ -449,7 +449,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
return nil, fmt.Errorf("asynq: could not parse redis uri: %w", err)
}
switch u.Scheme {
case "redis", "rediss":
@@ -539,7 +539,7 @@ type ResultWriter struct {
func (w *ResultWriter) Write(data []byte) (n int, err error) {
select {
case <-w.ctx.Done():
return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err())
return 0, fmt.Errorf("failed to result task result: %w", w.ctx.Err())
default:
}
return w.broker.WriteResult(w.qname, w.id, data)

View File

@@ -123,7 +123,7 @@ func ExampleResultWriter() {
res := []byte("task result data")
n, err := task.ResultWriter().Write(res) // implements io.Writer
if err != nil {
return fmt.Errorf("failed to write task result: %v", err)
return fmt.Errorf("failed to write task result: %w", err)
}
log.Printf(" %d bytes written", n)
return nil

View File

@@ -245,7 +245,7 @@ func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
case errors.IsTaskNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
}
@@ -316,7 +316,7 @@ func Page(n int) ListOption {
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -325,7 +325,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -344,7 +344,7 @@ func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskI
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -353,11 +353,11 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
expired, err := i.rdb.ListLeaseExpired(time.Now(), queue)
if err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
expiredSet := make(map[string]struct{}) // set of expired message IDs
for _, msg := range expired {
@@ -384,7 +384,7 @@ func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskIn
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -393,7 +393,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -413,7 +413,7 @@ func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -422,7 +422,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -442,7 +442,7 @@ func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*Tas
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -451,7 +451,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -471,7 +471,7 @@ func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInf
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -480,7 +480,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -500,7 +500,7 @@ func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*Task
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
@@ -509,7 +509,7 @@ func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*Tas
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
return nil, fmt.Errorf("asynq: %w", err)
}
var tasks []*TaskInfo
for _, i := range infos {
@@ -616,7 +616,7 @@ func (i *Inspector) UpdateTaskPayload(queue, id string, payload []byte) error {
// If the task is in active state, it returns a non-nil error.
func (i *Inspector) DeleteTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
err := i.rdb.DeleteTask(queue, id)
switch {
@@ -625,7 +625,7 @@ func (i *Inspector) DeleteTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
@@ -680,7 +680,7 @@ func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) {
// If the task is in pending or active state, it returns a non-nil error.
func (i *Inspector) RunTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
err := i.rdb.RunTask(queue, id)
switch {
@@ -689,7 +689,7 @@ func (i *Inspector) RunTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
}
@@ -752,7 +752,7 @@ func (i *Inspector) ArchiveTask(queue, id string) error {
case errors.IsTaskNotFound(err):
return fmt.Errorf("asynq: %w", ErrTaskNotFound)
case err != nil:
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
return nil
}

View File

@@ -122,10 +122,10 @@ func (mgr *PeriodicTaskManager) Start() error {
panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize")
}
if err := mgr.initialSync(); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
if err := mgr.s.Start(); err != nil {
return fmt.Errorf("asynq: %v", err)
return fmt.Errorf("asynq: %w", err)
}
mgr.wg.Add(1)
go func() {
@@ -168,11 +168,11 @@ func (mgr *PeriodicTaskManager) Run() error {
func (mgr *PeriodicTaskManager) initialSync() error {
configs, err := mgr.p.GetConfigs()
if err != nil {
return fmt.Errorf("initial call to GetConfigs failed: %v", err)
return fmt.Errorf("initial call to GetConfigs failed: %w", err)
}
for _, c := range configs {
if err := validatePeriodicTaskConfig(c); err != nil {
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %v", err)
return fmt.Errorf("initial call to GetConfigs contained an invalid config: %w", err)
}
}
mgr.add(configs)

View File

@@ -25,13 +25,13 @@ type QueueMetricsCollector struct {
func (qmc *QueueMetricsCollector) collectQueueInfo() ([]*asynq.QueueInfo, error) {
qnames, err := qmc.inspector.Queues()
if err != nil {
return nil, fmt.Errorf("failed to get queue names: %v", err)
return nil, fmt.Errorf("failed to get queue names: %w", err)
}
infos := make([]*asynq.QueueInfo, len(qnames))
for i, qname := range qnames {
qinfo, err := qmc.inspector.GetQueueInfo(qname)
if err != nil {
return nil, fmt.Errorf("failed to get queue info: %v", err)
return nil, fmt.Errorf("failed to get queue info: %w", err)
}
infos[i] = qinfo
}

View File

@@ -94,7 +94,7 @@ func (s *Semaphore) Release(ctx context.Context) error {
n, err := s.rc.ZRem(ctx, semaphoreKey(s.scope), taskID).Result()
if err != nil {
return fmt.Errorf("redis command failed: %v", err)
return fmt.Errorf("redis command failed: %w", err)
}
if n == 0 {