2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-10 21:05:54 +08:00

Compare commits

...

5 Commits

Author SHA1 Message Date
dependabot[bot]
56c1117078 build(deps): bump golang.org/x/sys from 0.27.0 to 0.31.0
Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.27.0 to 0.31.0.
- [Commits](https://github.com/golang/sys/compare/v0.27.0...v0.31.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sys
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-03-10 23:52:47 +00:00
Mohammed Sohail
489e21920b release: v0.25.1 2024-12-11 09:19:37 +03:00
Mohamed Sohail
043dcfbf56 fix: call Stop on all other signals to correctly set the server state for the shutdown procedure to complete successfully (#982)
* fixes: #979
2024-12-11 09:05:00 +03:00
Robin Joseph
02907551b4 feat(dash): Add --insecure option (#980) 2024-12-09 09:09:12 +03:00
Mohamed Sohail
127fac2e90 fix: NewScheduler incorrectly creates underlying Client, closing broker properly (#977)
* fix: NewScheduler wrongly creates a client whose sharedConnection value is always true

* This is affecting the PeriodicManager as well as the Scheduler

* fix: closing the Client also closes the broker

* The error was also previously unhandled. For shared connections an error will be returned by the broker itself because the sharedConnection bool is also set on the client. This also means we can get rid of the sharedConnection flag on the Scheduler itself and let it work internally.
2024-12-06 08:40:04 +03:00
7 changed files with 49 additions and 16 deletions

View File

@@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.25.1] - 2024-12-11
### Upgrades
* Some packages
### Added
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
### Fixes
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
## [0.25.0] - 2024-10-29
### Upgrades

3
go.mod
View File

@@ -1,6 +1,7 @@
module github.com/hibiken/asynq
go 1.22
toolchain go1.23.7
require (
github.com/google/go-cmp v0.6.0
@@ -9,7 +10,7 @@ require (
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.7.0
go.uber.org/goleak v1.3.0
golang.org/x/sys v0.27.0
golang.org/x/sys v0.31.0
golang.org/x/time v0.8.0
google.golang.org/protobuf v1.35.2
)

4
go.sum
View File

@@ -32,8 +32,8 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=

View File

@@ -23,7 +23,7 @@ import (
)
// Version of asynq library and CLI.
const Version = "0.25.0"
const Version = "0.25.1"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"

View File

@@ -44,9 +44,6 @@ type Scheduler struct {
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}
const defaultHeartbeatInterval = 10 * time.Second
@@ -54,12 +51,18 @@ const defaultHeartbeatInterval = 10 * time.Second
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
scheduler := newScheduler(opts)
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false
rdb := rdb.NewRDB(redisClient)
scheduler.rdb = rdb
scheduler.client = &Client{broker: rdb, sharedConnection: false}
return scheduler
}
@@ -67,6 +70,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
scheduler := newScheduler(opts)
scheduler.rdb = rdb.NewRDB(c)
scheduler.client = NewClientFromRedisClient(c)
return scheduler
}
func newScheduler(opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
@@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
state: &serverState{value: srvStateNew},
heartbeatInterval: heartbeatInterval,
logger: logger,
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
@@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() {
if err := s.client.Close(); err != nil {
s.logger.Errorf("Failed to close redis client connection: %v", err)
}
if !s.sharedConnection {
s.rdb.Close()
}
s.logger.Info("Scheduler stopped")
}

View File

@@ -24,8 +24,10 @@ func (srv *Server) waitForSignals() {
if sig == unix.SIGTSTP {
srv.Stop()
continue
} else {
srv.Stop()
break
}
break
}
}

View File

@@ -40,6 +40,7 @@ var (
useRedisCluster bool
clusterAddrs string
tlsServerName string
insecure bool
)
// rootCmd represents the base command when called without any subcommands
@@ -314,6 +315,8 @@ func init() {
"List of comma-separated redis server addresses")
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
"", "Server name for TLS validation")
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
false, "Allow insecure TLS connection by skipping cert validation")
// Bind flags with config.
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
@@ -321,6 +324,7 @@ func init() {
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
}
// initConfig reads in config file and ENV variables if set.
@@ -402,7 +406,7 @@ func getTLSConfig() *tls.Config {
if tlsServer == "" {
return nil
}
return &tls.Config{ServerName: tlsServer}
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
}
// printTable is a helper function to print data in table format.