Skip to content

Commit

Permalink
Merge pull request #43 from suborbital/connor/grav-update-nil-msg
Browse files Browse the repository at this point in the history
Update to send MsgTypeHiveNilResult when a Job returns a nil result
  • Loading branch information
cohix committed Dec 31, 2020
2 parents 7e4fa44 + c001e06 commit 1d5f297
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 22 deletions.
9 changes: 6 additions & 3 deletions go.mod
Expand Up @@ -3,9 +3,12 @@ module github.com/suborbital/hive
go 1.14

require (
github.com/google/uuid v1.1.2
github.com/google/uuid v1.1.3
github.com/pkg/errors v0.9.1
github.com/suborbital/grav v0.1.0
github.com/schollz/progressbar/v2 v2.15.0 // indirect
github.com/suborbital/grav v0.3.0
github.com/suborbital/vektor v0.2.2
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
)
23 changes: 23 additions & 0 deletions go.sum
Expand Up @@ -6,6 +6,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.3 h1:twObb+9XcuH5B9V1TBCvvvZoO6iEdILi2a76PYn5rJI=
github.com/google/uuid v1.1.3/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
Expand All @@ -14,19 +16,23 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/schollz/peerdiscovery v1.6.0/go.mod h1:hSU7N/NkfNH6AZwU/WBcDZtMABVbTfAWk/XD3XKxN+s=
github.com/schollz/peerdiscovery v1.6.1/go.mod h1:bq5/NB9o9/jyEwiW4ubehfToBa2LwdQQMoNiy/vSdYg=
github.com/schollz/progressbar/v2 v2.15.0/go.mod h1:UdPq3prGkfQ7MOzZKlDRpYKcFqEMczbD7YmbPgpzKMI=
github.com/sethvargo/go-envconfig v0.3.0 h1:9xW3N/jvX6TkJzY99pW4WPq8tMYQElwWZinf0P9fpXY=
github.com/sethvargo/go-envconfig v0.3.0/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0=
github.com/sethvargo/go-envconfig v0.3.2 h1:277Lb2iTpUZjUZu1qLoLa/aetwvtZbKh8wNWXmc6dSk=
github.com/sethvargo/go-envconfig v0.3.2/go.mod h1:XZ2JRR7vhlBEO5zMmOpLgUhgYltqYqq4d4tKagtPUv0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/suborbital/grav v0.0.10 h1:guf0PEBwqnwFCUOReStx+RddqSN1j81x+78c1bX/MI4=
github.com/suborbital/grav v0.0.10/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw=
github.com/suborbital/grav v0.0.11 h1:5S63w/Z/2ZsiekIDhg+CQxiVcxLp0vM0UaREOdul3I0=
github.com/suborbital/grav v0.0.11/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw=
github.com/suborbital/grav v0.1.0 h1:rTQV8TsEYf3f7xKZtEBdvKlj4MWWDuRlSN8v/kpQ/44=
github.com/suborbital/grav v0.1.0/go.mod h1:KhweoYFOVx408ZyPttmDuKghbglfRnnzHHByFOGzbHE=
github.com/suborbital/grav v0.3.0 h1:dxe3YCKIblSlZ0Pl+uy0qW/xtXrmRD2gQNLSY4ErgvY=
github.com/suborbital/grav v0.3.0/go.mod h1:PapJ62PtT9dPmW37WaCD+UMhoZiNPp0N9E3nUfEujC4=
github.com/suborbital/vektor v0.1.1 h1:F3n9rS1F3nc+1Q2HZxeVNinvVkCRliVQ01+jRASctH4=
github.com/suborbital/vektor v0.1.1/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw=
github.com/suborbital/vektor v0.1.2 h1:d4BvshbMl4wRVYPKO21vka7r89nlRrrZXidYQz07N9Q=
Expand All @@ -41,19 +47,34 @@ golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E=
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201217014255-9d1352758620/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d h1:dOiJ2n2cMwGLce/74I/QHMbnpk5GfY7InR8rczoMqRM=
golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201113135734-0a15ea8d9b02/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand All @@ -62,3 +83,5 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
36 changes: 21 additions & 15 deletions hive/hive.go
Expand Up @@ -9,9 +9,11 @@ import (
"github.com/suborbital/vektor/vlog"
)

// MsgTypeHiveJobErr and others are Grav message types used for Hive job
const (
msgTypeHiveJobErr = "hive.joberr"
msgTypeHiveResult = "hive.result"
MsgTypeHiveJobErr = "hive.joberr"
MsgTypeHiveResult = "hive.result"
MsgTypeHiveNilResult = "hive.nil"
)

// JobFunc is a function that runs a job of a predetermined type
Expand Down Expand Up @@ -71,39 +73,43 @@ func (h *Hive) Listen(pod *grav.Pod, msgType string) {
return h.Do(job)
}

