mirror of
https://github.com/hibiken/asynq.git
synced 2026-05-27 09:10:18 +08:00
Compare commits
24 Commits
v0.10.0
...
v0.10.0.rc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04702ddfd2 | ||
|
|
6705f7c27a | ||
|
|
e27ae0d33a | ||
|
|
6cd0ab65a3 | ||
|
|
83c9d5ae94 | ||
|
|
7eebbf181e | ||
|
|
7b1770da96 | ||
|
|
e2c5882368 | ||
|
|
50df107ace | ||
|
|
9699d196e5 | ||
|
|
1c5f7a791b | ||
|
|
232efe8279 | ||
|
|
ef4a4a8334 | ||
|
|
65e17a3469 | ||
|
|
88d94a2a9d | ||
|
|
7433b94aac | ||
|
|
08ac7793ab | ||
|
|
02b653df72 | ||
|
|
bee784c052 | ||
|
|
4ea58052f8 | ||
|
|
5afb4861a5 | ||
|
|
68e6b379fc | ||
|
|
0e70a14899 | ||
|
|
f01c7b8e66 |
@@ -7,8 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.10.0] - 2020-07-06
|
||||
|
||||
### Changed
|
||||
|
||||
- All tasks now requires timeout or deadline. By default, timeout is set to 30 mins.
|
||||
|
||||
@@ -34,7 +34,6 @@ A system can consist of multiple worker servers and brokers, giving way to high
|
||||
- Scheduling of tasks
|
||||
- Durability since tasks are written to Redis
|
||||
- [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks
|
||||
- Automatic recovery of tasks in the event of a worker crash
|
||||
- [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues)
|
||||
- [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues)
|
||||
- Low latency to add a task since writes are fast in Redis
|
||||
|
||||
23
processor.go
23
processor.go
@@ -191,19 +191,9 @@ func (p *processor) exec() {
|
||||
p.cancelations.Delete(msg.ID.String())
|
||||
}()
|
||||
|
||||
// check context before starting a worker goroutine.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// already canceled (e.g. deadline exceeded).
|
||||
p.retryOrKill(ctx, msg, ctx.Err())
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
resCh := make(chan error, 1)
|
||||
go func() {
|
||||
resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler)
|
||||
}()
|
||||
task := NewTask(msg.Type, msg.Payload)
|
||||
go func() { resCh <- perform(ctx, task, p.handler) }()
|
||||
|
||||
select {
|
||||
case <-p.abort:
|
||||
@@ -212,6 +202,7 @@ func (p *processor) exec() {
|
||||
p.requeue(msg)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
|
||||
p.retryOrKill(ctx, msg, ctx.Err())
|
||||
return
|
||||
case resErr := <-resCh:
|
||||
@@ -220,6 +211,9 @@ func (p *processor) exec() {
|
||||
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
|
||||
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
|
||||
if resErr != nil {
|
||||
if p.errHandler != nil {
|
||||
p.errHandler.HandleError(ctx, task, resErr)
|
||||
}
|
||||
p.retryOrKill(ctx, msg, resErr)
|
||||
return
|
||||
}
|
||||
@@ -258,11 +252,7 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
|
||||
}
|
||||
|
||||
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
|
||||
if p.errHandler != nil {
|
||||
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
|
||||
}
|
||||
if msg.Retried >= msg.Retry {
|
||||
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
||||
p.kill(ctx, msg, err)
|
||||
} else {
|
||||
p.retry(ctx, msg, err)
|
||||
@@ -291,6 +281,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||
}
|
||||
|
||||
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
||||
err := p.broker.Kill(msg, e.Error())
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
|
||||
|
||||
@@ -125,7 +125,7 @@ type Config struct {
|
||||
ShutdownTimeout time.Duration
|
||||
}
|
||||
|
||||
// An ErrorHandler handles an error occured during task processing.
|
||||
// An ErrorHandler handles errors returned by the task handler.
|
||||
type ErrorHandler interface {
|
||||
HandleError(ctx context.Context, task *Task, err error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user