mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-05 06:05:52 +08:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6df2c3ae2b | ||
|
|
37554fd23c | ||
|
|
77f5a38453 | ||
|
|
8d2b9d6be7 | ||
|
|
1b7d557c66 | ||
|
|
30b68728d4 | ||
|
|
310d38620d | ||
|
|
1a53bbf21b | ||
|
|
9c79a7d507 | ||
|
|
516f95edff |
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.6.2] - 2020-03-15
|
||||
|
||||
### Added
|
||||
|
||||
- `Use` method was added to `ServeMux` to apply middlewares to all handlers.
|
||||
|
||||
## [0.6.1] - 2020-03-12
|
||||
|
||||
### Added
|
||||
|
||||
157
README.md
157
README.md
@@ -22,61 +22,133 @@ First, make sure you are running a Redis server locally.
|
||||
$ redis-server
|
||||
```
|
||||
|
||||
To create and schedule tasks, use `Client` and provide a task and when to enqueue the task.
|
||||
Next, write a package that encapslates task creation and task handling.
|
||||
|
||||
```go
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
|
||||
// A list of background task types.
|
||||
const (
|
||||
EmailDelivery = "email:deliver"
|
||||
ImageProcessing = "image:process"
|
||||
)
|
||||
|
||||
// Write function NewXXXTask to create a task.
|
||||
|
||||
func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
|
||||
payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
|
||||
return asynq.NewTask(EmailDelivery, payload)
|
||||
}
|
||||
|
||||
func NewImageProcessingTask(src, dst string) *asynq.Task {
|
||||
payload := map[string]interface{}{"src": src, "dst": dst}
|
||||
return asynq.NewTask(ImageProcessing, payload)
|
||||
}
|
||||
|
||||
// Write function HandleXXXTask to handle the given task.
|
||||
// NOTE: It satisfies the asynq.HandlerFunc interface.
|
||||
|
||||
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
|
||||
userID, err := t.Payload.GetInt("user_id")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tmplID, err := t.Payload.GetString("template_id")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
|
||||
// Email delivery logic ...
|
||||
return nil
|
||||
}
|
||||
|
||||
func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error {
|
||||
src, err := t.Payload.GetString("src")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dst, err := t.Payload.GetString("dst")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("Process image: src = %s, dst = %s\n", src, dst)
|
||||
// Image processing logic ...
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to enqueue tasks to the task queue.
|
||||
A task will be processed by a background worker as soon as the task gets enqueued.
|
||||
Scheduled tasks will be stored in Redis and will be enqueued at the specified time.
|
||||
Scheduled tasks will be stored in Redis and will be enqueued at the specified time.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"your/app/package/tasks"
|
||||
)
|
||||
|
||||
const redisAddr = "127.0.0.1:6379"
|
||||
|
||||
func main() {
|
||||
r := &asynq.RedisClientOpt{
|
||||
Addr: "127.0.0.1:6379",
|
||||
r := &asynq.RedisClientOpt{Addr: redisAddr}
|
||||
c := asynq.NewClient(r)
|
||||
|
||||
// Example 1: Enqueue task to be processed immediately.
|
||||
|
||||
t := tasks.NewEmailDeliveryTask(42, "some:template:id")
|
||||
err := c.Enqueue(t)
|
||||
if err != nil {
|
||||
log.Fatal("could not enqueue task: %v", err)
|
||||
}
|
||||
|
||||
client := asynq.NewClient(r)
|
||||
|
||||
// Create a task with task type and payload.
|
||||
t1 := asynq.NewTask("email:signup", map[string]interface{}{"user_id": 42})
|
||||
// Example 2: Schedule task to be processed in the future.
|
||||
|
||||
t2 := asynq.NewTask("email:reminder", map[string]interface{}{"user_id": 42})
|
||||
t = tasks.NewEmailDeliveryTask(42, "other:template:id")
|
||||
err = c.EnqueueIn(24*time.Hour, t)
|
||||
if err != nil {
|
||||
log.Fatal("could not schedule task: %v", err)
|
||||
}
|
||||
|
||||
// Enqueue immediately.
|
||||
err := client.Enqueue(t1)
|
||||
|
||||
// Enqueue 24 hrs later.
|
||||
err = client.EnqueueIn(24*time.Hour, t2)
|
||||
// Example 3: Pass options to tune task processing behavior.
|
||||
// Options include MaxRetry, Queue, Timeout, Deadline, etc.
|
||||
|
||||
// Enqueue at specific time.
|
||||
err = client.EnqueueAt(time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC), t2)
|
||||
|
||||
// Pass vararg options to specify processing behavior for the given task.
|
||||
//
|
||||
// MaxRetry specifies the max number of retry if the task fails (Default is 25).
|
||||
// Queue specifies which queue to enqueue this task to (Default is "default" queue).
|
||||
// Timeout specifies the the task timeout (Default is no timeout).
|
||||
err = client.Enqueue(t1, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
|
||||
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
|
||||
err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
|
||||
if err != nil {
|
||||
log.Fatal("could not enqueue task: %v", err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
To start the background workers, use `Background` and provide your `Handler` to process the tasks.
|
||||
Next, create a binary to process these tasks in the background.
|
||||
To start the background workers, use [`Background`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Background) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
|
||||
|
||||
`Handler` is an interface with one method `ProcessTask` with the following signature.
|
||||
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler.
|
||||
|
||||
```go
|
||||
// ProcessTask should return nil if the processing of a task is successful.
|
||||
//
|
||||
// If ProcessTask return a non-nil error or panics, the task will be retried after delay.
|
||||
type Handler interface {
|
||||
ProcessTask(context.Context, *asynq.Task) error
|
||||
}
|
||||
```
|
||||
package main
|
||||
|
||||
You can optionally use `ServeMux` to create a handler, just as you would with `"net/http"` Handler.
|
||||
import (
|
||||
"github.com/hibiken/asynq"
|
||||
"your/app/package/tasks"
|
||||
)
|
||||
|
||||
const redisAddr = "127.0.0.1:6379"
|
||||
|
||||
```go
|
||||
func main() {
|
||||
r := &asynq.RedisClientOpt{
|
||||
Addr: "127.0.0.1:6379",
|
||||
}
|
||||
r := &asynq.RedisClientOpt{Addr: redisAddr}
|
||||
|
||||
bg := asynq.NewBackground(r, &asynq.Config{
|
||||
// Specify how many concurrent workers to use
|
||||
@@ -92,23 +164,12 @@ func main() {
|
||||
|
||||
// mux maps a type to a handler
|
||||
mux := asynq.NewServeMux()
|
||||
mux.HandleFunc("email:signup", signupEmailHandler)
|
||||
mux.HandleFunc("email:reminder", reminderEmailHandler)
|
||||
mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask)
|
||||
mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask)
|
||||
// ...register other handlers...
|
||||
|
||||
bg.Run(mux)
|
||||
}
|
||||
|
||||
// function with the same signature as the ProcessTask method for the Handler interface.
|
||||
func signupEmailHandler(ctx context.Context, t *asynq.Task) error {
|
||||
id, err := t.Payload.GetInt("user_id")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("Send welcome email to user %d\n", id)
|
||||
// ...your email sending logic...
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
For a more detailed walk-through of the library, see our [Getting Started Guide](https://github.com/hibiken/asynq/wiki/Getting-Started).
|
||||
|
||||
@@ -44,8 +44,8 @@ func TestClientEnqueueAt(t *testing.T) {
|
||||
processAt: now,
|
||||
opts: []Option{},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: defaultMaxRetry,
|
||||
@@ -129,8 +129,8 @@ func TestClientEnqueue(t *testing.T) {
|
||||
MaxRetry(3),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: 3,
|
||||
@@ -148,8 +148,8 @@ func TestClientEnqueue(t *testing.T) {
|
||||
MaxRetry(-2),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: 0, // Retry count should be set to zero
|
||||
@@ -168,8 +168,8 @@ func TestClientEnqueue(t *testing.T) {
|
||||
MaxRetry(10),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: 10, // Last option takes precedence
|
||||
@@ -187,8 +187,8 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Queue("custom"),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"custom": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"custom": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: defaultMaxRetry,
|
||||
@@ -206,8 +206,8 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Queue("HIGH"),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"high": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"high": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: defaultMaxRetry,
|
||||
@@ -225,8 +225,8 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout(20 * time.Second),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: defaultMaxRetry,
|
||||
@@ -244,8 +244,8 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: defaultMaxRetry,
|
||||
@@ -324,8 +324,8 @@ func TestClientEnqueueIn(t *testing.T) {
|
||||
delay: 0,
|
||||
opts: []Option{},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": []*base.TaskMessage{
|
||||
&base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
Payload: task.Payload.data,
|
||||
Retry: defaultMaxRetry,
|
||||
|
||||
@@ -110,9 +110,9 @@ func TestProcessStateConcurrentAccess(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
started := time.Now()
|
||||
msgs := []*TaskMessage{
|
||||
&TaskMessage{ID: xid.New(), Type: "type1", Payload: map[string]interface{}{"user_id": 42}},
|
||||
&TaskMessage{ID: xid.New(), Type: "type2"},
|
||||
&TaskMessage{ID: xid.New(), Type: "type3"},
|
||||
{ID: xid.New(), Type: "type1", Payload: map[string]interface{}{"user_id": 42}},
|
||||
{ID: xid.New(), Type: "type2"},
|
||||
{ID: xid.New(), Type: "type3"},
|
||||
}
|
||||
|
||||
// Simulate hearbeater calling SetStatus and SetStarted.
|
||||
|
||||
@@ -11,36 +11,45 @@ import (
|
||||
"os"
|
||||
)
|
||||
|
||||
// NewLogger creates and returns a new instance of Logger.
|
||||
func NewLogger(out io.Writer) *Logger {
|
||||
return &Logger{
|
||||
stdlog.New(out, "", stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC),
|
||||
}
|
||||
}
|
||||
|
||||
// Logger is a wrapper object around log.Logger from the standard library.
|
||||
// It supports logging at various log levels.
|
||||
type Logger struct {
|
||||
*stdlog.Logger
|
||||
}
|
||||
|
||||
// Debug logs a message at Debug level.
|
||||
func (l *Logger) Debug(format string, args ...interface{}) {
|
||||
format = "DEBUG: " + format
|
||||
l.Printf(format, args...)
|
||||
}
|
||||
|
||||
// Info logs a message at Info level.
|
||||
func (l *Logger) Info(format string, args ...interface{}) {
|
||||
format = "INFO: " + format
|
||||
l.Printf(format, args...)
|
||||
}
|
||||
|
||||
// Warn logs a message at Warning level.
|
||||
func (l *Logger) Warn(format string, args ...interface{}) {
|
||||
format = "WARN: " + format
|
||||
l.Printf(format, args...)
|
||||
}
|
||||
|
||||
// Error logs a message at Error level.
|
||||
func (l *Logger) Error(format string, args ...interface{}) {
|
||||
format = "ERROR: " + format
|
||||
l.Printf(format, args...)
|
||||
}
|
||||
|
||||
// Fatal logs a message at Fatal level
|
||||
// and process will exit with status set to 1.
|
||||
func (l *Logger) Fatal(format string, args ...interface{}) {
|
||||
format = "FATAL: " + format
|
||||
l.Printf(format, args...)
|
||||
|
||||
@@ -884,7 +884,7 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
|
||||
gotWorkers[key] = &w
|
||||
}
|
||||
wantWorkers := map[string]*base.WorkerInfo{
|
||||
msg1.ID.String(): &base.WorkerInfo{
|
||||
msg1.ID.String(): {
|
||||
Host: host,
|
||||
PID: pid,
|
||||
ID: msg1.ID,
|
||||
@@ -893,7 +893,7 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
|
||||
Payload: msg1.Payload,
|
||||
Started: w1Started,
|
||||
},
|
||||
msg2.ID.String(): &base.WorkerInfo{
|
||||
msg2.ID.String(): {
|
||||
Host: host,
|
||||
PID: pid,
|
||||
ID: msg2.ID,
|
||||
|
||||
883
payload_test.go
883
payload_test.go
@@ -14,333 +14,626 @@ import (
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
)
|
||||
|
||||
func TestPayloadGet(t *testing.T) {
|
||||
names := []string{"luke", "anakin", "rey"}
|
||||
primes := []int{2, 3, 5, 7, 11, 13, 17}
|
||||
user := map[string]interface{}{"name": "Ken", "score": 3.14}
|
||||
location := map[string]string{"address": "123 Main St.", "state": "NY", "zipcode": "10002"}
|
||||
favs := map[string][]string{
|
||||
"movies": []string{"forrest gump", "star wars"},
|
||||
"tv_shows": []string{"game of thrones", "HIMYM", "breaking bad"},
|
||||
type payloadTest struct {
|
||||
data map[string]interface{}
|
||||
key string
|
||||
nonkey string
|
||||
}
|
||||
|
||||
func TestPayloadString(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"name": "gopher"},
|
||||
key: "name",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetString(tc.key)
|
||||
if err != nil || got != tc.data[tc.key] {
|
||||
t.Errorf("Payload.GetString(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetString(tc.key)
|
||||
if err != nil || got != tc.data[tc.key] {
|
||||
t.Errorf("With Marshaling: Payload.GetString(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetString(tc.nonkey)
|
||||
if err == nil || got != "" {
|
||||
t.Errorf("Payload.GetString(%q) = %v, %v; want '', error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadInt(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"user_id": 42},
|
||||
key: "user_id",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetInt(tc.key)
|
||||
if err != nil || got != tc.data[tc.key] {
|
||||
t.Errorf("Payload.GetInt(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetInt(tc.key)
|
||||
if err != nil || got != tc.data[tc.key] {
|
||||
t.Errorf("With Marshaling: Payload.GetInt(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetInt(tc.nonkey)
|
||||
if err == nil || got != 0 {
|
||||
t.Errorf("Payload.GetInt(%q) = %v, %v; want 0, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadFloat64(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"pi": 3.14},
|
||||
key: "pi",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetFloat64(tc.key)
|
||||
if err != nil || got != tc.data[tc.key] {
|
||||
t.Errorf("Payload.GetFloat64(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetFloat64(tc.key)
|
||||
if err != nil || got != tc.data[tc.key] {
|
||||
t.Errorf("With Marshaling: Payload.GetFloat64(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetFloat64(tc.nonkey)
|
||||
if err == nil || got != 0 {
|
||||
t.Errorf("Payload.GetFloat64(%q) = %v, %v; want 0, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadBool(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"enabled": true},
|
||||
key: "enabled",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetBool(tc.key)
|
||||
if err != nil || got != tc.data[tc.key] {
|
||||
t.Errorf("Payload.GetBool(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetBool(tc.key)
|
||||
if err != nil || got != tc.data[tc.key] {
|
||||
t.Errorf("With Marshaling: Payload.GetBool(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetBool(tc.nonkey)
|
||||
if err == nil || got != false {
|
||||
t.Errorf("Payload.GetBool(%q) = %v, %v; want false, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadStringSlice(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"names": []string{"luke", "rey", "anakin"}},
|
||||
key: "names",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetStringSlice(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetStringSlice(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetStringSlice(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetStringSlice(tc.nonkey)
|
||||
if err == nil || got != nil {
|
||||
t.Errorf("Payload.GetStringSlice(%q) = %v, %v; want nil, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadIntSlice(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"nums": []int{9, 8, 7}},
|
||||
key: "nums",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetIntSlice(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetIntSlice(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetIntSlice(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetIntSlice(tc.nonkey)
|
||||
if err == nil || got != nil {
|
||||
t.Errorf("Payload.GetIntSlice(%q) = %v, %v; want nil, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadStringMap(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"user": map[string]interface{}{"name": "Jon Doe", "score": 2.2}},
|
||||
key: "user",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetStringMap(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetStringMap(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetStringMap(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetStringMap(tc.nonkey)
|
||||
if err == nil || got != nil {
|
||||
t.Errorf("Payload.GetStringMap(%q) = %v, %v; want nil, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadStringMapString(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"address": map[string]string{"line": "123 Main St", "city": "San Francisco", "state": "CA"}},
|
||||
key: "address",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetStringMapString(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetStringMapString(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetStringMapString(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetStringMapString(tc.nonkey)
|
||||
if err == nil || got != nil {
|
||||
t.Errorf("Payload.GetStringMapString(%q) = %v, %v; want nil, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadStringMapStringSlice(t *testing.T) {
|
||||
favs := map[string][]string{
|
||||
"movies": {"forrest gump", "star wars"},
|
||||
"tv_shows": {"game of thrones", "HIMYM", "breaking bad"},
|
||||
}
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"favorites": favs},
|
||||
key: "favorites",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetStringMapStringSlice(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetStringMapStringSlice(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetStringMapStringSlice(tc.nonkey)
|
||||
if err == nil || got != nil {
|
||||
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v; want nil, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadStringMapInt(t *testing.T) {
|
||||
counter := map[string]int{
|
||||
"a": 1,
|
||||
"b": 101,
|
||||
"c": 42,
|
||||
}
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"counts": counter},
|
||||
key: "counts",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
got, err := payload.GetStringMapInt(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetStringMapInt(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetStringMapInt(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
// access non-existent key.
|
||||
got, err = payload.GetStringMapInt(tc.nonkey)
|
||||
if err == nil || got != nil {
|
||||
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v; want nil, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadStringMapBool(t *testing.T) {
|
||||
features := map[string]bool{
|
||||
"A": false,
|
||||
"B": true,
|
||||
"C": true,
|
||||
}
|
||||
now := time.Now()
|
||||
duration := 15 * time.Minute
|
||||
|
||||
data := map[string]interface{}{
|
||||
"greeting": "Hello",
|
||||
"user_id": 9876,
|
||||
"pi": 3.1415,
|
||||
"enabled": false,
|
||||
"names": names,
|
||||
"primes": primes,
|
||||
"user": user,
|
||||
"location": location,
|
||||
"favs": favs,
|
||||
"counter": counter,
|
||||
"features": features,
|
||||
"timestamp": now,
|
||||
"duration": duration,
|
||||
}
|
||||
payload := Payload{data}
|
||||
|
||||
gotStr, err := payload.GetString("greeting")
|
||||
if gotStr != "Hello" || err != nil {
|
||||
t.Errorf("Payload.GetString(%q) = %v, %v, want %v, nil",
|
||||
"greeting", gotStr, err, "Hello")
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"features": features},
|
||||
key: "features",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
gotInt, err := payload.GetInt("user_id")
|
||||
if gotInt != 9876 || err != nil {
|
||||
t.Errorf("Payload.GetInt(%q) = %v, %v, want, %v, nil",
|
||||
"user_id", gotInt, err, 9876)
|
||||
}
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
gotFloat, err := payload.GetFloat64("pi")
|
||||
if gotFloat != 3.1415 || err != nil {
|
||||
t.Errorf("Payload.GetFloat64(%q) = %v, %v, want, %v, nil",
|
||||
"pi", gotFloat, err, 3.141592)
|
||||
}
|
||||
got, err := payload.GetStringMapBool(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
gotBool, err := payload.GetBool("enabled")
|
||||
if gotBool != false || err != nil {
|
||||
t.Errorf("Payload.GetBool(%q) = %v, %v, want, %v, nil",
|
||||
"enabled", gotBool, err, false)
|
||||
}
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetStringMapBool(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetStringMapBool(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
gotStrSlice, err := payload.GetStringSlice("names")
|
||||
if diff := cmp.Diff(gotStrSlice, names); diff != "" {
|
||||
t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"names", gotStrSlice, err, names, diff)
|
||||
}
|
||||
|
||||
gotIntSlice, err := payload.GetIntSlice("primes")
|
||||
if diff := cmp.Diff(gotIntSlice, primes); diff != "" {
|
||||
t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"primes", gotIntSlice, err, primes, diff)
|
||||
}
|
||||
|
||||
gotStrMap, err := payload.GetStringMap("user")
|
||||
if diff := cmp.Diff(gotStrMap, user); diff != "" {
|
||||
t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"user", gotStrMap, err, user, diff)
|
||||
}
|
||||
|
||||
gotStrMapStr, err := payload.GetStringMapString("location")
|
||||
if diff := cmp.Diff(gotStrMapStr, location); diff != "" {
|
||||
t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"location", gotStrMapStr, err, location, diff)
|
||||
}
|
||||
|
||||
gotStrMapStrSlice, err := payload.GetStringMapStringSlice("favs")
|
||||
if diff := cmp.Diff(gotStrMapStrSlice, favs); diff != "" {
|
||||
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"favs", gotStrMapStrSlice, err, favs, diff)
|
||||
}
|
||||
|
||||
gotStrMapInt, err := payload.GetStringMapInt("counter")
|
||||
if diff := cmp.Diff(gotStrMapInt, counter); diff != "" {
|
||||
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"counter", gotStrMapInt, err, counter, diff)
|
||||
}
|
||||
|
||||
gotStrMapBool, err := payload.GetStringMapBool("features")
|
||||
if diff := cmp.Diff(gotStrMapBool, features); diff != "" {
|
||||
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"features", gotStrMapBool, err, features, diff)
|
||||
}
|
||||
|
||||
gotTime, err := payload.GetTime("timestamp")
|
||||
if !gotTime.Equal(now) {
|
||||
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil",
|
||||
"timestamp", gotTime, err, now)
|
||||
}
|
||||
|
||||
gotDuration, err := payload.GetDuration("duration")
|
||||
if gotDuration != duration {
|
||||
t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil",
|
||||
"duration", gotDuration, err, duration)
|
||||
// access non-existent key.
|
||||
got, err = payload.GetStringMapBool(tc.nonkey)
|
||||
if err == nil || got != nil {
|
||||
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v; want nil, error",
|
||||
tc.key, got, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadGetWithMarshaling(t *testing.T) {
|
||||
names := []string{"luke", "anakin", "rey"}
|
||||
primes := []int{2, 3, 5, 7, 11, 13, 17}
|
||||
user := map[string]interface{}{"name": "Ken", "score": 3.14}
|
||||
location := map[string]string{"address": "123 Main St.", "state": "NY", "zipcode": "10002"}
|
||||
favs := map[string][]string{
|
||||
"movies": []string{"forrest gump", "star wars"},
|
||||
"tv_shows": []string{"game of throwns", "HIMYM", "breaking bad"},
|
||||
}
|
||||
counter := map[string]int{
|
||||
"a": 1,
|
||||
"b": 101,
|
||||
"c": 42,
|
||||
}
|
||||
features := map[string]bool{
|
||||
"A": false,
|
||||
"B": true,
|
||||
"C": true,
|
||||
}
|
||||
now := time.Now()
|
||||
duration := 15 * time.Minute
|
||||
|
||||
in := Payload{map[string]interface{}{
|
||||
"subject": "Hello",
|
||||
"recipient_id": 9876,
|
||||
"pi": 3.14,
|
||||
"enabled": true,
|
||||
"names": names,
|
||||
"primes": primes,
|
||||
"user": user,
|
||||
"location": location,
|
||||
"favs": favs,
|
||||
"counter": counter,
|
||||
"features": features,
|
||||
"timestamp": now,
|
||||
"duration": duration,
|
||||
}}
|
||||
// encode and then decode task messsage
|
||||
inMsg := h.NewTaskMessage("testing", in.data)
|
||||
data, err := json.Marshal(inMsg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var outMsg base.TaskMessage
|
||||
err = json.Unmarshal(data, &outMsg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
out := Payload{outMsg.Payload}
|
||||
|
||||
gotStr, err := out.GetString("subject")
|
||||
if gotStr != "Hello" || err != nil {
|
||||
t.Errorf("Payload.GetString(%q) = %v, %v; want %q, nil",
|
||||
"subject", gotStr, err, "Hello")
|
||||
func TestPayloadTime(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"current": time.Now()},
|
||||
key: "current",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
gotInt, err := out.GetInt("recipient_id")
|
||||
if gotInt != 9876 || err != nil {
|
||||
t.Errorf("Payload.GetInt(%q) = %v, %v; want %v, nil",
|
||||
"recipient_id", gotInt, err, 9876)
|
||||
}
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
gotFloat, err := out.GetFloat64("pi")
|
||||
if gotFloat != 3.14 || err != nil {
|
||||
t.Errorf("Payload.GetFloat64(%q) = %v, %v; want %v, nil",
|
||||
"pi", gotFloat, err, 3.14)
|
||||
}
|
||||
got, err := payload.GetTime(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
gotBool, err := out.GetBool("enabled")
|
||||
if gotBool != true || err != nil {
|
||||
t.Errorf("Payload.GetBool(%q) = %v, %v; want %v, nil",
|
||||
"enabled", gotBool, err, true)
|
||||
}
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetTime(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetTime(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
gotStrSlice, err := out.GetStringSlice("names")
|
||||
if diff := cmp.Diff(gotStrSlice, names); diff != "" {
|
||||
t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"names", gotStrSlice, err, names, diff)
|
||||
}
|
||||
|
||||
gotIntSlice, err := out.GetIntSlice("primes")
|
||||
if diff := cmp.Diff(gotIntSlice, primes); diff != "" {
|
||||
t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"primes", gotIntSlice, err, primes, diff)
|
||||
}
|
||||
|
||||
gotStrMap, err := out.GetStringMap("user")
|
||||
if diff := cmp.Diff(gotStrMap, user); diff != "" {
|
||||
t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"user", gotStrMap, err, user, diff)
|
||||
}
|
||||
|
||||
gotStrMapStr, err := out.GetStringMapString("location")
|
||||
if diff := cmp.Diff(gotStrMapStr, location); diff != "" {
|
||||
t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"location", gotStrMapStr, err, location, diff)
|
||||
}
|
||||
|
||||
gotStrMapStrSlice, err := out.GetStringMapStringSlice("favs")
|
||||
if diff := cmp.Diff(gotStrMapStrSlice, favs); diff != "" {
|
||||
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"favs", gotStrMapStrSlice, err, favs, diff)
|
||||
}
|
||||
|
||||
gotStrMapInt, err := out.GetStringMapInt("counter")
|
||||
if diff := cmp.Diff(gotStrMapInt, counter); diff != "" {
|
||||
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"counter", gotStrMapInt, err, counter, diff)
|
||||
}
|
||||
|
||||
gotStrMapBool, err := out.GetStringMapBool("features")
|
||||
if diff := cmp.Diff(gotStrMapBool, features); diff != "" {
|
||||
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
|
||||
"features", gotStrMapBool, err, features, diff)
|
||||
}
|
||||
|
||||
gotTime, err := out.GetTime("timestamp")
|
||||
if !gotTime.Equal(now) {
|
||||
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil",
|
||||
"timestamp", gotTime, err, now)
|
||||
}
|
||||
|
||||
gotDuration, err := out.GetDuration("duration")
|
||||
if gotDuration != duration {
|
||||
t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil",
|
||||
"duration", gotDuration, err, duration)
|
||||
// access non-existent key.
|
||||
got, err = payload.GetTime(tc.nonkey)
|
||||
if err == nil || !got.IsZero() {
|
||||
t.Errorf("Payload.GetTime(%q) = %v, %v; want %v, error",
|
||||
tc.key, got, err, time.Time{})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPayloadKeyNotFound(t *testing.T) {
|
||||
payload := Payload{nil}
|
||||
|
||||
key := "something"
|
||||
gotStr, err := payload.GetString(key)
|
||||
if err == nil || gotStr != "" {
|
||||
t.Errorf("Payload.GetString(%q) = %v, %v; want '', error",
|
||||
key, gotStr, err)
|
||||
func TestPayloadDuration(t *testing.T) {
|
||||
tests := []payloadTest{
|
||||
{
|
||||
data: map[string]interface{}{"duration": 15 * time.Minute},
|
||||
key: "duration",
|
||||
nonkey: "unknown",
|
||||
},
|
||||
}
|
||||
|
||||
gotInt, err := payload.GetInt(key)
|
||||
if err == nil || gotInt != 0 {
|
||||
t.Errorf("Payload.GetInt(%q) = %v, %v; want 0, error",
|
||||
key, gotInt, err)
|
||||
}
|
||||
for _, tc := range tests {
|
||||
payload := Payload{tc.data}
|
||||
|
||||
gotFloat, err := payload.GetFloat64(key)
|
||||
if err == nil || gotFloat != 0 {
|
||||
t.Errorf("Payload.GetFloat64(%q = %v, %v; want 0, error",
|
||||
key, gotFloat, err)
|
||||
}
|
||||
got, err := payload.GetDuration(tc.key)
|
||||
diff := cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
gotBool, err := payload.GetBool(key)
|
||||
if err == nil || gotBool != false {
|
||||
t.Errorf("Payload.GetBool(%q) = %v, %v; want false, error",
|
||||
key, gotBool, err)
|
||||
}
|
||||
// encode and then decode task messsage.
|
||||
in := h.NewTaskMessage("testing", tc.data)
|
||||
b, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out base.TaskMessage
|
||||
err = json.Unmarshal(b, &out)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
payload = Payload{out.Payload}
|
||||
got, err = payload.GetDuration(tc.key)
|
||||
diff = cmp.Diff(got, tc.data[tc.key])
|
||||
if err != nil || diff != "" {
|
||||
t.Errorf("With Marshaling: Payload.GetDuration(%q) = %v, %v, want %v, nil",
|
||||
tc.key, got, err, tc.data[tc.key])
|
||||
}
|
||||
|
||||
gotStrSlice, err := payload.GetStringSlice(key)
|
||||
if err == nil || gotStrSlice != nil {
|
||||
t.Errorf("Payload.GetStringSlice(%q) = %v, %v; want nil, error",
|
||||
key, gotStrSlice, err)
|
||||
}
|
||||
|
||||
gotIntSlice, err := payload.GetIntSlice(key)
|
||||
if err == nil || gotIntSlice != nil {
|
||||
t.Errorf("Payload.GetIntSlice(%q) = %v, %v; want nil, error",
|
||||
key, gotIntSlice, err)
|
||||
}
|
||||
|
||||
gotStrMap, err := payload.GetStringMap(key)
|
||||
if err == nil || gotStrMap != nil {
|
||||
t.Errorf("Payload.GetStringMap(%q) = %v, %v; want nil, error",
|
||||
key, gotStrMap, err)
|
||||
}
|
||||
|
||||
gotStrMapStr, err := payload.GetStringMapString(key)
|
||||
if err == nil || gotStrMapStr != nil {
|
||||
t.Errorf("Payload.GetStringMapString(%q) = %v, %v; want nil, error",
|
||||
key, gotStrMapStr, err)
|
||||
}
|
||||
|
||||
gotStrMapStrSlice, err := payload.GetStringMapStringSlice(key)
|
||||
if err == nil || gotStrMapStrSlice != nil {
|
||||
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v; want nil, error",
|
||||
key, gotStrMapStrSlice, err)
|
||||
}
|
||||
|
||||
gotStrMapInt, err := payload.GetStringMapInt(key)
|
||||
if err == nil || gotStrMapInt != nil {
|
||||
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want nil, error",
|
||||
key, gotStrMapInt, err)
|
||||
}
|
||||
|
||||
gotStrMapBool, err := payload.GetStringMapBool(key)
|
||||
if err == nil || gotStrMapBool != nil {
|
||||
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want nil, error",
|
||||
key, gotStrMapBool, err)
|
||||
}
|
||||
|
||||
gotTime, err := payload.GetTime(key)
|
||||
if err == nil || !gotTime.IsZero() {
|
||||
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, error",
|
||||
key, gotTime, err, time.Time{})
|
||||
}
|
||||
|
||||
gotDuration, err := payload.GetDuration(key)
|
||||
if err == nil || gotDuration != 0 {
|
||||
t.Errorf("Payload.GetDuration(%q) = %v, %v, want 0, error",
|
||||
key, gotDuration, err)
|
||||
// access non-existent key.
|
||||
got, err = payload.GetDuration(tc.nonkey)
|
||||
if err == nil || got != 0 {
|
||||
t.Errorf("Payload.GetDuration(%q) = %v, %v; want %v, error",
|
||||
tc.key, got, err, time.Duration(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
25
servemux.go
25
servemux.go
@@ -23,9 +23,10 @@ import (
|
||||
// "images:thumbnails" and the former will receive tasks with type name beginning
|
||||
// with "images".
|
||||
type ServeMux struct {
|
||||
mu sync.RWMutex
|
||||
m map[string]muxEntry
|
||||
es []muxEntry // slice of entries sorted from longest to shortest.
|
||||
mu sync.RWMutex
|
||||
m map[string]muxEntry
|
||||
es []muxEntry // slice of entries sorted from longest to shortest.
|
||||
mws []MiddlewareFunc
|
||||
}
|
||||
|
||||
type muxEntry struct {
|
||||
@@ -33,6 +34,11 @@ type muxEntry struct {
|
||||
pattern string
|
||||
}
|
||||
|
||||
// MiddlewareFunc is a function which receives an asynq.Handler and returns another asynq.Handler.
|
||||
// Typically, the returned handler is a closure which does something with the context and task passed
|
||||
// to it, and then calls the handler passed as parameter to the MiddlewareFunc.
|
||||
type MiddlewareFunc func(Handler) Handler
|
||||
|
||||
// NewServeMux allocates and returns a new ServeMux.
|
||||
func NewServeMux() *ServeMux {
|
||||
return new(ServeMux)
|
||||
@@ -60,6 +66,9 @@ func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) {
|
||||
if h == nil {
|
||||
h, pattern = NotFoundHandler(), ""
|
||||
}
|
||||
for i := len(mux.mws) - 1; i >= 0; i-- {
|
||||
h = mux.mws[i](h)
|
||||
}
|
||||
return h, pattern
|
||||
}
|
||||
|
||||
@@ -130,6 +139,16 @@ func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *T
|
||||
mux.Handle(pattern, HandlerFunc(handler))
|
||||
}
|
||||
|
||||
// Use appends a MiddlewareFunc to the chain.
|
||||
// Middlewares are executed in the order that they are applied to the ServeMux.
|
||||
func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
|
||||
mux.mu.Lock()
|
||||
defer mux.mu.Unlock()
|
||||
for _, fn := range mws {
|
||||
mux.mws = append(mux.mws, fn)
|
||||
}
|
||||
}
|
||||
|
||||
// NotFound returns an error indicating that the handler was not found for the given task.
|
||||
func NotFound(ctx context.Context, task *Task) error {
|
||||
return fmt.Errorf("handler not found for task %q", task.Type)
|
||||
|
||||
@@ -7,9 +7,12 @@ package asynq
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
var called string
|
||||
var called string // identity of the handler that was called.
|
||||
var invoked []string // list of middlewares in the order they were invoked.
|
||||
|
||||
// makeFakeHandler returns a handler that updates the global called variable
|
||||
// to the given identity.
|
||||
@@ -20,6 +23,17 @@ func makeFakeHandler(identity string) Handler {
|
||||
})
|
||||
}
|
||||
|
||||
// makeFakeMiddleware returns a middleware function that appends the given identity
|
||||
//to the global invoked slice.
|
||||
func makeFakeMiddleware(identity string) MiddlewareFunc {
|
||||
return func(next Handler) Handler {
|
||||
return HandlerFunc(func(ctx context.Context, t *Task) error {
|
||||
invoked = append(invoked, identity)
|
||||
return next.ProcessTask(ctx, t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// A list of pattern, handler pair that is registered with mux.
|
||||
var serveMuxRegister = []struct {
|
||||
pattern string
|
||||
@@ -114,3 +128,43 @@ func TestServeMuxNotFound(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var middlewareTests = []struct {
|
||||
typename string // task's type name
|
||||
middlewares []string // middlewares to use. They should be called in this order.
|
||||
want string // identifier of the handler that should be called
|
||||
}{
|
||||
{"email:signup", []string{"logging", "expiration"}, "signup email handler"},
|
||||
{"csv:export", []string{}, "csv export handler"},
|
||||
{"email:daily", []string{"expiration", "logging"}, "default email handler"},
|
||||
}
|
||||
|
||||
func TestServeMuxMiddlewares(t *testing.T) {
|
||||
for _, tc := range middlewareTests {
|
||||
mux := NewServeMux()
|
||||
for _, e := range serveMuxRegister {
|
||||
mux.Handle(e.pattern, e.h)
|
||||
}
|
||||
var mws []MiddlewareFunc
|
||||
for _, s := range tc.middlewares {
|
||||
mws = append(mws, makeFakeMiddleware(s))
|
||||
}
|
||||
mux.Use(mws...)
|
||||
|
||||
invoked = []string{} // reset to empty slice
|
||||
called = "" // reset to zero value
|
||||
|
||||
task := NewTask(tc.typename, nil)
|
||||
if err := mux.ProcessTask(context.Background(), task); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(invoked, tc.middlewares); diff != "" {
|
||||
t.Errorf("invoked middlewares were %v, want %v", invoked, tc.middlewares)
|
||||
}
|
||||
|
||||
if called != tc.want {
|
||||
t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ var workersCmd = &cobra.Command{
|
||||
Short: "Shows all running workers information",
|
||||
Long: `Workers (asynqmon workers) will show all running workers information.
|
||||
|
||||
The command shows the follwoing for each worker:
|
||||
The command shows the following for each worker:
|
||||
* Process in which the worker is running
|
||||
* ID of the task worker is processing
|
||||
* Type of the task worker is processing
|
||||
|
||||
Reference in New Issue
Block a user