2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-02-04 17:39:37 +00:00

Implement reusing redis client

This commit is contained in:
Jeroen Bobbeldijk
2023-09-19 10:16:51 +02:00
parent 6a7bf2ceff
commit 9e548fc097
8 changed files with 184 additions and 50 deletions

View File

@@ -10,11 +10,11 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
)
@@ -43,15 +43,27 @@ type Scheduler struct {
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false
return scheduler
}
// NewSchedulerFromRedisClient returns a new Scheduler instance given a redis client.
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: the redis client will not be closed by Asynq, you are responsible for closing.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
@@ -72,7 +84,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
id: generateSchedulerID(),
state: &serverState{value: srvStateNew},
logger: logger,
client: NewClient(r),
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
@@ -262,7 +274,9 @@ func (s *Scheduler) Shutdown() {
s.clearHistory()
s.client.Close()
s.rdb.Close()
if !s.sharedConnection {
s.rdb.Close()
}
s.logger.Info("Scheduler stopped")
}