Skip to content

Commit

Permalink
Merge pull request #34 from suborbital/connor/handlemsg
Browse files Browse the repository at this point in the history
Added HandleMsg and documentation for Grav integration
  • Loading branch information
cohix committed Sep 8, 2020
2 parents a10159e + 44b348a commit ab9ec6b
Show file tree
Hide file tree
Showing 15 changed files with 216 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -26,7 +26,7 @@ And then get started by defining something `Runnable`:
type generic struct{}

// Run runs a generic job
func (g generic) Run(job hive.Job, run hive.RunFunc) (interface{}, error) {
func (g generic) Run(job hive.Job, do hive.DoFunc) (interface{}, error) {
fmt.Println("doing job:", job.String()) // get the string value of the job's data

// do your work here
Expand Down
30 changes: 17 additions & 13 deletions docs/getstarted.md
Expand Up @@ -9,13 +9,13 @@ There are some more complicated things you can do with Runnables:
type recursive struct{}

// Run runs a recursive job
func (r recursive) Run(job hive.Job, run hive.RunFunc) (interface{}, error) {
func (r recursive) Run(job hive.Job, do hive.DoFunc) (interface{}, error) {
fmt.Println("doing job:", job.String())

if job.String() == "first" {
return run(hive.NewJob("recursive", "second")), nil
return do(hive.NewJob("recursive", "second")), nil
} else if job.String() == "second" {
return run(hive.NewJob("recursive", "last")), nil
return do(hive.NewJob("recursive", "last")), nil
}

return fmt.Sprintf("finished %s", job.String()), nil
Expand All @@ -25,9 +25,9 @@ func (r recursive) OnStart() error {
return nil
}
```
The `hive.RunFunc` that you see there is a way for your Runnable to, well, run more things!
The `hive.DoFunc` that you see there is a way for your Runnable to, well, run more things!

Calling the `RunFunc` will schedule another job to be executed and give you a `Result`. If you return a `Result` from `Run`, then the caller will recursively recieve that `Result` when they call `Then()`!
Calling the `DoFunc` will schedule another job to be executed and give you a `Result`. If you return a `Result` from `Run`, then the caller will recursively recieve that `Result` when they call `Then()`!

For example:
```golang
Expand All @@ -47,7 +47,7 @@ doing job: second
doing job: last
done! finished last
```
Think about that for a minute, and let it sink in, it can be quite powerful!
The ability to chain jobs is quite powerful!

You won't always need or care about a job's output, and in those cases, make sure to call `Discard()` on the result to allow the underlying resources to be deallocated!
```golang
Expand All @@ -72,9 +72,9 @@ A hive `Group` is a set of `Result`s that belong together. If you're familiar wi
```golang
grp := hive.NewGroup()

grp.Add(run(hive.NewJob("recursive", "first")))
grp.Add(run(hive.NewJob("generic", "group work")))
grp.Add(run(hive.NewJob("generic", "group work")))
grp.Add(do(hive.NewJob("recursive", "first")))
grp.Add(do(hive.NewJob("generic", "group work")))
grp.Add(do(hive.NewJob("generic", "group work")))

if err := grp.Wait(); err != nil {
log.Fatal(err)
Expand All @@ -95,7 +95,7 @@ Note that you cannot get result values from result groups, the error returned fr
**TIP** If you return a group from a Runnable's `Run`, calling `Then()` on the result will recursively call `Wait()` on the group and return the error to the original caller! You can easily chain jobs and job groups in various orders.

### Pools
Each `Runnable` that you register is given a worker to process their jobs. By default, each worker has one goroutine processing jobs in sequence. If you want a particular worker to process more than one job concurrently, you can increase its `PoolSize`:
Each `Runnable` that you register is given a worker to process their jobs. By default, each worker has one work thread processing jobs in sequence. If you want a particular worker to process more than one job concurrently, you can increase its `PoolSize`:
```golang
doGeneric := h.Handle("generic", generic{}, hive.PoolSize(3))

Expand All @@ -108,7 +108,7 @@ if err := grp.Wait(); err != nil {
log.Fatal(err)
}
```
Passing `PoolSize(3)` will spawn three goroutines to process `generic` jobs.
Passing `PoolSize(3)` will spawn three work threads to process `generic` jobs.

### Timeouts
By default, if a job becomes stuck and is blocking execution, it will block forever. If you want to have a worker time out after a certain amount of seconds on a stuck job, pass `hive.TimeoutSeconds` to Handle:
Expand All @@ -125,7 +125,7 @@ The `Runnable` interface defines an `OnStart` function which gives the Runnable

Most Runnables can return `nil` from this function, however returning an error will cause the worker start to be paused and retried until the required pool size has been created. The number of seconds between retries (default 3) and the maximum number of retries (default 5) can be configured when registering a Runnable:
```golang
doBad := h.Handle("badRunner", badRunner{}, RetrySeconds(1), MaxRetries(1))
doBad := h.Handle("badRunner", badRunner{}, hive.RetrySeconds(1), hive.MaxRetries(10))
```
Any error from a failed worker will be returned to the first job that is attempted for that Runnable.

Expand All @@ -140,7 +140,7 @@ type input struct {
type math struct{}

// Run runs a math job
func (g math) Run(job hive.Job, run hive.RunFunc) (interface{}, error) {
func (g math) Run(job hive.Job, do hive.DoFunc) (interface{}, error) {
in := job.Data().(input)

return in.First + in.Second, nil
Expand All @@ -156,4 +156,8 @@ for i := 1; i < 10; i++ {
```
The `Handle` function returns an optional helper function. Instead of passing a job name and full `Job` into `h.Do`, you can use the helper function to instead just pass the input data for the job, and you receive a `Result` as normal. `doMath`!

## Additional features

Hive can integrate with [Grav](https://github.com/suborbital/grav), which is the decentralized message bus developed as part of the Suborbital development framework. Read about the integration on [the grav documentation page.](./grav.md)

Hive provides the building blocks for scalable asynchronous systems. This should be everything you need to help you improve the performance of your application. When you are looking to take advantage of Hive's other features, check out its [FaaS](./faas.md) and [WASM](./wasm.md) capabilities!
17 changes: 17 additions & 0 deletions docs/grav.md
@@ -0,0 +1,17 @@
# Hive ➕ Grav

Hive is designed to integrate with the other [Suborbital](https://suborbital.dev) projects such as [Grav](https://github.com/suborbital/grav). Grav is a decentralized message bus which allows for your application code to communicate in a scalable, resilient way.

## Handle Messages
Hive can respond to messages by connecting to a `grav.Pod` using `HandleMsg`:
```golang
hive := hive.New()
g := grav.New()

hive.HandleMsg(g.Connect(), msgTypeLogin, &loginEmailRunner{})
```
Whenever a message with the given type is received from the bus, a `Job` will be queued to be handled by the provided Runnable. The `Job` will contain the message, and `job.Msg()` makes it easy to retreive (with type conversions happening automatically).

The result returned by the Runnable's `Run` function should be a `grav.Message`. If so, it will be sent back out over the message bus. If `Run` returns an error or a result that is not a `grav.Message`, a message with type `hive.joberr` or `hive.typeerr` (respectively) will be sent. If `Run` returns `nil, nil`, then nothing will be sent.

Further integrations with `Grav` are in the works, along with improvements to Hive's [FaaS](./faas.md) capabilities, which is powered by Suborbital's [Vektor](https://github.com/suborbital/vektor) framework.
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/pkg/errors v0.9.1
github.com/suborbital/vektor v0.1.1
github.com/suborbital/grav v0.0.11
github.com/suborbital/vektor v0.1.2
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
)
8 changes: 8 additions & 0 deletions go.sum
@@ -1,13 +1,21 @@
github.com/google/go-cmp v0.4.1 h1:/exdXoGamhu5ONeUJH0deniYLWYvQwW66yvlfiiKTu0=
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/sethvargo/go-envconfig v0.3.0 h1:9xW3N/jvX6TkJzY99pW4WPq8tMYQElwWZinf0P9fpXY=
github.com/sethvargo/go-envconfig v0.3.0/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0=
github.com/suborbital/grav v0.0.10 h1:guf0PEBwqnwFCUOReStx+RddqSN1j81x+78c1bX/MI4=
github.com/suborbital/grav v0.0.10/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw=
github.com/suborbital/grav v0.0.11 h1:5S63w/Z/2ZsiekIDhg+CQxiVcxLp0vM0UaREOdul3I0=
github.com/suborbital/grav v0.0.11/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw=
github.com/suborbital/vektor v0.1.1 h1:F3n9rS1F3nc+1Q2HZxeVNinvVkCRliVQ01+jRASctH4=
github.com/suborbital/vektor v0.1.1/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
github.com/suborbital/vektor v0.1.2 h1:d4BvshbMl4wRVYPKO21vka7r89nlRrrZXidYQz07N9Q=
github.com/suborbital/vektor v0.1.2/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
Expand Down
12 changes: 6 additions & 6 deletions hive/group_test.go
Expand Up @@ -54,14 +54,14 @@ func TestLargeGroupWithPool(t *testing.T) {
type groupWork struct{}

// Run runs a groupWork job
func (g groupWork) Run(job Job, run RunFunc) (interface{}, error) {
func (g groupWork) Run(job Job, do DoFunc) (interface{}, error) {
grp := NewGroup()

grp.Add(run(NewJob("generic", "first")))
grp.Add(run(NewJob("generic", "group work")))
grp.Add(run(NewJob("generic", "group work")))
grp.Add(run(NewJob("generic", "group work")))
grp.Add(run(NewJob("generic", "group work")))
grp.Add(do(NewJob("generic", "first")))
grp.Add(do(NewJob("generic", "group work")))
grp.Add(do(NewJob("generic", "group work")))
grp.Add(do(NewJob("generic", "group work")))
grp.Add(do(NewJob("generic", "group work")))

return grp, nil
}
Expand Down
62 changes: 58 additions & 4 deletions hive/hive.go
@@ -1,19 +1,31 @@
package hive

import "github.com/suborbital/vektor/vk"
import (
"github.com/pkg/errors"
"github.com/suborbital/grav/grav"
"github.com/suborbital/vektor/vk"
"github.com/suborbital/vektor/vlog"
)

//DoFunc is a function that runs a job of a predetermined type
type DoFunc func(interface{}) *Result
const (
msgTypeHiveJobErr = "hive.joberr"
msgTypeHiveTypeErr = "hive.typeerr"
)

// JobFunc is a function that runs a job of a predetermined type
type JobFunc func(interface{}) *Result

// Hive represents the main control object
type Hive struct {
*scheduler
log *vlog.Logger
}

// New returns a Hive ready to accept Jobs
func New() *Hive {
h := &Hive{
scheduler: newScheduler(),
log: vlog.Default(),
}

return h
Expand All @@ -25,7 +37,7 @@ func (h *Hive) Do(job Job) *Result {
}

// Handle registers a Runnable with the Hive and returns a shortcut function to run those jobs
func (h *Hive) Handle(jobType string, runner Runnable, options ...Option) DoFunc {
func (h *Hive) Handle(jobType string, runner Runnable, options ...Option) JobFunc {
h.handle(jobType, runner, options...)

helper := func(data interface{}) *Result {
Expand All @@ -40,6 +52,48 @@ func (h *Hive) Handle(jobType string, runner Runnable, options ...Option) DoFunc
return helper
}

// HandleMsg registers a Runnable with the Hive and triggers that job whenever the provided Grav pod
// receives a message of a particular type. The message is passed to the runnable as the job data.
// The job's result is then emitted as a message. If the result cannot be cast to type grav.Message,
// or if an error occurs, it is logged and an error is sent. If the result is nil, nothing is sent.
func (h *Hive) HandleMsg(pod *grav.Pod, msgType string, runner Runnable, options ...Option) {
h.handle(msgType, runner, options...)

helper := func(data interface{}) *Result {
job := Job{
jobType: msgType,
data: data,
}

return h.Do(job)
}

pod.OnType(func(msg grav.Message) error {
var resultMsg grav.Message

result, err := helper(msg).Then()
if err != nil {
h.log.Error(errors.Wrap(err, "job returned error result"))
resultMsg = grav.NewMsg(msgTypeHiveJobErr, []byte(err.Error()))
} else {
if result == nil {
return nil
}

var ok bool
resultMsg, ok = result.(grav.Message)
if !ok {
h.log.Error(errors.Wrap(err, "job result is not a grav.Message, discarding"))
resultMsg = grav.NewMsg(msgTypeHiveTypeErr, []byte("failed to convert job result to grav.Message type"))
}
}

pod.Send(resultMsg)

return nil
}, msgType)
}

// Job is a shorter alias for NewJob
func (h *Hive) Job(jobType string, data interface{}) Job {
return NewJob(jobType, data)
Expand Down
8 changes: 4 additions & 4 deletions hive/hive_test.go
Expand Up @@ -11,11 +11,11 @@ import (
type generic struct{}

// Run runs a generic job
func (g generic) Run(job Job, run RunFunc) (interface{}, error) {
func (g generic) Run(job Job, do DoFunc) (interface{}, error) {
if job.String() == "first" {
return run(NewJob("generic", "second")), nil
return do(NewJob("generic", "second")), nil
} else if job.String() == "second" {
return run(NewJob("generic", "last")), nil
return do(NewJob("generic", "last")), nil
} else if job.String() == "fail" {
return nil, errors.New("error!")
}
Expand Down Expand Up @@ -55,7 +55,7 @@ type input struct {
type math struct{}

// Run runs a math job
func (g math) Run(job Job, run RunFunc) (interface{}, error) {
func (g math) Run(job Job, do DoFunc) (interface{}, error) {
in := job.Data().(input)

return in.First + in.Second, nil
Expand Down
12 changes: 12 additions & 0 deletions hive/job.go
Expand Up @@ -3,6 +3,8 @@ package hive
import (
"encoding/json"
"errors"

"github.com/suborbital/grav/grav"
)

// Job describes a job to be done
Expand Down Expand Up @@ -61,3 +63,13 @@ func (j *Job) Int() int {
func (j *Job) Data() interface{} {
return j.data
}

// Msg returns a grav.Message stored in the Job, if any
func (j *Job) Msg() grav.Message {
msg, ok := j.data.(grav.Message)
if !ok {
return nil
}

return msg
}
75 changes: 75 additions & 0 deletions hive/message_test.go
@@ -0,0 +1,75 @@
package hive

import (
"fmt"
"testing"

"github.com/pkg/errors"
"github.com/suborbital/grav/grav"
"github.com/suborbital/grav/testutil"
)

const msgTypeTester = "hive.test"

type msgRunner struct{}

func (m *msgRunner) Run(job Job, do DoFunc) (interface{}, error) {
msg := job.Msg()
if msg == nil {
return nil, errors.New("not a message")
}

name := string(msg.Data())

reply := grav.NewMsg(msgTypeTester, []byte(fmt.Sprintf("hello, %s", name)))

return reply, nil
}

func (m *msgRunner) OnStart() error { return nil }

func TestHandleMessage(t *testing.T) {
hive := New()
g := grav.New()

hive.HandleMsg(g.Connect(), msgTypeTester, &msgRunner{})

counter := testutil.NewAsyncCounter(10)

sender := g.Connect()

sender.OnType(func(msg grav.Message) error {
counter.Count()
return nil
}, msgTypeTester)

sender.Send(grav.NewMsg(msgTypeTester, []byte("charlie brown")))

if err := counter.Wait(1, 1); err != nil {
t.Error(errors.Wrap(err, "failed to counter.Wait"))
}
}

func TestHandleMessagePt2(t *testing.T) {
hive := New()
g := grav.New()

hive.HandleMsg(g.Connect(), msgTypeTester, &msgRunner{})

counter := testutil.NewAsyncCounter(10000)

sender := g.Connect()

sender.OnType(func(msg grav.Message) error {
counter.Count()
return nil
}, msgTypeTester)

for i := 0; i < 9876; i++ {
sender.Send(grav.NewMsg(msgTypeTester, []byte("charlie brown")))
}

if err := counter.Wait(9876, 1); err != nil {
t.Error(errors.Wrap(err, "failed to counter.Wait"))
}
}

0 comments on commit ab9ec6b

Please sign in to comment.