Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libgit2: add contextual logging to subtransports #778

Merged
merged 2 commits into from Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion controllers/bucket_controller.go
Expand Up @@ -34,6 +34,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -246,7 +247,13 @@ func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts Buc

func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the Bucket
obj := &sourcev1.Bucket{}
Expand Down
9 changes: 8 additions & 1 deletion controllers/gitrepository_controller.go
Expand Up @@ -32,6 +32,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -159,7 +160,13 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o

func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the GitRepository
obj := &sourcev1.GitRepository{}
Expand Down
9 changes: 8 additions & 1 deletion controllers/helmchart_controller.go
Expand Up @@ -35,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -180,7 +181,13 @@ func (r *HelmChartReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts

func (r *HelmChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the HelmChart
obj := &sourcev1.HelmChart{}
Expand Down
9 changes: 8 additions & 1 deletion controllers/helmrepository_controller.go
Expand Up @@ -29,6 +29,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -142,7 +143,13 @@ func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager,

func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
hiddeco marked this conversation as resolved.
Show resolved Hide resolved
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the HelmRepository
obj := &sourcev1.HelmRepository{}
Expand Down
9 changes: 8 additions & 1 deletion controllers/helmrepository_controller_oci.go
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -107,7 +108,13 @@ func (r *HelmRepositoryOCIReconciler) SetupWithManagerAndOptions(mgr ctrl.Manage

func (r *HelmRepositoryOCIReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the HelmRepository
obj := &sourcev1.HelmRepository{}
Expand Down
3 changes: 1 addition & 2 deletions controllers/suite_test.go
Expand Up @@ -40,7 +40,6 @@ import (
feathelper "github.com/fluxcd/pkg/runtime/features"
"github.com/fluxcd/pkg/runtime/testenv"
"github.com/fluxcd/pkg/testserver"
"github.com/go-logr/logr"
"github.com/phayes/freeport"

"github.com/distribution/distribution/v3/configuration"
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestMain(m *testing.M) {

fg := feathelper.FeatureGates{}
fg.SupportedFeatures(features.FeatureGates())
managed.InitManagedTransport(logr.Discard())
managed.InitManagedTransport()

if err := (&GitRepositoryReconciler{
Client: testEnv,
Expand Down
2 changes: 1 addition & 1 deletion main.go
Expand Up @@ -311,7 +311,7 @@ func main() {
}()

if enabled, _ := features.Enabled(features.GitManagedTransport); enabled {
managed.InitManagedTransport(ctrl.Log.WithName("managed-transport"))
managed.InitManagedTransport()
} else {
if optimize, _ := feathelper.Enabled(features.OptimizedGitClones); optimize {
features.Disable(features.OptimizedGitClones)
Expand Down
52 changes: 42 additions & 10 deletions pkg/git/libgit2/managed/http.go
Expand Up @@ -45,6 +45,7 @@ package managed

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"errors"
Expand All @@ -55,9 +56,12 @@ import (
"strings"
"sync"

"github.com/fluxcd/pkg/runtime/logger"
pool "github.com/fluxcd/source-controller/internal/transport"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
git2go "github.com/libgit2/git2go/v33"
ctrl "sigs.k8s.io/controller-runtime"
)

var actionSuffixes = []string{
Expand All @@ -81,10 +85,11 @@ func registerManagedHTTP() error {
}

func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
traceLog.Info("[http]: httpSmartSubtransportFactory")
sst := &httpSmartSubtransport{
transport: transport,
httpTransport: pool.NewOrIdle(nil),
ctx: context.Background(),
logger: logr.Discard(),
}

return sst, nil
Expand All @@ -93,6 +98,21 @@ func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Trans
type httpSmartSubtransport struct {
transport *git2go.Transport
httpTransport *http.Transport

// once is used to ensure that logger and ctx is set only once,
// on the initial (or only) Action call. Without this a mutex must
// be applied to ensure that ctx won't be changed, as this would be
// prone to race conditions in the stdout processing goroutine.
once sync.Once
// ctx defines the context to be used across long-running or
// cancellable operations.
// Defaults to context.Background().
ctx context.Context
// logger keeps a Logger instance for logging. This was preferred
// due to the need to have a correlation ID and URL set and
// reused across all log calls.
// If context is not set, this defaults to logr.Discard().
logger logr.Logger
}

func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
Expand Down Expand Up @@ -133,6 +153,15 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
}
t.httpTransport.DisableCompression = false

t.once.Do(func() {
if opts.Context != nil {
t.ctx = opts.Context
t.logger = ctrl.LoggerFrom(t.ctx,
"transportType", "http",
"url", opts.TargetURL)
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine how it is, but I'd expect such one time configurations to be done at the very beginning of the function when we have all the things we need to do it.


client, req, err := createClientRequest(targetURL, action, t.httpTransport, opts.AuthOpts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -176,8 +205,10 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
opts.TargetURL = trimActionSuffix(newURL.String())
AddTransportOptions(transportOptionsURL, *opts)

debugLog.Info("[http]: server responded with redirect",
"newURL", opts.TargetURL, "StatusCode", req.Response.StatusCode)
// show as info, as this should be visible regardless of the
// chosen log-level.
t.logger.Info("server responded with redirect",
"newUrl", opts.TargetURL, "StatusCode", req.Response.StatusCode)
}
}
}
Expand Down Expand Up @@ -270,15 +301,16 @@ func createClientRequest(targetURL string, action git2go.SmartServiceAction,
}

func (t *httpSmartSubtransport) Close() error {
traceLog.Info("[http]: httpSmartSubtransport.Close()")
t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Close()")
return nil
}

func (t *httpSmartSubtransport) Free() {
traceLog.Info("[http]: httpSmartSubtransport.Free()")
t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Free()")

if t.httpTransport != nil {
traceLog.Info("[http]: release http transport back to pool")
t.logger.V(logger.TraceLevel).Info("release http transport back to pool")

pool.Release(t.httpTransport)
t.httpTransport = nil
}
Expand Down Expand Up @@ -345,18 +377,18 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {

func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil {
traceLog.Info("[http]: httpSmartSubtransportStream.Free()")
self.owner.logger.V(logger.TraceLevel).Info("httpSmartSubtransportStream.Free()")

if self.resp.Body != nil {
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once.
if _, err := io.Copy(io.Discard, self.resp.Body); err != nil {
traceLog.Error(err, "[http]: cannot discard response body")
self.owner.logger.V(logger.TraceLevel).Error(err, "cannot discard response body")
}

if err := self.resp.Body.Close(); err != nil {
traceLog.Error(err, "[http]: cannot close response body")
self.owner.logger.V(logger.TraceLevel).Error(err, "cannot close response body")
}
}
}
Expand Down Expand Up @@ -399,7 +431,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
req.ContentLength = -1
}

traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL)
self.owner.logger.V(logger.TraceLevel).Info("new request", "method", req.Method, "postUrl", req.URL)
resp, err = self.client.Do(req)
if err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions pkg/git/libgit2/managed/http_test.go
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"

git2go "github.com/libgit2/git2go/v33"
Expand Down Expand Up @@ -170,7 +169,7 @@ func TestHTTPManagedTransport_E2E(t *testing.T) {
defer server.StopHTTP()

// Force managed transport to be enabled
InitManagedTransport(logr.Discard())
InitManagedTransport()

repoPath := "test.git"
err = server.InitRepo("../../testdata/git/repo", git.DefaultBranch, repoPath)
Expand Down Expand Up @@ -253,7 +252,7 @@ func TestHTTPManagedTransport_HandleRedirect(t *testing.T) {
}

// Force managed transport to be enabled
InitManagedTransport(logr.Discard())
InitManagedTransport()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
13 changes: 2 additions & 11 deletions pkg/git/libgit2/managed/init.go
Expand Up @@ -19,9 +19,6 @@ package managed
import (
"sync"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
)

var (
Expand All @@ -38,9 +35,7 @@ var (
// handshake, put/get).
fullHttpClientTimeOut time.Duration = 10 * time.Minute

debugLog logr.Logger
traceLog logr.Logger
enabled bool
enabled bool
)

// Enabled defines whether the use of Managed Transport is enabled which
Expand All @@ -63,14 +58,10 @@ func Enabled() bool {
//
// This function will only register managed transports once, subsequent calls
// leads to no-op.
func InitManagedTransport(log logr.Logger) error {
func InitManagedTransport() error {
var err error

once.Do(func() {
log.Info("Initializing managed transport")
debugLog = log.V(logger.DebugLevel)
traceLog = log.V(logger.TraceLevel)

if err = registerManagedHTTP(); err != nil {
return
}
Expand Down