diff --git a/.gitignore b/.gitignore index 00268614..3c31f54c 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ _cgo_export.* _testmain.go *.exe +.vscode diff --git a/constantdelay_test.go b/constantdelay_test.go index f43a58ad..24a11731 100644 --- a/constantdelay_test.go +++ b/constantdelay_test.go @@ -26,7 +26,7 @@ func TestConstantDelayNext(t *testing.T) { {"Mon Jul 9 23:35:51 2012", 25*time.Hour + 44*time.Minute + 24*time.Second, "Thu Jul 11 01:20:15 2012"}, // Wrap around months - {"Mon Jul 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Oct 9 00:00 2012"}, + {"Mon Jun 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Sep 9 00:00 2012"}, // Don't do JUL->OCT otherwise daylight savings breaks the test in some time zones... // Wrap around minute, hour, day, month, and year {"Mon Dec 31 23:59:45 2012", 15 * time.Second, "Tue Jan 1 00:00:00 2013"}, diff --git a/cron.go b/cron.go index f6e451db..d08bd2cc 100644 --- a/cron.go +++ b/cron.go @@ -2,6 +2,8 @@ package cron import ( "context" + "math" + "math/rand" "sort" "sync" "time" @@ -11,19 +13,21 @@ import ( // specified by the schedule. It may be started, stopped, and the entries may // be inspected while running. type Cron struct { - entries []*Entry - chain Chain - stop chan struct{} - add chan *Entry - remove chan EntryID - snapshot chan chan []Entry - running bool - logger Logger - runningMu sync.Mutex - location *time.Location - parser Parser - nextID EntryID - jobWaiter sync.WaitGroup + entries []*Entry + chain Chain + stop chan struct{} + add chan *Entry + remove chan EntryID + snapshot chan chan []Entry + running bool + logger Logger + runningMu sync.Mutex + location *time.Location + parser Parser + nextID EntryID + jobWaiter sync.WaitGroup + timeCallback func() time.Time + recalculateTimer chan struct{} } // Job is an interface for submitted cron jobs. @@ -64,11 +68,32 @@ type Entry struct { // It is kept around so that user code that needs to get at the job later, // e.g. via Entries() can do so. Job Job + + // Add some jitter to the Entry's scheduled time. The actual jitter used + // will be randomly chosen within the range [0, Jitter]. The jitter will + // always be converted to a positive duration. + Jitter time.Duration } // Valid returns true if this is not the zero entry. func (e Entry) Valid() bool { return e.ID != 0 } +// Add a random amount of jitter to the input time using the maximum jitter +// amount. The calculated jitter is on the closed interval [0, jitterMaximum] +// where the only exception to this rule is if the jitter duration is the +// maximum value for an int64 which is on the closed interval [0, Int64Max]. +func calculateJitteredTime(now time.Time, jitterMaximum time.Duration) time.Time { + result := now + if jitterMaximum > 0 { + val := jitterMaximum.Nanoseconds() + if val < math.MinInt64 { + val = jitterMaximum.Nanoseconds() + 1 + } + result = result.Add(time.Duration(rand.Int63n(val))) + } + return result +} + // byTime is a wrapper for sorting the entry array by time // (with zero time at the end). type byTime []*Entry @@ -107,17 +132,19 @@ func (s byTime) Less(i, j int) bool { // See "cron.With*" to modify the default behavior. func New(opts ...Option) *Cron { c := &Cron{ - entries: nil, - chain: NewChain(), - add: make(chan *Entry), - stop: make(chan struct{}), - snapshot: make(chan chan []Entry), - remove: make(chan EntryID), - running: false, - runningMu: sync.Mutex{}, - logger: DefaultLogger, - location: time.Local, - parser: standardParser, + entries: nil, + chain: NewChain(), + add: make(chan *Entry), + stop: make(chan struct{}), + snapshot: make(chan chan []Entry), + remove: make(chan EntryID), + running: false, + runningMu: sync.Mutex{}, + logger: DefaultLogger, + location: time.Local, + parser: standardParser, + timeCallback: nil, + recalculateTimer: make(chan struct{}), } for _, opt := range opts { opt(c) @@ -134,23 +161,49 @@ func (f FuncJob) Run() { f() } // The spec is parsed using the time zone of this Cron instance as the default. // An opaque ID is returned that can be used to later remove it. func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { - return c.AddJob(spec, FuncJob(cmd)) + return c.AddJobWithJitter(spec, FuncJob(cmd), 0) +} + +// AddFuncWithJitter adds a func to the Cron to be run on the given schedule. +// The spec is parsed using the time zone of this Cron instance as the default. +// An opaque ID is returned that can be used to later remove it. +func (c *Cron) AddFuncWithJitter(spec string, cmd func(), jitter time.Duration) (EntryID, error) { + return c.AddJobWithJitter(spec, FuncJob(cmd), jitter) } // AddJob adds a Job to the Cron to be run on the given schedule. // The spec is parsed using the time zone of this Cron instance as the default. // An opaque ID is returned that can be used to later remove it. func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { + return c.AddJobWithJitter(spec, cmd, 0) +} + +// AddJobWithJitter adds a Job to the Cron to be run on the given schedule with +// a specified amount of jitter for each invocation. +// The spec is parsed using the time zone of this Cron instance as the default. +// An opaque ID is returned that can be used to later remove it. +func (c *Cron) AddJobWithJitter(spec string, cmd Job, jitter time.Duration) (EntryID, error) { schedule, err := c.parser.Parse(spec) if err != nil { return 0, err } - return c.Schedule(schedule, cmd), nil + return c.ScheduleWithJitter(schedule, cmd, jitter), nil } // Schedule adds a Job to the Cron to be run on the given schedule. // The job is wrapped with the configured Chain. func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { + return c.ScheduleWithJitter(schedule, cmd, 0) +} + +// ScheduleWithJitter adds a Job to the Cron to be run on the given schedule +// with a specified amount of jitter for each invocation. +// The job is wrapped with the configured Chain. +func (c *Cron) ScheduleWithJitter(schedule Schedule, cmd Job, jitter time.Duration) EntryID { + if jitter < 0 { + jitter = -jitter + } + c.runningMu.Lock() defer c.runningMu.Unlock() c.nextID++ @@ -159,6 +212,7 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { Schedule: schedule, WrappedJob: c.chain.Then(cmd), Job: cmd, + Jitter: jitter, } if !c.running { c.entries = append(c.entries, entry) @@ -229,6 +283,10 @@ func (c *Cron) Run() { c.run() } +func (c *Cron) RecalculateNextEvent() { + c.recalculateTimer <- struct{}{} +} + // run the scheduler.. this is private just due to the need to synchronize // access to the 'running' state variable. func (c *Cron) run() { @@ -237,7 +295,7 @@ func (c *Cron) run() { // Figure out the next activation times for each entry. now := c.now() for _, entry := range c.entries { - entry.Next = entry.Schedule.Next(now) + entry.Next = calculateJitteredTime(entry.Schedule.Next(now), entry.Jitter) c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next) } @@ -256,8 +314,9 @@ func (c *Cron) run() { for { select { - case now = <-timer.C: - now = now.In(c.location) + case <-timer.C: + // Note: we can't just use the value which comes back from timer.C, because that won't respect the timeCallback, if one is set. + now = c.now() c.logger.Info("wake", "now", now) // Run every entry whose next time was less than now @@ -267,14 +326,14 @@ func (c *Cron) run() { } c.startJob(e.WrappedJob) e.Prev = e.Next - e.Next = e.Schedule.Next(now) + e.Next = calculateJitteredTime(e.Schedule.Next(now), e.Jitter) c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) } case newEntry := <-c.add: timer.Stop() now = c.now() - newEntry.Next = newEntry.Schedule.Next(now) + newEntry.Next = calculateJitteredTime(newEntry.Schedule.Next(now), newEntry.Jitter) c.entries = append(c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) @@ -292,6 +351,9 @@ func (c *Cron) run() { now = c.now() c.removeEntry(id) c.logger.Info("removed", "entry", id) + case <-c.recalculateTimer: + now = c.now() + break } break @@ -310,7 +372,14 @@ func (c *Cron) startJob(j Job) { // now returns current time in c location func (c *Cron) now() time.Time { - return time.Now().In(c.location) + var result time.Time + if c.timeCallback == nil { + result = time.Now() + } else { + result = c.timeCallback() + } + result = result.In(c.location) + return result } // Stop stops the cron scheduler if it is running; otherwise it does nothing. diff --git a/option.go b/option.go index 07638201..ac0f4c4e 100644 --- a/option.go +++ b/option.go @@ -43,3 +43,10 @@ func WithLogger(logger Logger) Option { c.logger = logger } } + +// WithTimeCallback uses the provided time function callback for getting the current time. +func WithTimeCallback(timeCallback func() time.Time) Option { + return func(c *Cron) { + c.timeCallback = timeCallback + } +}