2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-14 22:55:58 +08:00

Compare commits

..

4 Commits

Author SHA1 Message Date
dependabot[bot]
5ae1ae1299 build(deps): bump golangci/golangci-lint-action from 6 to 9
Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 6 to 9.
- [Release notes](https://github.com/golangci/golangci-lint-action/releases)
- [Commits](https://github.com/golangci/golangci-lint-action/compare/v6...v9)

---
updated-dependencies:
- dependency-name: golangci/golangci-lint-action
  dependency-version: '9'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-04-13 23:45:24 +00:00
Mohamed Sohail
23905a286f Merge pull request #1104 from Bahtya/fix/pubsub-connection-leak
fix: close pubsub connection on Subscribe error in CancelationPubSub
2026-04-13 13:58:23 +03:00
Bahtya
5586efeae7 test: add test for CancelationPubSub error path
Add TestCancelationPubSubReceiveError to verify that when
Receive() fails in CancelationPubSub(), an error is returned
and the pubsub connection is not leaked.

This provides test coverage for the pubsub.Close() fix that
was missing in the previous commit.

Bahtya
2026-04-09 20:11:53 +08:00
bahtya
dd3c923f44 fix: close pubsub connection on Subscribe error in CancelationPubSub
When redis.Subscribe succeeds but Receive() fails, the pubsub
connection was not being closed before returning the error. This caused
the subscriber goroutine to leak a Redis connection on each retry
iteration, eventually exhausting the connection pool.

Fixes #1095
2026-04-09 18:15:48 +08:00
8 changed files with 41 additions and 38 deletions

View File

@@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.25.x, 1.26.x]
go-version: [1.24.x, 1.25.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -45,7 +45,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
go-version: [1.25.x, 1.26.x]
go-version: [1.24.x, 1.25.x]
runs-on: ${{ matrix.os }}
services:
redis:
@@ -79,6 +79,6 @@ jobs:
go-version: stable
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
uses: golangci/golangci-lint-action@v9
with:
version: v1.61

View File

@@ -64,12 +64,6 @@ Initialize your project by creating a folder and then running `go mod init githu
go get -u github.com/hibiken/asynq
```
You may use the latest features not available in the last tagged release by installing with:
```sh
go get -u github.com/hibiken/asynq@master
```
Make sure you're running a Redis server locally or from a [Docker](https://hub.docker.com/_/redis) container. Version `4.0` or higher is required.
Next, write a package that encapsulates task creation and task handling.

View File

@@ -1,8 +0,0 @@
coverage:
status:
project:
default:
informational: true
patch:
default:
informational: true

11
go.mod
View File

@@ -1,21 +1,20 @@
module github.com/hibiken/asynq
go 1.25.0
go 1.24.0
require (
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.18.0
github.com/redis/go-redis/v9 v9.14.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.10.0
go.uber.org/goleak v1.3.0
golang.org/x/sys v0.43.0
golang.org/x/time v0.15.0
google.golang.org/protobuf v1.36.11
golang.org/x/sys v0.37.0
golang.org/x/time v0.14.0
google.golang.org/protobuf v1.36.10
)
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
go.uber.org/atomic v1.11.0 // indirect
)

22
go.sum
View File

@@ -14,16 +14,14 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
@@ -32,17 +30,13 @@ github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -23,7 +23,7 @@ import (
)
// Version of asynq library and CLI.
const Version = "0.27.0"
const Version = "0.26.0"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"

View File

@@ -1488,6 +1488,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
_, err := pubsub.Receive(ctx)
if err != nil {
pubsub.Close()
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
return pubsub, nil

View File

@@ -3274,6 +3274,29 @@ func TestCancelationPubSub(t *testing.T) {
mu.Unlock()
}
func TestCancelationPubSubReceiveError(t *testing.T) {
// Use a client connected to a non-existent Redis server to trigger
// a Receive() error. This verifies that the pubsub connection is
// closed on error, preventing connection leaks.
client := redis.NewClient(&redis.Options{
Addr: "localhost:0", // invalid port — connection will fail
})
r := NewRDB(client)
defer r.Close()
pubsub, err := r.CancelationPubSub()
if err == nil {
// If no error, we must clean up the pubsub.
if pubsub != nil {
pubsub.Close()
}
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
}
if pubsub != nil {
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
}
}
func TestWriteResult(t *testing.T) {
r := setup(t)
defer r.Close()