From dd3c923f4406085d36a8efb7299202a8133e0fd5 Mon Sep 17 00:00:00 2001 From: bahtya Date: Thu, 9 Apr 2026 18:15:48 +0800 Subject: [PATCH 1/2] fix: close pubsub connection on Subscribe error in CancelationPubSub When redis.Subscribe succeeds but Receive() fails, the pubsub connection was not being closed before returning the error. This caused the subscriber goroutine to leak a Redis connection on each retry iteration, eventually exhausting the connection pool. Fixes #1095 --- internal/rdb/rdb.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 22df506..ae680ef 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1488,6 +1488,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { pubsub := r.client.Subscribe(ctx, base.CancelChannel) _, err := pubsub.Receive(ctx) if err != nil { + pubsub.Close() return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err)) } return pubsub, nil From 5586efeae7939e60ff02b7e7caa22e0de8c15685 Mon Sep 17 00:00:00 2001 From: Bahtya Date: Thu, 9 Apr 2026 20:11:53 +0800 Subject: [PATCH 2/2] test: add test for CancelationPubSub error path Add TestCancelationPubSubReceiveError to verify that when Receive() fails in CancelationPubSub(), an error is returned and the pubsub connection is not leaked. This provides test coverage for the pubsub.Close() fix that was missing in the previous commit. Bahtya --- internal/rdb/rdb_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5249a29..eed4ad3 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3274,6 +3274,29 @@ func TestCancelationPubSub(t *testing.T) { mu.Unlock() } +func TestCancelationPubSubReceiveError(t *testing.T) { + // Use a client connected to a non-existent Redis server to trigger + // a Receive() error. This verifies that the pubsub connection is + // closed on error, preventing connection leaks. + client := redis.NewClient(&redis.Options{ + Addr: "localhost:0", // invalid port — connection will fail + }) + r := NewRDB(client) + defer r.Close() + + pubsub, err := r.CancelationPubSub() + if err == nil { + // If no error, we must clean up the pubsub. + if pubsub != nil { + pubsub.Close() + } + t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable") + } + if pubsub != nil { + t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error") + } +} + func TestWriteResult(t *testing.T) { r := setup(t) defer r.Close()