mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-08 19:35:51 +08:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6486716b4 | ||
|
|
742ed6546f | ||
|
|
897ab4e28b | ||
|
|
a4e4c0b1d5 | ||
|
|
95b7dcaad4 | ||
|
|
8d3248e850 | ||
|
|
e69264dc04 |
@@ -4,7 +4,7 @@ git:
|
||||
depth: 1
|
||||
env:
|
||||
- GO111MODULE=on # go modules are the default
|
||||
go: [1.12.x, 1.13.x]
|
||||
go: [1.12.x, 1.13.x, 1.14.x]
|
||||
script:
|
||||
- go test -race -v -coverprofile=coverage.txt -covermode=atomic ./...
|
||||
services:
|
||||
|
||||
@@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.6.0] - 2020-03-01
|
||||
|
||||
### Added
|
||||
|
||||
- Added `ServeMux` type to make it easy for users to implement Handler interface.
|
||||
- `ErrorHandler` type was added. Allow users to specify error handling function (e.g. Report error to error reporting service such as Honeybadger, Bugsnag, etc)
|
||||
|
||||
## [0.5.0] - 2020-02-23
|
||||
|
||||
### Changed
|
||||
|
||||
52
README.md
52
README.md
@@ -30,7 +30,7 @@ First, make sure you are running a Redis server locally.
|
||||
$ redis-server
|
||||
```
|
||||
|
||||
To create and schedule tasks, use `Client` and provide a task and when to process the task.
|
||||
To create and schedule tasks, use `Client` and provide a task and when to enqueue the task.
|
||||
|
||||
```go
|
||||
func main() {
|
||||
@@ -41,9 +41,9 @@ func main() {
|
||||
client := asynq.NewClient(r)
|
||||
|
||||
// Create a task with task type and payload
|
||||
t1 := asynq.NewTask("send_welcome_email", map[string]interface{}{"user_id": 42})
|
||||
t1 := asynq.NewTask("email:signup", map[string]interface{}{"user_id": 42})
|
||||
|
||||
t2 := asynq.NewTask("send_reminder_email", map[string]interface{}{"user_id": 42})
|
||||
t2 := asynq.NewTask("email:reminder", map[string]interface{}{"user_id": 42})
|
||||
|
||||
// Process immediately
|
||||
err := client.Enqueue(t1)
|
||||
@@ -52,8 +52,8 @@ func main() {
|
||||
err = client.EnqueueIn(24*time.Hour, t2)
|
||||
|
||||
// Process at specified time.
|
||||
t := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC)
|
||||
err = client.EnqueueAt(t, t2)
|
||||
target := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC)
|
||||
err = client.EnqueueAt(target, t2)
|
||||
|
||||
// Pass options to specify processing behavior for a given task.
|
||||
//
|
||||
@@ -66,6 +66,21 @@ func main() {
|
||||
|
||||
To start the background workers, use `Background` and provide your `Handler` to process the tasks.
|
||||
|
||||
`Handler` is an interface with one method `ProcessTask` with the following signature.
|
||||
|
||||
```go
|
||||
// ProcessTask should return nil if the processing of a task
|
||||
// is successful.
|
||||
//
|
||||
// If ProcessTask return a non-nil error or panics, the task
|
||||
// will be retried after delay.
|
||||
type Handler interface {
|
||||
ProcessTask(context.Context, *asynq.Task) error
|
||||
}
|
||||
```
|
||||
|
||||
You can optionally use `ServeMux` to create a handler, just as you would with `"net/http"` Handler.
|
||||
|
||||
```go
|
||||
func main() {
|
||||
r := &asynq.RedisClientOpt{
|
||||
@@ -84,20 +99,23 @@ func main() {
|
||||
// See the godoc for other configuration options
|
||||
})
|
||||
|
||||
bg.Run(handler)
|
||||
mux := asynq.NewServeMux()
|
||||
mux.HandleFunc("email:signup", signupEmailHandler)
|
||||
mux.HandleFunc("email:reminder", reminderEmailHandler)
|
||||
// ...register other handlers...
|
||||
|
||||
bg.Run(mux)
|
||||
}
|
||||
```
|
||||
|
||||
`Handler` is an interface with one method `ProcessTask` with the following signature.
|
||||
|
||||
```go
|
||||
// ProcessTask should return nil if the processing of a task
|
||||
// is successful.
|
||||
//
|
||||
// If ProcessTask return a non-nil error or panics, the task
|
||||
// will be retried after delay.
|
||||
type Handler interface {
|
||||
ProcessTask(context.Context, *asynq.Task) error
|
||||
// function with the same signature as the ProcessTask method for the Handler interface.
|
||||
func signupEmailHandler(ctx context.Context, t *asynq.Task) error {
|
||||
id, err := t.Payload.GetInt("user_id")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("Send welcome email to user %d\n", id)
|
||||
// ...your email sending logic...
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
2
asynq.go
2
asynq.go
@@ -138,6 +138,6 @@ func createRedisClient(r RedisConnOpt) *redis.Client {
|
||||
TLSConfig: r.TLSConfig,
|
||||
})
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected type %T for RedisConnOpt", r))
|
||||
panic(fmt.Sprintf("asynq: unexpected type %T for RedisConnOpt", r))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,6 +89,35 @@ type Config struct {
|
||||
// The tasks in lower priority queues are processed only when those queues with
|
||||
// higher priorities are empty.
|
||||
StrictPriority bool
|
||||
|
||||
// ErrorHandler handles errors returned by the task handler.
|
||||
//
|
||||
// HandleError is invoked only if the task handler returns a non-nil error.
|
||||
//
|
||||
// Example:
|
||||
// func reportError(task *asynq.Task, err error, retried, maxRetry int) {
|
||||
// if retried >= maxRetry {
|
||||
// err = fmt.Errorf("retry exhausted for task %s: %w", task.Type, err)
|
||||
// }
|
||||
// errorReportingService.Notify(err)
|
||||
// })
|
||||
//
|
||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||
ErrorHandler ErrorHandler
|
||||
}
|
||||
|
||||
// An ErrorHandler handles errors returned by the task handler.
|
||||
type ErrorHandler interface {
|
||||
HandleError(task *Task, err error, retried, maxRetry int)
|
||||
}
|
||||
|
||||
// The ErrorHandlerFunc type is an adapter to allow the use of ordinary functions as a ErrorHandler.
|
||||
// If f is a function with the appropriate signature, ErrorHandlerFunc(f) is a ErrorHandler that calls f.
|
||||
type ErrorHandlerFunc func(task *Task, err error, retried, maxRetry int)
|
||||
|
||||
// HandleError calls fn(task, err, retried, maxRetry)
|
||||
func (fn ErrorHandlerFunc) HandleError(task *Task, err error, retried, maxRetry int) {
|
||||
fn(task, err, retried, maxRetry)
|
||||
}
|
||||
|
||||
// Formula taken from https://github.com/mperham/sidekiq.
|
||||
@@ -136,7 +165,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
|
||||
syncer := newSyncer(syncCh, 5*time.Second)
|
||||
heartbeater := newHeartbeater(rdb, ps, 5*time.Second)
|
||||
scheduler := newScheduler(rdb, 5*time.Second, queues)
|
||||
processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels)
|
||||
processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
|
||||
subscriber := newSubscriber(rdb, cancels)
|
||||
return &Background{
|
||||
rdb: rdb,
|
||||
@@ -149,7 +178,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
|
||||
}
|
||||
}
|
||||
|
||||
// A Handler processes a task.
|
||||
// A Handler processes tasks.
|
||||
//
|
||||
// ProcessTask should return nil if the processing of a task
|
||||
// is successful.
|
||||
|
||||
@@ -31,6 +31,8 @@ type processor struct {
|
||||
|
||||
retryDelayFunc retryDelayFunc
|
||||
|
||||
errHandler ErrorHandler
|
||||
|
||||
// channel via which to send sync requests to syncer.
|
||||
syncRequestCh chan<- *syncRequest
|
||||
|
||||
@@ -59,7 +61,8 @@ type processor struct {
|
||||
type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
||||
|
||||
// newProcessor constructs a new processor.
|
||||
func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations) *processor {
|
||||
func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
|
||||
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
|
||||
info := ps.Get()
|
||||
qcfg := normalizeQueueCfg(info.Queues)
|
||||
orderedQueues := []string(nil)
|
||||
@@ -79,6 +82,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh c
|
||||
done: make(chan struct{}),
|
||||
abort: make(chan struct{}),
|
||||
quit: make(chan struct{}),
|
||||
errHandler: errHandler,
|
||||
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
|
||||
}
|
||||
}
|
||||
@@ -192,6 +196,9 @@ func (p *processor) exec() {
|
||||
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
|
||||
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
|
||||
if resErr != nil {
|
||||
if p.errHandler != nil {
|
||||
p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry)
|
||||
}
|
||||
if msg.Retried >= msg.Retry {
|
||||
p.kill(msg, resErr)
|
||||
} else {
|
||||
|
||||
@@ -66,11 +66,9 @@ func TestProcessorSuccess(t *testing.T) {
|
||||
processed = append(processed, task)
|
||||
return nil
|
||||
}
|
||||
workerCh := make(chan int)
|
||||
go fakeHeartbeater(workerCh)
|
||||
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
|
||||
cancelations := base.NewCancelations()
|
||||
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations)
|
||||
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
|
||||
p.handler = HandlerFunc(handler)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
@@ -84,7 +82,6 @@ func TestProcessorSuccess(t *testing.T) {
|
||||
}
|
||||
time.Sleep(tc.wait)
|
||||
p.terminate()
|
||||
close(workerCh)
|
||||
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" {
|
||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||
@@ -123,24 +120,30 @@ func TestProcessorRetry(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
enqueued []*base.TaskMessage // initial default queue state
|
||||
incoming []*base.TaskMessage // tasks to be enqueued during run
|
||||
delay time.Duration // retry delay duration
|
||||
wait time.Duration // wait duration between starting and stopping processor for this test case
|
||||
wantRetry []h.ZSetEntry // tasks in retry queue at the end
|
||||
wantDead []*base.TaskMessage // tasks in dead queue at the end
|
||||
enqueued []*base.TaskMessage // initial default queue state
|
||||
incoming []*base.TaskMessage // tasks to be enqueued during run
|
||||
delay time.Duration // retry delay duration
|
||||
handler Handler // task handler
|
||||
wait time.Duration // wait duration between starting and stopping processor for this test case
|
||||
wantRetry []h.ZSetEntry // tasks in retry queue at the end
|
||||
wantDead []*base.TaskMessage // tasks in dead queue at the end
|
||||
wantErrCount int // number of times error handler should be called
|
||||
}{
|
||||
{
|
||||
enqueued: []*base.TaskMessage{m1, m2},
|
||||
incoming: []*base.TaskMessage{m3, m4},
|
||||
delay: time.Minute,
|
||||
wait: time.Second,
|
||||
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
|
||||
return fmt.Errorf(errMsg)
|
||||
}),
|
||||
wait: time.Second,
|
||||
wantRetry: []h.ZSetEntry{
|
||||
{Msg: &r2, Score: float64(now.Add(time.Minute).Unix())},
|
||||
{Msg: &r3, Score: float64(now.Add(time.Minute).Unix())},
|
||||
{Msg: &r4, Score: float64(now.Add(time.Minute).Unix())},
|
||||
},
|
||||
wantDead: []*base.TaskMessage{&r1},
|
||||
wantDead: []*base.TaskMessage{&r1},
|
||||
wantErrCount: 4,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -152,15 +155,19 @@ func TestProcessorRetry(t *testing.T) {
|
||||
delayFunc := func(n int, e error, t *Task) time.Duration {
|
||||
return tc.delay
|
||||
}
|
||||
handler := func(ctx context.Context, task *Task) error {
|
||||
return fmt.Errorf(errMsg)
|
||||
var (
|
||||
mu sync.Mutex // guards n
|
||||
n int // number of times error handler is called
|
||||
)
|
||||
errHandler := func(t *Task, err error, retried, maxRetry int) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
n++
|
||||
}
|
||||
workerCh := make(chan int)
|
||||
go fakeHeartbeater(workerCh)
|
||||
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
|
||||
cancelations := base.NewCancelations()
|
||||
p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations)
|
||||
p.handler = HandlerFunc(handler)
|
||||
p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
|
||||
p.handler = tc.handler
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.start(&wg)
|
||||
@@ -173,7 +180,6 @@ func TestProcessorRetry(t *testing.T) {
|
||||
}
|
||||
time.Sleep(tc.wait)
|
||||
p.terminate()
|
||||
close(workerCh)
|
||||
|
||||
cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score
|
||||
gotRetry := h.GetRetryEntries(t, r)
|
||||
@@ -189,6 +195,10 @@ func TestProcessorRetry(t *testing.T) {
|
||||
if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
|
||||
}
|
||||
|
||||
if n != tc.wantErrCount {
|
||||
t.Errorf("error handler was called %d times, want %d", n, tc.wantErrCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,7 +232,7 @@ func TestProcessorQueues(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
cancelations := base.NewCancelations()
|
||||
ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false)
|
||||
p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations)
|
||||
p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations, nil)
|
||||
got := p.queues()
|
||||
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
||||
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
|
||||
@@ -288,18 +298,15 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
||||
"low": 1,
|
||||
}
|
||||
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
|
||||
workerCh := make(chan int)
|
||||
go fakeHeartbeater(workerCh)
|
||||
cancelations := base.NewCancelations()
|
||||
ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
|
||||
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations)
|
||||
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
|
||||
p.handler = HandlerFunc(handler)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.start(&wg)
|
||||
time.Sleep(tc.wait)
|
||||
p.terminate()
|
||||
close(workerCh)
|
||||
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" {
|
||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||
@@ -356,9 +363,3 @@ func TestPerform(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fake heartbeater to receive sends from the worker channel.
|
||||
func fakeHeartbeater(ch <-chan int) {
|
||||
for range ch {
|
||||
}
|
||||
}
|
||||
|
||||
139
servemux.go
Normal file
139
servemux.go
Normal file
@@ -0,0 +1,139 @@
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// ServeMux is a multiplexer for asynchronous tasks.
|
||||
// It matches the type of each task against a list of registered patterns
|
||||
// and calls the handler for the pattern that most closely matches the
|
||||
// taks's type name.
|
||||
//
|
||||
// Longer patterns take precedence over shorter ones, so that if there are
|
||||
// handlers registered for both "images" and "images:thumbnails",
|
||||
// the latter handler will be called for tasks with a type name beginning with
|
||||
// "images:thumbnails" and the former will receive tasks with type name beginning
|
||||
// with "images".
|
||||
type ServeMux struct {
|
||||
mu sync.RWMutex
|
||||
m map[string]muxEntry
|
||||
es []muxEntry // slice of entries sorted from longest to shortest.
|
||||
}
|
||||
|
||||
type muxEntry struct {
|
||||
h Handler
|
||||
pattern string
|
||||
}
|
||||
|
||||
// NewServeMux allocates and returns a new ServeMux.
|
||||
func NewServeMux() *ServeMux {
|
||||
return new(ServeMux)
|
||||
}
|
||||
|
||||
// ProcessTask dispatches the task to the handler whose
|
||||
// pattern most closely matches the task type.
|
||||
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error {
|
||||
h, _ := mux.Handler(task)
|
||||
return h.ProcessTask(ctx, task)
|
||||
}
|
||||
|
||||
// Handler returns the handler to use for the given task.
|
||||
// It always return a non-nil handler.
|
||||
//
|
||||
// Handler also returns the registered pattern that matches the task.
|
||||
//
|
||||
// If there is no registered handler that applies to the task,
|
||||
// handler returns a 'not found' handler which returns an error.
|
||||
func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) {
|
||||
mux.mu.RLock()
|
||||
defer mux.mu.RUnlock()
|
||||
|
||||
h, pattern = mux.match(t.Type)
|
||||
if h == nil {
|
||||
h, pattern = NotFoundHandler(), ""
|
||||
}
|
||||
return h, pattern
|
||||
}
|
||||
|
||||
// Find a handler on a handler map given a typename string.
|
||||
// Most-specific (longest) pattern wins.
|
||||
func (mux *ServeMux) match(typename string) (h Handler, pattern string) {
|
||||
// Check for exact match first.
|
||||
v, ok := mux.m[typename]
|
||||
if ok {
|
||||
return v.h, v.pattern
|
||||
}
|
||||
|
||||
// Check for longest valid match.
|
||||
// mux.es contains all patterns from longest to shortest.
|
||||
for _, e := range mux.es {
|
||||
if strings.HasPrefix(typename, e.pattern) {
|
||||
return e.h, e.pattern
|
||||
}
|
||||
}
|
||||
return nil, ""
|
||||
|
||||
}
|
||||
|
||||
// Handle registers the handler for the given pattern.
|
||||
// If a handler already exists for pattern, Handle panics.
|
||||
func (mux *ServeMux) Handle(pattern string, handler Handler) {
|
||||
mux.mu.Lock()
|
||||
defer mux.mu.Unlock()
|
||||
|
||||
if pattern == "" {
|
||||
panic("asynq: invalid pattern")
|
||||
}
|
||||
if handler == nil {
|
||||
panic("asynq: nil handler")
|
||||
}
|
||||
if _, exist := mux.m[pattern]; exist {
|
||||
panic("asynq: multiple registrations for " + pattern)
|
||||
}
|
||||
|
||||
if mux.m == nil {
|
||||
mux.m = make(map[string]muxEntry)
|
||||
}
|
||||
e := muxEntry{h: handler, pattern: pattern}
|
||||
mux.m[pattern] = e
|
||||
mux.es = appendSorted(mux.es, e)
|
||||
}
|
||||
|
||||
func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
|
||||
n := len(es)
|
||||
i := sort.Search(n, func(i int) bool {
|
||||
return len(es[i].pattern) < len(e.pattern)
|
||||
})
|
||||
if i == n {
|
||||
return append(es, e)
|
||||
}
|
||||
// we now know that i points at where we want to insert.
|
||||
es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
|
||||
copy(es[i+1:], es[i:]) // shift shorter entries down.
|
||||
es[i] = e
|
||||
return es
|
||||
}
|
||||
|
||||
// HandleFunc registers the handler function for the given pattern.
|
||||
func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error) {
|
||||
if handler == nil {
|
||||
panic("asynq: nil handler")
|
||||
}
|
||||
mux.Handle(pattern, HandlerFunc(handler))
|
||||
}
|
||||
|
||||
// NotFound returns an error indicating that the handler was not found for the given task.
|
||||
func NotFound(ctx context.Context, task *Task) error {
|
||||
return fmt.Errorf("handler not found for task %q", task.Type)
|
||||
}
|
||||
|
||||
// NotFoundHandler returns a simple task handler that returns a ``not found`` error.
|
||||
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }
|
||||
116
servemux_test.go
Normal file
116
servemux_test.go
Normal file
@@ -0,0 +1,116 @@
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var called string
|
||||
|
||||
// makeFakeHandler returns a handler that updates the global called variable
|
||||
// to the given identity.
|
||||
func makeFakeHandler(identity string) Handler {
|
||||
return HandlerFunc(func(ctx context.Context, t *Task) error {
|
||||
called = identity
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// A list of pattern, handler pair that is registered with mux.
|
||||
var serveMuxRegister = []struct {
|
||||
pattern string
|
||||
h Handler
|
||||
}{
|
||||
{"email:", makeFakeHandler("default email handler")},
|
||||
{"email:signup", makeFakeHandler("signup email handler")},
|
||||
{"csv:export", makeFakeHandler("csv export handler")},
|
||||
}
|
||||
|
||||
var serveMuxTests = []struct {
|
||||
typename string // task's type name
|
||||
want string // identifier of the handler that should be called
|
||||
}{
|
||||
{"email:signup", "signup email handler"},
|
||||
{"csv:export", "csv export handler"},
|
||||
{"email:daily", "default email handler"},
|
||||
}
|
||||
|
||||
func TestServeMux(t *testing.T) {
|
||||
mux := NewServeMux()
|
||||
for _, e := range serveMuxRegister {
|
||||
mux.Handle(e.pattern, e.h)
|
||||
}
|
||||
|
||||
for _, tc := range serveMuxTests {
|
||||
called = "" // reset to zero value
|
||||
|
||||
task := NewTask(tc.typename, nil)
|
||||
if err := mux.ProcessTask(context.Background(), task); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if called != tc.want {
|
||||
t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeMuxRegisterNilHandler(t *testing.T) {
|
||||
defer func() {
|
||||
if err := recover(); err == nil {
|
||||
t.Error("expected call to mux.HandleFunc to panic")
|
||||
}
|
||||
}()
|
||||
|
||||
mux := NewServeMux()
|
||||
mux.HandleFunc("email:signup", nil)
|
||||
}
|
||||
|
||||
func TestServeMuxRegisterEmptyPattern(t *testing.T) {
|
||||
defer func() {
|
||||
if err := recover(); err == nil {
|
||||
t.Error("expected call to mux.HandleFunc to panic")
|
||||
}
|
||||
}()
|
||||
|
||||
mux := NewServeMux()
|
||||
mux.Handle("", makeFakeHandler("email"))
|
||||
}
|
||||
|
||||
func TestServeMuxRegisterDuplicatePattern(t *testing.T) {
|
||||
defer func() {
|
||||
if err := recover(); err == nil {
|
||||
t.Error("expected call to mux.HandleFunc to panic")
|
||||
}
|
||||
}()
|
||||
|
||||
mux := NewServeMux()
|
||||
mux.Handle("email", makeFakeHandler("email"))
|
||||
mux.Handle("email", makeFakeHandler("email:default"))
|
||||
}
|
||||
|
||||
var notFoundTests = []struct {
|
||||
typename string // task's type name
|
||||
}{
|
||||
{"image:minimize"},
|
||||
{"csv:"}, // registered patterns match the task's type prefix, not the other way around.
|
||||
}
|
||||
|
||||
func TestServeMuxNotFound(t *testing.T) {
|
||||
mux := NewServeMux()
|
||||
for _, e := range serveMuxRegister {
|
||||
mux.Handle(e.pattern, e.h)
|
||||
}
|
||||
|
||||
for _, tc := range notFoundTests {
|
||||
task := NewTask(tc.typename, nil)
|
||||
err := mux.ProcessTask(context.Background(), task)
|
||||
if err == nil {
|
||||
t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user