Skip to content

Commit

Permalink
fix(e2core): Various fixes to make e2core more stable (#422)
Browse files Browse the repository at this point in the history
* Update systemspec version to lates commit on main

* Fix import paths

* Build tenant name -> env -> ref mapping

* Update systemspec to sha a1181442 with the request id pr

* Use an http newrequestwithcontext in auth path

* Remove http.CanonicalheaderKey because header.Set does it for us either way

* Remove unused non-admin codepath

* Add comments to clarify what is happening

* Fix an import path for a module

* Remove unused tenantmappings

* Rename unused func params to underscores

* Change the way an error is reported with data

* Move golang.org/x/exp to indirect in go.mod

* Remove unneeded fmt.Printf
  • Loading branch information
javorszky committed Apr 25, 2023
1 parent 2b957b1 commit fb66c18
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 160 deletions.
8 changes: 6 additions & 2 deletions e2core/auth/access.go
@@ -1,6 +1,7 @@
package auth

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -72,14 +73,17 @@ func (client *AuthzClient) Authorize(token system.Credential, identifier, namesp

func (client *AuthzClient) loadAuth(token system.Credential, identifier string) func() (*TenantInfo, error) {
return func() (*TenantInfo, error) {
authzReq, err := http.NewRequest(http.MethodGet, client.location+identifier, nil)
ctx, cxl := context.WithTimeout(context.Background(), 10*time.Second)
defer cxl()

authzReq, err := http.NewRequestWithContext(ctx, http.MethodGet, client.location+identifier, nil)
if err != nil {
return nil, common.Error(err, "post authorization request")
}

// pass token along
headerVal := fmt.Sprintf("%s %s", token.Scheme(), token.Value())
authzReq.Header.Set(http.CanonicalHeaderKey("Authorization"), headerVal)
authzReq.Header.Set("Authorization", headerVal)

resp, err := client.httpClient.Do(authzReq)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion e2core/command/start.go
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/suborbital/e2core/e2core/server"
"github.com/suborbital/e2core/e2core/sourceserver"
"github.com/suborbital/e2core/e2core/syncer"
"github.com/suborbital/systemspec/bundle"
"github.com/suborbital/systemspec/system/bundle"
"github.com/suborbital/systemspec/system/client"
)

Expand Down
5 changes: 0 additions & 5 deletions e2core/options/options.go
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/pkg/errors"
"github.com/sethvargo/go-envconfig"
"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -158,7 +157,3 @@ func (o *Options) finalize() error {

return nil
}

func (o *Options) AdminEnabled() bool {
return slices.Contains(o.Features, FeatureMultiTenant)
}
145 changes: 9 additions & 136 deletions e2core/server/handlers.go
Expand Up @@ -5,8 +5,6 @@ import (
"net/http"

"github.com/labstack/echo/v4"
"github.com/pkg/errors"
"github.com/rs/zerolog"

"github.com/suborbital/e2core/e2core/sequence"
"github.com/suborbital/systemspec/request"
Expand All @@ -15,8 +13,14 @@ import (

func (s *Server) executePluginByNameHandler() echo.HandlerFunc {
return func(c echo.Context) error {
// with the authorization middleware, this is going to be the uuid of the tenant specified by the path name in
// the environment specified by the authorization token.
ident := ReadParam(c, "ident")

// this is coming from the path.
namespace := ReadParam(c, "namespace")

// this is coming from the path.
name := ReadParam(c, "name")

ll := s.logger.With().
Expand Down Expand Up @@ -72,140 +76,7 @@ func (s *Server) executePluginByNameHandler() echo.HandlerFunc {

responseData := seq.Request().State[mod.FQMN]

return c.Blob(http.StatusOK, "application/octet-stream", responseData)
}
}

func (s *Server) executePluginByRefHandler(l zerolog.Logger) echo.HandlerFunc {
ll := l.With().Str("handler", "executePluginByRefHandler").Logger()

return func(c echo.Context) error {
ref := c.Param("ref")

mod := s.syncer.GetModuleByRef(ref)
if mod == nil {
return echo.NewHTTPError(http.StatusNotFound, "module not found").SetInternal(fmt.Errorf("no module by ref %s", ref))
}

ll.Debug().Str("fqmn", mod.FQMN).Msg("found module by ref")

req, err := request.FromEchoContext(c)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError).SetInternal(err)
}

err = req.UseSuborbitalHeaders(c)
if err != nil {
return err
}

steps := []tenant.WorkflowStep{{FQMN: mod.FQMN}}

// a sequence executes the handler's steps and manages its state.
seq, err := sequence.New(steps, req)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "failed to handle request").SetInternal(err)
}

if err := s.dispatcher.Execute(seq); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "failed to execute plugin").SetInternal(err)
}

// handle any response headers that were set by the Runnables.
if req.RespHeaders != nil {
for head, val := range req.RespHeaders {
// need to directly assign because .Add and .Set will filter out non-standard
// header names, which ours are.
if c.Response().Header()[head] == nil {
c.Response().Header()[head] = make([]string, 0)
}

c.Response().Header()[head] = append(c.Response().Header()[head], val)
}
}

responseData := seq.Request().State[mod.FQMN]

return c.Blob(http.StatusOK, "application/octet-stream", responseData)
}
}

