diff --git a/.github/workflows/sanity.yml b/.github/workflows/sanity.yml new file mode 100644 index 00000000..331679db --- /dev/null +++ b/.github/workflows/sanity.yml @@ -0,0 +1,33 @@ +name: Testapalooza + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + + build: + name: Test + runs-on: ubuntu-latest + steps: + + - name: Set up Go 1.15 + uses: actions/setup-go@v1 + with: + go-version: 1.15 + id: go + + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + + - name: Get dependencies + run: | + # have to do this nonsense to ensure that the correct arch gets downloaded for libwasmer + go get -v -t -d ./... + go mod vendor + + - name: Run test + run: | + make build diff --git a/Dockerfile b/Dockerfile index d1801495..d307b627 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,8 +2,9 @@ FROM golang:1.15 as builder RUN mkdir -p /go/src/github.com/suborbital/atmo COPY . /go/src/github.com/suborbital/atmo/ +WORKDIR /go/src/github.com/suborbital/atmo/ -RUN go install github.com/suborbital/atmo +RUN go install FROM debian:buster-slim diff --git a/Makefile b/Makefile index afe37035..5e541737 100644 --- a/Makefile +++ b/Makefile @@ -11,9 +11,6 @@ atmo: build atmo/docker: build/docker docker run -v ${PWD}/$(dir):/home/atmo -e ATMO_HTTP_PORT=8080 -p 8080:8080 atmo:dev atmo -test/run: - go run ./main.go - test/go: go test -v --count=1 -p=1 ./... diff --git a/atmo/coordinator/coordinator.go b/atmo/coordinator/coordinator.go index 953455e9..055b2b2e 100644 --- a/atmo/coordinator/coordinator.go +++ b/atmo/coordinator/coordinator.go @@ -18,11 +18,6 @@ import ( "github.com/suborbital/vektor/vlog" ) -const ( - msgTypeHiveJobErr = "hive.joberr" - msgTypeHiveResult = "hive.result" -) - // Coordinator is a type that is responsible for covnerting the directive into // usable Vektor handles by coordinating Hive jobs and meshing when needed. type Coordinator struct { @@ -39,9 +34,12 @@ type Coordinator struct { // CoordinatedRequest represents a request being coordinated type CoordinatedRequest struct { - URL string `json:"url"` - Body string `json:"body"` - State map[string]interface{} `json:"state"` + Method string `json:"method"` + URL string `json:"url"` + Body string `json:"body"` + Headers map[string]string `json:"headers"` + Params map[string]string `json:"params"` + State map[string]interface{} `json:"state"` } type requestScope struct { @@ -78,7 +76,7 @@ func (c *Coordinator) UseBundle(bundle *bundle.Bundle) *vk.RouteGroup { group := vk.Group("").Before(scopeMiddleware) // connect a Grav pod to each function - for _, fn := range bundle.Directive.Functions { + for _, fn := range bundle.Directive.Runnables { fqfn, err := bundle.Directive.FQFN(fn.Name) if err != nil { c.log.Error(errors.Wrapf(err, "failed to derive FQFN for Directive function %s, function will not be available", fn.Name)) @@ -111,26 +109,44 @@ func (c *Coordinator) vkHandlerForDirectiveHandler(handler directive.Handler) vk defer r.Body.Close() + flatHeaders := map[string]string{} + for k, v := range r.Header { + flatHeaders[k] = v[0] + } + + flatParams := map[string]string{} + for _, p := range ctx.Params { + flatParams[p.Key] = p.Value + } + req := CoordinatedRequest{ - URL: r.URL.String(), - Body: string(reqBody), - State: map[string]interface{}{}, + Method: r.Method, + URL: r.URL.String(), + Body: string(reqBody), + Headers: flatHeaders, + Params: flatParams, + State: map[string]interface{}{}, } + // run through the handler's steps, updating the coordinated state after each for _, step := range handler.Steps { - stateJSON, err := req.Marshal() + stateJSON, err := stateJSONForStep(req, step) if err != nil { - return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrap(err, "failed to Marshal Request State")) + ctx.Log.Error(errors.Wrap(err, "failed to stateJSONForStep")) + return nil, err } if step.IsFn() { - entry, err := c.runSingleFn(step.Fn, stateJSON, ctx) + entry, err := c.runSingleFn(step.CallableFn, stateJSON, ctx) if err != nil { return nil, err } if entry != nil { - req.State[step.Fn] = entry + // hive-wasm issue #45 + key := key(step.CallableFn) + + req.State[key] = entry } } else { // if the step is a group, run them all concurrently and collect the results @@ -149,43 +165,46 @@ func (c *Coordinator) vkHandlerForDirectiveHandler(handler directive.Handler) vk } } -func (c *Coordinator) runSingleFn(name string, body []byte, ctx *vk.Ctx) (interface{}, error) { +func (c *Coordinator) runSingleFn(fn directive.CallableFn, body []byte, ctx *vk.Ctx) (interface{}, error) { start := time.Now() defer func() { duration := time.Since(start) - ctx.Log.Debug("fn", name, fmt.Sprintf("executed in %d ms", duration.Milliseconds())) + ctx.Log.Debug("fn", fn.Fn, fmt.Sprintf("executed in %d ms", duration.Milliseconds())) }() // calculate the FQFN - fqfn, err := c.directive.FQFN(name) + fqfn, err := c.directive.FQFN(fn.Fn) if err != nil { - return nil, errors.Wrapf(err, "failed to FQFN for group fn %s", name) + return nil, errors.Wrapf(err, "failed to FQFN for group fn %s", fn.Fn) } // compose a message containing the serialized request state, and send it via Grav // for the appropriate meshed Hive to handle. It may be handled by self if appropriate. jobMsg := grav.NewMsg(fqfn, body) + var jobResult []byte var jobErr error pod := c.bus.Connect() defer pod.Disconnect() - podErr := pod.SendAndWaitOnReply(jobMsg, func(msg grav.Message) error { + podErr := pod.Send(jobMsg).WaitUntil(grav.Timeout(3), func(msg grav.Message) error { switch msg.Type() { - case msgTypeHiveResult: + case hive.MsgTypeHiveResult: jobResult = msg.Data() - case msgTypeHiveJobErr: + case hive.MsgTypeHiveJobErr: jobErr = errors.New(string(msg.Data())) + case hive.MsgTypeHiveNilResult: + // do nothing } return nil }) // check for errors and results, convert to something useful, and return + // this should probably be refactored as it looks pretty goofy if podErr != nil { - // Hive needs to be updated to reply with a message when a job returns no result if podErr == grav.ErrWaitTimeout { // do nothing } else { @@ -194,11 +213,11 @@ func (c *Coordinator) runSingleFn(name string, body []byte, ctx *vk.Ctx) (interf } if jobErr != nil { - return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrapf(jobErr, "group fn %s failed", name)) + return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrapf(jobErr, "group fn %s failed", fn.Fn)) } if jobResult == nil { - ctx.Log.Debug("fn", name, "returned a nil result") + ctx.Log.Debug("fn", fn.Fn, "returned a nil result") return nil, nil } @@ -211,11 +230,12 @@ type fnResult struct { err error } -func (c *Coordinator) runGroup(fns []string, body []byte, ctx *vk.Ctx) (map[string]interface{}, error) { +// runGroup runs a group of functions +// this is all more complicated than it needs to be, Grav should be doing more of the work for us here +func (c *Coordinator) runGroup(fns []directive.CallableFn, body []byte, ctx *vk.Ctx) (map[string]interface{}, error) { start := time.Now() defer func() { - duration := time.Since(start) - ctx.Log.Debug("group", fmt.Sprintf("executed in %d ms", duration.Milliseconds())) + ctx.Log.Debug("group", fmt.Sprintf("executed in %d ms", time.Since(start).Milliseconds())) }() resultChan := make(chan fnResult, len(fns)) @@ -225,13 +245,15 @@ func (c *Coordinator) runGroup(fns []string, body []byte, ctx *vk.Ctx) (map[stri // functionality to collect all the responses, probably using the parent ID. for i := range fns { fn := fns[i] - ctx.Log.Debug("running fn", fn, "from group") + ctx.Log.Debug("running fn", fn.Fn, "from group") + + key := key(fn) go func() { res, err := c.runSingleFn(fn, body, ctx) result := fnResult{ - name: fn, + name: key, result: res, err: err, } @@ -279,6 +301,36 @@ func scopeMiddleware(r *http.Request, ctx *vk.Ctx) error { return nil } +func stateJSONForStep(req CoordinatedRequest, step directive.Executable) ([]byte, error) { + // the desired state is cached, so after the first call this is very efficient + desired, err := step.ParseWith() + if err != nil { + return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrap(err, "failed to ParseWith")) + } + + // based on the step's `with` clause, build the state to pass into the function + stepState, err := desiredState(desired, req.State) + if err != nil { + return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrap(err, "failed to build desiredState")) + } + + stepReq := CoordinatedRequest{ + Method: req.Method, + URL: req.URL, + Body: req.Body, + Headers: req.Headers, + Params: req.Params, + State: stepState, + } + + stateJSON, err := stepReq.Marshal() + if err != nil { + return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrap(err, "failed to Marshal Request State")) + } + + return stateJSON, nil +} + // resultFromState returns the state value for the last single function that ran in a handler func resultFromState(handler directive.Handler, state map[string]interface{}) interface{} { // if the handler defines a response explicitly, use it (return nil if there is nothing in state) @@ -305,6 +357,25 @@ func resultFromState(handler directive.Handler, state map[string]interface{}) in return nil } +func desiredState(desired []directive.Alias, state map[string]interface{}) (map[string]interface{}, error) { + if desired == nil || len(desired) == 0 { + return state, nil + } + + desiredState := map[string]interface{}{} + + for _, a := range desired { + val, exists := state[a.Key] + if !exists { + return nil, fmt.Errorf("failed to build desired state, %s does not exists in handler state", a.Key) + } + + desiredState[a.Alias] = val + } + + return desiredState, nil +} + // stringOrMap converts bytes to a map if they are JSON, or a string if not func stringOrMap(result []byte) interface{} { resMap := map[string]interface{}{} @@ -314,3 +385,13 @@ func stringOrMap(result []byte) interface{} { return resMap } + +func key(fn directive.CallableFn) string { + key := fn.Fn + + if fn.As != "" { + key = fn.As + } + + return key +} diff --git a/docs/introduction/getstarted.md b/docs/introduction/getstarted.md index 00d7d95b..b4eddbc7 100644 --- a/docs/introduction/getstarted.md +++ b/docs/introduction/getstarted.md @@ -37,9 +37,7 @@ If you prefer using Docker, you can locally build and run Atmo in Docker using: ``` ## How it works - -If you explore the `example-project` directory, you will see several Runnables \(`fetch-test`, `modify-url`, etc.\) and a `Directive.yaml` file. Each folder represents an Atmo function, and the Directive is responsible for describing how those functions should be used. The Directive looks like this: - +If you explore the `example-project` directory, you will see several Runnables (`fetch-test`, `modify-url`, etc.) and a `Directive.yaml` file. Each folder represents an Atmo function, and the Directive is responsible for describing how those functions should be used. The Directive looks like this: ```yaml identifier: com.suborbital.test version: v0.0.1 @@ -50,20 +48,30 @@ handlers: method: POST steps: - group: - - modify-url - - helloworld-rs + - fn: modify-url + - fn: helloworld-rs + as: hello - fn: fetch-test + with: + - "url: modify-url" + - "logme: hello" ``` +This describes the application being constructed. It declares a route (`POST /hello`) and a set of `steps` to handle that request. The `steps` are a set of functions to be **composed** when handling requests to the `/hello` endpoint. -This describes the application being constructed. This declares a route \(`POST /hello`\) and how to handle that request. The `steps` provided contain a set of instructions for how to handle requests to the `/hello` endpoint. - -The first step is a `group`, meaning that all of the functions in that group will be executed **concurrently**. +The first step is a `group`, meaning that all of the functions in that group will be executed **concurrently**. The second step is a single function that uses the [Runnable API](https://github.com/suborbital/hive-wasm) to make an HTTP request. The API is continually evolving to include more capabilities. In addition to making HTTP requests, it includes logging abilities and more. -For each function executed, its result gets stored in the request handler's `state`. The `state` is used to pass values between functions, since they are completely isolated and independent from one another. The `modify-url` function takes the request body and modifies it \(in this case, adding `/suborbital` to it\). The second step \(`fetch-test`\) takes that modified URL and makes an HTTP request to fetch it. The final function's output is used as the response data for the request. +## State +After each step, its function results gets stored in the request handler's `state`. The `state` is an ephemeral set of key/value pairs created for each request. State is used to pass values between functions, since they are completely isolated and unaware of one another. -## Coming soon +The `modify-url` function for example takes the request body (in this case, a URL), and modifies it (by adding `/suborbital` to it). The second step (`fetch-test`) takes that modified URL and makes an HTTP request to fetch it. The final function's output is used as the response data for the request. -Further functionality is incoming along with improved docs, more examples, and an improved Directive format. Visit [the Suborbital website](https://suborbital.dev) to sign up for email updates related to new versions of Atmo. +There are two clauses, `as` and `with` that make working with request state easier. `as` will assign the value returned by a function to a particular name in state. In the above example, `helloworld-rs` is saved into state as `hello`. You can think of this just like storing a value into a variable! +`with` allows the developer to pass a "desired state" into a given function. Rather than passing the entire state with the existing keys, the developer can optionally define a custom state by choosing aliases for some or all of the keys available in request state. This is just like choosing parameter values to pass into function arguments! + +`subo` will validate your directive to help ensure that your Directive is correct, including validating that you're not accessing keys that don't exist in state. + +## Coming soon +Further functionality is incoming along with improved docs, more examples, and an improved Directive format. Visit [the Suborbital website](https://suborbital.dev) to sign up for email updates related to new versions of Atmo. \ No newline at end of file diff --git a/docs/introduction/getstarted/README.md b/docs/introduction/getstarted/README.md deleted file mode 100644 index 88224403..00000000 --- a/docs/introduction/getstarted/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# Get started with Atmo 🚀 - -**NOTE:** These docs are far from complete, but are being actively worked on. - -Atmo is a self-hosted that uses a _Runnable bundle_ to run your described application. The bundle includes two things: a Directive, and a set of Runnable WebAssembly modules (functions compiled from various languages such as Rust and Swift). - -## Building a bundle -Bundles are built using `subo`, which is the Suborbital CLI tool. You'll need to install `subo` to build a bundle. To install the tool, [visit the subo repository](https://github.com/suborbital/subo). - -Once you've installed `subo`, you can use it to build the example project included with this repository. Clone this project, and then run: -``` -> subo build ./example-project --bundle -``` -The end of this command should read `✅ DONE: bundle was created -> example-project/runnables.wasm.zip` - -## Running Atmo -Once you have your runnable bundle, you can run Atmo: -``` -> ATMO_HTTP_PORT=8080 make atmo bundle=./example-project/runnables.wasm.zip -``` -Atmo will start up and you will begin to see its structured logs in yor terminal. Make a request to `POST localhost:8080/hello` with a request body of `https://github.com`. You will recieve the HTML fetched from `https://github.com/suborbital`. - -## Using Docker -If you prefer using Docker, you can locally build and run Atmo in Docker using: -``` -> make atmo/docker dir=example-project -``` - -## How it works -If you explore the `example-project` directory, you will see several Runnables (`fetch-test`, `modify-url`, etc.) and a `Directive.yaml` file. Each folder represents an Atmo function, and the Directive is responsible for describing how those functions should be used. The Directive looks like this: -```yaml -identifier: com.suborbital.test -version: v0.0.1 - -handlers: - - type: request - resource: /hello - method: POST - steps: - - group: - - modify-url - - helloworld-rs - - fn: fetch-test -``` -This describes the application being constructed. This declares a route (`POST /hello`) and how to handle that request. The `steps` provided contain a set of instructions on how to handle requests to the `/hello` endpoint. The first step is a `group`, meaning that all of the functions in that group will be executed **concurrently**. The second step is a single function that uses the [Runnable API](https://github.com/suborbital/hive-wasm) to make an HTTP request. The API is continually evolving to include more capabilities. In addition to making HTTP requests, it includes logging abilities and more. - -For each function executed, its result gets stored in the request handler's `state`. The `state` is used to pass values between functions, since they are completely isolated and independent from one another. The `modify-url` function takes the request body (in this case, a URL), and modifies it (in this case, adding `/suborbital` to it). The second step (`fetch-test`) takes that modified URL and makes an HTTP request to fetch it. The final function's output is used as the response data for the request. - -## Coming soon -Further functionality is incoming along with improved docs, more examples, and an improved Directive format. Visit [the Suborbital website](https://suborbital.dev) to sign up for email updates related to new versions of Atmo. \ No newline at end of file diff --git a/example-project/Directive.yaml b/example-project/Directive.yaml index a9f54db9..703015f3 100644 --- a/example-project/Directive.yaml +++ b/example-project/Directive.yaml @@ -7,6 +7,10 @@ handlers: method: POST steps: - group: - - modify-url - - helloworld-rs - - fn: fetch-test \ No newline at end of file + - fn: modify-url + - fn: helloworld-rs + as: hello + - fn: fetch-test + with: + - "url: modify-url" + - "logme: hello" \ No newline at end of file diff --git a/example-project/fetch-test/.hive.yml b/example-project/fetch-test/.runnable.yml similarity index 100% rename from example-project/fetch-test/.hive.yml rename to example-project/fetch-test/.runnable.yml diff --git a/example-project/fetch-test/src/lib.rs b/example-project/fetch-test/src/lib.rs index fe13297a..ea45aab7 100755 --- a/example-project/fetch-test/src/lib.rs +++ b/example-project/fetch-test/src/lib.rs @@ -1,6 +1,7 @@ use suborbital::runnable; use suborbital::request; use suborbital::net; +use suborbital::log; struct FetchTest{} @@ -11,7 +12,10 @@ impl runnable::Runnable for FetchTest { None => return Some(String::from("failed").as_bytes().to_vec()) }; - let url = req.state["modify-url"].as_str().unwrap(); + let msg = req.state["logme"].as_str().unwrap(); + log::info(msg); + + let url = req.state["url"].as_str().unwrap(); let data = net::fetch(url); diff --git a/example-project/helloworld-rs/.hive.yml b/example-project/helloworld-rs/.runnable.yml similarity index 100% rename from example-project/helloworld-rs/.hive.yml rename to example-project/helloworld-rs/.runnable.yml diff --git a/example-project/helloworld-rs/src/lib.rs b/example-project/helloworld-rs/src/lib.rs index bdd73367..8a7804f7 100755 --- a/example-project/helloworld-rs/src/lib.rs +++ b/example-project/helloworld-rs/src/lib.rs @@ -1,12 +1,16 @@ use suborbital::runnable; +use suborbital::request; struct HelloworldRs{} impl runnable::Runnable for HelloworldRs { fn run(&self, input: Vec) -> Option> { - let in_string = String::from_utf8(input).unwrap(); + let req = match request::from_json(input) { + Some(r) => r, + None => return Some(String::from("failed").as_bytes().to_vec()) + }; - Some(String::from(format!("hello {}", in_string)).as_bytes().to_vec()) + Some(String::from(format!("hello {}", req.body)).as_bytes().to_vec()) } } diff --git a/example-project/modify-url/.hive.yml b/example-project/modify-url/.runnable.yml similarity index 100% rename from example-project/modify-url/.hive.yml rename to example-project/modify-url/.runnable.yml diff --git a/go.mod b/go.mod index b183e003..6f70c861 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,16 @@ module github.com/suborbital/atmo go 1.15 require ( + github.com/google/uuid v1.1.3 // indirect github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.1.1 - github.com/suborbital/grav v0.1.1 - github.com/suborbital/hive v0.1.4 - github.com/suborbital/hive-wasm v0.2.5 + github.com/suborbital/grav v0.3.0 + github.com/suborbital/hive v0.1.5 + github.com/suborbital/hive-wasm v0.2.6 github.com/suborbital/vektor v0.2.2 - golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9 // indirect + golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect golang.org/x/mod v0.4.0 // indirect - golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 // indirect + golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect golang.org/x/sync v0.0.0-20201207232520-09787c993a3a // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 8400f153..0155f093 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.3 h1:twObb+9XcuH5B9V1TBCvvvZoO6iEdILi2a76PYn5rJI= +github.com/google/uuid v1.1.3/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= @@ -152,6 +154,7 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/schollz/peerdiscovery v1.6.0/go.mod h1:hSU7N/NkfNH6AZwU/WBcDZtMABVbTfAWk/XD3XKxN+s= +github.com/schollz/peerdiscovery v1.6.1/go.mod h1:bq5/NB9o9/jyEwiW4ubehfToBa2LwdQQMoNiy/vSdYg= github.com/schollz/progressbar/v2 v2.15.0/go.mod h1:UdPq3prGkfQ7MOzZKlDRpYKcFqEMczbD7YmbPgpzKMI= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sethvargo/go-envconfig v0.3.0 h1:9xW3N/jvX6TkJzY99pW4WPq8tMYQElwWZinf0P9fpXY= @@ -177,18 +180,21 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/suborbital/grav v0.0.11 h1:5S63w/Z/2ZsiekIDhg+CQxiVcxLp0vM0UaREOdul3I0= github.com/suborbital/grav v0.0.11/go.mod h1:fN837ibcYZILUd/nKoSaEbo+oTSGRtTbbm/MiwmM3Pw= github.com/suborbital/grav v0.1.0 h1:rTQV8TsEYf3f7xKZtEBdvKlj4MWWDuRlSN8v/kpQ/44= github.com/suborbital/grav v0.1.0/go.mod h1:KhweoYFOVx408ZyPttmDuKghbglfRnnzHHByFOGzbHE= -github.com/suborbital/grav v0.1.1 h1:Hn3WWj3JLfw1VQKn9+h3Yxy8XrlFM7Q55Cv3XpAj5Ks= -github.com/suborbital/grav v0.1.1/go.mod h1:KhweoYFOVx408ZyPttmDuKghbglfRnnzHHByFOGzbHE= +github.com/suborbital/grav v0.3.0 h1:dxe3YCKIblSlZ0Pl+uy0qW/xtXrmRD2gQNLSY4ErgvY= +github.com/suborbital/grav v0.3.0/go.mod h1:PapJ62PtT9dPmW37WaCD+UMhoZiNPp0N9E3nUfEujC4= github.com/suborbital/hive v0.1.3 h1:b29wRkR6Vg1LiPF3H3bfJq+N2xY2AQl0H+fkwC+MJtk= github.com/suborbital/hive v0.1.3/go.mod h1:CxEZVgJfnwOzUt/67YYnuw5uunb1t5oAoGpCrtIcPiE= github.com/suborbital/hive v0.1.4 h1:+ywO+vh/b/VdBDeGp6033zhcN5bQ8cPwVA8duiuWWoA= github.com/suborbital/hive v0.1.4/go.mod h1:HqkRK2LgA10WuPCTqNtokO5hukn9Hgab3gjLECL15v0= -github.com/suborbital/hive-wasm v0.2.5 h1:kd5uWaZ3QVzAbIF8TTfQ9t9XHY8kVbhb1/ZFJarsJTo= -github.com/suborbital/hive-wasm v0.2.5/go.mod h1:D3xySADEKHXXaySSUCdM4eqxnIb44X78JQb7OFh+XK8= +github.com/suborbital/hive v0.1.5 h1:pFWcyH8W/lchs6UfZZGYCG1ajIUz9LzGilt6emiRVUM= +github.com/suborbital/hive v0.1.5/go.mod h1:7EhT+wD3Wk31YeUwbw/3PvLTq4QF5emJYAu23sDeEo0= +github.com/suborbital/hive-wasm v0.2.6 h1:wBvimdaNDFu5fja35uzAi6yXKba4DHNVV0OtaQqsdkk= +github.com/suborbital/hive-wasm v0.2.6/go.mod h1:D3xySADEKHXXaySSUCdM4eqxnIb44X78JQb7OFh+XK8= github.com/suborbital/vektor v0.1.1/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= github.com/suborbital/vektor v0.1.3/go.mod h1:tJ4gA2P8NWC4Pdu0TgpSBWZHuZ1Qhp+r5RlsqyIqdyw= github.com/suborbital/vektor v0.2.2 h1:x3yit9RMXcP8LirkKb1f/psNev/G/iTIHo6eL/f1OBI= @@ -216,8 +222,9 @@ golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9 h1:sYNJzB4J8toYPQTM6pAkcmBRgw9SnQKP9oXCHfgy604= -golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20201217014255-9d1352758620/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= +golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -259,8 +266,10 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201026091529-146b70c837a4 h1:awiuzyrRjJDb+OXi9ceHO3SDxVoN3JER57mhtqkdQBs= golang.org/x/net v0.0.0-20201026091529-146b70c837a4/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201029221708-28c70e62bb1d/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 h1:lwlPPsmjDKK0J6eG6xDWd5XPehI0R024zxjDnw3esPA= -golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201216054612-986b41b23924/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -293,7 +302,9 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201029080932-201ba4db2418/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201113135734-0a15ea8d9b02/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -364,6 +375,7 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/vendor/github.com/google/uuid/uuid.go b/vendor/github.com/google/uuid/uuid.go index 524404cc..daf3639c 100644 --- a/vendor/github.com/google/uuid/uuid.go +++ b/vendor/github.com/google/uuid/uuid.go @@ -35,6 +35,12 @@ const ( var rander = rand.Reader // random function +type invalidLengthError struct{ len int } + +func (err *invalidLengthError) Error() string { + return fmt.Sprintf("invalid UUID length: %d", err.len) +} + // Parse decodes s into a UUID or returns an error. Both the standard UUID // forms of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and // urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx are decoded as well as the @@ -68,7 +74,7 @@ func Parse(s string) (UUID, error) { } return uuid, nil default: - return uuid, fmt.Errorf("invalid UUID length: %d", len(s)) + return uuid, &invalidLengthError{len(s)} } // s is now at least 36 bytes long // it must be of the form xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx @@ -112,7 +118,7 @@ func ParseBytes(b []byte) (UUID, error) { } return uuid, nil default: - return uuid, fmt.Errorf("invalid UUID length: %d", len(b)) + return uuid, &invalidLengthError{len(b)} } // s is now at least 36 bytes long // it must be of the form xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx diff --git a/vendor/github.com/suborbital/grav/grav/discovery.go b/vendor/github.com/suborbital/grav/grav/discovery.go index 2f096975..2d1503bd 100644 --- a/vendor/github.com/suborbital/grav/grav/discovery.go +++ b/vendor/github.com/suborbital/grav/grav/discovery.go @@ -2,15 +2,20 @@ package grav import "github.com/suborbital/vektor/vlog" +// DiscoveryFunc is a function that allows a plugin to report a newly discovered node +type DiscoveryFunc func(endpoint string, uuid string) + // Discovery represents a discovery plugin type Discovery interface { - Start(*DiscoveryOpts, Transport, ConnectFunc) error + // Start is called to start the Discovery plugin + Start(*DiscoveryOpts, DiscoveryFunc) error } // DiscoveryOpts is a set of options for transports type DiscoveryOpts struct { NodeUUID string TransportPort string + TransportURI string Logger *vlog.Logger Custom interface{} } diff --git a/vendor/github.com/suborbital/grav/grav/grav.go b/vendor/github.com/suborbital/grav/grav/grav.go index 80f8e0dd..f26e8449 100644 --- a/vendor/github.com/suborbital/grav/grav/grav.go +++ b/vendor/github.com/suborbital/grav/grav/grav.go @@ -14,11 +14,10 @@ var ( // Grav represents a Grav message bus instance type Grav struct { - NodeUUID string - bus *messageBus - logger *vlog.Logger - transport Transport - discovery Discovery + NodeUUID string + bus *messageBus + logger *vlog.Logger + hub *hub } // New creates a new Grav with the provided options @@ -28,39 +27,13 @@ func New(opts ...OptionsModifier) *Grav { options := newOptionsWithModifiers(opts...) g := &Grav{ - NodeUUID: nodeUUID, - bus: newMessageBus(), - logger: options.Logger, - transport: options.Transport, - discovery: options.Discovery, + NodeUUID: nodeUUID, + bus: newMessageBus(), + logger: options.Logger, } - // start transport, then discovery if each have been configured (can have transport but no discovery) - if g.transport != nil { - transportOpts := &TransportOpts{ - NodeUUID: nodeUUID, - Port: options.Port, - Logger: options.Logger, - } - - go func() { - if err := g.transport.Serve(transportOpts, g.Connect); err != nil { - options.Logger.Error(errors.Wrap(err, "failed to Serve transport")) - } - - if g.discovery != nil { - discoveryOpts := &DiscoveryOpts{ - NodeUUID: nodeUUID, - TransportPort: transportOpts.Port, - Logger: options.Logger, - } - - if err := g.discovery.Start(discoveryOpts, g.transport, g.Connect); err != nil { - options.Logger.Error(errors.Wrap(err, "failed to Start discovery")) - } - } - }() - } + // the hub handles coordinating the transport and discovery plugins + g.hub = initHub(nodeUUID, options, options.Transport, options.Discovery, g.Connect()) return g } @@ -82,22 +55,14 @@ func (g *Grav) ConnectWithReplay() *Pod { // ConnectEndpoint uses the configured transport to connect the bus to an external endpoint func (g *Grav) ConnectEndpoint(endpoint string) error { - if g.transport == nil { - return ErrTransportNotConfigured - } - - return g.transport.ConnectEndpoint(endpoint, g.Connect) + return g.hub.connectEndpoint(endpoint, "") } // ConnectEndpointWithReplay uses the configured transport to connect the bus to an external endpoint // and replays recent messages to the endpoint when the pod registers its onFunc -func (g *Grav) ConnectEndpointWithReplay(endpoint string) error { - if g.transport == nil { - return ErrTransportNotConfigured - } +// func (g *Grav) ConnectEndpointWithReplay(endpoint string) error { - return g.transport.ConnectEndpoint(endpoint, g.ConnectWithReplay) -} +// } func (g *Grav) connectWithOpts(opts *podOpts) *Pod { pod := newPod(g.bus.busChan, opts) diff --git a/vendor/github.com/suborbital/grav/grav/hub.go b/vendor/github.com/suborbital/grav/grav/hub.go new file mode 100644 index 00000000..c6d600d3 --- /dev/null +++ b/vendor/github.com/suborbital/grav/grav/hub.go @@ -0,0 +1,249 @@ +package grav + +import ( + "sync" + + "github.com/pkg/errors" + "github.com/suborbital/vektor/vlog" +) + +// hub is responsible for coordinating the transport and discovery plugins +type hub struct { + nodeUUID string + transport Transport + discovery Discovery + pod *Pod + log *vlog.Logger + + connections map[string]Connection + + lock sync.RWMutex +} + +func initHub(nodeUUID string, options *Options, tspt Transport, dscv Discovery, pod *Pod) *hub { + h := &hub{ + nodeUUID: nodeUUID, + transport: tspt, + discovery: dscv, + pod: pod, + log: options.Logger, + connections: map[string]Connection{}, + lock: sync.RWMutex{}, + } + + // start transport, then discovery if each have been configured (can have transport but no discovery) + if h.transport != nil { + transportOpts := &TransportOpts{ + NodeUUID: nodeUUID, + Port: options.Port, + URI: options.URI, + Logger: options.Logger, + } + + // setup messages to be sent to all active connections + pod.On(h.outgoingMessageHandler()) + + go func() { + if err := h.transport.Setup(transportOpts, h.handleIncomingConnection, h.findConnection); err != nil { + options.Logger.Error(errors.Wrap(err, "failed to Setup transport")) + } + }() + + if h.discovery != nil { + discoveryOpts := &DiscoveryOpts{ + NodeUUID: nodeUUID, + TransportPort: transportOpts.Port, + TransportURI: transportOpts.URI, + Logger: options.Logger, + } + + go func() { + if err := h.discovery.Start(discoveryOpts, h.discoveryHandler()); err != nil { + options.Logger.Error(errors.Wrap(err, "failed to Start discovery")) + } + }() + } + } + + return h +} + +func (h *hub) discoveryHandler() func(endpoint string, uuid string) { + return func(endpoint string, uuid string) { + if uuid == h.nodeUUID { + h.log.Debug("discovered self, discarding") + return + } + + // connectEndpoint does this check as well, but it's better to do it here as well + // as it reduces the number of extraneous outgoing handshakes that get attempted. + if existing, exists := h.findConnection(uuid); exists { + if !existing.CanReplace() { + h.log.Debug("encountered duplicate connection, discarding") + return + } + } + + if err := h.connectEndpoint(endpoint, uuid); err != nil { + h.log.Error(errors.Wrap(err, "failed to connectEndpoint for discovered peer")) + } + } +} + +// connectEndpoint creates a new outgoing connection +func (h *hub) connectEndpoint(endpoint, uuid string) error { + if h.transport == nil { + return ErrTransportNotConfigured + } + + h.log.Debug("connecting to endpoint", endpoint) + + conn, err := h.transport.CreateConnection(endpoint) + if err != nil { + return errors.Wrap(err, "failed to transport.CreateConnection") + } + + h.setupOutgoingConnection(conn, uuid) + + return nil +} + +func (h *hub) setupOutgoingConnection(connection Connection, uuid string) { + handshake := &TransportHandshake{h.nodeUUID} + + ack, err := connection.DoOutgoingHandshake(handshake) + if err != nil { + h.log.Error(errors.Wrap(err, "failed to connection.DoOutgoingHandshake")) + connection.Close() + return + } + + if uuid == "" { + if ack.UUID == "" { + h.log.ErrorString("connection handshake returned empty UUID, terminating connection") + connection.Close() + return + } + + uuid = ack.UUID + } else if ack.UUID != uuid { + h.log.ErrorString("connection handshake Ack did not match Discovery Ack, terminating connection") + connection.Close() + return + } + + h.setupNewConnection(connection, uuid) +} + +func (h *hub) handleIncomingConnection(connection Connection) { + ack := &TransportHandshakeAck{h.nodeUUID} + + handshake, err := connection.DoIncomingHandshake(ack) + if err != nil { + h.log.Error(errors.Wrap(err, "failed to connection.DoIncomingHandshake")) + connection.Close() + return + } + + if handshake.UUID == "" { + h.log.ErrorString("connection handshake returned empty UUID, terminating connection") + connection.Close() + return + } + + h.setupNewConnection(connection, handshake.UUID) +} + +func (h *hub) setupNewConnection(connection Connection, uuid string) { + // if an existing connection is found, check if it can be replaced and do so if possible + if existing, exists := h.findConnection(uuid); exists { + if !existing.CanReplace() { + connection.Close() + h.log.Debug("encountered duplicate connection, discarding") + } else { + existing.Close() + h.replaceConnection(connection, uuid) + } + } else { + h.addConnection(connection, uuid) + } +} + +func (h *hub) outgoingMessageHandler() MsgFunc { + return func(msg Message) error { + // read-lock while dispatching all of the goroutines to prevent concurrent read/write + h.lock.RLock() + defer h.lock.RUnlock() + + for u := range h.connections { + uuid := u + conn := h.connections[uuid] + + go func() { + h.log.Debug("sending message", msg.UUID(), "to", uuid) + + if err := conn.Send(msg); err != nil { + if errors.Is(err, ErrConnectionClosed) { + h.log.Debug("attempted to send on closed connection, will remove") + } else { + h.log.Warn("error sending to connection", uuid, ":", err.Error()) + } + + h.removeConnection(uuid) + } + }() + } + + return nil + } +} + +func (h *hub) incomingMessageHandler(uuid string) ReceiveFunc { + return func(msg Message) { + h.log.Debug("received message ", msg.UUID(), "from node", uuid) + + h.pod.Send(msg) + } +} + +func (h *hub) addConnection(connection Connection, uuid string) { + h.lock.Lock() + defer h.lock.Unlock() + + h.log.Debug("adding connection for", uuid) + + connection.Start(h.incomingMessageHandler(uuid)) + + h.connections[uuid] = connection +} + +func (h *hub) replaceConnection(newConnection Connection, uuid string) { + h.lock.Lock() + defer h.lock.Unlock() + + h.log.Debug("replacing connection for", uuid) + + delete(h.connections, uuid) + + newConnection.Start(h.incomingMessageHandler(uuid)) + + h.connections[uuid] = newConnection +} + +func (h *hub) removeConnection(uuid string) { + h.lock.Lock() + defer h.lock.Unlock() + + h.log.Debug("removing connection for", uuid) + + delete(h.connections, uuid) +} + +func (h *hub) findConnection(uuid string) (Connection, bool) { + h.lock.RLock() + defer h.lock.RUnlock() + + conn, exists := h.connections[uuid] + + return conn, exists +} diff --git a/vendor/github.com/suborbital/grav/grav/message.go b/vendor/github.com/suborbital/grav/grav/message.go index e7ef9670..872ecbb8 100644 --- a/vendor/github.com/suborbital/grav/grav/message.go +++ b/vendor/github.com/suborbital/grav/grav/message.go @@ -31,8 +31,6 @@ type Message interface { ReplyTo() string // Allow setting a message UUID that this message is a response to SetReplyTo(string) - // Get a MessageTicket that references this message - Ticket() MessageTicket // Type of message (application-specific) Type() string // Time the message was sent @@ -47,11 +45,6 @@ type Message interface { Unmarshal([]byte) error } -// MessageTicket represents a "ticket" that references a message that was sent with the hopes of getting a response -type MessageTicket struct { - UUID string -} - // NewMsg creates a new Message with the built-in `_message` type func NewMsg(msgType string, data []byte) Message { return new(msgType, "", data) @@ -63,7 +56,7 @@ func NewMsgWithParentID(msgType, parentID string, data []byte) Message { } // NewMsgReplyTo creates a new message in response to a previous message -func NewMsgReplyTo(ticket MessageTicket, msgType string, data []byte) Message { +func NewMsgReplyTo(ticket MsgReceipt, msgType string, data []byte) Message { m := new(msgType, "", data) m.SetReplyTo(ticket.UUID) @@ -147,14 +140,6 @@ func (m *_message) SetReplyTo(uuid string) { m.Meta.ReplyTo = uuid } -func (m *_message) Ticket() MessageTicket { - t := MessageTicket{ - UUID: m.Meta.UUID, - } - - return t -} - func (m *_message) Type() string { return m.Meta.MsgType } diff --git a/vendor/github.com/suborbital/grav/grav/options.go b/vendor/github.com/suborbital/grav/grav/options.go index bd9a34d2..c5b4ebe0 100644 --- a/vendor/github.com/suborbital/grav/grav/options.go +++ b/vendor/github.com/suborbital/grav/grav/options.go @@ -5,9 +5,10 @@ import "github.com/suborbital/vektor/vlog" // Options represent Grav options type Options struct { Logger *vlog.Logger - Port string Transport Transport Discovery Discovery + Port string + URI string } // OptionsModifier is function that modifies an option @@ -30,17 +31,24 @@ func UseLogger(logger *vlog.Logger) OptionsModifier { } } -// UsePort sets the port that will be advertised by discovery -func UsePort(port string) OptionsModifier { +// UseTransport sets the transport plugin to be used. +func UseTransport(transport Transport) OptionsModifier { return func(o *Options) { - o.Port = port + o.Transport = transport } } -// UseTransport sets the transport plugin to be used -func UseTransport(transport Transport) OptionsModifier { +// UseEndpoint sets the endpoint settings for the instance to broadcast for discovery +// Pass empty strings for either if you would like to keep the defaults (8080 and /meta/message) +func UseEndpoint(port, uri string) OptionsModifier { return func(o *Options) { - o.Transport = transport + if port != "" { + o.Port = port + } + + if uri != "" { + o.URI = uri + } } } @@ -55,6 +63,7 @@ func defaultOptions() *Options { o := &Options{ Logger: vlog.Default(), Port: "8080", + URI: "/meta/message", Transport: nil, Discovery: nil, } diff --git a/vendor/github.com/suborbital/grav/grav/pod.go b/vendor/github.com/suborbital/grav/grav/pod.go index 71eef223..59723feb 100644 --- a/vendor/github.com/suborbital/grav/grav/pod.go +++ b/vendor/github.com/suborbital/grav/grav/pod.go @@ -4,7 +4,6 @@ import ( "errors" "sync" "sync/atomic" - "time" ) const ( @@ -79,69 +78,65 @@ func newPod(busChan MsgChan, opts *podOpts) *Pod { return p } -// Disconnect indicates to the bus that this pod is no longer needed and should be disconnected. -// Sending will immediately become unavailable, and the pod will soon stop recieving messages. -func (p *Pod) Disconnect() { - // stop future messages from being sent and then indicate to the bus that disconnection is desired - // The bus will close the busChan, which will cause the onFunc listener to quit. - p.dead.Store(false) - p.feedbackChan <- podFeedbackMsgDisconnect -} - // Send emits a message to be routed to the bus -func (p *Pod) Send(msg Message) { +// If the returned ticket is nil, it means the pod was unable to send +// It is safe to call methods on a nil ticket, they will error with ErrNoTicket +// This means error checking can be done on a chained call such as err := p.Send(msg).Wait(...) +func (p *Pod) Send(msg Message) *MsgReceipt { // check to see if the pod has died (aka disconnected) if p.dead.Load().(bool) == true { - return + return nil } p.FilterUUID(msg.UUID(), false) // don't allow the same message to bounce back through this pod p.busChan <- msg -} -// ReplyTo sends a response to a message -func (p *Pod) ReplyTo(ticket MessageTicket, msg Message) { - msg.SetReplyTo(ticket.UUID) + t := &MsgReceipt{ + UUID: msg.UUID(), + pod: p, + } - p.Send(msg) + return t } -// SendAndWaitOnReply sends a message and then blocks until a message is recieved in ReplyTo that message -func (p *Pod) SendAndWaitOnReply(msg Message, onFunc MsgFunc, timeoutSeconds ...int) error { - ticket := msg.Ticket() - - p.Send(msg) +// ReplyTo sends a response to a message. The reply message's ticket is returned. +func (p *Pod) ReplyTo(inReplyTo Message, msg Message) *MsgReceipt { + msg.SetReplyTo(inReplyTo.UUID()) - return p.WaitOnReply(ticket, onFunc, timeoutSeconds...) + return p.Send(msg) } // On sets the function to be called whenever this pod recieves a message from the bus. If nil is passed, the pod will ignore all messages. // Calling On multiple times causes the function to be overwritten. To recieve using two different functions, create two pods. +// Errors returned from the onFunc are interpreted as problems handling messages. Too many errors will result in the pod being disconnected. +// Failed messages will be replayed when messages begin to succeed. Returning an error is inadvisable unless there is a real problem handling messages. func (p *Pod) On(onFunc MsgFunc) { p.onFuncLock.Lock() defer p.onFuncLock.Unlock() - // reset the message filter when the onFunc is changed - p.messageFilter = newMessageFilter() - p.setOnFunc(onFunc) } -// OnType sets the function to be called whenever this pod recieves a message and sets the pod's filter to only include certain message types -func (p *Pod) OnType(onFunc MsgFunc, msgTypes ...string) { +// OnType sets the function to be called whenever this pod recieves a message and sets the pod's filter to only receive certain message types. +// The same rules as `On` about error handling apply to OnType. +func (p *Pod) OnType(msgType string, onFunc MsgFunc) { p.onFuncLock.Lock() defer p.onFuncLock.Unlock() - // reset the message filter when the onFunc is changed - p.messageFilter = newMessageFilter() - p.TypeInclusive = false // only allow the listed types + p.setOnFunc(onFunc) - for _, t := range msgTypes { - p.FilterType(t, true) - } + p.FilterType(msgType, true) + p.TypeInclusive = false // only allow the listed types +} - p.setOnFunc(onFunc) +// Disconnect indicates to the bus that this pod is no longer needed and should be disconnected. +// Sending will immediately become unavailable, and the pod will soon stop recieving messages. +func (p *Pod) Disconnect() { + // stop future messages from being sent and then indicate to the bus that disconnection is desired + // The bus will close the busChan, which will cause the onFunc listener to quit. + p.dead.Store(true) + p.feedbackChan <- podFeedbackMsgDisconnect } // ErrMsgNotWanted is used by WaitOn to determine if the current message is what's being waited on @@ -154,14 +149,29 @@ var ErrWaitTimeout = errors.New("waited past timeout") // something other than ErrMsgNotWanted. WaitOn should be used if there is a need to wait for a particular message. // When the onFunc returns something other than ErrMsgNotWanted (such as nil or a different error), WaitOn will return and set // the onFunc to nil. If an error other than ErrMsgNotWanted is returned from the onFunc, it will be propogated to the caller. -// An optional timeout (default 10s) can be provided (only the first value will be used). If the timeout is exceeded, ErrWaitTimeout is returned. -func (p *Pod) WaitOn(onFunc MsgFunc, timeoutSeconds ...int) error { +// WaitOn will block forever if the desired message is never found. Use WaitUntil if a timeout is desired. +func (p *Pod) WaitOn(onFunc MsgFunc) error { + return p.WaitUntil(nil, onFunc) +} + +// WaitUntil takes a function to be called whenever this pod recieves a message and blocks until that function returns +// something other than ErrMsgNotWanted. WaitOn should be used if there is a need to wait for a particular message. +// When the onFunc returns something other than ErrMsgNotWanted (such as nil or a different error), WaitUntil will return and set +// the onFunc to nil. If an error other than ErrMsgNotWanted is returned from the onFunc, it will be propogated to the caller. +// A timeout can be provided. If the timeout is non-nil and greater than 0, ErrWaitTimeout is returned if the time is exceeded. +func (p *Pod) WaitUntil(timeout TimeoutFunc, onFunc MsgFunc) error { p.onFuncLock.Lock() errChan := make(chan error) p.setOnFunc(func(msg Message) error { - if err := onFunc(msg); err != ErrMsgNotWanted { + if err := onFunc(msg); err != nil { + if err == ErrMsgNotWanted { + return nil // don't do anything + } + errChan <- err + } else { + errChan <- nil } return nil @@ -169,17 +179,15 @@ func (p *Pod) WaitOn(onFunc MsgFunc, timeoutSeconds ...int) error { p.onFuncLock.Unlock() // can't stay locked here or the onFunc will never be called - timeout := 3 - if timeoutSeconds != nil { - timeout = timeoutSeconds[0] - } - var onFuncErr error + if timeout == nil { + timeout = Timeout(-1) + } select { case err := <-errChan: onFuncErr = err - case <-time.After(time.Second * time.Duration(timeout)): + case <-timeout(): onFuncErr = ErrWaitTimeout } @@ -191,13 +199,13 @@ func (p *Pod) WaitOn(onFunc MsgFunc, timeoutSeconds ...int) error { return onFuncErr } -// WaitOnReply waits on a reply message to arrive at the pod and then calls onFunc with that message. +// waitOnReply waits on a reply message to arrive at the pod and then calls onFunc with that message. // If the onFunc produces an error, it will be propogated to the caller. -// an optionsl timrout can be provided (only the first value will be used) -func (p *Pod) WaitOnReply(ticket MessageTicket, onFunc MsgFunc, timeoutSeconds ...int) error { +// If a non-nil timeout greater than 0 is passed, the function will return ErrWaitTimeout if the timeout elapses. +func (p *Pod) waitOnReply(ticket *MsgReceipt, timeout TimeoutFunc, onFunc MsgFunc) error { var reply Message - if err := p.WaitOn(func(msg Message) error { + if err := p.WaitUntil(timeout, func(msg Message) error { if msg.ReplyTo() != ticket.UUID { return ErrMsgNotWanted } @@ -205,7 +213,7 @@ func (p *Pod) WaitOnReply(ticket MessageTicket, onFunc MsgFunc, timeoutSeconds . reply = msg return nil - }, timeoutSeconds...); err != nil { + }); err != nil { return err } @@ -214,8 +222,12 @@ func (p *Pod) WaitOnReply(ticket MessageTicket, onFunc MsgFunc, timeoutSeconds . // setOnFunc sets the OnFunc. THIS DOES NOT LOCK. THE CALLER MUST LOCK. func (p *Pod) setOnFunc(on MsgFunc) { + // reset the message filter when the onFunc is changed + p.messageFilter = newMessageFilter() + p.onFunc = on + // request replay from the bus if needed if on != nil { p.opts.replayOnce.Do(func() { if p.opts.WantsReplay { diff --git a/vendor/github.com/suborbital/grav/grav/receipt.go b/vendor/github.com/suborbital/grav/grav/receipt.go new file mode 100644 index 00000000..25bfa719 --- /dev/null +++ b/vendor/github.com/suborbital/grav/grav/receipt.go @@ -0,0 +1,44 @@ +package grav + +import "github.com/pkg/errors" + +// ErrNoReceipt is returned when a method is called on a nil ticket +var ErrNoReceipt = errors.New("message receipt is nil") + +// MsgReceipt represents a "ticket" that references a message that was sent with the hopes of getting a response +// The embedded pod is a pointer to the pod that sent the original message, and therefore any ticket methods used +// will replace the OnFunc of the pod. +type MsgReceipt struct { + UUID string + pod *Pod +} + +// WaitOn will block until a response to the message is recieved and passes it to the provided onFunc. +// onFunc errors are propogated to the caller. +func (m *MsgReceipt) WaitOn(onFunc MsgFunc) error { + return m.WaitUntil(nil, onFunc) +} + +// WaitUntil will block until a response to the message is recieved and passes it to the provided onFunc. +// ErrWaitTimeout is returned if the timeout elapses, onFunc errors are propogated to the caller. +func (m *MsgReceipt) WaitUntil(timeout TimeoutFunc, onFunc MsgFunc) error { + if m == nil { + return ErrNoReceipt + } + + return m.pod.waitOnReply(m, timeout, onFunc) +} + +// OnReply will set the pod's OnFunc to the provided MsgFunc and set it to run asynchronously when a reply is received +// onFunc errors are discarded. +func (m *MsgReceipt) OnReply(mfn MsgFunc) error { + if m == nil { + return ErrNoReceipt + } + + go func() { + m.pod.waitOnReply(m, nil, mfn) + }() + + return nil +} diff --git a/vendor/github.com/suborbital/grav/grav/timeout.go b/vendor/github.com/suborbital/grav/grav/timeout.go new file mode 100644 index 00000000..329d42fd --- /dev/null +++ b/vendor/github.com/suborbital/grav/grav/timeout.go @@ -0,0 +1,28 @@ +package grav + +import "time" + +// TimeoutFunc is a function that takes a value (a number of seconds) and returns a channel that fires after that given amount of time +type TimeoutFunc func() chan time.Time + +// Timeout returns a function that returns a channel that fires after the provided number of seconds have elapsed +// if the value passed is less than or equal to 0, the timeout will never fire +func Timeout(seconds int) TimeoutFunc { + return func() chan time.Time { + tChan := make(chan time.Time) + + if seconds > 0 { + go func() { + duration := time.Second * time.Duration(seconds) + tChan <- <-time.After(duration) + }() + } + + return tChan + } +} + +// TO is a shorthand for Timeout +func TO(seconds int) TimeoutFunc { + return Timeout(seconds) +} diff --git a/vendor/github.com/suborbital/grav/grav/transport.go b/vendor/github.com/suborbital/grav/grav/transport.go index deb9b126..4328cb6b 100644 --- a/vendor/github.com/suborbital/grav/grav/transport.go +++ b/vendor/github.com/suborbital/grav/grav/transport.go @@ -1,6 +1,9 @@ package grav -import "github.com/suborbital/vektor/vlog" +import ( + "github.com/pkg/errors" + "github.com/suborbital/vektor/vlog" +) // TransportMsgTypeHandshake and others represent internal Transport message types used for handshakes and metadata transfer const ( @@ -8,25 +11,53 @@ const ( TransportMsgTypeUser = 2 ) -// ConnectFunc represents a function that returns a pod conntected to Grav -type ConnectFunc func() *Pod +// ErrConnectionClosed and others are transport and connection related errors +var ( + ErrConnectionClosed = errors.New("connection was closed") + ErrNodeUUIDMismatch = errors.New("handshake UUID did not match node UUID") +) + +type ( + // ReceiveFunc is a function that allows passing along a received message + ReceiveFunc func(msg Message) + // ConnectFunc is a function that provides a new Connection + ConnectFunc func(Connection) + // FindFunc allows a Transport to query Grav for an active connection for the given UUID + FindFunc func(uuid string) (Connection, bool) +) // TransportOpts is a set of options for transports type TransportOpts struct { NodeUUID string Port string + URI string Logger *vlog.Logger Custom interface{} } // Transport represents a Grav transport plugin type Transport interface { - // Serve is a transport-specific function that exposes a connection point - Serve(*TransportOpts, ConnectFunc) error - // ConnectEndpoint indicates to the Transport that a connection to a remote endpoint is needed - ConnectEndpoint(string, ConnectFunc) error - // ConnectEndpointWithUUID connects to an endpoint with a known identifier - ConnectEndpointWithUUID(string, string, ConnectFunc) error + // Setup is a transport-specific function that allows bootstrapping + // Setup can block forever if needed; for example if a webserver is bring run + Setup(opts *TransportOpts, connFunc ConnectFunc, findFunc FindFunc) error + // CreateConnection connects to an endpoint and returns + CreateConnection(endpoint string) (Connection, error) +} + +// Connection represents a connection to another node +type Connection interface { + // Called when the connection handshake is complete and the connection can actively start exchanging messages + Start(recvFunc ReceiveFunc) + // Send a message from the local instance to the connected node + Send(msg Message) error + // CanReplace returns true if the connection can be replaced (i.e. is not a persistent connection like a websocket) + CanReplace() bool + // Initiate a handshake for an outgoing connection and return the remote Ack + DoOutgoingHandshake(handshake *TransportHandshake) (*TransportHandshakeAck, error) + // Wait for an incoming handshake and return the provided Ack to the remote connection + DoIncomingHandshake(handshakeAck *TransportHandshakeAck) (*TransportHandshake, error) + // Close requests that the Connection close itself + Close() } // TransportHandshake represents a handshake sent to a node that you're trying to connect to diff --git a/vendor/github.com/suborbital/hive-wasm/directive/directive.go b/vendor/github.com/suborbital/hive-wasm/directive/directive.go index eb4fc6aa..6a8246b1 100644 --- a/vendor/github.com/suborbital/hive-wasm/directive/directive.go +++ b/vendor/github.com/suborbital/hive-wasm/directive/directive.go @@ -3,6 +3,7 @@ package directive import ( "errors" "fmt" + "strings" "golang.org/x/mod/semver" "gopkg.in/yaml.v2" @@ -23,19 +24,13 @@ const ( type Directive struct { Identifier string `yaml:"identifier"` Version string `yaml:"version"` - Functions []Function `yaml:"functions"` + Runnables []Runnable `yaml:"runnables"` Handlers []Handler `yaml:"handlers,omitempty"` // "fully qualified function names" fqfns map[string]string `yaml:"-"` } -// Function describes a function present inside of a bundle -type Function struct { - Name string - NameSpace string -} - // Handler represents the mapping between an input and a composition of functions type Handler struct { Input Input `yaml:"input,inline"` @@ -52,8 +47,24 @@ type Input struct { // Executable represents an executable step in a handler type Executable struct { - Group []string `yaml:"group,omitempty"` - Fn string `yaml:"fn,omitempty"` + CallableFn `yaml:"callableFn,inline"` + Group []CallableFn `yaml:"group,omitempty"` +} + +// CallableFn is a fn along with its "variable name" and "args" +type CallableFn struct { + Fn string `yaml:"fn,omitempty"` + As string `yaml:"as,omitempty"` + With []string `yaml:"with,omitempty"` + DesiredState []Alias `yaml:"-"` +} + +// Alias is the parsed version of an entry in the `With` array from a CallableFn +// If you do user: activeUser, then activeUser is the state key and user +// is the key that gets put into the function's state (i.e. the alias) +type Alias struct { + Key string + Alias string } // Marshal outputs the YAML bytes of the Directive @@ -93,14 +104,14 @@ func (d *Directive) Validate() error { problems.add(errors.New("version is not a valid semver")) } - if len(d.Functions) < 1 { + if len(d.Runnables) < 1 { problems.add(errors.New("no functions listed")) } fns := map[string]bool{} - for i, f := range d.Functions { - namespaced := fmt.Sprintf("%s#%s", f.NameSpace, f.Name) + for i, f := range d.Runnables { + namespaced := fmt.Sprintf("%s#%s", f.Namespace, f.Name) if _, exists := fns[namespaced]; exists { problems.add(fmt.Errorf("duplicate fn %s found", namespaced)) @@ -116,12 +127,12 @@ func (d *Directive) Validate() error { problems.add(fmt.Errorf("function at position %d missing name", i)) continue } - if f.NameSpace == "" { + if f.Namespace == "" { problems.add(fmt.Errorf("function at position %d missing namespace", i)) } // if the fn is in the default namespace, let it exist "naked" and namespaced - if f.NameSpace == NamespaceDefault { + if f.Namespace == NamespaceDefault { fns[f.Name] = true fns[namespaced] = true } else { @@ -147,21 +158,49 @@ func (d *Directive) Validate() error { continue } + // keep track of the functions that have run so far at each step + fullState := map[string]bool{} + for j, s := range h.Steps { + fnsToAdd := []string{} + if !s.IsFn() && !s.IsGroup() { problems.add(fmt.Errorf("step at position %d for handler handler at position %d has neither Fn or Group", j, i)) } - if s.IsFn() { - if _, exists := fns[s.Fn]; !exists { + validateFn := func(fn CallableFn) { + if _, exists := fns[fn.Fn]; !exists { problems.add(fmt.Errorf("handler at positiion %d lists fn at step %d that does not exist: %s (did you forget a namespace?)", i, j, s.Fn)) } - } else if s.IsGroup() { - for k, gfn := range s.Group { - if _, exists := fns[gfn]; !exists { - problems.add(fmt.Errorf("handler at positiion %d lists fn at position %d in group at step %d that does not exist: %s (did you forget a namespace?)", i, k, j, gfn)) + + if _, err := fn.ParseWith(); err != nil { + problems.add(fmt.Errorf("handler at position %d has invalid 'with' value at step %d: %s", i, j, err.Error())) + } + + for _, d := range fn.DesiredState { + if _, exists := fullState[d.Key]; !exists { + problems.add(fmt.Errorf("handler at position %d has 'with' value at step %d referencing a key that is not yet available in the handler's state: %s", i, j, d.Key)) } } + + key := fn.Fn + if fn.As != "" { + key = fn.As + } + + fnsToAdd = append(fnsToAdd, key) + } + + if s.IsFn() { + validateFn(s.CallableFn) + } else { + for _, gfn := range s.Group { + validateFn(gfn) + } + } + + for _, newFn := range fnsToAdd { + fullState[newFn] = true } } @@ -169,8 +208,8 @@ func (d *Directive) Validate() error { if h.Response == "" && lastStep.IsGroup() { problems.add(fmt.Errorf("handler at position %d has group as last step but does not include 'response' field", i)) } else if h.Response != "" { - if _, exists := fns[h.Response]; !exists { - problems.add(fmt.Errorf("handler at positiion %d lists response fn name that does not exist: %s", i, h.Response)) + if _, exists := fullState[h.Response]; !exists { + problems.add(fmt.Errorf("handler at positiion %d lists response state key that does not exist: %s", i, h.Response)) } } } @@ -181,15 +220,15 @@ func (d *Directive) Validate() error { func (d *Directive) calculateFQFNs() { d.fqfns = map[string]string{} - for _, fn := range d.Functions { - namespaced := fmt.Sprintf("%s#%s", fn.NameSpace, fn.Name) + for _, fn := range d.Runnables { + namespaced := fmt.Sprintf("%s#%s", fn.Namespace, fn.Name) // if the function is in the default namespace, add it to the map both namespaced and not - if fn.NameSpace == NamespaceDefault { - d.fqfns[fn.Name] = d.fqfnForFunc(fn.NameSpace, fn.Name) - d.fqfns[namespaced] = d.fqfnForFunc(fn.NameSpace, fn.Name) + if fn.Namespace == NamespaceDefault { + d.fqfns[fn.Name] = d.fqfnForFunc(fn.Namespace, fn.Name) + d.fqfns[namespaced] = d.fqfnForFunc(fn.Namespace, fn.Name) } else { - d.fqfns[namespaced] = d.fqfnForFunc(fn.NameSpace, fn.Name) + d.fqfns[namespaced] = d.fqfnForFunc(fn.Namespace, fn.Name) } } } @@ -208,6 +247,26 @@ func (e *Executable) IsFn() bool { return e.Fn != "" && e.Group == nil } +// ParseWith parses the fn's 'with' clause and returns the desired state +func (c *CallableFn) ParseWith() ([]Alias, error) { + if c.DesiredState != nil && len(c.DesiredState) > 0 { + return c.DesiredState, nil + } + + c.DesiredState = make([]Alias, len(c.With)) + + for i, w := range c.With { + parts := strings.Split(w, ": ") + if len(parts) != 2 { + return nil, fmt.Errorf("with value has wrong format: parsed %d parts seperated by : , expected 2", len(parts)) + } + + c.DesiredState[i] = Alias{Alias: parts[0], Key: parts[1]} + } + + return c.DesiredState, nil +} + type problems []error func (p *problems) add(err error) { diff --git a/vendor/github.com/suborbital/hive-wasm/directive/runnable.go b/vendor/github.com/suborbital/hive-wasm/directive/runnable.go new file mode 100644 index 00000000..ade22341 --- /dev/null +++ b/vendor/github.com/suborbital/hive-wasm/directive/runnable.go @@ -0,0 +1,9 @@ +package directive + +// Runnable is the structure of a .runnable.yaml file +type Runnable struct { + Name string `yaml:"name"` + Namespace string `yaml:"namespace"` + Lang string `yaml:"lang"` + APIVersion string `yaml:"apiVersion,omitempty"` +} diff --git a/vendor/github.com/suborbital/hive/hive/hive.go b/vendor/github.com/suborbital/hive/hive/hive.go index 119abe5e..45808d8a 100644 --- a/vendor/github.com/suborbital/hive/hive/hive.go +++ b/vendor/github.com/suborbital/hive/hive/hive.go @@ -9,9 +9,11 @@ import ( "github.com/suborbital/vektor/vlog" ) +// MsgTypeHiveJobErr and others are Grav message types used for Hive job const ( - msgTypeHiveJobErr = "hive.joberr" - msgTypeHiveResult = "hive.result" + MsgTypeHiveJobErr = "hive.joberr" + MsgTypeHiveResult = "hive.result" + MsgTypeHiveNilResult = "hive.nil" ) // JobFunc is a function that runs a job of a predetermined type @@ -71,39 +73,43 @@ func (h *Hive) Listen(pod *grav.Pod, msgType string) { return h.Do(job) } - pod.OnType(func(msg grav.Message) error { + pod.OnType(msgType, func(msg grav.Message) error { var replyMsg grav.Message result, err := helper(msg.Data()).Then() if err != nil { - h.log.Error(errors.Wrap(err, "job returned error result")) - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(err.Error())) + h.log.Error(errors.Wrapf(err, "job from message %s returned error result", msg.UUID())) + replyMsg = grav.NewMsg(MsgTypeHiveJobErr, []byte(err.Error())) } else { if result == nil { - return nil - } - - if resultMsg, isMsg := result.(grav.Message); isMsg { + // if the job returned no result + replyMsg = grav.NewMsg(MsgTypeHiveNilResult, []byte{}) + } else if resultMsg, isMsg := result.(grav.Message); isMsg { + // if the job returned a Grav message resultMsg.SetReplyTo(msg.UUID()) replyMsg = resultMsg } else if bytes, isBytes := result.([]byte); isBytes { - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, bytes) + // if the job returned bytes + replyMsg = grav.NewMsg(MsgTypeHiveResult, bytes) } else if resultString, isString := result.(string); isString { - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, []byte(resultString)) + // if the job returned a string + replyMsg = grav.NewMsg(MsgTypeHiveResult, []byte(resultString)) } else { + // if the job returned something else like a struct resultJSON, err := json.Marshal(result) if err != nil { - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveJobErr, []byte(errors.Wrap(err, "failed to Marshal job result").Error())) + h.log.Error(errors.Wrapf(err, "job from message %s returned result that could not be JSON marshalled", msg.UUID())) + replyMsg = grav.NewMsg(MsgTypeHiveJobErr, []byte(errors.Wrap(err, "failed to Marshal job result").Error())) } - replyMsg = grav.NewMsgReplyTo(msg.Ticket(), msgTypeHiveResult, resultJSON) + replyMsg = grav.NewMsg(MsgTypeHiveResult, resultJSON) } } - pod.Send(replyMsg) + pod.ReplyTo(msg, replyMsg) return nil - }, msgType) + }) } // Job is a shorter alias for NewJob diff --git a/vendor/golang.org/x/crypto/acme/acme.go b/vendor/golang.org/x/crypto/acme/acme.go index 6e6c9d13..174cfe8b 100644 --- a/vendor/golang.org/x/crypto/acme/acme.go +++ b/vendor/golang.org/x/crypto/acme/acme.go @@ -363,6 +363,10 @@ func AcceptTOS(tosURL string) bool { return true } // Also see Error's Instance field for when a CA requires already registered accounts to agree // to an updated Terms of Service. func (c *Client) Register(ctx context.Context, acct *Account, prompt func(tosURL string) bool) (*Account, error) { + if c.Key == nil { + return nil, errors.New("acme: client.Key must be set to Register") + } + dir, err := c.Discover(ctx) if err != nil { return nil, err diff --git a/vendor/golang.org/x/crypto/acme/jws.go b/vendor/golang.org/x/crypto/acme/jws.go index 76e3fdac..8c3eccec 100644 --- a/vendor/golang.org/x/crypto/acme/jws.go +++ b/vendor/golang.org/x/crypto/acme/jws.go @@ -7,6 +7,7 @@ package acme import ( "crypto" "crypto/ecdsa" + "crypto/hmac" "crypto/rand" "crypto/rsa" "crypto/sha256" @@ -14,6 +15,7 @@ import ( "encoding/asn1" "encoding/base64" "encoding/json" + "errors" "fmt" "math/big" ) @@ -31,6 +33,14 @@ const noKeyID = keyID("") // See https://tools.ietf.org/html/rfc8555#section-6.3 for more details. const noPayload = "" +// jsonWebSignature can be easily serialized into a JWS following +// https://tools.ietf.org/html/rfc7515#section-3.2. +type jsonWebSignature struct { + Protected string `json:"protected"` + Payload string `json:"payload"` + Sig string `json:"signature"` +} + // jwsEncodeJSON signs claimset using provided key and a nonce. // The result is serialized in JSON format containing either kid or jwk // fields based on the provided keyID value. @@ -71,12 +81,7 @@ func jwsEncodeJSON(claimset interface{}, key crypto.Signer, kid keyID, nonce, ur if err != nil { return nil, err } - - enc := struct { - Protected string `json:"protected"` - Payload string `json:"payload"` - Sig string `json:"signature"` - }{ + enc := jsonWebSignature{ Protected: phead, Payload: payload, Sig: base64.RawURLEncoding.EncodeToString(sig), @@ -84,6 +89,43 @@ func jwsEncodeJSON(claimset interface{}, key crypto.Signer, kid keyID, nonce, ur return json.Marshal(&enc) } +// jwsWithMAC creates and signs a JWS using the given key and the HS256 +// algorithm. kid and url are included in the protected header. rawPayload +// should not be base64-URL-encoded. +func jwsWithMAC(key []byte, kid, url string, rawPayload []byte) (*jsonWebSignature, error) { + if len(key) == 0 { + return nil, errors.New("acme: cannot sign JWS with an empty MAC key") + } + header := struct { + Algorithm string `json:"alg"` + KID string `json:"kid"` + URL string `json:"url,omitempty"` + }{ + // Only HMAC-SHA256 is supported. + Algorithm: "HS256", + KID: kid, + URL: url, + } + rawProtected, err := json.Marshal(header) + if err != nil { + return nil, err + } + protected := base64.RawURLEncoding.EncodeToString(rawProtected) + payload := base64.RawURLEncoding.EncodeToString(rawPayload) + + h := hmac.New(sha256.New, key) + if _, err := h.Write([]byte(protected + "." + payload)); err != nil { + return nil, err + } + mac := h.Sum(nil) + + return &jsonWebSignature{ + Protected: protected, + Payload: payload, + Sig: base64.RawURLEncoding.EncodeToString(mac), + }, nil +} + // jwkEncode encodes public part of an RSA or ECDSA key into a JWK. // The result is also suitable for creating a JWK thumbprint. // https://tools.ietf.org/html/rfc7517 diff --git a/vendor/golang.org/x/crypto/acme/rfc8555.go b/vendor/golang.org/x/crypto/acme/rfc8555.go index dfb57a66..073cee58 100644 --- a/vendor/golang.org/x/crypto/acme/rfc8555.go +++ b/vendor/golang.org/x/crypto/acme/rfc8555.go @@ -37,22 +37,32 @@ func (c *Client) DeactivateReg(ctx context.Context) error { return nil } -// registerRFC is quivalent to c.Register but for CAs implementing RFC 8555. +// registerRFC is equivalent to c.Register but for CAs implementing RFC 8555. // It expects c.Discover to have already been called. -// TODO: Implement externalAccountBinding. func (c *Client) registerRFC(ctx context.Context, acct *Account, prompt func(tosURL string) bool) (*Account, error) { c.cacheMu.Lock() // guard c.kid access defer c.cacheMu.Unlock() req := struct { - TermsAgreed bool `json:"termsOfServiceAgreed,omitempty"` - Contact []string `json:"contact,omitempty"` + TermsAgreed bool `json:"termsOfServiceAgreed,omitempty"` + Contact []string `json:"contact,omitempty"` + ExternalAccountBinding *jsonWebSignature `json:"externalAccountBinding,omitempty"` }{ Contact: acct.Contact, } if c.dir.Terms != "" { req.TermsAgreed = prompt(c.dir.Terms) } + + // set 'externalAccountBinding' field if requested + if acct.ExternalAccountBinding != nil { + eabJWS, err := c.encodeExternalAccountBinding(acct.ExternalAccountBinding) + if err != nil { + return nil, fmt.Errorf("acme: failed to encode external account binding: %v", err) + } + req.ExternalAccountBinding = eabJWS + } + res, err := c.post(ctx, c.Key, c.dir.RegURL, req, wantStatus( http.StatusOK, // account with this key already registered http.StatusCreated, // new account created @@ -75,7 +85,17 @@ func (c *Client) registerRFC(ctx context.Context, acct *Account, prompt func(tos return a, nil } -// updateGegRFC is equivalent to c.UpdateReg but for CAs implementing RFC 8555. +// encodeExternalAccountBinding will encode an external account binding stanza +// as described in https://tools.ietf.org/html/rfc8555#section-7.3.4. +func (c *Client) encodeExternalAccountBinding(eab *ExternalAccountBinding) (*jsonWebSignature, error) { + jwk, err := jwkEncode(c.Key.Public()) + if err != nil { + return nil, err + } + return jwsWithMAC(eab.Key, eab.KID, c.dir.RegURL, []byte(jwk)) +} + +// updateRegRFC is equivalent to c.UpdateReg but for CAs implementing RFC 8555. // It expects c.Discover to have already been called. func (c *Client) updateRegRFC(ctx context.Context, a *Account) (*Account, error) { url := string(c.accountKID(ctx)) diff --git a/vendor/golang.org/x/crypto/acme/types.go b/vendor/golang.org/x/crypto/acme/types.go index e959cafc..e751bf52 100644 --- a/vendor/golang.org/x/crypto/acme/types.go +++ b/vendor/golang.org/x/crypto/acme/types.go @@ -199,6 +199,28 @@ type Account struct { // // It is non-RFC 8555 compliant and is obsoleted by OrdersURL. Certificates string + + // ExternalAccountBinding represents an arbitrary binding to an account of + // the CA which the ACME server is tied to. + // See https://tools.ietf.org/html/rfc8555#section-7.3.4 for more details. + ExternalAccountBinding *ExternalAccountBinding +} + +// ExternalAccountBinding contains the data needed to form a request with +// an external account binding. +// See https://tools.ietf.org/html/rfc8555#section-7.3.4 for more details. +type ExternalAccountBinding struct { + // KID is the Key ID of the symmetric MAC key that the CA provides to + // identify an external account from ACME. + KID string + + // Key is the bytes of the symmetric key that the CA provides to identify + // the account. Key must correspond to the KID. + Key []byte +} + +func (e *ExternalAccountBinding) String() string { + return fmt.Sprintf("&{KID: %q, Key: redacted}", e.KID) } // Directory is ACME server discovery data. diff --git a/vendor/modules.txt b/vendor/modules.txt index c8d34acb..a319fa4e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,5 @@ -# github.com/google/uuid v1.1.2 +# github.com/google/uuid v1.1.3 +## explicit github.com/google/uuid # github.com/inconshreveable/mousetrap v1.0.0 github.com/inconshreveable/mousetrap @@ -14,13 +15,13 @@ github.com/sethvargo/go-envconfig github.com/spf13/cobra # github.com/spf13/pflag v1.0.5 github.com/spf13/pflag -# github.com/suborbital/grav v0.1.1 +# github.com/suborbital/grav v0.3.0 ## explicit github.com/suborbital/grav/grav -# github.com/suborbital/hive v0.1.4 +# github.com/suborbital/hive v0.1.5 ## explicit github.com/suborbital/hive/hive -# github.com/suborbital/hive-wasm v0.2.5 +# github.com/suborbital/hive-wasm v0.2.6 ## explicit github.com/suborbital/hive-wasm/bundle github.com/suborbital/hive-wasm/directive @@ -31,14 +32,14 @@ github.com/suborbital/vektor/vk github.com/suborbital/vektor/vlog # github.com/wasmerio/wasmer-go v0.3.2-0.20200903143934-299b1d478cde github.com/wasmerio/wasmer-go/wasmer -# golang.org/x/crypto v0.0.0-20201208171446-5f87f3452ae9 +# golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad ## explicit golang.org/x/crypto/acme golang.org/x/crypto/acme/autocert # golang.org/x/mod v0.4.0 ## explicit golang.org/x/mod/semver -# golang.org/x/net v0.0.0-20201209123823-ac852fbbde11 +# golang.org/x/net v0.0.0-20201224014010-6772e930b67b ## explicit golang.org/x/net/idna # golang.org/x/sync v0.0.0-20201207232520-09787c993a3a