Skip to content

Commit

Permalink
Merge pull request #6 from suborbital/connor/as-with
Browse files Browse the repository at this point in the history
Support `as` and `with`
  • Loading branch information
cohix committed Jan 1, 2021
2 parents 20689e2 + 8a4d645 commit d8704a5
Show file tree
Hide file tree
Showing 32 changed files with 898 additions and 306 deletions.
33 changes: 33 additions & 0 deletions .github/workflows/sanity.yml
@@ -0,0 +1,33 @@
name: Testapalooza

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:

build:
name: Test
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.15
uses: actions/setup-go@v1
with:
go-version: 1.15
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Get dependencies
run: |
# have to do this nonsense to ensure that the correct arch gets downloaded for libwasmer
go get -v -t -d ./...
go mod vendor
- name: Run test
run: |
make build
3 changes: 2 additions & 1 deletion Dockerfile
Expand Up @@ -2,8 +2,9 @@ FROM golang:1.15 as builder

RUN mkdir -p /go/src/github.com/suborbital/atmo
COPY . /go/src/github.com/suborbital/atmo/
WORKDIR /go/src/github.com/suborbital/atmo/

RUN go install github.com/suborbital/atmo
RUN go install

FROM debian:buster-slim

Expand Down
3 changes: 0 additions & 3 deletions Makefile
Expand Up @@ -11,9 +11,6 @@ atmo: build
atmo/docker: build/docker
docker run -v ${PWD}/$(dir):/home/atmo -e ATMO_HTTP_PORT=8080 -p 8080:8080 atmo:dev atmo

test/run:
go run ./main.go

test/go:
go test -v --count=1 -p=1 ./...

Expand Down
143 changes: 112 additions & 31 deletions atmo/coordinator/coordinator.go
Expand Up @@ -18,11 +18,6 @@ import (
"github.com/suborbital/vektor/vlog"
)

const (
msgTypeHiveJobErr = "hive.joberr"
msgTypeHiveResult = "hive.result"
)

