diff --git a/CHANGELOG.md b/CHANGELOG.md index 7716de404..9d0521a1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ Unreleased changes are available as `avenga/couper:edge` container. * **Changed** * Use nested `jwt_signing_profile` block in [`oauth2` block](https://docs.couper.io/configuration/block/oauth2) for `grant_type` `"urn:ietf:params:oauth:grant-type:jwt-bearer"` in absence of `assertion` attribute ([#619](https://github.com/avenga/couper/pull/619)) +* **Fixed** + * [Endpoint sequences](https://docs.couper.io/configuration/block/endpoint#endpoint-sequence) not being terminated by errors (e.g. `unexpected_status`) (regression; since v1.11.0) ([#648](https://github.com/avenga/couper/pull/648)) + --- ## [1.11.0](https://github.com/avenga/couper/releases/tag/v1.11.0) diff --git a/handler/producer/sequence.go b/handler/producer/sequence.go index 8c45e594f..a371a7bea 100644 --- a/handler/producer/sequence.go +++ b/handler/producer/sequence.go @@ -91,11 +91,15 @@ func pipe(req *http.Request, rt []Roundtrip, kind string, additionalSync *sync.M } additionalSync.Store(k, []chan *Result{}) + toBreak := false switch kind { case "parallel": // execute each sequence branch in parallel go produceAndPipeResults(ctx, req, rch, srt, additionalSync) case "sequence": // one by one - produceAndPipeResults(ctx, req, rch, srt, additionalSync) + toBreak = produceAndPipeResults(ctx, req, rch, srt, additionalSync) + } + if toBreak { + break } } @@ -145,7 +149,7 @@ func pipeResults(target, src chan *Result) { } } -func produceAndPipeResults(ctx context.Context, req *http.Request, results chan *Result, rt Roundtrip, additionalSync *sync.Map) { +func produceAndPipeResults(ctx context.Context, req *http.Request, results chan *Result, rt Roundtrip, additionalSync *sync.Map) bool { outreq := req.WithContext(ctx) defer close(results) rs := rt.Produce(outreq, additionalSync) @@ -167,11 +171,15 @@ func produceAndPipeResults(ctx context.Context, req *http.Request, results chan for _, ach := range additionalChs { ach <- e } - return + return true case results <- r: for _, ach := range additionalChs { ach <- r } + if r.Err != nil { + return true + } } } + return false } diff --git a/server/http_endpoints_test.go b/server/http_endpoints_test.go index 041d0051a..009334b75 100644 --- a/server/http_endpoints_test.go +++ b/server/http_endpoints_test.go @@ -939,6 +939,73 @@ func TestEndpointErrorHandler(t *testing.T) { } } +func TestEndpointSequenceBreak(t *testing.T) { + client := test.NewHTTPClient() + helper := test.New(t) + + shutdown, hook := newCouper(filepath.Join(testdataPath, "14_couper.hcl"), helper) + defer shutdown() + defer func() { + if !t.Failed() { + return + } + for _, e := range hook.AllEntries() { + t.Logf("%#v", e.Data) + } + }() + + type testcase struct { + name string + path string + expectedErrorType string + expBERNames []string + } + + for _, tc := range []testcase{ + {"sequence break unexpected_status", "/sequence-break-unexpected_status", "unexpected_status", []string{"resolve"}}, + {"sequence break backend_timeout", "/sequence-break-backend_timeout", "backend_timeout", []string{"resolve"}}, + {"break only one sequence", "/break-only-one-sequence", "unexpected_status", []string{"resolve2", "resolve1", "refl"}}, + } { + t.Run(tc.name, func(st *testing.T) { + hook.Reset() + h := test.New(st) + + req, err := http.NewRequest(http.MethodGet, "http://domain.local:8080"+tc.path, nil) + h.Must(err) + + res, err := client.Do(req) + h.Must(err) + + if res.StatusCode != http.StatusBadGateway { + st.Fatalf("want: %d, got: %d", http.StatusBadGateway, res.StatusCode) + } + + time.Sleep(time.Millisecond * 200) + + berNames := make(map[string]struct{}) + for _, e := range hook.AllEntries() { + if e.Data["type"] == "couper_backend" { + request := e.Data["request"].(logging.Fields) + berNames[fmt.Sprintf("%s", request["name"])] = struct{}{} + } else if e.Data["type"] == "couper_access" { + if e.Data["error_type"] != tc.expectedErrorType { + st.Errorf("want: %q, got: %q", tc.expectedErrorType, e.Data["error_type"]) + } + } + } + if len(berNames) != len(tc.expBERNames) { + st.Errorf("number of BE request names want: %d, got: %d", len(tc.expBERNames), len(berNames)) + } else { + for _, n := range tc.expBERNames { + if _, ok := berNames[n]; !ok { + st.Errorf("missing BE request %q", n) + } + } + } + }) + } +} + func TestEndpointACBufferOptions(t *testing.T) { client := test.NewHTTPClient() helper := test.New(t) diff --git a/server/testdata/endpoints/14_couper.hcl b/server/testdata/endpoints/14_couper.hcl index 8cb03b82a..f1e1d9ee8 100644 --- a/server/testdata/endpoints/14_couper.hcl +++ b/server/testdata/endpoints/14_couper.hcl @@ -92,6 +92,72 @@ server { # error_handler } } + endpoint "/sequence-break-unexpected_status" { + request "resolve" { + url = "${env.COUPER_TEST_BACKEND_ADDR}/anything" + + expected_status = [418] # break + } + + proxy { + url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect" + set_request_headers = { + x = backend_responses.resolve.headers.content-type + } + expected_status = [200] + } + } + + endpoint "/sequence-break-backend_timeout" { + request "resolve" { + url = "${env.COUPER_TEST_BACKEND_ADDR}/anything" + backend { + timeout = "1ns" # break + } + } + + proxy { + url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect" + set_request_headers = { + x = backend_responses.resolve.headers.content-type + } + expected_status = [200] + } + } + + endpoint "/break-only-one-sequence" { + request "resolve1" { + url = "${env.COUPER_TEST_BACKEND_ADDR}/anything" + + expected_status = [418] # break + } + + proxy { + url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect" + set_request_headers = { + x = backend_responses.resolve1.headers.content-type + } + expected_status = [200] + } + + request "resolve2" { + url = "${env.COUPER_TEST_BACKEND_ADDR}/anything" + expected_status = [200] + } + + proxy "refl" { + url = "${env.COUPER_TEST_BACKEND_ADDR}/reflect" + set_request_headers = { + x = backend_responses.resolve2.headers.content-type + } + expected_status = [200] + } + + response { + status = 200 + } + } + api { endpoint "/1.1" { request "r1" {