Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal metrics in reconciler run #909

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions controllers/helmchart_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type HelmChartReconciler struct {
Cache *cache.Cache
TTL time.Duration
*cache.CacheRecorder
RepoRecorder *repository.Recorder
}

func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand Down Expand Up @@ -585,7 +586,7 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
}
}
default:
httpChartRepo, err := repository.NewChartRepository(normalizedURL, r.Storage.LocalPath(*repo.GetArtifact()), r.Getters, tlsConfig, clientOpts,
httpChartRepo, err := repository.NewChartRepository(normalizedURL, r.Storage.LocalPath(*repo.GetArtifact()), r.Getters, tlsConfig, clientOpts, repo.Namespace, r.RepoRecorder,
repository.WithMemoryCache(r.Storage.LocalPath(*repo.GetArtifact()), r.Cache, r.TTL, func(event string) {
r.IncCacheEvents(event, obj.Name, obj.Namespace)
}))
Expand Down Expand Up @@ -1038,7 +1039,7 @@ func (r *HelmChartReconciler) namespacedChartRepositoryCallback(ctx context.Cont

chartRepo = ociChartRepo
} else {
httpChartRepo, err := repository.NewChartRepository(normalizedURL, "", r.Getters, tlsConfig, clientOpts)
httpChartRepo, err := repository.NewChartRepository(normalizedURL, "", r.Getters, tlsConfig, clientOpts, namespace, r.RepoRecorder)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion controllers/helmrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type HelmRepositoryReconciler struct {
Cache *cache.Cache
TTL time.Duration
*cache.CacheRecorder
RepoRecorder *repository.Recorder
}

type HelmRepositoryReconcilerOptions struct {
Expand Down Expand Up @@ -398,7 +399,7 @@ func (r *HelmRepositoryReconciler) reconcileSource(ctx context.Context, obj *sou
}

// Construct Helm chart repository with options and download index
newChartRepo, err := repository.NewChartRepository(obj.Spec.URL, "", r.Getters, tlsConfig, clientOpts)
newChartRepo, err := repository.NewChartRepository(obj.Spec.URL, "", r.Getters, tlsConfig, clientOpts, obj.Namespace, r.RepoRecorder)
if err != nil {
switch err.(type) {
case *url.Error:
Expand Down
6 changes: 3 additions & 3 deletions controllers/helmrepository_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ func TestHelmRepositoryReconciler_reconcileSource(t *testing.T) {
if serr != nil {
validSecret = false
}
newChartRepo, err = repository.NewChartRepository(obj.Spec.URL, "", testGetters, tOpts, clientOpts)
newChartRepo, err = repository.NewChartRepository(obj.Spec.URL, "", testGetters, tOpts, clientOpts, obj.Namespace, testRepoRecorder)
} else {
newChartRepo, err = repository.NewChartRepository(obj.Spec.URL, "", testGetters, nil, nil)
newChartRepo, err = repository.NewChartRepository(obj.Spec.URL, "", testGetters, nil, nil, obj.Namespace, testRepoRecorder)
}
g.Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -736,7 +736,7 @@ func TestHelmRepositoryReconciler_reconcileArtifact(t *testing.T) {
g.Expect(err).ToNot(HaveOccurred())
g.Expect(cacheFile.Close()).ToNot(HaveOccurred())

chartRepo, err := repository.NewChartRepository(obj.Spec.URL, "", testGetters, nil, nil)
chartRepo, err := repository.NewChartRepository(obj.Spec.URL, "", testGetters, nil, nil, obj.Namespace, testRepoRecorder)
g.Expect(err).ToNot(HaveOccurred())
chartRepo.CachePath = cachePath

Expand Down
4 changes: 4 additions & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/fluxcd/source-controller/internal/cache"
"github.com/fluxcd/source-controller/internal/features"
"github.com/fluxcd/source-controller/internal/helm/registry"
"github.com/fluxcd/source-controller/internal/helm/repository"
"github.com/fluxcd/source-controller/pkg/git/libgit2/managed"
// +kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -106,6 +107,7 @@ var (
var (
testRegistryServer *registryClientTestServer
testCache *cache.Cache
testRepoRecorder *repository.Recorder
)

func init() {
Expand Down Expand Up @@ -263,6 +265,7 @@ func TestMain(m *testing.M) {

testCache = cache.New(5, 1*time.Second)
cacheRecorder := cache.MustMakeMetrics()
testRepoRecorder = repository.MustMakeMetrics()

if err := (&OCIRepositoryReconciler{
Client: testEnv,
Expand All @@ -282,6 +285,7 @@ func TestMain(m *testing.M) {
Cache: testCache,
TTL: 1 * time.Second,
CacheRecorder: cacheRecorder,
RepoRecorder: testRepoRecorder,
}).SetupWithManager(testEnv); err != nil {
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
}
Expand Down
32 changes: 29 additions & 3 deletions internal/helm/repository/chart_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type ChartRepository struct {
*sync.RWMutex

cacheInfo

// namespace of the ChartRepository of the helm/oci chart repository object
Namespace string

*Recorder
}

type cacheInfo struct {
Expand Down Expand Up @@ -119,7 +124,7 @@ func WithMemoryCache(key string, c *cache.Cache, ttl time.Duration, rec RecordMe
// the ChartRepository.Client configured to the getter.Getter for the
// repository URL scheme. It returns an error on URL parsing failures,
// or if there is no getter available for the scheme.
func NewChartRepository(repositoryURL, cachePath string, providers getter.Providers, tlsConfig *tls.Config, getterOpts []getter.Option, chartRepoOpts ...ChartRepositoryOption) (*ChartRepository, error) {
func NewChartRepository(repositoryURL, cachePath string, providers getter.Providers, tlsConfig *tls.Config, getterOpts []getter.Option, namespace string, repoRecorder *Recorder, chartRepoOpts ...ChartRepositoryOption) (*ChartRepository, error) {
u, err := url.Parse(repositoryURL)
if err != nil {
return nil, err
Expand All @@ -135,6 +140,8 @@ func NewChartRepository(repositoryURL, cachePath string, providers getter.Provid
r.Client = c
r.Options = getterOpts
r.tlsConfig = tlsConfig
r.Namespace = namespace
r.Recorder = repoRecorder

for _, opt := range chartRepoOpts {
if err := opt(r); err != nil {
Expand Down Expand Up @@ -275,8 +282,17 @@ func (r *ChartRepository) DownloadChart(chart *repo.ChartVersion) (*bytes.Buffer
t := transport.NewOrIdle(r.tlsConfig)
clientOpts := append(r.Options, getter.WithTransport(t))
defer transport.Release(t)

return r.Client.Get(u.String(), clientOpts...)
start := time.Now()
buffer, err := r.Client.Get(u.String(), clientOpts...)
if r.Recorder != nil {
r.Recorder.RecordChartRepoEventDuration(
ChartRepoTypeHelm,
ChartRepoEventDownloadChart,
r.Namespace,
r.URL,
start)
}
return buffer, err
}

// LoadIndexFromBytes loads Index from the given bytes.
Expand Down Expand Up @@ -428,6 +444,7 @@ func (r *ChartRepository) LoadFromCache() error {
// the Client and set Options, and writes the index to the given io.Writer.
// It returns an url.Error if the URL failed to parse.
func (r *ChartRepository) DownloadIndex(w io.Writer) (err error) {
start := time.Now()
u, err := url.Parse(r.URL)
if err != nil {
return err
Expand All @@ -444,6 +461,15 @@ func (r *ChartRepository) DownloadIndex(w io.Writer) (err error) {
if err != nil {
return err
}
if r.Recorder != nil {
r.Recorder.RecordChartRepoEventDuration(
ChartRepoTypeHelm,
ChartRepoEventDownloadIndex,
r.Namespace,
r.URL,
start)
}

if _, err = io.Copy(w, res); err != nil {
return err
}
Expand Down
13 changes: 8 additions & 5 deletions internal/helm/repository/chart_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestNewChartRepository(t *testing.T) {
t.Run("should construct chart repository", func(t *testing.T) {
g := NewWithT(t)

r, err := NewChartRepository(repositoryURL, "", providers, nil, options)
r, err := NewChartRepository(repositoryURL, "", providers, nil, options, "fake-namespace", nil)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(r).ToNot(BeNil())
g.Expect(r.URL).To(Equal(repositoryURL))
Expand All @@ -78,7 +78,7 @@ func TestNewChartRepository(t *testing.T) {

t.Run("should error on URL parsing failure", func(t *testing.T) {
g := NewWithT(t)
r, err := NewChartRepository("https://ex ample.com", "", nil, nil, nil)
r, err := NewChartRepository("https://ex ample.com", "", nil, nil, nil, "fake-namespace", nil)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(BeAssignableToTypeOf(&url.Error{}))
g.Expect(r).To(BeNil())
Expand All @@ -88,7 +88,7 @@ func TestNewChartRepository(t *testing.T) {
t.Run("should error on unsupported scheme", func(t *testing.T) {
g := NewWithT(t)

r, err := NewChartRepository("http://example.com", "", providers, nil, nil)
r, err := NewChartRepository("http://example.com", "", providers, nil, nil, "fake-namespace", nil)
g.Expect(err).To(HaveOccurred())
g.Expect(err.Error()).To(Equal("scheme \"http\" not supported"))
g.Expect(r).To(BeNil())
Expand Down Expand Up @@ -235,9 +235,12 @@ func TestChartRepository_DownloadChart(t *testing.T) {
t.Parallel()

mg := mockGetter{}

r := &ChartRepository{
URL: tt.url,
Client: &mg,
URL: tt.url,
Namespace: "dummy",
Client: &mg,
Recorder: nil,
}
res, err := r.DownloadChart(tt.chartVersion)
if tt.wantErr {
Expand Down
90 changes: 90 additions & 0 deletions internal/helm/repository/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
Copyright 2022 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package repository

import (
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"time"
)

const (
ChartRepoTypeHelm = "helm"
//ChartRepoTypeOCI = "oci"
ChartRepoEventDownloadIndex = "chart_repository_download"
ChartRepoEventDownloadChart = "chart_download"
)

// Recorder is a recorder for chart repository events.
type Recorder struct {
// TODO: type up the metrics and talk to aryan9600
// TODO: split this counter??
chartRepoEventsCounter *prometheus.CounterVec
durationHistogram *prometheus.HistogramVec
}

// NewRepositoryRecorder returns a new Recorder.
// The configured labels are: event_type, name, namespace.
// The event_type is one of:
// - "chart_repository_download"
// - "chart_download"
// The url is the url of the helm chart repository
func NewRepositoryRecorder() *Recorder {
return &Recorder{
chartRepoEventsCounter: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gotk_chart_repository_events_total",
Help: "Total number of events for a Helm Chart Repository.",
},
[]string{"name", "repo_type", "namespace", "url", "checksum"},
),
durationHistogram: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gotk_chart_repository_event_duration_seconds",
Help: "The duration in seconds of an event for a Helm Chart Repository.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name", "repo_type", "namespace", "url"},
),
}
}

// ChartRepoCollectors returns the metrics.Collector objects for the Recorder.
func (r *Recorder) ChartRepoCollectors() []prometheus.Collector {
return []prometheus.Collector{
r.chartRepoEventsCounter,
r.durationHistogram,
}
}

// IncChartRepoEvents increment by 1 the chart repo event count for the given event type, url and checksum.
func (r *Recorder) IncChartRepoEvents(event, repoType, url, checksum, namespace string) {
r.chartRepoEventsCounter.WithLabelValues(event, repoType, url, checksum, namespace).Inc()
}

// RecordChartRepoEventDuration records the duration since start for the given ref.
func (r *Recorder) RecordChartRepoEventDuration(event, repoType, namespace, url string, start time.Time) {
r.durationHistogram.WithLabelValues(event, repoType, namespace, url).Observe(time.Since(start).Seconds())
}

// MustMakeMetrics creates a new Recorder, and registers the metrics collectors in the controller-runtime metrics registry.
func MustMakeMetrics() *Recorder {
r := NewRepositoryRecorder()
metrics.Registry.MustRegister(r.ChartRepoCollectors()...)

return r
}
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"fmt"
"github.com/fluxcd/source-controller/internal/helm/repository"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -225,6 +226,8 @@ func main() {
os.Exit(1)
}

repoRecorder := repository.MustMakeMetrics()

if err = (&controllers.HelmRepositoryOCIReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Expand Down Expand Up @@ -270,6 +273,7 @@ func main() {
Cache: c,
TTL: ttl,
CacheRecorder: cacheRecorder,
RepoRecorder: repoRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
Expand Down