Skip to content

Commit

Permalink
Fix sequence break (#648)
Browse files Browse the repository at this point in the history
* added test for sequence break

* fix sequence break

* wait for logs

* Changelog entry

* marked fixed error as regression
  • Loading branch information
johakoch committed Dec 29, 2022
1 parent 5dd027b commit 51b9640
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,9 @@ Unreleased changes are available as `avenga/couper:edge` container.
* **Changed**
* Use nested `jwt_signing_profile` block in [`oauth2` block](https://docs.couper.io/configuration/block/oauth2) for `grant_type` `"urn:ietf:params:oauth:grant-type:jwt-bearer"` in absence of `assertion` attribute ([#619](https://github.com/avenga/couper/pull/619))

* **Fixed**
* [Endpoint sequences](https://docs.couper.io/configuration/block/endpoint#endpoint-sequence) not being terminated by errors (e.g. `unexpected_status`) (regression; since v1.11.0) ([#648](https://github.com/avenga/couper/pull/648))

---

## [1.11.0](https://github.com/avenga/couper/releases/tag/v1.11.0)
Expand Down
14 changes: 11 additions & 3 deletions handler/producer/sequence.go
Expand Up @@ -91,11 +91,15 @@ func pipe(req *http.Request, rt []Roundtrip, kind string, additionalSync *sync.M
}
additionalSync.Store(k, []chan *Result{})

toBreak := false
switch kind {
case "parallel": // execute each sequence branch in parallel
go produceAndPipeResults(ctx, req, rch, srt, additionalSync)
case "sequence": // one by one
produceAndPipeResults(ctx, req, rch, srt, additionalSync)
toBreak = produceAndPipeResults(ctx, req, rch, srt, additionalSync)
}
if toBreak {
break
}
}

Expand Down Expand Up @@ -145,7 +149,7 @@ func pipeResults(target, src chan *Result) {
}
}

func produceAndPipeResults(ctx context.Context, req *http.Request, results chan *Result, rt Roundtrip, additionalSync *sync.Map) {
func produceAndPipeResults(ctx context.Context, req *http.Request, results chan *Result, rt Roundtrip, additionalSync *sync.Map) bool {
outreq := req.WithContext(ctx)
defer close(results)
rs := rt.Produce(outreq, additionalSync)
Expand All @@ -167,11 +171,15 @@ func produceAndPipeResults(ctx context.Context, req *http.Request, results chan
for _, ach := range additionalChs {
ach <- e
}
return
return true
case results <- r:
for _, ach := range additionalChs {
ach <- r
}
if r.Err != nil {
return true
}
}
}
return false
}
67 changes: 67 additions & 0 deletions server/http_endpoints_test.go
Expand Up @@ -939,6 +939,73 @@ func TestEndpointErrorHandler(t *testing.T) {
}
}

func TestEndpointSequenceBreak(t *testing.T) {
client := test.NewHTTPClient()
helper := test.New(t)

shutdown, hook := newCouper(filepath.Join(testdataPath, "14_couper.hcl"), helper)
defer shutdown()
defer func() {
if !t.Failed() {
return
}
for _, e := range hook.AllEntries() {
t.Logf("%#v", e.Data)
}
}()

type testcase struct {
name string
path string
expectedErrorType string
expBERNames []string
}

for _, tc := range []testcase{
{"sequence break unexpected_status", "/sequence-break-unexpected_status", "unexpected_status", []string{"resolve"}},
{"sequence break backend_timeout", "/sequence-break-backend_timeout", "backend_timeout", []string{"resolve"}},
{"break only one sequence", "/break-only-one-sequence", "unexpected_status", []string{"resolve2", "resolve1", "refl"}},
} {
t.Run(tc.name, func(st *testing.T) {
hook.Reset()
h := test.New(st)

req, err := http.NewRequest(http.MethodGet, "http://domain.local:8080"+tc.path, nil)
h.Must(err)

res, err := client.Do(req)
h.Must(err)

if res.StatusCode != http.StatusBadGateway {
st.Fatalf("want: %d, got: %d", http.StatusBadGateway, res.StatusCode)
}

time.Sleep(time.Millisecond * 200)

berNames := make(map[string]struct{})
for _, e := range hook.AllEntries() {
if e.Data["type"] == "couper_backend" {
request := e.Data["request"].(logging.Fields)
berNames[fmt.Sprintf("%s", request["name"])] = struct{}{}
} else if e.Data["type"] == "couper_access" {
if e.Data["error_type"] != tc.expectedErrorType {
st.Errorf("want: %q, got: %q", tc.expectedErrorType, e.Data["error_type"])
}
}
}
if len(berNames) != len(tc.expBERNames) {
st.Errorf("number of BE request names want: %d, got: %d", len(tc.expBERNames), len(berNames))
} else {
for _, n := range tc.expBERNames {
if _, ok := berNames[n]; !ok {
st.Errorf("missing BE request %q", n)
}
}
}
})
}
}

func TestEndpointACBufferOptions(t *testing.T) {
client := test.NewHTTPClient()
helper := test.New(t)
Expand Down
66 changes: 66 additions & 0 deletions server/testdata/endpoints/14_couper.hcl
Expand Up @@ -92,6 +92,72 @@ server { # error_handler
}
}

endpoint "/sequence-break-unexpected_status" {
request "resolve" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/anything"

expected_status = [418] # break
}

proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect"
set_request_headers = {
x = backend_responses.resolve.headers.content-type
}
expected_status = [200]
}
}

endpoint "/sequence-break-backend_timeout" {
request "resolve" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/anything"
backend {
timeout = "1ns" # break
}
}

proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect"
set_request_headers = {
x = backend_responses.resolve.headers.content-type
}
expected_status = [200]
}
}

endpoint "/break-only-one-sequence" {
request "resolve1" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/anything"

expected_status = [418] # break
}

proxy {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect"
set_request_headers = {
x = backend_responses.resolve1.headers.content-type
}
expected_status = [200]
}

request "resolve2" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/anything"
expected_status = [200]
}

proxy "refl" {
url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect"
set_request_headers = {
x = backend_responses.resolve2.headers.content-type
}
expected_status = [200]
}

response {
status = 200
}
}

api {
endpoint "/1.1" {
request "r1" {
Expand Down

0 comments on commit 51b9640

Please sign in to comment.