Skip to content

Commit

Permalink
Merge pull request #36 from suborbital/connor/storage
Browse files Browse the repository at this point in the history
Create centralized storage for Hive jobs
  • Loading branch information
cohix committed Sep 23, 2020
2 parents 9f0c7db + 63019af commit ffb9177
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 65 deletions.
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -3,8 +3,9 @@ module github.com/suborbital/hive
go 1.14

require (
github.com/google/uuid v1.1.2
github.com/pkg/errors v0.9.1
github.com/suborbital/grav v0.0.11
github.com/suborbital/vektor v0.1.2
github.com/suborbital/vektor v0.1.3
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
)
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -2,6 +2,8 @@ github.com/google/go-cmp v0.4.1 h1:/exdXoGamhu5ONeUJH0deniYLWYvQwW66yvlfiiKTu0=
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -16,6 +18,8 @@ github.com/suborbital/vektor v0.1.1 h1:F3n9rS1F3nc+1Q2HZxeVNinvVkCRliVQ01+jRASct
github.com/suborbital/vektor v0.1.1/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
github.com/suborbital/vektor v0.1.2 h1:d4BvshbMl4wRVYPKO21vka7r89nlRrrZXidYQz07N9Q=
github.com/suborbital/vektor v0.1.2/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
github.com/suborbital/vektor v0.1.3 h1:rC5ic4FnjmcbizmV/WAQt67QkF6eJ7jHSsuy8IFC2bc=
github.com/suborbital/vektor v0.1.3/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
Expand Down
15 changes: 5 additions & 10 deletions hive/hive.go
Expand Up @@ -23,9 +23,10 @@ type Hive struct {

// New returns a Hive ready to accept Jobs
func New() *Hive {
logger := vlog.Default()
h := &Hive{
scheduler: newScheduler(),
log: vlog.Default(),
scheduler: newScheduler(logger),
log: logger,
}

return h
Expand All @@ -41,10 +42,7 @@ func (h *Hive) Handle(jobType string, runner Runnable, options ...Option) JobFun
h.handle(jobType, runner, options...)

helper := func(data interface{}) *Result {
job := Job{
jobType: jobType,
data: data,
}
job := NewJob(jobType, data)

return h.Do(job)
}
Expand All @@ -60,10 +58,7 @@ func (h *Hive) HandleMsg(pod *grav.Pod, msgType string, runner Runnable, options
h.handle(msgType, runner, options...)

helper := func(data interface{}) *Result {
job := Job{
jobType: msgType,
data: data,
}
job := NewJob(msgType, data)

return h.Do(job)
}
Expand Down
2 changes: 1 addition & 1 deletion hive/hive_test.go
Expand Up @@ -34,7 +34,7 @@ func TestHiveJob(t *testing.T) {

r := h.Do(h.Job("generic", "first"))

if r.ID == "" {
if r.UUID() == "" {
t.Error("result ID is empty")
}

Expand Down
55 changes: 43 additions & 12 deletions hive/job.go
Expand Up @@ -4,45 +4,70 @@ import (
"encoding/json"
"errors"

"github.com/google/uuid"
"github.com/suborbital/grav/grav"
)

// Job describes a job to be done
type Job struct {
// JobReference is a lightweight reference to a Job
type JobReference struct {
uuid string
jobType string
data interface{}
result *Result
}

// Job describes a job to be done
type Job struct {
JobReference
data interface{}
resultData interface{}
resultErr error
}

// NewJob creates a new job
func NewJob(jobType string, data interface{}) Job {
j := Job{
jobType: jobType,
data: data,
JobReference: JobReference{
uuid: uuid.New().String(),
jobType: jobType,
},
data: data,
}

return j
}

// UUID returns the Job's UUID
func (j JobReference) UUID() string {
return j.uuid
}

// Reference returns a reference to the Job
func (j Job) Reference() JobReference {
return j.JobReference
}

// Unmarshal unmarshals the job's data into a struct
func (j *Job) Unmarshal(target interface{}) error {
func (j Job) Unmarshal(target interface{}) error {
if bytes, ok := j.data.([]byte); ok {
return json.Unmarshal(bytes, target)
}

return errors.New("failed to Unmarshal, job data is not []byte")
}

func (j *Job) String() string {
if s, ok := j.data.(string); ok {
// String returns the string value of a job's data
func (j Job) String() string {
if s, isString := j.data.(string); isString {
return s
} else if b, isBytes := j.data.([]byte); isBytes {
return string(b)
}

return ""
}

// Bytes returns the []byte value of the job's data
func (j *Job) Bytes() []byte {
func (j Job) Bytes() []byte {
if v, ok := j.data.([]byte); ok {
return v
}
Expand All @@ -51,7 +76,7 @@ func (j *Job) Bytes() []byte {
}

// Int returns the int value of the job's data
func (j *Job) Int() int {
func (j Job) Int() int {
if v, ok := j.data.(int); ok {
return v
}
Expand All @@ -60,16 +85,22 @@ func (j *Job) Int() int {
}

// Data returns the "raw" data for the job
func (j *Job) Data() interface{} {
func (j Job) Data() interface{} {
return j.data
}

// Msg returns a grav.Message stored in the Job, if any
func (j *Job) Msg() grav.Message {
func (j Job) Msg() grav.Message {
msg, ok := j.data.(grav.Message)
if !ok {
return nil
}

return msg
}

// loadResult has a pointer reciever such that it actually modifies the object it's being called on
func (j *Job) loadResult(resultData interface{}, errString string) {
j.resultData = resultData
j.resultErr = errors.New(errString)
}
33 changes: 20 additions & 13 deletions hive/result.go
Expand Up @@ -3,26 +3,43 @@ package hive
import (
"encoding/json"

"github.com/suborbital/hive/util"

"github.com/pkg/errors"
)

// Result describes a result
type Result struct {
ID string
uuid string
data interface{}
err error

resultChan chan bool
errChan chan bool
removeFunc removeFunc
}

// ResultFunc is a result callback function.
type ResultFunc func(interface{}, error)

func newResult(uuid string, remove removeFunc) *Result {
r := &Result{
uuid: uuid,
resultChan: make(chan bool, 1), // buffered, so the result can be written and related goroutines can end before Then() is called
errChan: make(chan bool, 1),
removeFunc: remove,
}

return r
}

// UUID returns the result/job's UUID
func (r *Result) UUID() string {
return r.uuid
}

// Then returns the result or error from a Result
func (r *Result) Then() (interface{}, error) {
defer r.removeFunc(r.uuid)

select {
case <-r.resultChan:
return r.data, nil
Expand Down Expand Up @@ -80,16 +97,6 @@ func (r *Result) Discard() {
}()
}

func newResult() *Result {
r := &Result{
ID: util.GenerateResultID(),
resultChan: make(chan bool, 1), // buffered, so the result can be written and related goroutines can end before Then() is called
errChan: make(chan bool, 1),
}

return r
}

func (r *Result) sendResult(data interface{}) {
// if the result is another Result,
// wait for its result and recursively send it
Expand Down
23 changes: 13 additions & 10 deletions hive/scheduler.go
Expand Up @@ -5,33 +5,34 @@ import (
"sync"

"github.com/pkg/errors"
"github.com/suborbital/vektor/vlog"
)

type scheduler struct {
workers map[string]*worker

starter sync.Once
store Storage
logger *vlog.Logger
sync.Mutex
}

func newScheduler() *scheduler {
func newScheduler(logger *vlog.Logger) *scheduler {
s := &scheduler{
workers: map[string]*worker{},
store: newMemoryStorage(),
logger: logger,
Mutex: sync.Mutex{},
}

return s
}

func (s *scheduler) schedule(job Job) *Result {
s.starter.Do(func() {
if s.workers == nil {
s.workers = map[string]*worker{}
result := newResult(job.UUID(), func(uuid string) {
if err := s.store.Remove(uuid); err != nil {
s.logger.Error(errors.Wrap(err, "scheduler failed to Remove Job from storage"))
}
})

result := newResult()

worker := s.getWorker(job.jobType)
if worker == nil {
result.sendErr(fmt.Errorf("failed to getRunnable for jobType %q", job.jobType))
Expand All @@ -48,7 +49,9 @@ func (s *scheduler) schedule(job Job) *Result {
}

job.result = result
worker.schedule(job)
s.store.Add(job)

worker.schedule(job.Reference())
}()

return result
Expand All @@ -65,7 +68,7 @@ func (s *scheduler) handle(jobType string, runnable Runnable, options ...Option)
opts = o(opts)
}

w := newWorker(runnable, opts)
w := newWorker(runnable, s.store, opts)
if s.workers == nil {
s.workers = map[string]*worker{jobType: w}
} else {
Expand Down
4 changes: 2 additions & 2 deletions hive/server.go
Expand Up @@ -89,7 +89,7 @@ func (s *Server) scheduleHandler() vk.HandlerFunc {
s.addInFlight(res)

resp := doResponse{
ResultID: res.ID,
ResultID: res.UUID(),
}

return resp, nil
Expand Down Expand Up @@ -155,7 +155,7 @@ func (s *Server) addInFlight(r *Result) {
s.Lock()
defer s.Unlock()

s.inFlight[r.ID] = r
s.inFlight[r.UUID()] = r
}

func (s *Server) getInFlight(id string) *Result {
Expand Down

0 comments on commit ffb9177

Please sign in to comment.