Skip to content

Commit

Permalink
Merge pull request #40 from suborbital/connor/listen
Browse files Browse the repository at this point in the history
Split HandleMsg up and added Listen
  • Loading branch information
cohix committed Nov 13, 2020
2 parents 89e2ab2 + a684e40 commit 7e4fa44
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 22 deletions.
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -5,7 +5,7 @@ go 1.14
require (
github.com/google/uuid v1.1.2
github.com/pkg/errors v0.9.1
github.com/suborbital/grav v0.0.11
github.com/suborbital/vektor v0.1.3
github.com/suborbital/grav v0.1.0
github.com/suborbital/vektor v0.2.2
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
)
24 changes: 24 additions & 0 deletions go.sum
@@ -1,40 +1,64 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.4.1 h1:/exdXoGamhu5ONeUJH0deniYLWYvQwW66yvlfiiKTu0=
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/schollz/peerdiscovery v1.6.0/go.mod h1:hSU7N/NkfNH6AZwU/WBcDZtMABVbTfAWk/XD3XKxN+s=
github.com/schollz/progressbar/v2 v2.15.0/go.mod h1:UdPq3prGkfQ7MOzZKlDRpYKcFqEMczbD7YmbPgpzKMI=
github.com/sethvargo/go-envconfig v0.3.0 h1:9xW3N/jvX6TkJzY99pW4WPq8tMYQElwWZinf0P9fpXY=
github.com/sethvargo/go-envconfig v0.3.0/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0=
github.com/sethvargo/go-envconfig v0.3.2 h1:277Lb2iTpUZjUZu1qLoLa/aetwvtZbKh8wNWXmc6dSk=
github.com/sethvargo/go-envconfig v0.3.2/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/suborbital/grav v0.0.10 h1:guf0PEBwqnwFCUOReStx+RddqSN1j81x+78c1bX/MI4=
github.com/suborbital/grav v0.0.10/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw=
github.com/suborbital/grav v0.0.11 h1:5S63w/Z/2ZsiekIDhg+CQxiVcxLp0vM0UaREOdul3I0=
github.com/suborbital/grav v0.0.11/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw=
github.com/suborbital/grav v0.1.0 h1:rTQV8TsEYf3f7xKZtEBdvKlj4MWWDuRlSN8v/kpQ/44=
github.com/suborbital/grav v0.1.0/go.mod h1:KhweoYFOVx408ZyPttmDuKghbglfRnnzHHByFOGzbHE=
github.com/suborbital/vektor v0.1.1 h1:F3n9rS1F3nc+1Q2HZxeVNinvVkCRliVQ01+jRASctH4=
github.com/suborbital/vektor v0.1.1/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
github.com/suborbital/vektor v0.1.2 h1:d4BvshbMl4wRVYPKO21vka7r89nlRrrZXidYQz07N9Q=
github.com/suborbital/vektor v0.1.2/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
github.com/suborbital/vektor v0.1.3 h1:rC5ic4FnjmcbizmV/WAQt67QkF6eJ7jHSsuy8IFC2bc=
github.com/suborbital/vektor v0.1.3/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
github.com/suborbital/vektor v0.2.2 h1:x3yit9RMXcP8LirkKb1f/psNev/G/iTIHo6eL/f1OBI=
github.com/suborbital/vektor v0.2.2/go.mod h1:6YQE7r6t1JcVs3twpqjXDftsLUaTNUk5YorRKHcDamI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d h1:dOiJ2n2cMwGLce/74I/QHMbnpk5GfY7InR8rczoMqRM=
golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
45 changes: 31 additions & 14 deletions hive/hive.go
@@ -1,15 +1,17 @@
package hive

import (
"encoding/json"

"github.com/pkg/errors"
"github.com/suborbital/grav/grav"
"github.com/suborbital/vektor/vk"
"github.com/suborbital/vektor/vlog"
)

const (
msgTypeHiveJobErr = "hive.joberr"
msgTypeHiveTypeErr = "hive.typeerr"
msgTypeHiveJobErr = "hive.joberr"
msgTypeHiveResult = "hive.result"
)

// JobFunc is a function that runs a job of a predetermined type
Expand Down Expand Up @@ -51,39 +53,54 @@ func (h *Hive) Handle(jobType string, runner Runnable, options ...Option) JobFun
}

// HandleMsg registers a Runnable with the Hive and triggers that job whenever the provided Grav pod
// receives a message of a particular type. The message is passed to the runnable as the job data.
// The job's result is then emitted as a message. If the result cannot be cast to type grav.Message,
// or if an error occurs, it is logged and an error is sent. If the result is nil, nothing is sent.
// receives a message of a particular type.
func (h *Hive) HandleMsg(pod *grav.Pod, msgType string, runner Runnable, options ...Option) {
h.handle(msgType, runner, options...)

h.Listen(pod, msgType)
}

// Listen causes Hive to listen for messages of the given type and trigger the job of the same type.
// The message's data is passed to the runnable as the job data.
// The job's result is then emitted as a message. If an error occurs, it is logged and an error is sent.
// If the result is nil, nothing is sent.
func (h *Hive) Listen(pod *grav.Pod, msgType string) {
helper := func(data interface{}) *Result {
job := NewJob(msgType, data)

return h.Do(job)
}

pod.OnType(func(msg grav.Message) error {
var resultMsg grav.Message
var replyMsg grav.Message

result, err := helper(msg).Then()
result, err := helper(msg.Data()).Then()
if err != nil {
h.log.Error(errors.Wrap(err, "job returned error result"))
resultMsg = grav.NewMsg(msgTypeHiveJobErr, []byte(err.Error()))
replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(err.Error()))
} else {
if result == nil {
return nil
}

var ok bool
resultMsg, ok = result.(grav.Message)
if !ok {
h.log.Error(errors.Wrap(err, "job result is not a grav.Message, discarding"))
resultMsg = grav.NewMsg(msgTypeHiveTypeErr, []byte("failed to convert job result to grav.Message type"))
if resultMsg, isMsg := result.(grav.Message); isMsg {
resultMsg.SetReplyTo(msg.UUID())
replyMsg = resultMsg
} else if bytes, isBytes := result.([]byte); isBytes {
replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, bytes)
} else if resultString, isString := result.(string); isString {
replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, []byte(resultString))
} else {
resultJSON, err := json.Marshal(result)
if err != nil {
replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(errors.Wrap(err, "failed to Marshal job result").Error()))
}

replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, resultJSON)
}
}

pod.Send(resultMsg)
pod.Send(replyMsg)

return nil
}, msgType)
Expand Down
7 changes: 1 addition & 6 deletions hive/message_test.go
Expand Up @@ -14,12 +14,7 @@ const msgTypeTester = "hive.test"
type msgRunner struct{}

func (m *msgRunner) Run(job Job, do DoFunc) (interface{}, error) {
msg := job.Msg()
if msg == nil {
return nil, errors.New("not a message")
}

name := string(msg.Data())
name := string(job.Bytes())

reply := grav.NewMsg(msgTypeTester, []byte(fmt.Sprintf("hello, %s", name)))

Expand Down

0 comments on commit 7e4fa44

Please sign in to comment.