mirror of
https://github.com/hibiken/asynq.git
synced 2026-05-09 09:36:01 +08:00
Compare commits
3 Commits
sohail/v0.
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
23905a286f | ||
|
|
5586efeae7 | ||
|
|
dd3c923f44 |
@@ -1488,6 +1488,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
|||||||
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
|
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
|
||||||
_, err := pubsub.Receive(ctx)
|
_, err := pubsub.Receive(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
pubsub.Close()
|
||||||
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
||||||
}
|
}
|
||||||
return pubsub, nil
|
return pubsub, nil
|
||||||
|
|||||||
@@ -3274,6 +3274,29 @@ func TestCancelationPubSub(t *testing.T) {
|
|||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCancelationPubSubReceiveError(t *testing.T) {
|
||||||
|
// Use a client connected to a non-existent Redis server to trigger
|
||||||
|
// a Receive() error. This verifies that the pubsub connection is
|
||||||
|
// closed on error, preventing connection leaks.
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:0", // invalid port — connection will fail
|
||||||
|
})
|
||||||
|
r := NewRDB(client)
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
pubsub, err := r.CancelationPubSub()
|
||||||
|
if err == nil {
|
||||||
|
// If no error, we must clean up the pubsub.
|
||||||
|
if pubsub != nil {
|
||||||
|
pubsub.Close()
|
||||||
|
}
|
||||||
|
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
|
||||||
|
}
|
||||||
|
if pubsub != nil {
|
||||||
|
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWriteResult(t *testing.T) {
|
func TestWriteResult(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
|||||||
Reference in New Issue
Block a user