// Coordinator is a type that is responsible for covnerting the directive into
// usable Vektor handles by coordinating Hive jobs and meshing when needed.
type Coordinator struct {
Expand All @@ -39,9 +34,12 @@ type Coordinator struct {

// CoordinatedRequest represents a request being coordinated
type CoordinatedRequest struct {
URL string `json:"url"`
Body string `json:"body"`
State map[string]interface{} `json:"state"`
Method string `json:"method"`
URL string `json:"url"`
Body string `json:"body"`
Headers map[string]string `json:"headers"`
Params map[string]string `json:"params"`
State map[string]interface{} `json:"state"`
}

type requestScope struct {
Expand Down Expand Up @@ -78,7 +76,7 @@ func (c *Coordinator) UseBundle(bundle *bundle.Bundle) *vk.RouteGroup {
group := vk.Group("").Before(scopeMiddleware)

// connect a Grav pod to each function
for _, fn := range bundle.Directive.Functions {
for _, fn := range bundle.Directive.Runnables {
fqfn, err := bundle.Directive.FQFN(fn.Name)
if err != nil {
c.log.Error(errors.Wrapf(err, "failed to derive FQFN for Directive function %s, function will not be available", fn.Name))
Expand Down Expand Up @@ -111,26 +109,44 @@ func (c *Coordinator) vkHandlerForDirectiveHandler(handler directive.Handler) vk

defer r.Body.Close()

flatHeaders := map[string]string{}
for k, v := range r.Header {
flatHeaders[k] = v[0]
}

flatParams := map[string]string{}
for _, p := range ctx.Params {
flatParams[p.Key] = p.Value
}

req := CoordinatedRequest{
URL: r.URL.String(),
Body: string(reqBody),
State: map[string]interface{}{},
Method: r.Method,
URL: r.URL.String(),
Body: string(reqBody),
Headers: flatHeaders,
Params: flatParams,
State: map[string]interface{}{},
}

// run through the handler's steps, updating the coordinated state after each
for _, step := range handler.Steps {
stateJSON, err := req.Marshal()
stateJSON, err := stateJSONForStep(req, step)
if err != nil {
return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrap(err, "failed to Marshal Request State"))
ctx.Log.Error(errors.Wrap(err, "failed to stateJSONForStep"))
return nil, err
}

if step.IsFn() {
entry, err := c.runSingleFn(step.Fn, stateJSON, ctx)
entry, err := c.runSingleFn(step.CallableFn, stateJSON, ctx)
if err != nil {
return nil, err
}

if entry != nil {
req.State[step.Fn] = entry
// hive-wasm issue #45
key := key(step.CallableFn)

req.State[key] = entry
}
} else {
// if the step is a group, run them all concurrently and collect the results
Expand All @@ -149,43 +165,46 @@ func (c *Coordinator) vkHandlerForDirectiveHandler(handler directive.Handler) vk
}
}

func (c *Coordinator) runSingleFn(name string, body []byte, ctx *vk.Ctx) (interface{}, error) {
func (c *Coordinator) runSingleFn(fn directive.CallableFn, body []byte, ctx *vk.Ctx) (interface{}, error) {
start := time.Now()
defer func() {
duration := time.Since(start)
ctx.Log.Debug("fn", name, fmt.Sprintf("executed in %d ms", duration.Milliseconds()))
ctx.Log.Debug("fn", fn.Fn, fmt.Sprintf("executed in %d ms", duration.Milliseconds()))
}()

// calculate the FQFN
fqfn, err := c.directive.FQFN(name)
fqfn, err := c.directive.FQFN(fn.Fn)
if err != nil {
return nil, errors.Wrapf(err, "failed to FQFN for group fn %s", name)
return nil, errors.Wrapf(err, "failed to FQFN for group fn %s", fn.Fn)
}

// compose a message containing the serialized request state, and send it via Grav
// for the appropriate meshed Hive to handle. It may be handled by self if appropriate.
jobMsg := grav.NewMsg(fqfn, body)

var jobResult []byte
var jobErr error

pod := c.bus.Connect()
defer pod.Disconnect()

podErr := pod.SendAndWaitOnReply(jobMsg, func(msg grav.Message) error {
podErr := pod.Send(jobMsg).WaitUntil(grav.Timeout(3), func(msg grav.Message) error {
switch msg.Type() {
case msgTypeHiveResult:
case hive.MsgTypeHiveResult:
jobResult = msg.Data()
case msgTypeHiveJobErr:
case hive.MsgTypeHiveJobErr:
jobErr = errors.New(string(msg.Data()))
case hive.MsgTypeHiveNilResult:
// do nothing
}

return nil
})

// check for errors and results, convert to something useful, and return
// this should probably be refactored as it looks pretty goofy

if podErr != nil {
// Hive needs to be updated to reply with a message when a job returns no result
if podErr == grav.ErrWaitTimeout {
// do nothing
} else {
Expand All @@ -194,11 +213,11 @@ func (c *Coordinator) runSingleFn(name string, body []byte, ctx *vk.Ctx) (interf
}

if jobErr != nil {
return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrapf(jobErr, "group fn %s failed", name))
return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrapf(jobErr, "group fn %s failed", fn.Fn))
}

if jobResult == nil {
ctx.Log.Debug("fn", name, "returned a nil result")
ctx.Log.Debug("fn", fn.Fn, "returned a nil result")
return nil, nil
}

Expand All @@ -211,11 +230,12 @@ type fnResult struct {
err error
}

func (c *Coordinator) runGroup(fns []string, body []byte, ctx *vk.Ctx) (map[string]interface{}, error) {
// runGroup runs a group of functions
// this is all more complicated than it needs to be, Grav should be doing more of the work for us here
func (c *Coordinator) runGroup(fns []directive.CallableFn, body []byte, ctx *vk.Ctx) (map[string]interface{}, error) {
start := time.Now()
defer func() {
duration := time.Since(start)
ctx.Log.Debug("group", fmt.Sprintf("executed in %d ms", duration.Milliseconds()))
ctx.Log.Debug("group", fmt.Sprintf("executed in %d ms", time.Since(start).Milliseconds()))
}()

resultChan := make(chan fnResult, len(fns))
Expand All @@ -225,13 +245,15 @@ func (c *Coordinator) runGroup(fns []string, body []byte, ctx *vk.Ctx) (map[stri
// functionality to collect all the responses, probably using the parent ID.
for i := range fns {
fn := fns[i]
ctx.Log.Debug("running fn", fn, "from group")
ctx.Log.Debug("running fn", fn.Fn, "from group")

key := key(fn)

go func() {
res, err := c.runSingleFn(fn, body, ctx)

result := fnResult{
name: fn,
name: key,
result: res,
err: err,
}
Expand Down Expand Up @@ -279,6 +301,36 @@ func scopeMiddleware(r *http.Request, ctx *vk.Ctx) error {
return nil
}

func stateJSONForStep(req CoordinatedRequest, step directive.Executable) ([]byte, error) {
// the desired state is cached, so after the first call this is very efficient
desired, err := step.ParseWith()
if err != nil {
return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrap(err, "failed to ParseWith"))
}

// based on the step's `with` clause, build the state to pass into the function
stepState, err := desiredState(desired, req.State)
if err != nil {
return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrap(err, "failed to build desiredState"))
}

stepReq := CoordinatedRequest{
Method: req.Method,
URL: req.URL,
Body: req.Body,
Headers: req.Headers,
Params: req.Params,
State: stepState,
}

stateJSON, err := stepReq.Marshal()
if err != nil {
return nil, vk.Wrap(http.StatusInternalServerError, errors.Wrap(err, "failed to Marshal Request State"))
}

return stateJSON, nil
}

// resultFromState returns the state value for the last single function that ran in a handler
func resultFromState(handler directive.Handler, state map[string]interface{}) interface{} {
// if the handler defines a response explicitly, use it (return nil if there is nothing in state)
Expand All @@ -305,6 +357,25 @@ func resultFromState(handler directive.Handler, state map[string]interface{}) in
return nil
}

func desiredState(desired []directive.Alias, state map[string]interface{}) (map[string]interface{}, error) {
if desired == nil || len(desired) == 0 {
return state, nil
}

desiredState := map[string]interface{}{}

for _, a := range desired {
val, exists := state[a.Key]
if !exists {
return nil, fmt.Errorf("failed to build desired state, %s does not exists in handler state", a.Key)
}

desiredState[a.Alias] = val
}

return desiredState, nil
}

// stringOrMap converts bytes to a map if they are JSON, or a string if not
func stringOrMap(result []byte) interface{} {
resMap := map[string]interface{}{}
Expand All @@ -314,3 +385,13 @@ func stringOrMap(result []byte) interface{} {

return resMap
}

func key(fn directive.CallableFn) string {
key := fn.Fn

if fn.As != "" {
key = fn.As
}

return key
}
30 changes: 19 additions & 11 deletions docs/introduction/getstarted.md
Expand Up @@ -37,9 +37,7 @@ If you prefer using Docker, you can locally build and run Atmo in Docker using:
```

## How it works

If you explore the `example-project` directory, you will see several Runnables \(`fetch-test`, `modify-url`, etc.\) and a `Directive.yaml` file. Each folder represents an Atmo function, and the Directive is responsible for describing how those functions should be used. The Directive looks like this:

If you explore the `example-project` directory, you will see several Runnables (`fetch-test`, `modify-url`, etc.) and a `Directive.yaml` file. Each folder represents an Atmo function, and the Directive is responsible for describing how those functions should be used. The Directive looks like this:
```yaml
identifier: com.suborbital.test
version: v0.0.1
Expand All @@ -50,20 +48,30 @@ handlers:
method: POST
steps:
- group:
- modify-url
- helloworld-rs
- fn: modify-url
- fn: helloworld-rs
as: hello
- fn: fetch-test
with:
- "url: modify-url"
- "logme: hello"
```
This describes the application being constructed. It declares a route (`POST /hello`) and a set of `steps` to handle that request. The `steps` are a set of functions to be **composed** when handling requests to the `/hello` endpoint.

This describes the application being constructed. This declares a route \(`POST /hello`\) and how to handle that request. The `steps` provided contain a set of instructions for how to handle requests to the `/hello` endpoint.

The first step is a `group`, meaning that all of the functions in that group will be executed **concurrently**.
The first step is a `group`, meaning that all of the functions in that group will be executed **concurrently**.

The second step is a single function that uses the [Runnable API](https://github.com/suborbital/hive-wasm) to make an HTTP request. The API is continually evolving to include more capabilities. In addition to making HTTP requests, it includes logging abilities and more.

For each function executed, its result gets stored in the request handler's `state`. The `state` is used to pass values between functions, since they are completely isolated and independent from one another. The `modify-url` function takes the request body and modifies it \(in this case, adding `/suborbital` to it\). The second step \(`fetch-test`\) takes that modified URL and makes an HTTP request to fetch it. The final function's output is used as the response data for the request.
## State
After each step, its function results gets stored in the request handler's `state`. The `state` is an ephemeral set of key/value pairs created for each request. State is used to pass values between functions, since they are completely isolated and unaware of one another.

## Coming soon
The `modify-url` function for example takes the request body (in this case, a URL), and modifies it (by adding `/suborbital` to it). The second step (`fetch-test`) takes that modified URL and makes an HTTP request to fetch it. The final function's output is used as the response data for the request.

Further functionality is incoming along with improved docs, more examples, and an improved Directive format. Visit [the Suborbital website](https://suborbital.dev) to sign up for email updates related to new versions of Atmo.
There are two clauses, `as` and `with` that make working with request state easier. `as` will assign the value returned by a function to a particular name in state. In the above example, `helloworld-rs` is saved into state as `hello`. You can think of this just like storing a value into a variable!

`with` allows the developer to pass a "desired state" into a given function. Rather than passing the entire state with the existing keys, the developer can optionally define a custom state by choosing aliases for some or all of the keys available in request state. This is just like choosing parameter values to pass into function arguments!

`subo` will validate your directive to help ensure that your Directive is correct, including validating that you're not accessing keys that don't exist in state.

## Coming soon
Further functionality is incoming along with improved docs, more examples, and an improved Directive format. Visit [the Suborbital website](https://suborbital.dev) to sign up for email updates related to new versions of Atmo.

0 comments on commit d8704a5

Please sign in to comment.