Skip to content

Commit

Permalink
Merge pull request #60 from suborbital/connor/schedule
Browse files Browse the repository at this point in the history
Scheduled jobs
  • Loading branch information
cohix committed Feb 24, 2021
2 parents ee788c9 + 2d86930 commit 636d99d
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -70,7 +70,7 @@ When you `Do` some work, you get a `Result`. A result is like a Rust future or a

Calling `Then()` will block until the job is complete, and then give you the return value from the Runnable's `Run`. Cool, right?

## Reactr has some very powerful capabilities, visit the [get started guide](./docs/getstarted.md) to learn more.
## Reactr has some very powerful capabilities, visit the [Reactr guide](./docs/guide.md) to learn more.

Reactr is being actively developed and has planned improvements, including optimized memory usage, library stability, data persistence, and more. Cheers!

Expand Down
25 changes: 25 additions & 0 deletions docs/getstarted.md → docs/guide.md
Expand Up @@ -119,6 +119,31 @@ doTimeout := r.Handle("timeout", timeoutRunner{}, rt.TimeoutSeconds(3))
```
When `TimeoutSeconds` is set and a job executes for longer than the provided number of seconds, the worker will move on to the next job and `ErrJobTimeout` will be returned to the Result. The failed job will continue to execute in the background, but its result will be discarded.

### Schedules
The `r.Do` method will run your job immediately, but if you need to run a job at a later time, at a regular interval, or on some other schedule, then the `Schedule` interface will help. The `Schedule` interface allows for an object to choose when to execute a job. Any object that conforms to the interface can be used as a Schedule:
```golang
// Schedule is a type that returns an *optional* job if there is something that should be scheduled.
// Reactr will poll the Check() method at regular intervals to see if work is available.
type Schedule interface {
Check() *Job
Done() bool
}
```
The `r.Schedule` method will allow you to register a Schedule, and there are two built-in schedules(`Every` and `After`) to help:
```golang
r := rt.New()

r.Handle("worker", &workerRunner{})

