2
0
mirror of https://github.com/hibiken/asynq.git synced 2026-04-07 19:15:51 +08:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Ken Hibino
421dc584ff v0.18.4 2021-08-17 17:12:33 -07:00
Ken Hibino
cfd1a1dfe8 Make scheduler methods thread-safe 2021-08-17 17:10:53 -07:00
3 changed files with 17 additions and 1 deletions

View File

@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.18.4] - 2020-08-17
### Fixed
- Scheduler methods are now thread-safe. It's now safe to call `Register` and `Unregister` concurrently.
## [0.18.3] - 2020-08-09
### Changed

View File

@@ -23,7 +23,7 @@ import (
)
// Version of asynq library and CLI.
const Version = "0.18.3"
const Version = "0.18.4"
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"

View File

@@ -19,6 +19,8 @@ import (
)
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
//
// Schedulers are safe for concurrent use by multiple goroutines.
type Scheduler struct {
id string
state *base.ServerState
@@ -30,6 +32,9 @@ type Scheduler struct {
done chan struct{}
wg sync.WaitGroup
errHandler func(task *Task, opts []Option, err error)
// guards idmap
mu sync.Mutex
// idmap maps Scheduler's entry ID to cron.EntryID
// to avoid using cron.EntryID as the public API of
// the Scheduler.
@@ -154,17 +159,22 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
if err != nil {
return "", err
}
s.mu.Lock()
s.idmap[job.id.String()] = cronID
s.mu.Unlock()
return job.id.String(), nil
}
// Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID.
func (s *Scheduler) Unregister(entryID string) error {
s.mu.Lock()
defer s.mu.Unlock()
cronID, ok := s.idmap[entryID]
if !ok {
return fmt.Errorf("asynq: no scheduler entry found")
}
delete(s.idmap, entryID)
s.cron.Remove(cronID)
return nil
}