mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-20 13:15:51 +08:00
Compare commits
2 Commits
dependabot
...
pr-1100
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a9dd34460 | ||
|
|
07898eade0 |
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -79,6 +79,6 @@ jobs:
|
|||||||
go-version: stable
|
go-version: stable
|
||||||
|
|
||||||
- name: golangci-lint
|
- name: golangci-lint
|
||||||
uses: golangci/golangci-lint-action@v9
|
uses: golangci/golangci-lint-action@v6
|
||||||
with:
|
with:
|
||||||
version: v1.61
|
version: v1.61
|
||||||
|
|||||||
13
asynq.go
13
asynq.go
@@ -471,7 +471,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
|
|||||||
// redis://[:password@]host[:port][/dbnumber]
|
// redis://[:password@]host[:port][/dbnumber]
|
||||||
// rediss://[:password@]host[:port][/dbnumber]
|
// rediss://[:password@]host[:port][/dbnumber]
|
||||||
// redis-socket://[:password@]path[?db=dbnumber]
|
// redis-socket://[:password@]path[?db=dbnumber]
|
||||||
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
|
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][/dbnumber][?master=masterName]
|
||||||
func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
||||||
u, err := url.Parse(uri)
|
u, err := url.Parse(uri)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -545,11 +545,20 @@ func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
|
|||||||
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
|
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
|
||||||
addrs := strings.Split(u.Host, ",")
|
addrs := strings.Split(u.Host, ",")
|
||||||
master := u.Query().Get("master")
|
master := u.Query().Get("master")
|
||||||
|
var db int
|
||||||
|
var err error
|
||||||
|
if len(u.Path) > 0 {
|
||||||
|
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
|
||||||
|
db, err = strconv.Atoi(xs[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("asynq: could not parse redis sentinel uri: database number should be the first segment of the path")
|
||||||
|
}
|
||||||
|
}
|
||||||
var password string
|
var password string
|
||||||
if v, ok := u.User.Password(); ok {
|
if v, ok := u.User.Password(); ok {
|
||||||
password = v
|
password = v
|
||||||
}
|
}
|
||||||
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password}, nil
|
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password, DB: db}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResultWriter is a client interface to write result data for a task.
|
// ResultWriter is a client interface to write result data for a task.
|
||||||
|
|||||||
@@ -146,6 +146,24 @@ func TestParseRedisURI(t *testing.T) {
|
|||||||
MasterName: "mymaster",
|
MasterName: "mymaster",
|
||||||
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
|
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
|
||||||
SentinelPassword: "mypassword",
|
SentinelPassword: "mypassword",
|
||||||
|
DB: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"redis-sentinel://localhost:5000,localhost:5001,localhost:5002/3?master=mymaster",
|
||||||
|
RedisFailoverClientOpt{
|
||||||
|
MasterName: "mymaster",
|
||||||
|
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
|
||||||
|
DB: 3,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002/7?master=mymaster",
|
||||||
|
RedisFailoverClientOpt{
|
||||||
|
MasterName: "mymaster",
|
||||||
|
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
|
||||||
|
SentinelPassword: "mypassword",
|
||||||
|
DB: 7,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -188,6 +206,10 @@ func TestParseRedisURIErrors(t *testing.T) {
|
|||||||
"non integer for db numbers for socket",
|
"non integer for db numbers for socket",
|
||||||
"redis-socket:///some/path/to/redis?db=one",
|
"redis-socket:///some/path/to/redis?db=one",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"non integer for db number for sentinel",
|
||||||
|
"redis-sentinel://localhost:5000/abc?master=mymaster",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
|||||||
@@ -264,9 +264,7 @@ for i=1,2 do
|
|||||||
if (table.getn(ids) > 0) then
|
if (table.getn(ids) > 0) then
|
||||||
for _, id in ipairs(ids) do
|
for _, id in ipairs(ids) do
|
||||||
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
||||||
if bytes then
|
sample_total = sample_total + bytes
|
||||||
sample_total = sample_total + bytes
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
local n = redis.call("LLEN", KEYS[i])
|
local n = redis.call("LLEN", KEYS[i])
|
||||||
local avg = sample_total / table.getn(ids)
|
local avg = sample_total / table.getn(ids)
|
||||||
@@ -283,9 +281,7 @@ for i=3,6 do
|
|||||||
if (table.getn(ids) > 0) then
|
if (table.getn(ids) > 0) then
|
||||||
for _, id in ipairs(ids) do
|
for _, id in ipairs(ids) do
|
||||||
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
||||||
if bytes then
|
sample_total = sample_total + bytes
|
||||||
sample_total = sample_total + bytes
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
local n = redis.call("ZCARD", KEYS[i])
|
local n = redis.call("ZCARD", KEYS[i])
|
||||||
local avg = sample_total / table.getn(ids)
|
local avg = sample_total / table.getn(ids)
|
||||||
@@ -308,17 +304,13 @@ if table.getn(groups) > 0 then
|
|||||||
local ids = redis.call("ZRANGE", group_key, 0, sample_size - 1)
|
local ids = redis.call("ZRANGE", group_key, 0, sample_size - 1)
|
||||||
for _, id in ipairs(ids) do
|
for _, id in ipairs(ids) do
|
||||||
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
||||||
if bytes then
|
agg_task_sample_total = agg_task_sample_total + bytes
|
||||||
agg_task_sample_total = agg_task_sample_total + bytes
|
agg_task_sample_size = agg_task_sample_size + 1
|
||||||
agg_task_sample_size = agg_task_sample_size + 1
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
if agg_task_sample_size > 0 then
|
local avg = agg_task_sample_total / agg_task_sample_size
|
||||||
local avg = agg_task_sample_total / agg_task_sample_size
|
memusg = memusg + (avg * agg_task_count)
|
||||||
memusg = memusg + (avg * agg_task_count)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
return memusg
|
return memusg
|
||||||
`)
|
`)
|
||||||
|
|||||||
@@ -1488,7 +1488,6 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
|||||||
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
|
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
|
||||||
_, err := pubsub.Receive(ctx)
|
_, err := pubsub.Receive(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pubsub.Close()
|
|
||||||
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
||||||
}
|
}
|
||||||
return pubsub, nil
|
return pubsub, nil
|
||||||
|
|||||||
@@ -3274,29 +3274,6 @@ func TestCancelationPubSub(t *testing.T) {
|
|||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCancelationPubSubReceiveError(t *testing.T) {
|
|
||||||
// Use a client connected to a non-existent Redis server to trigger
|
|
||||||
// a Receive() error. This verifies that the pubsub connection is
|
|
||||||
// closed on error, preventing connection leaks.
|
|
||||||
client := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "localhost:0", // invalid port — connection will fail
|
|
||||||
})
|
|
||||||
r := NewRDB(client)
|
|
||||||
defer r.Close()
|
|
||||||
|
|
||||||
pubsub, err := r.CancelationPubSub()
|
|
||||||
if err == nil {
|
|
||||||
// If no error, we must clean up the pubsub.
|
|
||||||
if pubsub != nil {
|
|
||||||
pubsub.Close()
|
|
||||||
}
|
|
||||||
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
|
|
||||||
}
|
|
||||||
if pubsub != nil {
|
|
||||||
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWriteResult(t *testing.T) {
|
func TestWriteResult(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|||||||
Reference in New Issue
Block a user