mirror of
https://github.com/hibiken/asynq.git
synced 2026-04-14 10:51:50 +08:00
Compare commits
6 Commits
v0.26.0
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2eecc427df | ||
|
|
23905a286f | ||
|
|
5586efeae7 | ||
|
|
dd3c923f44 | ||
|
|
f81c78e68d | ||
|
|
2fd155e31d |
4
.github/workflows/benchstat.yml
vendored
4
.github/workflows/benchstat.yml
vendored
@@ -69,11 +69,11 @@ jobs:
|
||||
- name: Install benchstat
|
||||
run: go get -u golang.org/x/perf/cmd/benchstat
|
||||
- name: Download Incoming
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v8
|
||||
with:
|
||||
name: bench-incoming
|
||||
- name: Download Current
|
||||
uses: actions/download-artifact@v4
|
||||
uses: actions/download-artifact@v8
|
||||
with:
|
||||
name: bench-current
|
||||
- name: Benchstat Results
|
||||
|
||||
@@ -264,7 +264,9 @@ for i=1,2 do
|
||||
if (table.getn(ids) > 0) then
|
||||
for _, id in ipairs(ids) do
|
||||
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
||||
sample_total = sample_total + bytes
|
||||
if bytes then
|
||||
sample_total = sample_total + bytes
|
||||
end
|
||||
end
|
||||
local n = redis.call("LLEN", KEYS[i])
|
||||
local avg = sample_total / table.getn(ids)
|
||||
@@ -281,7 +283,9 @@ for i=3,6 do
|
||||
if (table.getn(ids) > 0) then
|
||||
for _, id in ipairs(ids) do
|
||||
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
||||
sample_total = sample_total + bytes
|
||||
if bytes then
|
||||
sample_total = sample_total + bytes
|
||||
end
|
||||
end
|
||||
local n = redis.call("ZCARD", KEYS[i])
|
||||
local avg = sample_total / table.getn(ids)
|
||||
@@ -304,13 +308,17 @@ if table.getn(groups) > 0 then
|
||||
local ids = redis.call("ZRANGE", group_key, 0, sample_size - 1)
|
||||
for _, id in ipairs(ids) do
|
||||
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
||||
agg_task_sample_total = agg_task_sample_total + bytes
|
||||
agg_task_sample_size = agg_task_sample_size + 1
|
||||
if bytes then
|
||||
agg_task_sample_total = agg_task_sample_total + bytes
|
||||
agg_task_sample_size = agg_task_sample_size + 1
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
local avg = agg_task_sample_total / agg_task_sample_size
|
||||
memusg = memusg + (avg * agg_task_count)
|
||||
if agg_task_sample_size > 0 then
|
||||
local avg = agg_task_sample_total / agg_task_sample_size
|
||||
memusg = memusg + (avg * agg_task_count)
|
||||
end
|
||||
end
|
||||
return memusg
|
||||
`)
|
||||
|
||||
@@ -1488,6 +1488,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
||||
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
|
||||
_, err := pubsub.Receive(ctx)
|
||||
if err != nil {
|
||||
pubsub.Close()
|
||||
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
||||
}
|
||||
return pubsub, nil
|
||||
|
||||
@@ -3274,6 +3274,29 @@ func TestCancelationPubSub(t *testing.T) {
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
func TestCancelationPubSubReceiveError(t *testing.T) {
|
||||
// Use a client connected to a non-existent Redis server to trigger
|
||||
// a Receive() error. This verifies that the pubsub connection is
|
||||
// closed on error, preventing connection leaks.
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:0", // invalid port — connection will fail
|
||||
})
|
||||
r := NewRDB(client)
|
||||
defer r.Close()
|
||||
|
||||
pubsub, err := r.CancelationPubSub()
|
||||
if err == nil {
|
||||
// If no error, we must clean up the pubsub.
|
||||
if pubsub != nil {
|
||||
pubsub.Close()
|
||||
}
|
||||
t.Fatal("(*RDB).CancelationPubSub() expected to return an error when redis is unreachable")
|
||||
}
|
||||
if pubsub != nil {
|
||||
t.Error("(*RDB).CancelationPubSub() expected nil pubsub on error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteResult(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
Reference in New Issue
Block a user