2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-05-27 09:10:18 +08:00

Compare commits

..

24 Commits

Author SHA1 Message Date
Ken Hibino
04702ddfd2 Change ErrorHandler function signature 2020-07-04 05:53:50 -07:00
Ken Hibino
6705f7c27a Return Result struct to caller of Enqueue 2020-07-03 21:49:53 -07:00
Ken Hibino
e27ae0d33a Replace github.com/rs/xid with github.com/google/uuid 2020-07-02 06:38:13 -07:00
Ken Hibino
6cd0ab65a3 Add version command to CLI 2020-06-29 20:59:15 -07:00
Ken Hibino
83c9d5ae94 Add migrate command to CLI
The command converts all messages in redis to be compatible for asynq
v0.10.0
2020-06-29 06:11:47 -07:00
Ken Hibino
7eebbf181e Update docs 2020-06-29 06:11:47 -07:00
Ken Hibino
7b1770da96 Minor code cleanup 2020-06-29 06:11:47 -07:00
Ken Hibino
e2c5882368 Use int64 type for Timeout and Deadline in TaskMessage 2020-06-29 06:11:47 -07:00
Ken Hibino
50df107ace Clean up processor test 2020-06-29 06:11:47 -07:00
Ken Hibino
9699d196e5 Add recoverer 2020-06-29 06:11:47 -07:00
Ken Hibino
1c5f7a791b Add RDB.ListDeadlineExceeded 2020-06-29 06:11:47 -07:00
Ken Hibino
232efe8279 Fix processor 2020-06-29 06:11:47 -07:00
Ken Hibino
ef4a4a8334 Add deadline to syncRequest
- syncer will drop a request if its deadline has been exceeded
2020-06-29 06:11:47 -07:00
Ken Hibino
65e17a3469 Update processor to adapt for deadlines set change
- Processor dequeues tasks only when it's available to process
- Processor retries a task when its context's Done channel is closed
2020-06-29 06:11:47 -07:00
Ken Hibino
88d94a2a9d Update RDB.Requeue to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
7433b94aac Update RDB.Dequeue to return deadline as time.Time 2020-06-29 06:11:47 -07:00
Ken Hibino
08ac7793ab Update RDB.Kill to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
02b653df72 Update RDB.Retry to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
bee784c052 Update RDB.Done to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
4ea58052f8 Update RDB.Dequeue to return message and deadline 2020-06-29 06:11:47 -07:00
Ken Hibino
5afb4861a5 Add task message to deadlines set on dequeue
Updated dequeueCmd to decode the message and compute its deadline and add
the message to the Deadline set.
2020-06-29 06:11:47 -07:00
Ken Hibino
68e6b379fc Use default timeout of 30mins if both timeout and deadline are not
provided
2020-06-29 06:11:47 -07:00
Ken Hibino
0e70a14899 Change TaskMessage Timeout and Deadline to int
* This change breaks existing tasks in Redis
2020-06-29 06:11:47 -07:00
Ken Hibino
f01c7b8e66 Add redis key for deadlines in base package 2020-06-29 06:11:47 -07:00
4 changed files with 8 additions and 20 deletions

View File

@@ -7,8 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.10.0] - 2020-07-06
### Changed
- All tasks now requires timeout or deadline. By default, timeout is set to 30 mins.

View File

@@ -34,7 +34,6 @@ A system can consist of multiple worker servers and brokers, giving way to high
- Scheduling of tasks
- Durability since tasks are written to Redis
- [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues)
- [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues)
- Low latency to add a task since writes are fast in Redis

View File

@@ -191,19 +191,9 @@ func (p *processor) exec() {
p.cancelations.Delete(msg.ID.String())
}()
// check context before starting a worker goroutine.
select {
case <-ctx.Done():
// already canceled (e.g. deadline exceeded).
p.retryOrKill(ctx, msg, ctx.Err())
return
default:
}
resCh := make(chan error, 1)
go func() {
resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler)
}()
task := NewTask(msg.Type, msg.Payload)
go func() { resCh <- perform(ctx, task, p.handler) }()
select {
case <-p.abort:
@@ -212,6 +202,7 @@ func (p *processor) exec() {
p.requeue(msg)
return
case <-ctx.Done():
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
p.retryOrKill(ctx, msg, ctx.Err())
return
case resErr := <-resCh:
@@ -220,6 +211,9 @@ func (p *processor) exec() {
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, task, resErr)
}
p.retryOrKill(ctx, msg, resErr)
return
}
@@ -258,11 +252,7 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
}
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
}
if msg.Retried >= msg.Retry {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.kill(ctx, msg, err)
} else {
p.retry(ctx, msg, err)
@@ -291,6 +281,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
}
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
err := p.broker.Kill(msg, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)

View File

@@ -125,7 +125,7 @@ type Config struct {
ShutdownTimeout time.Duration
}
// An ErrorHandler handles an error occured during task processing.
// An ErrorHandler handles errors returned by the task handler.
type ErrorHandler interface {
HandleError(ctx context.Context, task *Task, err error)
}