2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-05-06 08:05:59 +08:00

Compare commits

..

2 Commits

Author SHA1 Message Date
Mohammed Sohail
0a9dd34460 tests: add explicit test to demo default to 0 db on redis-sentinel without db opt 2026-04-09 11:35:22 +03:00
Jiale Lin
07898eade0 fix: parse DB number from redis-sentinel URI path (#669)
Previously, parseRedisSentinelURI ignored the /dbnumber path segment,
making it impossible to connect to any DB other than 0 via sentinel URIs.

This adds DB extraction from the URI path, consistent with how
parseRedisURI already handles it for redis:// and rediss:// schemes.

Closes #669
2026-03-21 11:33:56 -07:00
7 changed files with 42 additions and 50 deletions

View File

@@ -471,7 +471,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
// redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][/dbnumber][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
@@ -545,11 +545,20 @@ func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
addrs := strings.Split(u.Host, ",")
master := u.Query().Get("master")
var db int
var err error
if len(u.Path) > 0 {
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
db, err = strconv.Atoi(xs[0])
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis sentinel uri: database number should be the first segment of the path")
}
}
var password string
if v, ok := u.User.Password(); ok {
password = v
}
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password}, nil
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password, DB: db}, nil
}
// ResultWriter is a client interface to write result data for a task.

View File

@@ -146,6 +146,24 @@ func TestParseRedisURI(t *testing.T) {
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
SentinelPassword: "mypassword",
DB: 0,
},
},
{
"redis-sentinel://localhost:5000,localhost:5001,localhost:5002/3?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
DB: 3,
},
},
{
"redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002/7?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
SentinelPassword: "mypassword",
DB: 7,
},
},
}
@@ -188,6 +206,10 @@ func TestParseRedisURIErrors(t *testing.T) {
"non integer for db numbers for socket",
"redis-socket:///some/path/to/redis?db=one",
},
{
"non integer for db number for sentinel",
"redis-sentinel://localhost:5000/abc?master=mymaster",
},
}
for _, tc := range tests {

3
go.mod
View File

@@ -5,7 +5,7 @@ go 1.24.0
require (
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.18.0
github.com/redis/go-redis/v9 v9.14.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.10.0
go.uber.org/goleak v1.3.0
@@ -17,5 +17,4 @@ require (
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
go.uber.org/atomic v1.11.0 // indirect
)

10
go.sum
View File

@@ -14,16 +14,14 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
@@ -32,10 +30,6 @@ github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=

View File

@@ -264,9 +264,7 @@ for i=1,2 do
if (table.getn(ids) > 0) then
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
if bytes then
sample_total = sample_total + bytes
end
sample_total = sample_total + bytes
end
local n = redis.call("LLEN", KEYS[i])
local avg = sample_total / table.getn(ids)
@@ -283,9 +281,7 @@ for i=3,6 do
if (table.getn(ids) > 0) then
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
if bytes then
sample_total = sample_total + bytes
end
sample_total = sample_total + bytes
end
local n = redis.call("ZCARD", KEYS[i])
local avg = sample_total / table.getn(ids)
@@ -308,17 +304,13 @@ if table.getn(groups) > 0 then
local ids = redis.call("ZRANGE", group_key, 0, sample_size - 1)
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
if bytes then
agg_task_sample_total = agg_task_sample_total + bytes
agg_task_sample_size = agg_task_sample_size + 1
end
agg_task_sample_total = agg_task_sample_total + bytes
agg_task_sample_size = agg_task_sample_size + 1
end
end
end
if agg_task_sample_size > 0 then
local avg = agg_task_sample_total / agg_task_sample_size
memusg = memusg + (avg * agg_task_count)
end
local avg = agg_task_sample_total / agg_task_sample_size
memusg = memusg + (avg * agg_task_count)
end
return memusg
`)

View File

@@ -1488,7 +1488,6 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
_, err := pubsub.Receive(ctx)
if err != nil {
pubsub.Close()
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
return pubsub, nil

View File

@@ -3274,29 +3274,6 @@ func TestCancelationPubSub(t *testing.T) {
mu.Unlock()
}
func TestCancelationPubSubReceiveError(t *testing.T) {
// Use a client connected to a non-existent Redis server to trigger
// a Receive() error. This verifies that the pubsub connection is
// closed on error, preventing connection leaks.
client := redis.NewClient(&redis.Options{
Addr: "localhost:0", // invalid port — connection will fail
})
r := NewRDB(client)
defer r.Close()
pubsub, err := r.CancelationPubSub()
if err == nil {
// If no error, we must clean up the pubsub.
if pubsub != nil {
pubsub.Close()
}
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
}
if pubsub != nil {
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
}
}
func TestWriteResult(t *testing.T) {
r := setup(t)
defer r.Close()