func (s *Server) executeWorkflowHandler() echo.HandlerFunc {
return func(c echo.Context) error {
ident := c.Param("ident")
namespace := c.Param("namespace")
name := c.Param("name")

tnt := s.syncer.TenantOverview(ident)
if tnt == nil {
return echo.NewHTTPError(http.StatusNotFound, "not found").SetInternal(fmt.Errorf("no tenant with ident %s", ident))
}

namespaces := []tenant.NamespaceConfig{tnt.Config.DefaultNamespace}
namespaces = append(namespaces, tnt.Config.Namespaces...)

var workflow *tenant.Workflow

// yes, this is a dumb and slow way to do this but we'll optimize later

OUTER:
for i := range namespaces {
ns := namespaces[i]
if ns.Name != namespace {
continue
}

for j := range ns.Workflows {
wfl := ns.Workflows[j]

if wfl.Name != name {
continue
}

workflow = &wfl
break OUTER
}
}

if workflow == nil {
return echo.NewHTTPError(http.StatusNotFound, "not found").SetInternal(errors.New("workflow was nil"))
}

req, err := request.FromEchoContext(c)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "failed to handle request").SetInternal(err)
}

err = req.UseSuborbitalHeaders(c)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError).SetInternal(err)
}

// a sequence executes the handler's steps and manages its state.
seq, err := sequence.New(workflow.Steps, req)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "failed to handle request").SetInternal(err)
}

if err := s.dispatcher.Execute(seq); err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "failed to execute plugin").SetInternal(err)
}

// handle any response headers that were set by the Runnables.
if req.RespHeaders != nil {
for head, val := range req.RespHeaders {
// need to directly assign because .Add and .Set will filter out non-standard
// header names, which ours are.
if c.Response().Header()[head] == nil {
c.Response().Header()[head] = make([]string, 0)
}

c.Response().Header()[head] = append(c.Response().Header()[head], val)
}
}

// this should be smarter eventually (i.e. handle last-step groups properly)
responseData := seq.Request().State[workflow.Steps[len(workflow.Steps)-1].FQMN]
ll.Info().Str("fqmn", mod.FQMN).Msg("finished execution of the module, sending back data")

return c.Blob(http.StatusOK, "application/octet-stream", responseData)
}
Expand All @@ -217,6 +88,8 @@ func (s *Server) healthHandler() echo.HandlerFunc {
}
}

