2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-14 22:55:58 +08:00

Compare commits

...

4 Commits

Author SHA1 Message Date
dependabot[bot]
9ff76e2dd0 build(deps): bump github.com/redis/go-redis/v9 in /tools
Bumps [github.com/redis/go-redis/v9](https://github.com/redis/go-redis) from 9.7.0 to 9.18.0.
- [Release notes](https://github.com/redis/go-redis/releases)
- [Changelog](https://github.com/redis/go-redis/blob/master/RELEASE-NOTES.md)
- [Commits](https://github.com/redis/go-redis/compare/v9.7.0...v9.18.0)

---
updated-dependencies:
- dependency-name: github.com/redis/go-redis/v9
  dependency-version: 9.18.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-04-13 23:30:51 +00:00
Mohamed Sohail
23905a286f Merge pull request #1104 from Bahtya/fix/pubsub-connection-leak
fix: close pubsub connection on Subscribe error in CancelationPubSub
2026-04-13 13:58:23 +03:00
Bahtya
5586efeae7 test: add test for CancelationPubSub error path
Add TestCancelationPubSubReceiveError to verify that when
Receive() fails in CancelationPubSub(), an error is returned
and the pubsub connection is not leaked.

This provides test coverage for the pubsub.Close() fix that
was missing in the previous commit.

Bahtya
2026-04-09 20:11:53 +08:00
bahtya
dd3c923f44 fix: close pubsub connection on Subscribe error in CancelationPubSub
When redis.Subscribe succeeds but Receive() fails, the pubsub
connection was not being closed before returning the error. This caused
the subscriber goroutine to leak a Redis connection on each retry
iteration, eventually exhausting the connection pool.

Fixes #1095
2026-04-09 18:15:48 +08:00
4 changed files with 37 additions and 6 deletions

View File

@@ -1488,6 +1488,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
_, err := pubsub.Receive(ctx)
if err != nil {
pubsub.Close()
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
return pubsub, nil

View File

@@ -3274,6 +3274,29 @@ func TestCancelationPubSub(t *testing.T) {
mu.Unlock()
}
func TestCancelationPubSubReceiveError(t *testing.T) {
// Use a client connected to a non-existent Redis server to trigger
// a Receive() error. This verifies that the pubsub connection is
// closed on error, preventing connection leaks.
client := redis.NewClient(&redis.Options{
Addr: "localhost:0", // invalid port — connection will fail
})
r := NewRDB(client)
defer r.Close()
pubsub, err := r.CancelationPubSub()
if err == nil {
// If no error, we must clean up the pubsub.
if pubsub != nil {
pubsub.Close()
}
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
}
if pubsub != nil {
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
}
}
func TestWriteResult(t *testing.T) {
r := setup(t)
defer r.Close()

View File

@@ -12,7 +12,7 @@ require (
github.com/mattn/go-runewidth v0.0.16
github.com/mitchellh/go-homedir v1.1.0
github.com/prometheus/client_golang v1.11.1
github.com/redis/go-redis/v9 v9.7.0
github.com/redis/go-redis/v9 v9.18.0
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.0
@@ -21,7 +21,7 @@ require (
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gdamore/encoding v1.0.0 // indirect
@@ -45,6 +45,7 @@ require (
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf // indirect
golang.org/x/text v0.3.8 // indirect

View File

@@ -39,8 +39,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
@@ -173,6 +173,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
@@ -261,8 +263,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
@@ -312,10 +314,14 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=