From 3df34fe06452d7a5c5369b27d81514497a57485b Mon Sep 17 00:00:00 2001 From: Andrej Ocenas Date: Mon, 4 Jul 2022 11:18:45 +0200 Subject: [PATCH] Prometheus: Remove cache, pass headers in request, simplify client creation for resource calls and custom client (#51436) * Remove cache, pass headers in request, simplify client creation * Add test for http options creation --- pkg/services/featuremgmt/manager.go | 3 +- pkg/tsdb/prometheus/buffered/client.go | 6 +- pkg/tsdb/prometheus/buffered/client_test.go | 27 ++++ pkg/tsdb/prometheus/client/cache.go | 55 ------- pkg/tsdb/prometheus/client/cache_test.go | 135 ---------------- pkg/tsdb/prometheus/client/client.go | 119 ++++++++------ pkg/tsdb/prometheus/client/provider.go | 93 ----------- pkg/tsdb/prometheus/client/provider_azure.go | 113 ------------- .../prometheus/client/provider_azure_test.go | 153 ------------------ pkg/tsdb/prometheus/client/provider_test.go | 125 -------------- pkg/tsdb/prometheus/prometheus.go | 27 ++-- pkg/tsdb/prometheus/prometheus_test.go | 38 ++--- pkg/tsdb/prometheus/querydata/framing_test.go | 5 +- .../querydata/prometeus_bench_test.go | 4 +- pkg/tsdb/prometheus/querydata/request.go | 81 +++++----- pkg/tsdb/prometheus/querydata/request_test.go | 54 ++++--- pkg/tsdb/prometheus/resource/resource.go | 118 +++----------- 17 files changed, 238 insertions(+), 918 deletions(-) create mode 100644 pkg/tsdb/prometheus/buffered/client_test.go delete mode 100644 pkg/tsdb/prometheus/client/cache.go delete mode 100644 pkg/tsdb/prometheus/client/cache_test.go delete mode 100644 pkg/tsdb/prometheus/client/provider.go delete mode 100644 pkg/tsdb/prometheus/client/provider_azure.go delete mode 100644 pkg/tsdb/prometheus/client/provider_azure_test.go delete mode 100644 pkg/tsdb/prometheus/client/provider_test.go diff --git a/pkg/services/featuremgmt/manager.go b/pkg/services/featuremgmt/manager.go index 8593e6635cfb..8c3ff596e068 100644 --- a/pkg/services/featuremgmt/manager.go +++ b/pkg/services/featuremgmt/manager.go @@ -162,7 +162,8 @@ func (fm *FeatureManager) HandleGetSettings(c *models.ReqContext) { } // WithFeatures is used to define feature toggles for testing. -// The arguments are a list of strings that are optionally followed by a boolean value +// The arguments are a list of strings that are optionally followed by a boolean value for example: +// WithFeatures([]interface{}{"my_feature", "other_feature"}) or WithFeatures([]interface{}{"my_feature", true}) func WithFeatures(spec ...interface{}) *FeatureManager { count := len(spec) enabled := make(map[string]bool, count) diff --git a/pkg/tsdb/prometheus/buffered/client.go b/pkg/tsdb/prometheus/buffered/client.go index 4b4e02c18eb0..b5a60633d960 100644 --- a/pkg/tsdb/prometheus/buffered/client.go +++ b/pkg/tsdb/prometheus/buffered/client.go @@ -5,11 +5,11 @@ import ( "net/http" "strings" + "github.com/grafana/grafana-azure-sdk-go/azsettings" "github.com/grafana/grafana-plugin-sdk-go/backend" sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb/prometheus/buffered/azureauth" "github.com/grafana/grafana/pkg/tsdb/prometheus/middleware" "github.com/grafana/grafana/pkg/tsdb/prometheus/utils" @@ -20,7 +20,7 @@ import ( // CreateTransportOptions creates options for the http client. Probably should be shared and should not live in the // buffered package. -func CreateTransportOptions(settings backend.DataSourceInstanceSettings, cfg *setting.Cfg, features featuremgmt.FeatureToggles, logger log.Logger) (*sdkhttpclient.Options, error) { +func CreateTransportOptions(settings backend.DataSourceInstanceSettings, azureSettings *azsettings.AzureSettings, features featuremgmt.FeatureToggles, logger log.Logger) (*sdkhttpclient.Options, error) { opts, err := settings.HTTPClientOptions() if err != nil { return nil, err @@ -41,7 +41,7 @@ func CreateTransportOptions(settings backend.DataSourceInstanceSettings, cfg *se // Azure authentication is experimental (#35857) if features.IsEnabled(featuremgmt.FlagPrometheusAzureAuth) { - err = azureauth.ConfigureAzureAuthentication(settings, cfg.Azure, &opts) + err = azureauth.ConfigureAzureAuthentication(settings, azureSettings, &opts) if err != nil { return nil, fmt.Errorf("error configuring Azure auth: %v", err) } diff --git a/pkg/tsdb/prometheus/buffered/client_test.go b/pkg/tsdb/prometheus/buffered/client_test.go new file mode 100644 index 000000000000..5dc96a904a34 --- /dev/null +++ b/pkg/tsdb/prometheus/buffered/client_test.go @@ -0,0 +1,27 @@ +package buffered + +import ( + "testing" + + "github.com/grafana/grafana-azure-sdk-go/azsettings" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana/pkg/infra/log/logtest" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/stretchr/testify/require" +) + +func TestCreateTransportOptions(t *testing.T) { + t.Run("creates correct options object", func(t *testing.T) { + settings := backend.DataSourceInstanceSettings{ + BasicAuthEnabled: false, + BasicAuthUser: "", + JSONData: []byte(`{"httpHeaderName1": "foo"}`), + DecryptedSecureJSONData: map[string]string{ + "httpHeaderValue1": "bar", + }, + } + opts, err := CreateTransportOptions(settings, &azsettings.AzureSettings{}, featuremgmt.WithFeatures(), &logtest.Fake{}) + require.NoError(t, err) + require.Equal(t, map[string]string{"foo": "bar"}, opts.Headers) + }) +} diff --git a/pkg/tsdb/prometheus/client/cache.go b/pkg/tsdb/prometheus/client/cache.go deleted file mode 100644 index 85e447b9b9fc..000000000000 --- a/pkg/tsdb/prometheus/client/cache.go +++ /dev/null @@ -1,55 +0,0 @@ -package client - -import ( - "sort" - "strings" - - lru "github.com/hashicorp/golang-lru" -) - -type ProviderCache struct { - provider promClientProvider - cache *lru.Cache -} - -type promClientProvider interface { - GetClient(map[string]string) (*Client, error) -} - -func NewProviderCache(p promClientProvider) (*ProviderCache, error) { - cache, err := lru.New(500) - if err != nil { - return nil, err - } - - return &ProviderCache{ - provider: p, - cache: cache, - }, nil -} - -func (c *ProviderCache) GetClient(headers map[string]string) (*Client, error) { - key := c.key(headers) - if client, ok := c.cache.Get(key); ok { - return client.(*Client), nil - } - - client, err := c.provider.GetClient(headers) - if err != nil { - return nil, err - } - - c.cache.Add(key, client) - return client, nil -} - -func (c *ProviderCache) key(headers map[string]string) string { - vals := make([]string, len(headers)) - var i int - for _, v := range headers { - vals[i] = v - i++ - } - sort.Strings(vals) - return strings.Join(vals, "") -} diff --git a/pkg/tsdb/prometheus/client/cache_test.go b/pkg/tsdb/prometheus/client/cache_test.go deleted file mode 100644 index 0d73a2216a8e..000000000000 --- a/pkg/tsdb/prometheus/client/cache_test.go +++ /dev/null @@ -1,135 +0,0 @@ -package client_test - -import ( - "errors" - "io/ioutil" - "net/http" - "sort" - "strings" - "testing" - - "github.com/grafana/grafana/pkg/tsdb/prometheus/client" - - "github.com/stretchr/testify/require" -) - -func TestCache_GetClient(t *testing.T) { - t.Run("it caches the client for a set of auth headers", func(t *testing.T) { - tc := setupCacheContext() - - c, err := tc.providerCache.GetClient(headers) - require.Nil(t, err) - - c2, err := tc.providerCache.GetClient(headers) - require.Nil(t, err) - - require.Equal(t, c, c2) - require.Equal(t, 1, tc.clientProvider.numCalls) - }) - - t.Run("it returns different clients when the headers differ", func(t *testing.T) { - tc := setupCacheContext() - h1 := map[string]string{"Authorization": "token", "X-ID-Token": "id-token"} - h2 := map[string]string{"Authorization": "token2", "X-ID-Token": "id-token"} - - c, err := tc.providerCache.GetClient(h1) - require.Nil(t, err) - - c2, err := tc.providerCache.GetClient(h2) - require.Nil(t, err) - - require.NotEqual(t, c, c2) - require.Equal(t, 2, tc.clientProvider.numCalls) - }) - - t.Run("it returns from the cache when headers are the same", func(t *testing.T) { - tc := setupCacheContext() - h1 := map[string]string{"Authorization": "token", "X-ID-Token": "id-token"} - h2 := map[string]string{"Authorization": "token", "X-ID-Token": "id-token"} - - c, err := tc.providerCache.GetClient(h1) - require.Nil(t, err) - - c2, err := tc.providerCache.GetClient(h2) - require.Nil(t, err) - - require.Equal(t, c, c2) - require.Equal(t, 1, tc.clientProvider.numCalls) - }) - - t.Run("it doesn't cache anything when an error occurs", func(t *testing.T) { - tc := setupCacheContext() - tc.clientProvider.errors <- errors.New("something bad") - - _, err := tc.providerCache.GetClient(headers) - require.EqualError(t, err, "something bad") - - c, err := tc.providerCache.GetClient(headers) - require.Nil(t, err) - - require.NotNil(t, c) - require.Equal(t, 2, tc.clientProvider.numCalls) - }) -} - -type cacheTestContext struct { - providerCache *client.ProviderCache - clientProvider *fakeClientProvider -} - -func setupCacheContext() *cacheTestContext { - fp := newFakePromClientProvider() - p, err := client.NewProviderCache(fp) - if err != nil { - panic(err) - } - - return &cacheTestContext{ - providerCache: p, - clientProvider: fp, - } -} - -func newFakePromClientProvider() *fakeClientProvider { - return &fakeClientProvider{ - errors: make(chan error, 1), - } -} - -type fakeClientProvider struct { - headers map[string]string - numCalls int - errors chan error -} - -func (p *fakeClientProvider) GetClient(h map[string]string) (*client.Client, error) { - p.headers = h - p.numCalls++ - - var err error - select { - case err = <-p.errors: - default: - } - - var config []string - for _, v := range h { - config = append(config, v) - } - sort.Strings(config) //because map - res := &http.Response{ - StatusCode: 200, - Header: http.Header{}, - Body: ioutil.NopCloser(strings.NewReader(strings.Join(config, ","))), - } - c := &fakeClient{res: res} - return client.NewClient(c, "GET", "http://localhost:9090/"), err -} - -type fakeClient struct { - res *http.Response -} - -func (c *fakeClient) Do(req *http.Request) (*http.Response, error) { - return c.res, nil -} diff --git a/pkg/tsdb/prometheus/client/client.go b/pkg/tsdb/prometheus/client/client.go index f6df3de71912..73d8d096273c 100644 --- a/pkg/tsdb/prometheus/client/client.go +++ b/pkg/tsdb/prometheus/client/client.go @@ -18,6 +18,9 @@ type doer interface { Do(req *http.Request) (*http.Response, error) } +// Client is a custom Prometheus client. Reason for this is that Prom Go client serializes response into its own +// objects, we have to go through them and then serialize again into DataFrame which isn't very efficient. Using custom +// client we can parse response directly into DataFrame. type Client struct { doer doer method string @@ -28,101 +31,119 @@ func NewClient(d doer, method, baseUrl string) *Client { return &Client{doer: d, method: method, baseUrl: baseUrl} } -func (c *Client) QueryRange(ctx context.Context, q *models.Query) (*http.Response, error) { - u, err := url.ParseRequestURI(c.baseUrl) +func (c *Client) QueryRange(ctx context.Context, q *models.Query, headers http.Header) (*http.Response, error) { + tr := q.TimeRange() + u, err := c.createUrl("api/v1/query_range", map[string]string{ + "query": q.Expr, + "start": formatTime(tr.Start), + "end": formatTime(tr.End), + "step": strconv.FormatFloat(tr.Step.Seconds(), 'f', -1, 64), + }) if err != nil { return nil, err } - - u.Path = path.Join(u.Path, "api/v1/query_range") - - qs := u.Query() - qs.Set("query", q.Expr) - tr := q.TimeRange() - qs.Set("start", formatTime(tr.Start)) - qs.Set("end", formatTime(tr.End)) - qs.Set("step", strconv.FormatFloat(tr.Step.Seconds(), 'f', -1, 64)) - - return c.fetch(ctx, c.method, u, qs, nil) -} - -func (c *Client) QueryInstant(ctx context.Context, q *models.Query) (*http.Response, error) { - u, err := url.ParseRequestURI(c.baseUrl) + req, err := createRequest(ctx, c.method, u, nil, headers) if err != nil { return nil, err } - u.Path = path.Join(u.Path, "api/v1/query") + return c.doer.Do(req) +} - qs := u.Query() - qs.Set("query", q.Expr) +func (c *Client) QueryInstant(ctx context.Context, q *models.Query, headers http.Header) (*http.Response, error) { + qs := map[string]string{"query": q.Expr} tr := q.TimeRange() if !tr.End.IsZero() { - qs.Set("time", formatTime(tr.End)) + qs["time"] = formatTime(tr.End) } - return c.fetch(ctx, c.method, u, qs, nil) -} - -func (c *Client) QueryExemplars(ctx context.Context, q *models.Query) (*http.Response, error) { - u, err := url.ParseRequestURI(c.baseUrl) + u, err := c.createUrl("api/v1/query", qs) + if err != nil { + return nil, err + } + req, err := createRequest(ctx, c.method, u, nil, headers) if err != nil { return nil, err } - u.Path = path.Join(u.Path, "api/v1/query_exemplars") + return c.doer.Do(req) +} - qs := u.Query() +func (c *Client) QueryExemplars(ctx context.Context, q *models.Query, headers http.Header) (*http.Response, error) { tr := q.TimeRange() - qs.Set("query", q.Expr) - qs.Set("start", formatTime(tr.Start)) - qs.Set("end", formatTime(tr.End)) + u, err := c.createUrl("api/v1/query_exemplars", map[string]string{ + "query": q.Expr, + "start": formatTime(tr.Start), + "end": formatTime(tr.End), + }) + if err != nil { + return nil, err + } - return c.fetch(ctx, c.method, u, qs, nil) -} + req, err := createRequest(ctx, c.method, u, nil, headers) + if err != nil { + return nil, err + } -type FetchReq struct { - Method string - Url *url.URL - QueryString url.Values + return c.doer.Do(req) } func (c *Client) QueryResource(ctx context.Context, req *backend.CallResourceRequest) (*http.Response, error) { // The way URL is represented in CallResourceRequest and what we need for the fetch function is different // so here we have to do a bit of parsing, so we can then compose it with the base url in correct way. - baseUrlParsed, err := url.ParseRequestURI(c.baseUrl) + reqUrlParsed, err := url.Parse(req.URL) if err != nil { return nil, err } - reqUrlParsed, err := url.Parse(req.URL) + u, err := c.createUrl(req.Path, nil) if err != nil { return nil, err } + u.RawQuery = reqUrlParsed.RawQuery - baseUrlParsed.Path = path.Join(baseUrlParsed.Path, req.Path) - baseUrlParsed.RawQuery = reqUrlParsed.RawQuery + // We use method from the request, as for resources front end may do a fallback to GET if POST does not work + // nad we want to respect that. + httpRequest, err := createRequest(ctx, req.Method, u, req.Body, req.Headers) + if err != nil { + return nil, err + } - return c.fetch(ctx, req.Method, baseUrlParsed, nil, req.Body) + return c.doer.Do(httpRequest) } -func (c *Client) fetch(ctx context.Context, method string, u *url.URL, qs url.Values, body []byte) (*http.Response, error) { - // The qs arg seems to be used in some callers of this method, but you can already pass them in the URL object - if strings.ToUpper(method) == http.MethodGet && qs != nil { - u.RawQuery = qs.Encode() +func (c *Client) createUrl(endpoint string, qs map[string]string) (*url.URL, error) { + finalUrl, err := url.ParseRequestURI(c.baseUrl) + if err != nil { + return nil, err + } + + finalUrl.Path = path.Join(finalUrl.Path, endpoint) + urlQuery := finalUrl.Query() + + for key, val := range qs { + urlQuery.Set(key, val) } + + finalUrl.RawQuery = urlQuery.Encode() + return finalUrl, nil +} + +func createRequest(ctx context.Context, method string, u *url.URL, body []byte, header http.Header) (*http.Request, error) { bodyReader := bytes.NewReader(body) request, err := http.NewRequestWithContext(ctx, method, u.String(), bodyReader) if err != nil { return nil, err } - + // request.Header is created empty from NewRequestWithContext so we can just replace it + if header != nil { + request.Header = header + } // This may not be true but right now we don't have more information here and seems like we send just this type // of encoding right now if it is a POST if strings.ToUpper(method) == http.MethodPost { request.Header.Set("Content-Type", "application/x-www-form-urlencoded") } - - return c.doer.Do(request) + return request, nil } func formatTime(t time.Time) string { diff --git a/pkg/tsdb/prometheus/client/provider.go b/pkg/tsdb/prometheus/client/provider.go deleted file mode 100644 index 808f16300689..000000000000 --- a/pkg/tsdb/prometheus/client/provider.go +++ /dev/null @@ -1,93 +0,0 @@ -package client - -import ( - "net/http" - - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/setting" - "github.com/grafana/grafana/pkg/tsdb/prometheus/middleware" - "github.com/grafana/grafana/pkg/util/maputil" - - sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" - "github.com/grafana/grafana/pkg/infra/httpclient" - "github.com/grafana/grafana/pkg/infra/log" -) - -type Provider struct { - settings backend.DataSourceInstanceSettings - jsonData map[string]interface{} - httpMethod string - clientProvider httpclient.Provider - cfg *setting.Cfg - features featuremgmt.FeatureToggles - log log.Logger -} - -func NewProvider( - settings backend.DataSourceInstanceSettings, - jsonData map[string]interface{}, - clientProvider httpclient.Provider, - cfg *setting.Cfg, - features featuremgmt.FeatureToggles, - log log.Logger, -) *Provider { - httpMethod, _ := maputil.GetStringOptional(jsonData, "httpMethod") - if httpMethod == "" { - httpMethod = http.MethodPost - } - return &Provider{ - settings: settings, - jsonData: jsonData, - httpMethod: httpMethod, - clientProvider: clientProvider, - cfg: cfg, - features: features, - log: log, - } -} - -func (p *Provider) GetClient(headers map[string]string) (*Client, error) { - opts, err := p.settings.HTTPClientOptions() - if err != nil { - return nil, err - } - - opts.Middlewares = p.middlewares() - opts.Headers = reqHeaders(headers) - - // Set SigV4 service namespace - if opts.SigV4 != nil { - opts.SigV4.Service = "aps" - } - - // Azure authentication - err = p.configureAzureAuthentication(&opts) - if err != nil { - return nil, err - } - - httpClient, err := p.clientProvider.New(opts) - if err != nil { - return nil, err - } - - return NewClient(httpClient, p.httpMethod, p.settings.URL), nil -} - -func (p *Provider) middlewares() []sdkhttpclient.Middleware { - middlewares := []sdkhttpclient.Middleware{ - middleware.CustomQueryParameters(p.log), - sdkhttpclient.CustomHeadersMiddleware(), - } - return middlewares -} - -func reqHeaders(headers map[string]string) map[string]string { - // copy to avoid changing the original map - h := make(map[string]string, len(headers)) - for k, v := range headers { - h[k] = v - } - return h -} diff --git a/pkg/tsdb/prometheus/client/provider_azure.go b/pkg/tsdb/prometheus/client/provider_azure.go deleted file mode 100644 index b3ee277c7e8f..000000000000 --- a/pkg/tsdb/prometheus/client/provider_azure.go +++ /dev/null @@ -1,113 +0,0 @@ -package client - -import ( - "fmt" - "net/url" - "path" - - "github.com/grafana/grafana-azure-sdk-go/azcredentials" - "github.com/grafana/grafana-azure-sdk-go/azhttpclient" - "github.com/grafana/grafana-azure-sdk-go/azsettings" - sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" - - "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/util/maputil" -) - -var ( - azurePrometheusScopes = map[string][]string{ - azsettings.AzurePublic: {"https://prometheus.monitor.azure.com/.default"}, - azsettings.AzureChina: {"https://prometheus.monitor.chinacloudapp.cn/.default"}, - azsettings.AzureUSGovernment: {"https://prometheus.monitor.usgovcloudapi.net/.default"}, - } -) - -func (p *Provider) configureAzureAuthentication(opts *sdkhttpclient.Options) error { - // Azure authentication is experimental (#35857) - if !p.features.IsEnabled(featuremgmt.FlagPrometheusAzureAuth) { - return nil - } - - credentials, err := azcredentials.FromDatasourceData(p.jsonData, p.settings.DecryptedSecureJSONData) - if err != nil { - err = fmt.Errorf("invalid Azure credentials: %w", err) - return err - } - - if credentials != nil { - var scopes []string - - if scopes, err = getOverriddenScopes(p.jsonData); err != nil { - return err - } - - if scopes == nil { - if scopes, err = getPrometheusScopes(p.cfg.Azure, credentials); err != nil { - return err - } - } - - azhttpclient.AddAzureAuthentication(opts, p.cfg.Azure, credentials, scopes) - } - - return nil -} - -func getOverriddenScopes(jsonData map[string]interface{}) ([]string, error) { - resourceIdStr, err := maputil.GetStringOptional(jsonData, "azureEndpointResourceId") - if err != nil { - err = fmt.Errorf("overridden resource ID (audience) invalid") - return nil, err - } else if resourceIdStr == "" { - return nil, nil - } - - resourceId, err := url.Parse(resourceIdStr) - if err != nil || resourceId.Scheme == "" || resourceId.Host == "" { - err = fmt.Errorf("overridden endpoint resource ID (audience) '%s' invalid", resourceIdStr) - return nil, err - } - - resourceId.Path = path.Join(resourceId.Path, ".default") - scopes := []string{resourceId.String()} - return scopes, nil -} - -func getPrometheusScopes(settings *azsettings.AzureSettings, credentials azcredentials.AzureCredentials) ([]string, error) { - // Extract cloud from credentials - azureCloud, err := getAzureCloudFromCredentials(settings, credentials) - if err != nil { - return nil, err - } - - // Get scopes for the given cloud - if scopes, ok := azurePrometheusScopes[azureCloud]; !ok { - err := fmt.Errorf("the Azure cloud '%s' not supported by Prometheus datasource", azureCloud) - return nil, err - } else { - return scopes, nil - } -} - -// To be part of grafana-azure-sdk-go -func getAzureCloudFromCredentials(settings *azsettings.AzureSettings, credentials azcredentials.AzureCredentials) (string, error) { - switch c := credentials.(type) { - case *azcredentials.AzureManagedIdentityCredentials: - // In case of managed identity, the cloud is always same as where Grafana is hosted - return getDefaultAzureCloud(settings), nil - case *azcredentials.AzureClientSecretCredentials: - return c.AzureCloud, nil - default: - err := fmt.Errorf("the Azure credentials of type '%s' not supported by Prometheus datasource", c.AzureAuthType()) - return "", err - } -} - -// To be part of grafana-azure-sdk-go -func getDefaultAzureCloud(settings *azsettings.AzureSettings) string { - cloudName := settings.Cloud - if cloudName == "" { - return azsettings.AzurePublic - } - return cloudName -} diff --git a/pkg/tsdb/prometheus/client/provider_azure_test.go b/pkg/tsdb/prometheus/client/provider_azure_test.go deleted file mode 100644 index faf45cd59ed0..000000000000 --- a/pkg/tsdb/prometheus/client/provider_azure_test.go +++ /dev/null @@ -1,153 +0,0 @@ -package client - -import ( - "testing" - - "github.com/grafana/grafana-azure-sdk-go/azsettings" - "github.com/grafana/grafana-plugin-sdk-go/backend" - sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" - - "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/setting" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestConfigureAzureAuthentication(t *testing.T) { - cfg := &setting.Cfg{ - Azure: &azsettings.AzureSettings{}, - } - settings := backend.DataSourceInstanceSettings{} - - t.Run("given feature flag enabled", func(t *testing.T) { - features := featuremgmt.WithFeatures(featuremgmt.FlagPrometheusAzureAuth) - - t.Run("should set Azure middleware when JsonData contains valid credentials", func(t *testing.T) { - jsonData := map[string]interface{}{ - "httpMethod": "POST", - "azureCredentials": map[string]interface{}{ - "authType": "msi", - }, - } - - var p = NewProvider(settings, jsonData, nil, cfg, features, nil) - - var opts = &sdkhttpclient.Options{CustomOptions: map[string]interface{}{}} - - err := p.configureAzureAuthentication(opts) - require.NoError(t, err) - - require.NotNil(t, opts.Middlewares) - assert.Len(t, opts.Middlewares, 1) - }) - - t.Run("should not set Azure middleware when JsonData doesn't contain valid credentials", func(t *testing.T) { - jsonData := map[string]interface{}{ - "httpMethod": "POST", - } - - var p = NewProvider(settings, jsonData, nil, cfg, features, nil) - - var opts = &sdkhttpclient.Options{CustomOptions: map[string]interface{}{}} - - err := p.configureAzureAuthentication(opts) - require.NoError(t, err) - - assert.NotContains(t, opts.CustomOptions, "_azureCredentials") - }) - - t.Run("should return error when JsonData contains invalid credentials", func(t *testing.T) { - jsonData := map[string]interface{}{ - "httpMethod": "POST", - "azureCredentials": "invalid", - } - - var p = NewProvider(settings, jsonData, nil, cfg, features, nil) - - var opts = &sdkhttpclient.Options{CustomOptions: map[string]interface{}{}} - - err := p.configureAzureAuthentication(opts) - assert.Error(t, err) - }) - - t.Run("should set Azure middleware when JsonData contains credentials and valid audience", func(t *testing.T) { - jsonData := map[string]interface{}{ - "httpMethod": "POST", - "azureCredentials": map[string]interface{}{ - "authType": "msi", - }, - "azureEndpointResourceId": "https://api.example.com/abd5c4ce-ca73-41e9-9cb2-bed39aa2adb5", - } - - var p = NewProvider(settings, jsonData, nil, cfg, features, nil) - - var opts = &sdkhttpclient.Options{CustomOptions: map[string]interface{}{}} - - err := p.configureAzureAuthentication(opts) - require.NoError(t, err) - - require.NotNil(t, opts.Middlewares) - assert.Len(t, opts.Middlewares, 1) - }) - - t.Run("should not set Azure middleware when JsonData doesn't contain credentials", func(t *testing.T) { - jsonData := map[string]interface{}{ - "httpMethod": "POST", - "azureEndpointResourceId": "https://api.example.com/abd5c4ce-ca73-41e9-9cb2-bed39aa2adb5", - } - - var p = NewProvider(settings, jsonData, nil, cfg, features, nil) - - var opts = &sdkhttpclient.Options{CustomOptions: map[string]interface{}{}} - - err := p.configureAzureAuthentication(opts) - require.NoError(t, err) - - if opts.Middlewares != nil { - assert.Len(t, opts.Middlewares, 0) - } - }) - - t.Run("should return error when JsonData contains invalid audience", func(t *testing.T) { - jsonData := map[string]interface{}{ - "httpMethod": "POST", - "azureCredentials": map[string]interface{}{ - "authType": "msi", - }, - "azureEndpointResourceId": "invalid", - } - - var p = NewProvider(settings, jsonData, nil, cfg, features, nil) - - var opts = &sdkhttpclient.Options{CustomOptions: map[string]interface{}{}} - - err := p.configureAzureAuthentication(opts) - assert.Error(t, err) - }) - }) - - t.Run("given feature flag not enabled", func(t *testing.T) { - features := featuremgmt.WithFeatures() - - t.Run("should not set Azure Credentials even when JsonData contains credentials", func(t *testing.T) { - jsonData := map[string]interface{}{ - "httpMethod": "POST", - "azureCredentials": map[string]interface{}{ - "authType": "msi", - }, - "azureEndpointResourceId": "https://api.example.com/abd5c4ce-ca73-41e9-9cb2-bed39aa2adb5", - } - - var p = NewProvider(settings, jsonData, nil, cfg, features, nil) - - var opts = &sdkhttpclient.Options{CustomOptions: map[string]interface{}{}} - - err := p.configureAzureAuthentication(opts) - require.NoError(t, err) - - if opts.Middlewares != nil { - assert.Len(t, opts.Middlewares, 0) - } - }) - }) -} diff --git a/pkg/tsdb/prometheus/client/provider_test.go b/pkg/tsdb/prometheus/client/provider_test.go deleted file mode 100644 index 400eb3438356..000000000000 --- a/pkg/tsdb/prometheus/client/provider_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package client_test - -import ( - "encoding/json" - "net/http" - "testing" - - "github.com/grafana/grafana-plugin-sdk-go/backend" - sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" - "github.com/grafana/grafana/pkg/infra/httpclient" - "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/setting" - "github.com/grafana/grafana/pkg/tsdb/prometheus/client" - - "github.com/stretchr/testify/require" -) - -var headers = map[string]string{"Authorization": "token", "X-ID-Token": "id-token"} - -func TestGetClient(t *testing.T) { - t.Run("it sets the SigV4 service if it exists", func(t *testing.T) { - tc := setup(`{"sigV4Auth":true}`) - - setting.SigV4AuthEnabled = true - defer func() { setting.SigV4AuthEnabled = false }() - - _, err := tc.clientProvider.GetClient(headers) - require.Nil(t, err) - - require.Equal(t, "aps", tc.httpProvider.opts.SigV4.Service) - }) - - t.Run("it always uses the custom params and custom headers middlewares", func(t *testing.T) { - tc := setup() - - _, err := tc.clientProvider.GetClient(headers) - require.Nil(t, err) - - require.Len(t, tc.httpProvider.middlewares(), 2) - require.Contains(t, tc.httpProvider.middlewares(), "prom-custom-query-parameters") - require.Contains(t, tc.httpProvider.middlewares(), "CustomHeaders") - }) - - t.Run("extra headers", func(t *testing.T) { - t.Run("it sets the headers when 'oauthPassThru' is true and auth headers are passed", func(t *testing.T) { - tc := setup(`{"oauthPassThru":true}`) - _, err := tc.clientProvider.GetClient(headers) - require.Nil(t, err) - - require.Equal(t, headers, tc.httpProvider.opts.Headers) - }) - - t.Run("it sets all headers", func(t *testing.T) { - withNonAuth := map[string]string{"X-Not-Auth": "stuff"} - - tc := setup(`{"oauthPassThru":true}`) - _, err := tc.clientProvider.GetClient(withNonAuth) - require.Nil(t, err) - - require.Equal(t, map[string]string{"X-Not-Auth": "stuff"}, tc.httpProvider.opts.Headers) - }) - - t.Run("it does not error when headers are nil", func(t *testing.T) { - tc := setup(`{"oauthPassThru":true}`) - - _, err := tc.clientProvider.GetClient(nil) - require.Nil(t, err) - }) - }) -} - -func setup(jsonData ...string) *testContext { - var rawData []byte - if len(jsonData) > 0 { - rawData = []byte(jsonData[0]) - } - - var jd map[string]interface{} - _ = json.Unmarshal(rawData, &jd) - - cfg := &setting.Cfg{} - settings := backend.DataSourceInstanceSettings{URL: "test-url", JSONData: rawData} - features := featuremgmt.WithFeatures() - hp := &fakeHttpClientProvider{} - p := client.NewProvider(settings, jd, hp, cfg, features, nil) - - return &testContext{ - httpProvider: hp, - clientProvider: p, - } -} - -type testContext struct { - httpProvider *fakeHttpClientProvider - clientProvider *client.Provider -} - -type fakeHttpClientProvider struct { - httpclient.Provider - - opts sdkhttpclient.Options -} - -func (p *fakeHttpClientProvider) New(opts ...sdkhttpclient.Options) (*http.Client, error) { - p.opts = opts[0] - return sdkhttpclient.New(opts[0]) -} - -func (p *fakeHttpClientProvider) GetTransport(opts ...sdkhttpclient.Options) (http.RoundTripper, error) { - p.opts = opts[0] - return http.DefaultTransport, nil -} - -func (p *fakeHttpClientProvider) middlewares() []string { - var middlewareNames []string - for _, m := range p.opts.Middlewares { - mw, ok := m.(sdkhttpclient.MiddlewareName) - if !ok { - panic("unexpected middleware type") - } - - middlewareNames = append(middlewareNames, mw.MiddlewareName()) - } - return middlewareNames -} diff --git a/pkg/tsdb/prometheus/prometheus.go b/pkg/tsdb/prometheus/prometheus.go index 4febc2229250..200d85402ad2 100644 --- a/pkg/tsdb/prometheus/prometheus.go +++ b/pkg/tsdb/prometheus/prometheus.go @@ -43,26 +43,28 @@ func ProvideService(httpClientProvider httpclient.Provider, cfg *setting.Cfg, fe func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) datasource.InstanceFactoryFunc { return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) { // Creates a http roundTripper. Probably should be used for both buffered and streaming/querydata instances. - opts, err := buffered.CreateTransportOptions(settings, cfg, features, plog) + opts, err := buffered.CreateTransportOptions(settings, cfg.Azure, features, plog) if err != nil { return nil, fmt.Errorf("error creating transport options: %v", err) } - roundTripper, err := httpClientProvider.GetTransport(*opts) + httpClient, err := httpClientProvider.New(*opts) if err != nil { return nil, fmt.Errorf("error creating http client: %v", err) } - - b, err := buffered.New(roundTripper, tracer, settings, plog) + // Older version using standard Go Prometheus client + b, err := buffered.New(httpClient.Transport, tracer, settings, plog) if err != nil { return nil, err } - qd, err := querydata.New(httpClientProvider, cfg, features, tracer, settings, plog) + // New version using custom client and better response parsing + qd, err := querydata.New(httpClient, features, tracer, settings, plog) if err != nil { return nil, err } - r, err := resource.New(httpClientProvider, cfg, features, settings, plog) + // Resource call management using new custom client same as querydata + r, err := resource.New(httpClient, settings, plog) if err != nil { return nil, err } @@ -98,19 +100,12 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq return err } - statusCode, bytes, err := i.resource.Execute(ctx, req) - body := bytes + resp, err := i.resource.Execute(ctx, req) if err != nil { - body = []byte(err.Error()) + return err } - return sender.Send(&backend.CallResourceResponse{ - Status: statusCode, - Headers: map[string][]string{ - "content-type": {"application/json"}, - }, - Body: body, - }) + return sender.Send(resp) } func (s *Service) getInstance(pluginCtx backend.PluginContext) (*instance, error) { diff --git a/pkg/tsdb/prometheus/prometheus_test.go b/pkg/tsdb/prometheus/prometheus_test.go index 29a496c01398..94ed8acf208f 100644 --- a/pkg/tsdb/prometheus/prometheus_test.go +++ b/pkg/tsdb/prometheus/prometheus_test.go @@ -2,6 +2,7 @@ package prometheus import ( "context" + "io" "net/http" "testing" "time" @@ -21,9 +22,12 @@ func (sender *fakeSender) Send(resp *backend.CallResourceResponse) error { return nil } -type fakeRoundtripper struct{} +type fakeRoundtripper struct { + Req *http.Request +} func (rt *fakeRoundtripper) RoundTrip(req *http.Request) (*http.Response, error) { + rt.Req = req return &http.Response{ Status: "200", StatusCode: 200, @@ -35,13 +39,13 @@ func (rt *fakeRoundtripper) RoundTrip(req *http.Request) (*http.Response, error) type fakeHTTPClientProvider struct { httpclient.Provider - opts []sdkHttpClient.Options + Roundtripper *fakeRoundtripper } func (provider *fakeHTTPClientProvider) New(opts ...sdkHttpClient.Options) (*http.Client, error) { client := &http.Client{} - client.Transport = &fakeRoundtripper{} - provider.opts = opts + provider.Roundtripper = &fakeRoundtripper{} + client.Transport = provider.Roundtripper return client, nil } @@ -49,15 +53,10 @@ func (provider *fakeHTTPClientProvider) GetTransport(opts ...sdkHttpClient.Optio return &fakeRoundtripper{}, nil } -func TestClient(t *testing.T) { +func TestService(t *testing.T) { t.Run("Service", func(t *testing.T) { t.Run("CallResource", func(t *testing.T) { - t.Run("Adds correct headers", func(t *testing.T) { - // Not a great test as right now we put the headers form request and custom headers into a client during - // it's creation and then relly on sdk middleware to add it to the request later on. This is hard to - // test so for now this just checks if the correct headers are passed to the http client not whether - // they are actually added to the request. - + t.Run("creates correct request", func(t *testing.T) { httpProvider := &fakeHTTPClientProvider{} service := &Service{ im: datasource.NewInstanceManager(newInstanceSettings(httpProvider, &setting.Cfg{}, &featuremgmt.FeatureManager{}, nil)), @@ -79,18 +78,14 @@ func TestClient(t *testing.T) { Database: "", BasicAuthEnabled: true, BasicAuthUser: "admin", - // Custom headers from data source configuration - JSONData: []byte("{\"httpHeaderName1\":\"x-tenant-id\"}"), - DecryptedSecureJSONData: map[string]string{ - "httpHeaderValue1": "0987", - }, - Updated: time.Time{}, + Updated: time.Time{}, + JSONData: []byte("{}"), }, }, Path: "/api/v1/series", Method: http.MethodPost, URL: "/api/v1/series", - // Header in the request itself sent from the browser + // This header should be passed on to the resource request Headers: map[string][]string{ "foo": {"bar"}, }, @@ -100,7 +95,12 @@ func TestClient(t *testing.T) { sender := &fakeSender{} err := service.CallResource(context.Background(), req, sender) require.NoError(t, err) - require.Equal(t, map[string]string{"X-Tenant-Id": "0987", "foo": "bar"}, httpProvider.opts[0].Headers) + require.Equal(t, http.Header{"Content-Type": {"application/x-www-form-urlencoded"}, "foo": {"bar"}}, httpProvider.Roundtripper.Req.Header) + require.Equal(t, http.MethodPost, httpProvider.Roundtripper.Req.Method) + body, err := io.ReadAll(httpProvider.Roundtripper.Req.Body) + require.NoError(t, err) + require.Equal(t, []byte("match%5B%5D: ALERTS\nstart: 1655271408\nend: 1655293008"), body) + require.Equal(t, "http://localhost:9090/api/v1/series", httpProvider.Roundtripper.Req.URL.String()) }) }) }) diff --git a/pkg/tsdb/prometheus/querydata/framing_test.go b/pkg/tsdb/prometheus/querydata/framing_test.go index 66b7afa37470..a551c1f2080f 100644 --- a/pkg/tsdb/prometheus/querydata/framing_test.go +++ b/pkg/tsdb/prometheus/querydata/framing_test.go @@ -118,7 +118,10 @@ func loadStoredQuery(fileName string) (*backend.QueryDataRequest, error) { } func runQuery(response []byte, q *backend.QueryDataRequest, wide bool) (*backend.QueryDataResponse, error) { - tCtx := setup(wide) + tCtx, err := setup(wide) + if err != nil { + return nil, err + } res := &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(response)), diff --git a/pkg/tsdb/prometheus/querydata/prometeus_bench_test.go b/pkg/tsdb/prometheus/querydata/prometeus_bench_test.go index 0f610b4a4d20..564d4ea63089 100644 --- a/pkg/tsdb/prometheus/querydata/prometeus_bench_test.go +++ b/pkg/tsdb/prometheus/querydata/prometeus_bench_test.go @@ -22,7 +22,9 @@ import ( // - go tool pprof -http=localhost:6061 memprofile.out func BenchmarkJson(b *testing.B) { body, q := createJsonTestData(1642000000, 1, 300, 400) - tCtx := setup(true) + tCtx, err := setup(true) + require.NoError(b, err) + b.ResetTimer() for n := 0; n < b.N; n++ { res := http.Response{ diff --git a/pkg/tsdb/prometheus/querydata/request.go b/pkg/tsdb/prometheus/querydata/request.go index 2e7ebac3c459..0269b05c3802 100644 --- a/pkg/tsdb/prometheus/querydata/request.go +++ b/pkg/tsdb/prometheus/querydata/request.go @@ -2,21 +2,19 @@ package querydata import ( "context" - "encoding/json" - "fmt" + "net/http" "regexp" "time" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" - "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb/intervalv2" "github.com/grafana/grafana/pkg/tsdb/prometheus/client" "github.com/grafana/grafana/pkg/tsdb/prometheus/models" + "github.com/grafana/grafana/pkg/tsdb/prometheus/utils" "github.com/grafana/grafana/pkg/util/maputil" "go.opentelemetry.io/otel/attribute" ) @@ -25,18 +23,18 @@ const legendFormatAuto = "__auto" var legendFormatRegexp = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`) -type clientGetter func(map[string]string) (*client.Client, error) - type ExemplarEvent struct { Time time.Time Value float64 Labels map[string]string } +// QueryData handles querying but different from buffered package uses a custom client instead of default Go Prom +// client. type QueryData struct { intervalCalculator intervalv2.Calculator tracer tracing.Tracer - getClient clientGetter + client *client.Client log log.Logger ID int64 URL string @@ -45,34 +43,30 @@ type QueryData struct { } func New( - httpClientProvider httpclient.Provider, - cfg *setting.Cfg, + httpClient *http.Client, features featuremgmt.FeatureToggles, tracer tracing.Tracer, settings backend.DataSourceInstanceSettings, plog log.Logger, ) (*QueryData, error) { - var jsonData map[string]interface{} - if err := json.Unmarshal(settings.JSONData, &jsonData); err != nil { - return nil, fmt.Errorf("error reading settings: %w", err) - } - - timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval") + jsonData, err := utils.GetJsonData(settings) if err != nil { return nil, err } + httpMethod, _ := maputil.GetStringOptional(jsonData, "httpMethod") - p := client.NewProvider(settings, jsonData, httpClientProvider, cfg, features, plog) - pc, err := client.NewProviderCache(p) + timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval") if err != nil { return nil, err } + promClient := client.NewClient(httpClient, httpMethod, settings.URL) + return &QueryData{ intervalCalculator: intervalv2.NewCalculator(), tracer: tracer, log: plog, - getClient: pc.GetClient, + client: promClient, TimeInterval: timeInterval, ID: settings.ID, URL: settings.URL, @@ -86,17 +80,12 @@ func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest) Responses: backend.Responses{}, } - client, err := s.getClient(req.Headers) - if err != nil { - return &result, err - } - for _, q := range req.Queries { query, err := models.Parse(q, s.TimeInterval, s.intervalCalculator, fromAlert) if err != nil { return &result, err } - r, err := s.fetch(ctx, client, query) + r, err := s.fetch(ctx, s.client, query, req.Headers) if err != nil { return &result, err } @@ -110,11 +99,11 @@ func (s *QueryData) Execute(ctx context.Context, req *backend.QueryDataRequest) return &result, nil } -func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.Query) (*backend.DataResponse, error) { +func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models.Query, headers map[string]string) (*backend.DataResponse, error) { s.log.Debug("Sending query", "start", q.Start, "end", q.End, "step", q.Step, "query", q.Expr) - traceCtx, span := s.trace(ctx, q) - defer span.End() + traceCtx, end := s.trace(ctx, q) + defer end() response := &backend.DataResponse{ Frames: data.Frames{}, @@ -122,7 +111,7 @@ func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models. } if q.RangeQuery { - res, err := s.rangeQuery(traceCtx, client, q) + res, err := s.rangeQuery(traceCtx, client, q, headers) if err != nil { return nil, err } @@ -130,7 +119,7 @@ func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models. } if q.InstantQuery { - res, err := s.instantQuery(traceCtx, client, q) + res, err := s.instantQuery(traceCtx, client, q, headers) if err != nil { return nil, err } @@ -138,7 +127,7 @@ func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models. } if q.ExemplarQuery { - res, err := s.exemplarQuery(traceCtx, client, q) + res, err := s.exemplarQuery(traceCtx, client, q, headers) if err != nil { // If exemplar query returns error, we want to only log it and // continue with other results processing @@ -152,34 +141,42 @@ func (s *QueryData) fetch(ctx context.Context, client *client.Client, q *models. return response, nil } -func (s *QueryData) rangeQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) { - res, err := c.QueryRange(ctx, q) +func (s *QueryData) rangeQuery(ctx context.Context, c *client.Client, q *models.Query, headers map[string]string) (*backend.DataResponse, error) { + res, err := c.QueryRange(ctx, q, sdkHeaderToHttpHeader(headers)) if err != nil { return nil, err } return s.parseResponse(ctx, q, res) } -func (s *QueryData) instantQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) { - res, err := c.QueryInstant(ctx, q) +func (s *QueryData) instantQuery(ctx context.Context, c *client.Client, q *models.Query, headers map[string]string) (*backend.DataResponse, error) { + res, err := c.QueryInstant(ctx, q, sdkHeaderToHttpHeader(headers)) if err != nil { return nil, err } return s.parseResponse(ctx, q, res) } -func (s *QueryData) exemplarQuery(ctx context.Context, c *client.Client, q *models.Query) (*backend.DataResponse, error) { - res, err := c.QueryExemplars(ctx, q) +func (s *QueryData) exemplarQuery(ctx context.Context, c *client.Client, q *models.Query, headers map[string]string) (*backend.DataResponse, error) { + res, err := c.QueryExemplars(ctx, q, sdkHeaderToHttpHeader(headers)) if err != nil { return nil, err } return s.parseResponse(ctx, q, res) } -func (s *QueryData) trace(ctx context.Context, q *models.Query) (context.Context, tracing.Span) { - traceCtx, span := s.tracer.Start(ctx, "datasource.prometheus") - span.SetAttributes("expr", q.Expr, attribute.Key("expr").String(q.Expr)) - span.SetAttributes("start_unixnano", q.Start, attribute.Key("start_unixnano").Int64(q.Start.UnixNano())) - span.SetAttributes("stop_unixnano", q.End, attribute.Key("stop_unixnano").Int64(q.End.UnixNano())) - return traceCtx, span +func (s *QueryData) trace(ctx context.Context, q *models.Query) (context.Context, func()) { + return utils.StartTrace(ctx, s.tracer, "datasource.prometheus", []utils.Attribute{ + {Key: "expr", Value: q.Expr, Kv: attribute.Key("expr").String(q.Expr)}, + {Key: "start_unixnano", Value: q.Start, Kv: attribute.Key("start_unixnano").Int64(q.Start.UnixNano())}, + {Key: "stop_unixnano", Value: q.End, Kv: attribute.Key("stop_unixnano").Int64(q.End.UnixNano())}, + }) +} + +func sdkHeaderToHttpHeader(headers map[string]string) http.Header { + httpHeader := make(http.Header) + for key, val := range headers { + httpHeader[key] = []string{val} + } + return httpHeader } diff --git a/pkg/tsdb/prometheus/querydata/request_test.go b/pkg/tsdb/prometheus/querydata/request_test.go index ce9176c290e1..1092fc8e2e8a 100644 --- a/pkg/tsdb/prometheus/querydata/request_test.go +++ b/pkg/tsdb/prometheus/querydata/request_test.go @@ -10,12 +10,13 @@ import ( "testing" "time" + "github.com/grafana/grafana-azure-sdk-go/azsettings" "github.com/grafana/grafana-plugin-sdk-go/backend" sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/tracing" - "github.com/grafana/grafana/pkg/setting" + "github.com/grafana/grafana/pkg/tsdb/prometheus/buffered" "github.com/grafana/grafana/pkg/tsdb/prometheus/models" "github.com/grafana/grafana/pkg/tsdb/prometheus/querydata" apiv1 "github.com/prometheus/client_golang/api/prometheus/v1" @@ -58,7 +59,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { }, } - tctx := setup(true) + tctx, err := setup(true) + require.NoError(t, err) qm := models.QueryModel{ LegendFormat: "legend {{app}}", @@ -119,7 +121,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { }, JSON: b, } - tctx := setup(true) + tctx, err := setup(true) + require.NoError(t, err) res, err := execute(tctx, query, result) require.NoError(t, err) @@ -165,7 +168,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { }, JSON: b, } - tctx := setup(true) + tctx, err := setup(true) + require.NoError(t, err) res, err := execute(tctx, query, result) require.NoError(t, err) @@ -207,7 +211,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { }, JSON: b, } - tctx := setup(true) + tctx, err := setup(true) + require.NoError(t, err) res, err := execute(tctx, query, result) require.NoError(t, err) @@ -248,7 +253,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { JSON: b, } - tctx := setup(true) + tctx, err := setup(true) + require.NoError(t, err) res, err := execute(tctx, query, result) require.NoError(t, err) @@ -277,7 +283,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { query := backend.DataQuery{ JSON: b, } - tctx := setup(true) + tctx, err := setup(true) + require.NoError(t, err) res, err := execute(tctx, query, qr) require.NoError(t, err) @@ -315,7 +322,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) { query := backend.DataQuery{ JSON: b, } - tctx := setup(true) + tctx, err := setup(true) + require.NoError(t, err) res, err := execute(tctx, query, qr) require.NoError(t, err) @@ -389,7 +397,7 @@ type testContext struct { queryData *querydata.QueryData } -func setup(wideFrames bool) *testContext { +func setup(wideFrames bool) (*testContext, error) { tracer := tracing.InitializeTracerForTest() httpProvider := &fakeHttpClientProvider{ opts: sdkhttpclient.Options{ @@ -400,19 +408,29 @@ func setup(wideFrames bool) *testContext { Body: ioutil.NopCloser(bytes.NewReader([]byte(`{}`))), }, } - queryData, _ := querydata.New( - httpProvider, - setting.NewCfg(), - &fakeFeatureToggles{flags: map[string]bool{"prometheusStreamingJSONParser": true, "prometheusWideSeries": wideFrames}}, - tracer, - backend.DataSourceInstanceSettings{URL: "http://localhost:9090", JSONData: json.RawMessage(`{"timeInterval": "15s"}`)}, - &fakeLogger{}, - ) + settings := backend.DataSourceInstanceSettings{ + URL: "http://localhost:9090", + JSONData: json.RawMessage(`{"timeInterval": "15s"}`), + } + + features := &fakeFeatureToggles{flags: map[string]bool{"prometheusStreamingJSONParser": true, "prometheusWideSeries": wideFrames}} + + opts, err := buffered.CreateTransportOptions(settings, &azsettings.AzureSettings{}, features, &fakeLogger{}) + if err != nil { + return nil, err + } + + httpClient, err := httpProvider.New(*opts) + if err != nil { + return nil, err + } + + queryData, _ := querydata.New(httpClient, features, tracer, settings, &fakeLogger{}) return &testContext{ httpProvider: httpProvider, queryData: queryData, - } + }, nil } type fakeFeatureToggles struct { diff --git a/pkg/tsdb/prometheus/resource/resource.go b/pkg/tsdb/prometheus/resource/resource.go index 4c58be8d5ced..6526e9ca9601 100644 --- a/pkg/tsdb/prometheus/resource/resource.go +++ b/pkg/tsdb/prometheus/resource/resource.go @@ -1,27 +1,21 @@ package resource import ( - "compress/gzip" "context" - "encoding/json" "fmt" - "io" "io/ioutil" "net/http" - "strings" "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" - "github.com/grafana/grafana/pkg/services/featuremgmt" - "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb/prometheus/client" + "github.com/grafana/grafana/pkg/tsdb/prometheus/utils" + "github.com/grafana/grafana/pkg/util/maputil" ) type Resource struct { - provider *client.Provider - log log.Logger - customHeaders map[string]string + promClient *client.Client + log log.Logger } // Hop-by-hop headers. These are removed when sent to the backend. @@ -55,113 +49,49 @@ func delStopHeaders(header http.Header) { } } -func addHeaders(header http.Header, toAdd map[string]string) { - for k, v := range toAdd { - header.Add(k, v) - } -} - -func normalizeReqHeaders(headers map[string][]string) map[string]string { - h := make(map[string]string, len(headers)) - for k, v := range headers { - h[k] = strings.Join(v, ",") - } - return h -} - func New( - httpClientProvider httpclient.Provider, - cfg *setting.Cfg, - features featuremgmt.FeatureToggles, + httpClient *http.Client, settings backend.DataSourceInstanceSettings, plog log.Logger, ) (*Resource, error) { - var jsonData map[string]interface{} - if err := json.Unmarshal(settings.JSONData, &jsonData); err != nil { - return nil, fmt.Errorf("error reading settings: %w", err) - } - - customHeaders := make(map[string]string) - var jsonDataMap map[string]interface{} - - err := json.Unmarshal(settings.JSONData, &jsonDataMap) + jsonData, err := utils.GetJsonData(settings) if err != nil { return nil, err } - - index := 1 - for { - headerNameSuffix := fmt.Sprintf("httpHeaderName%d", index) - headerValueSuffix := fmt.Sprintf("httpHeaderValue%d", index) - - key := jsonDataMap[headerNameSuffix] - if key == nil { - // No (more) header values are available - break - } - - if val, ok := settings.DecryptedSecureJSONData[headerValueSuffix]; ok { - switch k := key.(type) { - case string: - customHeaders[k] = val - } - } - index++ - } + httpMethod, _ := maputil.GetStringOptional(jsonData, "httpMethod") return &Resource{ - log: plog, - provider: client.NewProvider(settings, jsonData, httpClientProvider, cfg, features, plog), - customHeaders: customHeaders, + log: plog, + promClient: client.NewClient(httpClient, httpMethod, settings.URL), }, nil } -func (r *Resource) Execute(ctx context.Context, req *backend.CallResourceRequest) (int, []byte, error) { +func (r *Resource) Execute(ctx context.Context, req *backend.CallResourceRequest) (*backend.CallResourceResponse, error) { delHopHeaders(req.Headers) delStopHeaders(req.Headers) - addHeaders(req.Headers, r.customHeaders) - client, err := r.provider.GetClient(normalizeReqHeaders(req.Headers)) - if err != nil { - return 500, nil, err - } - return r.fetch(ctx, client, req) -} - -func (r *Resource) fetch(ctx context.Context, client *client.Client, req *backend.CallResourceRequest) (int, []byte, error) { r.log.Debug("Sending resource query", "URL", req.URL) - resp, err := client.QueryResource(ctx, req) + resp, err := r.promClient.QueryResource(ctx, req) if err != nil { - statusCode := 500 - if resp != nil { - statusCode = resp.StatusCode - } - return statusCode, nil, err + return nil, fmt.Errorf("error querying resource: %v", err) } defer func() { - err = resp.Body.Close() - }() - - // Check that the server actually sent compressed data - var reader io.ReadCloser - switch resp.Header.Get("Content-Encoding") { - case "gzip": - reader, err = gzip.NewReader(resp.Body) - defer func() { - err = reader.Close() - }() - if err != nil { - return 500, nil, err + tmpErr := resp.Body.Close() + if tmpErr != nil && err == nil { + err = tmpErr } - default: - reader = resp.Body - } + }() - data, err := ioutil.ReadAll(reader) + data, err := ioutil.ReadAll(resp.Body) if err != nil { - return 500, nil, err + return nil, err + } + callResponse := &backend.CallResourceResponse{ + Status: resp.StatusCode, + Headers: resp.Header, + Body: data, } - return resp.StatusCode, data, err + return callResponse, err }