Skip to content

Commit

Permalink
Merge pull request #62 from suborbital/connor/errors
Browse files Browse the repository at this point in the history
Ability for Wasm Runnables to return an error
  • Loading branch information
cohix committed Mar 10, 2021
2 parents e96f02d + 8f17ea4 commit 8366cf1
Show file tree
Hide file tree
Showing 37 changed files with 316 additions and 52 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Expand Up @@ -27,4 +27,6 @@ Suborbital.wasm

.swiftpm

bundlewritetester
bundlewritetester
runnable.wasm
runnables.wasm.zip
2 changes: 1 addition & 1 deletion api/rust/suborbital/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/rust/suborbital/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "suborbital"
version = "0.6.3"
version = "0.8.0"
authors = ["cohix <connorjhicks@gmail.com>"]
edition = "2018"
description = "Suborbital Wasm Runnable API"
Expand Down
42 changes: 33 additions & 9 deletions api/rust/suborbital/src/lib.rs
Expand Up @@ -14,8 +14,8 @@ struct State <'a> {
// something to hold down the fort until a real Runnable is set
struct DefaultRunnable {}
impl runnable::Runnable for DefaultRunnable {
fn run(&self, _input: Vec<u8>) -> Option<Vec<u8>> {
return None;
fn run(&self, _input: Vec<u8>) -> Result<Vec<u8>, runnable::RunErr> {
Err(runnable::RunErr::new(500, ""))
}
}

Expand All @@ -28,16 +28,32 @@ static mut STATE: State = State {
pub mod runnable {
use std::mem;
use std::slice;
use super::util;

extern {
fn return_result(result_pointer: *const u8, result_size: i32, ident: i32);
fn return_error(code: i32, result_pointer: *const u8, result_size: i32, ident: i32);
}

pub struct RunErr {
code: i32,
message: String,
}

impl RunErr {
pub fn new(code: i32, msg: &str) -> Self {
RunErr {
code: code,
message: String::from(msg)
}
}
}

pub trait Runnable {
fn run(&self, input: Vec<u8>) -> Option<Vec<u8>>;
fn run(&self, input: Vec<u8>) -> Result<Vec<u8>, RunErr>;
}

pub fn set(runnable: &'static dyn Runnable) {
pub fn use_runnable(runnable: &'static dyn Runnable) {
unsafe {
super::STATE.runnable = runnable;
}
Expand Down Expand Up @@ -70,20 +86,28 @@ pub mod runnable {
};

let in_bytes = Vec::from(in_slice);

let mut code = 0;

// call the runnable and check its result
let result: Vec<u8> = unsafe { match super::STATE.runnable.run(in_bytes) {
Some(val) => val,
None => Vec::from("run returned no data"),
Ok(val) => val,
Err(e) => {
code = e.code;
util::to_vec(e.message)
}
} };

let result_slice = result.as_slice();
let result_size = result_slice.len();


// call back to reactr to return the result
// call back to reactr to return the result or error
unsafe {
return_result(result_slice.as_ptr() as *const u8, result_size as i32, ident);
if code != 0 {
return_error(code, result_slice.as_ptr() as *const u8, result_size as i32, ident);
} else {
return_result(result_slice.as_ptr() as *const u8, result_size as i32, ident);
}
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions bundle/bundlewritetester/main.go
Expand Up @@ -69,6 +69,12 @@ func main() {
},
{
Fn: "log",
OnErr: &directive.FnOnErr{
Code: map[int]string{
404: "continue",
},
Other: "return",
},
},
},
},
Expand All @@ -78,6 +84,9 @@ func main() {
With: []string{
"data: ghData",
},
OnErr: &directive.FnOnErr{
Any: "return",
},
},
},
},
Expand Down
36 changes: 35 additions & 1 deletion directive/directive.go
Expand Up @@ -44,7 +44,7 @@ type Handler struct {
type Schedule struct {
Name string `yaml:"name"`
Every ScheduleEvery `yaml:"every"`
State map[string]string `yaml:"state"`
State map[string]string `yaml:"state,omitempty"`
Steps []Executable `yaml:"steps"`
}

Expand Down Expand Up @@ -74,9 +74,17 @@ type CallableFn struct {
Fn string `yaml:"fn,omitempty"`
As string `yaml:"as,omitempty"`
With []string `yaml:"with,omitempty"`
OnErr *FnOnErr `yaml:"onErr,omitempty"`
DesiredState []Alias `yaml:"-"`
}

// FnOnErr describes how to handle an error from a function call
type FnOnErr struct {
Code map[int]string `yaml:"code,omitempty"`
Any string `yaml:"any,omitempty"`
Other string `yaml:"other,omitempty"`
}

// Alias is the parsed version of an entry in the `With` array from a CallableFn
// If you do user: activeUser, then activeUser is the state key and user
// is the key that gets put into the function's state (i.e. the alias)
Expand Down Expand Up @@ -253,6 +261,32 @@ func validateSteps(exType executableType, name string, steps []Executable, initi
}
}

if fn.OnErr != nil {
// if codes are specificed, 'other' should be used, not 'any'
if len(fn.OnErr.Code) > 0 && fn.OnErr.Any != "" {
problems.add(fmt.Errorf("%s for %s has 'onErr.any' value at step %d while specific codes are specified, use 'other' instead", exType, name, j))
} else if fn.OnErr.Any != "" {
if fn.OnErr.Any != "continue" && fn.OnErr.Any != "return" {
problems.add(fmt.Errorf("%s for %s has 'onErr.any' value at step %d with an invalid error directive: %s", exType, name, j, fn.OnErr.Any))
}
}

// if codes are NOT specificed, 'any' should be used, not 'other'
if len(fn.OnErr.Code) == 0 && fn.OnErr.Other != "" {
problems.add(fmt.Errorf("%s for %s has 'onErr.other' value at step %d while specific codes are not specified, use 'any' instead", exType, name, j))
} else if fn.OnErr.Other != "" {
if fn.OnErr.Other != "continue" && fn.OnErr.Other != "return" {
problems.add(fmt.Errorf("%s for %s has 'onErr.any' value at step %d with an invalid error directive: %s", exType, name, j, fn.OnErr.Other))
}
}

for code, val := range fn.OnErr.Code {
if val != "return" && val != "continue" {
problems.add(fmt.Errorf("%s for %s has 'onErr.code' value at step %d with an invalid error directive for code %d: %s", exType, name, j, code, val))
}
}
}

key := fn.Fn
if fn.As != "" {
key = fn.As
Expand Down
58 changes: 58 additions & 0 deletions directive/directive_test.go
Expand Up @@ -133,6 +133,64 @@ func TestDirectiveValidatorGroupLast(t *testing.T) {
}
}

func TestDirectiveValidatorInvalidOnErr(t *testing.T) {
dir := Directive{
Identifier: "dev.suborbital.appname",
AppVersion: "v0.1.1",
AtmoVersion: "v0.0.6",
Runnables: []Runnable{
{
Name: "getUser",
Namespace: "db",
},
{
Name: "getUserDetails",
Namespace: "db",
},
{
Name: "returnUser",
Namespace: "api",
},
},
Handlers: []Handler{
{
Input: Input{
Type: "request",
Method: "GET",
Resource: "/api/v1/user",
},
Steps: []Executable{
{
CallableFn: CallableFn{
Fn: "api#returnUser",
OnErr: &FnOnErr{
Code: map[int]string{
400: "continue",
},
Any: "return",
},
},
},
{
CallableFn: CallableFn{
Fn: "api#returnUser",
OnErr: &FnOnErr{
Other: "continue",
},
},
},
},
},
},
}

if err := dir.Validate(); err == nil {
t.Error("directive validation should have failed")
} else {
fmt.Println("directive validation properly failed:", err)
}
}

func TestDirectiveValidatorMissingFns(t *testing.T) {
dir := Directive{
Identifier: "dev.suborbital.appname",
Expand Down
12 changes: 10 additions & 2 deletions rt/reactr.go
Expand Up @@ -10,7 +10,8 @@ import (

// MsgTypeReactrJobErr and others are Grav message types used for Reactr job
const (
MsgTypeReactrJobErr = "reactr.joberr"
MsgTypeReactrJobErr = "reactr.joberr" // any kind of error from a job run
MsgTypeReactrRunErr = "reactr.runerr" // specifically a RunErr returned from a Wasm Runnable
MsgTypeReactrResult = "reactr.result"
MsgTypeReactrNilResult = "reactr.nil"
)
Expand Down Expand Up @@ -86,7 +87,14 @@ func (h *Reactr) Listen(pod *grav.Pod, msgType string) {
result, err := helper(msg.Data()).Then()
if err != nil {
h.log.Error(errors.Wrapf(err, "job from message %s returned error result", msg.UUID()))
replyMsg = grav.NewMsg(MsgTypeReactrJobErr, []byte(err.Error()))

runErr := &RunErr{}
if errors.As(err, runErr) {
// if a Wasm Runnable returned a RunErr, let's be sure to handle that
replyMsg = grav.NewMsg(MsgTypeReactrRunErr, []byte(runErr.Error()))
} else {
replyMsg = grav.NewMsg(MsgTypeReactrJobErr, []byte(err.Error()))
}
} else {
if result == nil {
// if the job returned no result
Expand Down
25 changes: 25 additions & 0 deletions rt/run_error.go
@@ -0,0 +1,25 @@
package rt

import (
"encoding/json"

"github.com/suborbital/vektor/vk"
)

// RunErr represents an error returned from a Wasm Runnable
// it lives in the rt package to avoid import cycles
type RunErr struct {
Code int `json:"code"`
Message string `json:"message"`
}

// Error returns the stringified JSON representation of the error
func (r RunErr) Error() string {
bytes, _ := json.Marshal(r)
return string(bytes)
}

// ToVKErr converts a RunErr to a VKError
func (r RunErr) ToVKErr() vk.Error {
return vk.Err(r.Code, r.Message)
}
33 changes: 33 additions & 0 deletions rwasm/api_result.go
Expand Up @@ -2,6 +2,7 @@ package rwasm

import (
"github.com/pkg/errors"
"github.com/suborbital/reactr/rt"
"github.com/wasmerio/wasmer-go/wasmer"
)

Expand Down Expand Up @@ -33,3 +34,35 @@ func return_result(pointer int32, size int32, identifier int32) {

inst.resultChan <- result
}

func returnError() *HostFn {
fn := func(args ...wasmer.Value) (interface{}, error) {
code := args[0].I32()
pointer := args[1].I32()
size := args[2].I32()
ident := args[3].I32()

return_error(code, pointer, size, ident)

return nil, nil
}

return newHostFn("return_error", 4, false, fn)
}

func return_error(code int32, pointer int32, size int32, identifier int32) {
envLock.RLock()
defer envLock.RUnlock()

inst, err := instanceForIdentifier(identifier)
if err != nil {
logger.Error(errors.Wrap(err, "[rwasm] alert: invalid identifier used, potential malicious activity"))
return
}

result := inst.readMemory(pointer, size)

runErr := rt.RunErr{Code: int(code), Message: string(result)}

inst.errChan <- runErr
}
3 changes: 3 additions & 0 deletions rwasm/ffi_environment.go
Expand Up @@ -67,6 +67,7 @@ type wasmInstance struct {
staticFileFunc bundle.FileFunc

resultChan chan []byte
errChan chan rt.RunErr
lock sync.Mutex
}

Expand Down Expand Up @@ -124,6 +125,7 @@ func (w *wasmEnvironment) addInstance() error {
wasmerInst: inst,
staticFileFunc: w.staticFileFunc,
resultChan: make(chan []byte, 1),
errChan: make(chan rt.RunErr, 1),
lock: sync.Mutex{},
}

Expand Down Expand Up @@ -198,6 +200,7 @@ func (w *wasmEnvironment) internals() (*wasmer.Module, *wasmer.Store, *wasmer.Im
// mount the Runnable API host functions to the module's imports
addHostFns(imports, store,
returnResult(),
returnError(),
fetchURL(),
cacheSet(),
cacheGet(),
Expand Down
2 changes: 1 addition & 1 deletion rwasm/testdata/fetch/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified rwasm/testdata/fetch/fetch.wasm
Binary file not shown.

0 comments on commit 8366cf1

Please sign in to comment.