2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-19 00:46:00 +08:00

Compare commits

..

4 Commits

Author SHA1 Message Date
dependabot[bot]
f4440d9c7f build(deps): bump actions/checkout from 4 to 6
Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to 6.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v6)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-04-13 23:45:14 +00:00
Mohamed Sohail
23905a286f Merge pull request #1104 from Bahtya/fix/pubsub-connection-leak
fix: close pubsub connection on Subscribe error in CancelationPubSub
2026-04-13 13:58:23 +03:00
Bahtya
5586efeae7 test: add test for CancelationPubSub error path
Add TestCancelationPubSubReceiveError to verify that when
Receive() fails in CancelationPubSub(), an error is returned
and the pubsub connection is not leaked.

This provides test coverage for the pubsub.Close() fix that
was missing in the previous commit.

Bahtya
2026-04-09 20:11:53 +08:00
bahtya
dd3c923f44 fix: close pubsub connection on Subscribe error in CancelationPubSub
When redis.Subscribe succeeds but Receive() fails, the pubsub
connection was not being closed before returning the error. This caused
the subscriber goroutine to leak a Redis connection on each retry
iteration, eventually exhausting the connection pool.

Fixes #1095
2026-04-09 18:15:48 +08:00
4 changed files with 30 additions and 6 deletions

View File

@@ -17,7 +17,7 @@ jobs:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v5
with:
@@ -40,7 +40,7 @@ jobs:
- 6379:6379
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
ref: master
- name: Set up Go
@@ -61,7 +61,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v5
with:

View File

@@ -15,7 +15,7 @@ jobs:
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v5
@@ -53,7 +53,7 @@ jobs:
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v5
@@ -72,7 +72,7 @@ jobs:
runs-on: ubuntu-latest
if: false
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- uses: actions/setup-go@v5
with:

View File

@@ -1488,6 +1488,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
_, err := pubsub.Receive(ctx)
if err != nil {
pubsub.Close()
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
return pubsub, nil

View File

@@ -3274,6 +3274,29 @@ func TestCancelationPubSub(t *testing.T) {
mu.Unlock()
}
func TestCancelationPubSubReceiveError(t *testing.T) {
// Use a client connected to a non-existent Redis server to trigger
// a Receive() error. This verifies that the pubsub connection is
// closed on error, preventing connection leaks.
client := redis.NewClient(&redis.Options{
Addr: "localhost:0", // invalid port — connection will fail
})
r := NewRDB(client)
defer r.Close()
pubsub, err := r.CancelationPubSub()
if err == nil {
// If no error, we must clean up the pubsub.
if pubsub != nil {
pubsub.Close()
}
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
}
if pubsub != nil {
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
}
}
func TestWriteResult(t *testing.T) {
r := setup(t)
defer r.Close()