// runs every hour
r.Schedule(rt.Every(60*60, func() Job {
return NewJob("worker", nil)
}))
```
Reactr will poll all registered Schedules at a 1 second interval to `Check` for new jobs. Schedules can end their own execution by returning `false` from the `Done` method. You can use the Schedules provided with Reactr or develop your own.

Scheduled jobs' results are discarded automatically using `Discard()`

### Advanced Runnables

The `Runnable` interface defines an `OnChange` function which gives the Runnable a chance to prepare itself for changes to the worker running it. For example, when a Runnable is registered with a pool size greater than 1, the Runnable may need to provision resources for itself to enable handling jobs concurrently, and `OnChange` will be called once each time a new worker starts up. Our [Wasm implementation](https://github.com/suborbital/reactr/blob/master/rwasm/wasmrunnable.go) is a good example of this.
Expand Down
16 changes: 11 additions & 5 deletions rt/reactr.go
Expand Up @@ -20,8 +20,8 @@ type JobFunc func(interface{}) *Result

// Reactr represents the main control object
type Reactr struct {
*scheduler
log *vlog.Logger
scheduler *scheduler
log *vlog.Logger
}

// New returns a Reactr ready to accept Jobs
Expand All @@ -39,12 +39,18 @@ func New() *Reactr {

// Do schedules a job to be worked on and returns a result object
func (h *Reactr) Do(job Job) *Result {
return h.schedule(job)
return h.scheduler.schedule(job)
}

// Schedule adds a new Schedule to the instance, Reactr will 'watch' the Schedule
// and Do any jobs when the Schedule indicates it's needed
func (h *Reactr) Schedule(s Schedule) {
h.scheduler.watch(s)
}

// Handle registers a Runnable with the Reactr and returns a shortcut function to run those jobs
func (h *Reactr) Handle(jobType string, runner Runnable, options ...Option) JobFunc {
h.handle(jobType, runner, options...)
h.scheduler.handle(jobType, runner, options...)

helper := func(data interface{}) *Result {
job := NewJob(jobType, data)
Expand All @@ -58,7 +64,7 @@ func (h *Reactr) Handle(jobType string, runner Runnable, options ...Option) JobF
// HandleMsg registers a Runnable with the Reactr and triggers that job whenever the provided Grav pod
// receives a message of a particular type.
func (h *Reactr) HandleMsg(pod *grav.Pod, msgType string, runner Runnable, options ...Option) {
h.handle(msgType, runner, options...)
h.scheduler.handle(msgType, runner, options...)

h.Listen(pod, msgType)
}
Expand Down
84 changes: 84 additions & 0 deletions rt/schedule.go
@@ -0,0 +1,84 @@
package rt

import "time"

/**
This is not to be confused with the `scheduler` type, which is internal to the Reactr instance and actually schedules
jobs for the registered workers. `Schedule` is an external type that allows the caller to define a literal schedule
for jobs to run.
**/

// Schedule is a type that returns an *optional* job if there is something that should be scheduled.
// Reactr will poll the Check() method at regular intervals to see if work is available.
type Schedule interface {
Check() *Job
Done() bool
}

type everySchedule struct {
jobFunc func() Job
seconds int
last *time.Time
}

// Every returns a Schedule that will schedule the job provided by jobFunc every x seconds
func Every(seconds int, jobFunc func() Job) Schedule {
e := &everySchedule{
jobFunc: jobFunc,
seconds: seconds,
}

return e
}

func (e *everySchedule) Check() *Job {
now := time.Now()

// return a job if this schedule has never been checked OR the 'last' job was more than x seconds ago
if e.last == nil || time.Since(*e.last).Seconds() >= float64(e.seconds) {
e.last = &now

job := e.jobFunc()
return &job
}

return nil
}

func (e *everySchedule) Done() bool {
return false
}

type afterSchedule struct {
jobFunc func() Job
seconds int
created time.Time
done bool
}

// After returns a schedule that will schedule the job provided by jobFunc one time x seconds after creation
func After(seconds int, jobFunc func() Job) Schedule {
a := &afterSchedule{
jobFunc: jobFunc,
seconds: seconds,
created: time.Now(),
done: false,
}

return a
}

func (a *afterSchedule) Check() *Job {
if time.Since(a.created).Seconds() >= float64(a.seconds) {
a.done = true
job := a.jobFunc()

return &job
}

return nil
}

func (a *afterSchedule) Done() bool {
return a.done
}
35 changes: 35 additions & 0 deletions rt/schedule_test.go
@@ -0,0 +1,35 @@
package rt

import (
"testing"

"github.com/suborbital/grav/testutil"
)

type counterRunner struct {
counter *testutil.AsyncCounter
}

func (c *counterRunner) Run(job Job, ctx *Ctx) (interface{}, error) {
c.counter.Count()

return nil, nil
}

func (c *counterRunner) OnChange(change ChangeEvent) error { return nil }

func TestScheduleAfter(t *testing.T) {
r := New()

counter := testutil.NewAsyncCounter(10)

r.Handle("counter", &counterRunner{counter})

r.Schedule(After(2, func() Job {
return NewJob("counter", nil)
}))

if err := counter.Wait(1, 3); err != nil {
t.Error(err)
}
}
25 changes: 16 additions & 9 deletions rt/scheduler.go
Expand Up @@ -10,10 +10,11 @@ import (

type scheduler struct {
workers map[string]*worker
watcher *watcher
store Storage
cache Cache
logger *vlog.Logger
sync.Mutex
lock sync.Mutex
}

func newScheduler(logger *vlog.Logger, cache Cache) *scheduler {
Expand All @@ -22,22 +23,24 @@ func newScheduler(logger *vlog.Logger, cache Cache) *scheduler {
store: newMemoryStorage(),
cache: cache,
logger: logger,
Mutex: sync.Mutex{},
lock: sync.Mutex{},
}

s.watcher = newWatcher(s.schedule)

return s
}

func (s *scheduler) schedule(job Job) *Result {
result := newResult(job.UUID(), func(uuid string) {
if err := s.store.Remove(uuid); err != nil {
s.logger.Error(errors.Wrap(err, "scheduler failed to Remove Job from storage"))
s.logger.Error(errors.Wrapf(err, "scheduler failed to Remove Job %s from storage", uuid))
}
})

worker := s.getWorker(job.jobType)
if worker == nil {
result.sendErr(fmt.Errorf("failed to getRunnable for jobType %q", job.jobType))
result.sendErr(fmt.Errorf("failed to getWorker for jobType %q", job.jobType))
return result
}

Expand All @@ -61,8 +64,8 @@ func (s *scheduler) schedule(job Job) *Result {

// handle adds a handler
func (s *scheduler) handle(jobType string, runnable Runnable, options ...Option) {
s.Lock()
defer s.Unlock()
s.lock.Lock()
defer s.lock.Unlock()

// apply the provided options
opts := defaultOpts(jobType)
Expand All @@ -77,15 +80,19 @@ func (s *scheduler) handle(jobType string, runnable Runnable, options ...Option)
if opts.preWarm {
go func() {
if err := w.start(s.schedule); err != nil {
// should log something here
s.logger.Error(errors.Wrapf(err, "failed to preWarm %s worker", jobType))
}
}()
}
}

func (s *scheduler) watch(sched Schedule) {
s.watcher.watch(sched)
}

func (s *scheduler) getWorker(jobType string) *worker {
s.Lock()
defer s.Unlock()
s.lock.Lock()
defer s.lock.Unlock()

if s.workers == nil {
return nil
Expand Down
35 changes: 35 additions & 0 deletions rt/tester/main.go
@@ -0,0 +1,35 @@
package main

import (
"fmt"
"time"

"github.com/suborbital/reactr/rt"
)

func main() {
r := rt.New()

r.Handle("print", &printJob{})

r.Do(rt.NewJob("print", "start")).Discard()

r.Schedule(rt.Every(5, func() rt.Job {
return rt.NewJob("print", "every 5")
}))

r.Schedule(rt.After(7, func() rt.Job {
return rt.NewJob("print", "after 7")
}))

time.Sleep(time.Second * 25)
}

type printJob struct{}

func (p *printJob) Run(job rt.Job, ctx *rt.Ctx) (interface{}, error) {
fmt.Println(job.String())
return nil, nil
}

func (p *printJob) OnChange(c rt.ChangeEvent) error { return nil }
72 changes: 72 additions & 0 deletions rt/watcher.go
@@ -0,0 +1,72 @@
package rt

import (
"sync"
"time"

"github.com/google/uuid"
)

// watcher holds a set of schedules and "watches"
// them for new jobs to send to the scheduler
type watcher struct {
schedules map[string]Schedule
scheduleFunc func(Job) *Result

lock sync.RWMutex
startOnce sync.Once
}

func newWatcher(scheduleFunc func(Job) *Result) *watcher {
w := &watcher{
schedules: map[string]Schedule{},
scheduleFunc: scheduleFunc,
lock: sync.RWMutex{},
startOnce: sync.Once{},
}

return w
}

func (w *watcher) watch(sched Schedule) {
w.lock.Lock()
defer w.lock.Unlock()

w.schedules[uuid.New().String()] = sched

// we only want to start the ticker if something is actually set up
// to be scheduled, so we put it behind a sync.Once
w.startOnce.Do(func() {
go func() {
ticker := time.Tick(time.Second)

// loop forever and check each schedule for new jobs
// repeating every second
for {
remove := []string{}

w.lock.RLock()
for uuid, s := range w.schedules {
if s.Done() {
// set the schedule to be removed if it's done
remove = append(remove, uuid)
} else {
if job := s.Check(); job != nil {
// schedule the job and discard the result
w.scheduleFunc(*job).Discard()
}
}
}
w.lock.RUnlock()

w.lock.Lock()
for _, uuid := range remove {
delete(w.schedules, uuid)
}
w.lock.Unlock()

<-ticker
}
}()
})
}

0 comments on commit 636d99d

Please sign in to comment.