// ReadParam tries to grab the value by name from the echo context first, and if it doesn't find it, then it falls back
// onto the path parameter.
func ReadParam(ctx echo.Context, name string) string {
v := ctx.Get(name)
if v != nil {
Expand Down
8 changes: 1 addition & 7 deletions e2core/server/server.go
Expand Up @@ -71,13 +71,7 @@ func New(l zerolog.Logger, sync *syncer.Syncer, opts *options.Options) (*Server,
logger: ll,
}

if opts.AdminEnabled() {
e.POST("/name/:ident/:namespace/:name", server.executePluginByNameHandler(), auth.AuthorizationMiddleware(opts))
} else {
e.POST("/name/:ident/:namespace/:name", server.executePluginByNameHandler())
e.POST("/ref/:ref", server.executePluginByRefHandler(ll))
e.POST("/workflow/:ident/:namespace/:name", server.executeWorkflowHandler())
}
e.POST("/name/:ident/:namespace/:name", server.executePluginByNameHandler(), auth.AuthorizationMiddleware(opts))

e.GET("/health", server.healthHandler())

Expand Down
2 changes: 1 addition & 1 deletion e2core/server/server_test.go
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/suborbital/e2core/e2core/backend/satbackend"
"github.com/suborbital/e2core/e2core/options"
"github.com/suborbital/e2core/e2core/syncer"
"github.com/suborbital/systemspec/bundle"
"github.com/suborbital/systemspec/system/bundle"
)

type serverTestSuite struct {
Expand Down
2 changes: 1 addition & 1 deletion e2core/sourceserver/bundle.go
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/rs/zerolog"

"github.com/suborbital/go-kit/web/mid"
"github.com/suborbital/systemspec/bundle"
"github.com/suborbital/systemspec/system/bundle"
)

func FromBundle(bundlePath string) (*echo.Echo, error) {
Expand Down
3 changes: 1 addition & 2 deletions e2core/syncer/syncer.go
Expand Up @@ -72,8 +72,7 @@ func (s *Syncer) Start() error {
}

// Run runs a sync job
func (s *syncJob) Run(job scheduler.Job, ctx *scheduler.Ctx) (interface{}, error) {

func (s *syncJob) Run(_ scheduler.Job, _ *scheduler.Ctx) (interface{}, error) {
state, err := s.systemSource.State()
if err != nil {
return nil, errors.Wrap(err, "failed to systemSource.State")
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -17,15 +17,14 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.2
github.com/suborbital/go-kit v0.0.8
github.com/suborbital/systemspec v0.0.6-0.20230222150219-bd9cbb5436e5
github.com/suborbital/systemspec v0.0.6-0.20230418100216-a1181442e886
github.com/testcontainers/testcontainers-go v0.19.0
github.com/twmb/franz-go v1.13.1
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.37.0
go.opentelemetry.io/otel v1.13.0
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk v1.13.0
go.opentelemetry.io/otel/trace v1.13.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/sync v0.1.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -78,6 +77,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.36.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -279,8 +279,8 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/suborbital/go-kit v0.0.8 h1:yBsxUAlizy46p/eeHM1BfJZgEyx+dbK5oUYKd1N9szQ=
github.com/suborbital/go-kit v0.0.8/go.mod h1:1OWVnUwXri359OyolKOnthe8jetCQJY/daaObfwRaSs=
github.com/suborbital/systemspec v0.0.6-0.20230222150219-bd9cbb5436e5 h1:AnBi48eVHOhU/Mr2O91SuraAqSUM8Yd3YQJRXR5RFwU=
github.com/suborbital/systemspec v0.0.6-0.20230222150219-bd9cbb5436e5/go.mod h1:D+1w98uj6u9MVVAeDRkTt/oVPGRQUfGYM+jdbQd+AaM=
github.com/suborbital/systemspec v0.0.6-0.20230418100216-a1181442e886 h1:WOSewt3Q62xy3Js2mxM7dflvgH/rn4bOaIGccI1GTFI=
github.com/suborbital/systemspec v0.0.6-0.20230418100216-a1181442e886/go.mod h1:s8Y45QIcUfLyODEeA+d8eKuNbpjFU2NcsizUQDx5BVQ=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/testcontainers/testcontainers-go v0.19.0 h1:3bmFPuQRgVIQwxZJERyzB8AogmJW3Qzh8iDyfJbPhi8=
github.com/testcontainers/testcontainers-go v0.19.0/go.mod h1:3YsSoxK0rGEUzbGD4gUVt1Nm3GJpCIq94GX+2LSf3d4=
Expand Down
2 changes: 1 addition & 1 deletion sat/sat/meshed.go
Expand Up @@ -27,7 +27,7 @@ func (s *Sat) handleFnResult(msg bus.Message, result interface{}, fnErr error) {
// first unmarshal the request and sequence information
req, err := request.FromJSON(msg.Data())
if err != nil {
ll.Err(err).Msg("request.FromJSON")
ll.Err(err).Bytes("message", msg.Data()).Msg("request.FromJSON")
return
}

Expand Down

0 comments on commit fb66c18

Please sign in to comment.