Skip to content

Commit

Permalink
Merge pull request #50 from suborbital/connor/pre-warm
Browse files Browse the repository at this point in the history
Prewarm
  • Loading branch information
cohix committed Jan 21, 2021
2 parents 3e847e0 + 8a2a8dd commit 336ea76
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 7 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -36,7 +36,7 @@ func (g generic) Run(job hive.Job, ctx *hive.Ctx) (interface{}, error) {

// OnChange is called when Hive starts or stops a worker to handle jobs,
// and allows the Runnable to set up before receiving jobs or tear down if needed.
func (g generic) OnChange(change ChangeEvent) error {
func (g generic) OnChange(change hive.ChangeEvent) error {
return nil
}
```
Expand Down
8 changes: 7 additions & 1 deletion docs/getstarted.md
Expand Up @@ -21,7 +21,7 @@ func (r recursive) Run(job hive.Job, ctx *hive.Ctx) (interface{}, error) {
return fmt.Sprintf("finished %s", job.String()), nil
}

func (r recursive) OnChange(change ChangeEvent) error {
func (r recursive) OnChange(change hive.ChangeEvent) error {
return nil
}
```
Expand Down Expand Up @@ -129,6 +129,12 @@ doBad := h.Handle("badRunner", badRunner{}, hive.RetrySeconds(1), hive.MaxRetrie
```
Any error from a failed worker will be returned to the first job that is attempted for that Runnable.

### Pre-warming
When a Runnable is mounted, it is simply registered as available to receive work. The Runnable is not actually invoked until the first job of the given type is received. For basic Runnables, this is normally fine, but for Runnables who use the `OnChange` method to provision resources, this can cause the first job to be slow. The `PreWarm` option is available to allow Runnables to be started as soon as they are mounted, rather than waiting for the first job. This mitigates cold-starts when anything expensive is needed at startup.
```
doExpensive := h.Handle("expensive", expensiveRunnable{}, hive.PreWarm())
```

### Shortcuts

There are also some shortcuts to make working with Hive a bit easier:
Expand Down
34 changes: 34 additions & 0 deletions hive/hive_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/pkg/errors"
"github.com/suborbital/grav/testutil"
)

type generic struct{}
Expand Down Expand Up @@ -124,3 +125,36 @@ func TestHiveResultThenDo(t *testing.T) {
<-wait
<-wait
}

type prewarmRunnable struct {
counter *testutil.AsyncCounter
}

func (p *prewarmRunnable) Run(job Job, ctx *Ctx) (interface{}, error) {
return nil, nil
}

func (p *prewarmRunnable) OnChange(change ChangeEvent) error {
if change == ChangeTypeStart {
p.counter.Count()
}

return nil
}

func TestPreWarmWorker(t *testing.T) {
counter := testutil.NewAsyncCounter(10)

runnable := &prewarmRunnable{
counter: counter,
}

h := New()
h.Handle("prewarm", runnable, PoolSize(3), PreWarm())

// checking to see if the prewarmRunnable's OnChange function is called
// without ever sending it a job (see Runnable above)
if err := counter.Wait(3, 1); err != nil {
t.Error(err)
}
}
9 changes: 9 additions & 0 deletions hive/opts.go
Expand Up @@ -34,3 +34,12 @@ func MaxRetries(count int) Option {
return opts
}
}

// PreWarm sets the worker to pre-warm itself to minimize cold start time.
// if not enabled, worker will "warm up" when it receives its first job.
func PreWarm() Option {
return func(opts workerOpts) workerOpts {
opts.preWarm = true
return opts
}
}
13 changes: 9 additions & 4 deletions hive/scheduler.go
Expand Up @@ -71,10 +71,15 @@ func (s *scheduler) handle(jobType string, runnable Runnable, options ...Option)
}

w := newWorker(runnable, s.store, s.cache, opts)
if s.workers == nil {
s.workers = map[string]*worker{jobType: w}
} else {
s.workers[jobType] = w

s.workers[jobType] = w

if opts.preWarm {
go func() {
if err := w.start(s.schedule); err != nil {
// should log something here
}
}()
}
}

Expand Down
12 changes: 11 additions & 1 deletion hive/worker.go
Expand Up @@ -57,7 +57,7 @@ func (w *worker) schedule(job JobReference) {
}

func (w *worker) start(doFunc DoFunc) error {
// this should only be run once per worker
// this should only be run once per worker, unless startup fails the first time
if isStarted := w.started.Load().(bool); isStarted {
return nil
}
Expand Down Expand Up @@ -89,6 +89,12 @@ func (w *worker) start(doFunc DoFunc) error {
break
} else {
if attempts >= w.options.numRetries {
if started == 0 {
// if no threads were able to start, ensure that
// the next job causes another attempt
w.started.Store(false)
}

return fmt.Errorf("attempted to start worker %d times, Runnable returned error each time", w.options.numRetries)
}

Expand Down Expand Up @@ -141,6 +147,8 @@ func (wt *workThread) run(doFunc DoFunc) {
// wait for the next job
jobRef := <-wt.workChan

// TODO: check to see if the workThread pool is sufficient, and attempt to fill it if not

// fetch the full job from storage
job, err := wt.store.Get(jobRef.uuid)
if err != nil {
Expand Down Expand Up @@ -203,6 +211,7 @@ type workerOpts struct {
jobTimeoutSeconds int
numRetries int
retrySecs int
preWarm bool
}

func defaultOpts(jobType string) workerOpts {
Expand All @@ -212,6 +221,7 @@ func defaultOpts(jobType string) workerOpts {
jobTimeoutSeconds: 0,
retrySecs: 3,
numRetries: 5,
preWarm: false,
}

return o
Expand Down

0 comments on commit 336ea76

Please sign in to comment.