Skip to content

Commit

Permalink
Re-enable PingSource TLS E2E test (knative#7082) (#285)
Browse files Browse the repository at this point in the history
Fixes knative#7021

This is the root cause: cloudevents/sdk-go#923

<!-- Please include the 'why' behind your changes if no issue exists -->

<!-- Please categorize your changes:
- 🎁 Add new feature
- 🐛 Fix bug
- 🧹 Update or clean up current behavior
- 🗑️ Remove feature or internal logic
-->

- Re-enable PingSource TLS E2E test
- Remove hacky client injection for PingSource

<!-- If these boxes are not checked, you will be asked to complete these
requirements or explain why they do not apply to your PR. -->

- [ ] **At least 80% unit test coverage**
- [ ] **E2E tests** for any new behavior
- [ ] **Docs PR** for any user-facing impact
- [ ] **Spec PR** for any new API feature
- [ ] **Conformance test** for any change to the spec

**Release Note**

<!--
📄 If this change has user-visible impact, write a release
note in the block
below. Include the string "action required" if additional action is
required of
users switching to the new release, for example in case of a breaking
change.

Write as if you are speaking to users, not other Knative contributors.
If this
change has no user-visible impact, no release note is needed.
-->

```release-note

```

**Docs**

<!--
📖 If this change has user-visible impact, link to an issue or PR in
https://github.com/knative/docs.
-->

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Jul 27, 2023
1 parent bc30bfe commit 6b48d1a
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 62 deletions.
8 changes: 7 additions & 1 deletion pkg/adapter/mtping/runner.go
Expand Up @@ -200,13 +200,19 @@ func (a *cronJobsRunner) newPingSourceClient(source *sourcesv1.PingSource) (adap

env.Sink = source.Status.SinkURI.String()

a.Logger.Debugw("Creating client",
"namespace", source.Namespace,
"name", source.Name,
"env", env,
"source", source,
)

cfg := adapter.ClientConfig{
Env: &env,
CeOverrides: source.Spec.CloudEventOverrides,
Reporter: a.clientConfig.Reporter,
CrStatusEventClient: a.clientConfig.CrStatusEventClient,
Options: a.clientConfig.Options,
Client: a.clientConfig.Client,
}

return adapter.NewClient(cfg)
Expand Down
100 changes: 55 additions & 45 deletions pkg/adapter/mtping/runner_test.go
Expand Up @@ -20,13 +20,18 @@ import (
"bytes"
"context"
"encoding/base64"
"net/http"
"net/http/httptest"
"reflect"
"sync"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

"github.com/cloudevents/sdk-go/v2/binding"
bindingshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"

"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -36,7 +41,6 @@ import (
rectesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/adapter/v2"
adaptertesting "knative.dev/eventing/pkg/adapter/v2/test"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
)

Expand Down Expand Up @@ -74,11 +78,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
ContentType: cloudevents.TextPlain,
Data: sampleData,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: []byte(sampleData),
wantContentType: cloudevents.TextPlain,
Expand All @@ -98,11 +97,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
ContentType: cloudevents.TextPlain,
Data: sampleData,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: []byte(sampleData),
wantContentType: cloudevents.TextPlain,
Expand All @@ -120,11 +114,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
ContentType: cloudevents.TextPlain,
DataBase64: sampleDataBase64,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: decodeBase64(sampleDataBase64),
wantContentType: cloudevents.TextPlain,
Expand All @@ -142,11 +131,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
Data: sampleJSONData,
ContentType: cloudevents.ApplicationJSON,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: []byte(sampleJSONData),
wantContentType: cloudevents.ApplicationJSON,
Expand All @@ -164,11 +148,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
DataBase64: sampleJSONDataBase64,
ContentType: cloudevents.ApplicationJSON,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: decodeBase64(sampleJSONDataBase64),
wantContentType: cloudevents.ApplicationJSON,
Expand All @@ -186,11 +165,6 @@ func TestAddRunRemoveSchedules(t *testing.T) {
Data: sampleXmlData,
ContentType: cloudevents.ApplicationXML,
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a sink"},
},
},
},
wantData: []byte(sampleXmlData),
wantContentType: cloudevents.ApplicationXML,
Expand All @@ -200,9 +174,15 @@ func TestAddRunRemoveSchedules(t *testing.T) {
t.Run(n, func(t *testing.T) {
ctx, _ := rectesting.SetupFakeContext(t)
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
h, events := eventsAccumulator()

s := httptest.NewServer(h)
defer s.Close()
url, _ := apis.ParseURL(s.URL)

runner := NewCronJobsRunner(adapter.ClientConfig{}, kubeclient.Get(ctx), logger)
tc.src.Status.SinkURI = url
entryId := runner.AddSchedule(tc.src)

entry := runner.cron.Entry(entryId)
Expand All @@ -212,7 +192,7 @@ func TestAddRunRemoveSchedules(t *testing.T) {

entry.Job.Run()

validateSent(t, ce, tc.wantData, tc.wantContentType, tc.src.Spec.CloudEventOverrides.Extensions)
validateSent(t, *events, tc.wantData, tc.wantContentType, tc.src.Spec.CloudEventOverrides.Extensions)

runner.RemoveSchedule(entryId)

Expand All @@ -227,9 +207,8 @@ func TestAddRunRemoveSchedules(t *testing.T) {
func TestStartStopCron(t *testing.T) {
ctx, _ := rectesting.SetupFakeContext(t)
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
wctx, wcancel := context.WithCancel(context.Background())
Expand All @@ -256,9 +235,17 @@ func TestStartStopCronDelayWait(t *testing.T) {
}
ctx, _ := rectesting.SetupFakeContext(t)
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClientWithDelay(time.Second * 5)

runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
h, events := eventsAccumulator()

s := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
time.Sleep(5 * time.Second)
h.ServeHTTP(writer, request)
}))
defer s.Close()
url, _ := apis.ParseURL(s.URL)

runner := NewCronJobsRunner(adapter.ClientConfig{}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -280,7 +267,7 @@ func TestStartStopCronDelayWait(t *testing.T) {
},
Status: sourcesv1.PingSourceStatus{
SourceStatus: duckv1.SourceStatus{
SinkURI: &apis.URL{Path: "a delayed sink"},
SinkURI: url,
},
},
})
Expand All @@ -294,15 +281,18 @@ func TestStartStopCronDelayWait(t *testing.T) {

runner.Stop() // cron job because of delay is still running.

validateSent(t, ce, []byte("some delayed data"), cloudevents.TextPlain, nil)
validateSent(t, *events, []byte("some delayed data"), cloudevents.TextPlain, nil)
}

func validateSent(t *testing.T, ce *adaptertesting.TestCloudEventsClient, wantData []byte, wantContentType string, extensions map[string]string) {
if got := len(ce.Sent()); got != 1 {
t.Error("Expected 1 event to be sent, got", got)
func validateSent(t *testing.T, events []cloudevents.Event, wantData []byte, wantContentType string, extensions map[string]string) {
err := wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
return len(events) == 1, nil
})
if err != nil {
t.Fatal("Expected 1 event to be sent, got", len(events))
}

event := ce.Sent()[0]
event := events[0]

if gotContentType := event.DataContentType(); gotContentType != wantContentType {
t.Errorf("Expected event with contentType=%q to be sent, got %q", wantContentType, gotContentType)
Expand Down Expand Up @@ -332,3 +322,23 @@ func validateSent(t *testing.T, ce *adaptertesting.TestCloudEventsClient, wantDa
}
}
}

func eventsAccumulator() (http.Handler, *[]cloudevents.Event) {
var mu sync.Mutex
events := make([]cloudevents.Event, 0, 8)

return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
m := bindingshttp.NewMessageFromHttpRequest(request)
event, err := binding.ToEvent(request.Context(), m)
if err != nil {
writer.WriteHeader(http.StatusBadRequest)
return
}

mu.Lock()
defer mu.Unlock()

events = append(events, *event)
writer.WriteHeader(http.StatusOK)
}), &events
}
43 changes: 31 additions & 12 deletions pkg/adapter/v2/cloudevents.go
Expand Up @@ -24,14 +24,13 @@ import (
"net/url"
"time"

obshttp "github.com/cloudevents/sdk-go/observability/opencensus/v2/http"
cloudevents "github.com/cloudevents/sdk-go/v2"
ceclient "github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/cloudevents/sdk-go/v2/protocol/http"
"go.opencensus.io/plugin/ochttp"

"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

Expand All @@ -53,7 +52,9 @@ type Client interface {
var newClientHTTPObserved = NewClientHTTPObserved

func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (Client, error) {
t, err := obshttp.NewObservedHTTP(topt...)
t, err := http.New(append(topt,
http.WithMiddleware(tracecontextMiddleware),
)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -108,8 +109,6 @@ type ClientConfig struct {
Reporter source.StatsReporter
CrStatusEventClient *crstatusevent.CRStatusEventClient
Options []http.Option

Client Client
}

type clientConfigKey struct{}
Expand All @@ -127,16 +126,12 @@ func GetClientConfig(ctx context.Context) ClientConfig {
}

func NewClient(cfg ClientConfig) (Client, error) {
if cfg.Client != nil {
return cfg.Client, nil
}

transport := &ochttp.Transport{
Base: nethttp.DefaultTransport.(*nethttp.Transport),
Propagation: tracecontextb3.TraceContextEgress,
}

pOpts := make([]http.Option, 0)
var closeIdler closeIdler = nethttp.DefaultTransport.(*nethttp.Transport)

ceOverrides := cfg.CeOverrides
if cfg.Env != nil {
Expand All @@ -157,7 +152,11 @@ func NewClient(cfg ClientConfig) (Client, error) {
pOpts = append(pOpts, http.WithHeader(apis.KnNamespaceHeader, cfg.Env.GetNamespace()))
}

pOpts = append(pOpts, http.WithRoundTripper(transport))
httpClient := nethttp.Client{Transport: roundTripperDecorator(transport)}

// Important: prepend HTTP client option to make sure that other options are applied to this
// client and not to the default client.
pOpts = append([]http.Option{http.WithClient(httpClient)}, pOpts...)

// Make sure that explicitly set options have priority
opts := append(pOpts, cfg.Options...)
Expand All @@ -172,7 +171,7 @@ func NewClient(cfg ClientConfig) (Client, error) {
}
return &client{
ceClient: ceClient,
closeIdler: closeIdler,
closeIdler: transport.Base.(*nethttp.Transport),
ceOverrides: ceOverrides,
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
Expand Down Expand Up @@ -322,3 +321,23 @@ func MetricTagFromContext(ctx context.Context) *MetricTag {
ResourceGroup: "unknown",
}
}

func roundTripperDecorator(roundTripper nethttp.RoundTripper) nethttp.RoundTripper {
return &ochttp.Transport{
Propagation: &tracecontext.HTTPFormat{},
Base: roundTripper,
FormatSpanName: formatSpanName,
}
}

func formatSpanName(r *nethttp.Request) string {
return "cloudevents.http." + r.URL.Path
}

func tracecontextMiddleware(h nethttp.Handler) nethttp.Handler {
return &ochttp.Handler{
Propagation: &tracecontext.HTTPFormat{},
Handler: h,
FormatSpanName: formatSpanName,
}
}
4 changes: 1 addition & 3 deletions test/e2e-common.sh
Expand Up @@ -61,9 +61,7 @@ readonly TMP_DIR
readonly KNATIVE_DEFAULT_NAMESPACE="knative-eventing"

# This the namespace used to install and test Knative Eventing.
export SYSTEM_NAMESPACE
SYSTEM_NAMESPACE="${SYSTEM_NAMESPACE:-"knative-eventing-"$(head -c 128 < \
/dev/urandom | LC_CTYPE=C tr -dc 'a-z0-9' | fold -w 10 | head -n 1)}"
export SYSTEM_NAMESPACE=${SYSTEM_NAMESPACE:-"knative-eventing"}

# Latest release. If user does not supply this as a flag, the latest
# tagged release on the current branch will be used.
Expand Down
3 changes: 2 additions & 1 deletion test/rekt/pingsource_test.go
Expand Up @@ -23,11 +23,12 @@ import (
"testing"
"time"

"knative.dev/eventing/test/rekt/features/pingsource"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"

"knative.dev/eventing/test/rekt/features/pingsource"
)

func TestPingSourceWithSinkRef(t *testing.T) {
Expand Down

0 comments on commit 6b48d1a

Please sign in to comment.