2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-05-17 07:42:50 +08:00

Compare commits

..

4 Commits

Author SHA1 Message Date
dependabot[bot]
5b38bad6e9 build(deps): bump github.com/redis/go-redis/v9 from 9.14.1 to 9.19.0
Bumps [github.com/redis/go-redis/v9](https://github.com/redis/go-redis) from 9.14.1 to 9.19.0.
- [Release notes](https://github.com/redis/go-redis/releases)
- [Changelog](https://github.com/redis/go-redis/blob/master/RELEASE-NOTES.md)
- [Commits](https://github.com/redis/go-redis/compare/v9.14.1...v9.19.0)

---
updated-dependencies:
- dependency-name: github.com/redis/go-redis/v9
  dependency-version: 9.19.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-05 02:28:39 +00:00
Mohamed Sohail
23905a286f Merge pull request #1104 from Bahtya/fix/pubsub-connection-leak
fix: close pubsub connection on Subscribe error in CancelationPubSub
2026-04-13 13:58:23 +03:00
Bahtya
5586efeae7 test: add test for CancelationPubSub error path
Add TestCancelationPubSubReceiveError to verify that when
Receive() fails in CancelationPubSub(), an error is returned
and the pubsub connection is not leaked.

This provides test coverage for the pubsub.Close() fix that
was missing in the previous commit.

Bahtya
2026-04-09 20:11:53 +08:00
bahtya
dd3c923f44 fix: close pubsub connection on Subscribe error in CancelationPubSub
When redis.Subscribe succeeds but Receive() fails, the pubsub
connection was not being closed before returning the error. This caused
the subscriber goroutine to leak a Redis connection on each retry
iteration, eventually exhausting the connection pool.

Fixes #1095
2026-04-09 18:15:48 +08:00
4 changed files with 34 additions and 6 deletions

4
go.mod
View File

@@ -5,7 +5,7 @@ go 1.24.0
require (
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.14.1
github.com/redis/go-redis/v9 v9.19.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.10.0
go.uber.org/goleak v1.3.0
@@ -16,5 +16,5 @@ require (
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
go.uber.org/atomic v1.11.0 // indirect
)

12
go.sum
View File

@@ -6,22 +6,22 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k=
github.com/redis/go-redis/v9 v9.19.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
@@ -30,6 +30,10 @@ github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=

View File

@@ -1488,6 +1488,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
_, err := pubsub.Receive(ctx)
if err != nil {
pubsub.Close()
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
return pubsub, nil

View File

@@ -3274,6 +3274,29 @@ func TestCancelationPubSub(t *testing.T) {
mu.Unlock()
}
func TestCancelationPubSubReceiveError(t *testing.T) {
// Use a client connected to a non-existent Redis server to trigger
// a Receive() error. This verifies that the pubsub connection is
// closed on error, preventing connection leaks.
client := redis.NewClient(&redis.Options{
Addr: "localhost:0", // invalid port — connection will fail
})
r := NewRDB(client)
defer r.Close()
pubsub, err := r.CancelationPubSub()
if err == nil {
// If no error, we must clean up the pubsub.
if pubsub != nil {
pubsub.Close()
}
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
}
if pubsub != nil {
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
}
}
func TestWriteResult(t *testing.T) {
r := setup(t)
defer r.Close()