pod.OnType(func(msg grav.Message) error {
pod.OnType(msgType, func(msg grav.Message) error {
var replyMsg grav.Message

result, err := helper(msg.Data()).Then()
if err != nil {
h.log.Error(errors.Wrap(err, "job returned error result"))
replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(err.Error()))
h.log.Error(errors.Wrapf(err, "job from message %s returned error result", msg.UUID()))
replyMsg = grav.NewMsg(MsgTypeHiveJobErr, []byte(err.Error()))
} else {
if result == nil {
return nil
}

if resultMsg, isMsg := result.(grav.Message); isMsg {
// if the job returned no result
replyMsg = grav.NewMsg(MsgTypeHiveNilResult, []byte{})
} else if resultMsg, isMsg := result.(grav.Message); isMsg {
// if the job returned a Grav message
resultMsg.SetReplyTo(msg.UUID())
replyMsg = resultMsg
} else if bytes, isBytes := result.([]byte); isBytes {
replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, bytes)
// if the job returned bytes
replyMsg = grav.NewMsg(MsgTypeHiveResult, bytes)
} else if resultString, isString := result.(string); isString {
replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, []byte(resultString))
// if the job returned a string
replyMsg = grav.NewMsg(MsgTypeHiveResult, []byte(resultString))
} else {
// if the job returned something else like a struct
resultJSON, err := json.Marshal(result)
if err != nil {
replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(errors.Wrap(err, "failed to Marshal job result").Error()))
h.log.Error(errors.Wrapf(err, "job from message %s returned result that could not be JSON marshalled", msg.UUID()))
replyMsg = grav.NewMsg(MsgTypeHiveJobErr, []byte(errors.Wrap(err, "failed to Marshal job result").Error()))
}

replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, resultJSON)
replyMsg = grav.NewMsg(MsgTypeHiveResult, resultJSON)
}
}

pod.Send(replyMsg)
pod.ReplyTo(msg, replyMsg)

return nil
}, msgType)
})
}

// Job is a shorter alias for NewJob
Expand Down
43 changes: 39 additions & 4 deletions hive/message_test.go
Expand Up @@ -10,7 +10,9 @@ import (
)

const msgTypeTester = "hive.test"
const msgTypeNil = "hive.testnil"

// to test jobs listening to a Grav message
type msgRunner struct{}

func (m *msgRunner) Run(job Job, do DoFunc) (interface{}, error) {
Expand All @@ -23,6 +25,15 @@ func (m *msgRunner) Run(job Job, do DoFunc) (interface{}, error) {

func (m *msgRunner) OnStart() error { return nil }

// to test jobs with a nil result
type nilRunner struct{}

func (m *nilRunner) Run(job Job, do DoFunc) (interface{}, error) {
return nil, nil
}

func (m *nilRunner) OnStart() error { return nil }

func TestHandleMessage(t *testing.T) {
hive := New()
g := grav.New()
Expand All @@ -33,10 +44,10 @@ func TestHandleMessage(t *testing.T) {

sender := g.Connect()

sender.OnType(func(msg grav.Message) error {
sender.OnType(msgTypeTester, func(msg grav.Message) error {
counter.Count()
return nil
}, msgTypeTester)
})

sender.Send(grav.NewMsg(msgTypeTester, []byte("charlie brown")))

Expand All @@ -55,10 +66,10 @@ func TestHandleMessagePt2(t *testing.T) {

sender := g.Connect()

sender.OnType(func(msg grav.Message) error {
sender.OnType(msgTypeTester, func(msg grav.Message) error {
counter.Count()
return nil
}, msgTypeTester)
})

for i := 0; i < 9876; i++ {
sender.Send(grav.NewMsg(msgTypeTester, []byte("charlie brown")))
Expand All @@ -68,3 +79,27 @@ func TestHandleMessagePt2(t *testing.T) {
t.Error(errors.Wrap(err, "failed to counter.Wait"))
}
}

func TestHandleMessageNilResult(t *testing.T) {
hive := New()
g := grav.New()

hive.HandleMsg(g.Connect(), msgTypeNil, &nilRunner{})

counter := testutil.NewAsyncCounter(10)

pod := g.Connect()

pod.OnType(MsgTypeHiveNilResult, func(msg grav.Message) error {
counter.Count()
return nil
})

for i := 0; i < 5; i++ {
pod.Send(grav.NewMsg(msgTypeNil, []byte("hi")))
}

if err := counter.Wait(5, 1); err != nil {
t.Error(errors.Wrap(err, "failed to counter.Wait"))
}
}

0 comments on commit 1d5f297

Please sign in to comment.