Skip to content

Commit

Permalink
Merge pull request #778 from pjbgf/improved-logging
Browse files Browse the repository at this point in the history
libgit2: add contextual logging to subtransports
  • Loading branch information
Paulo Gomes committed Jun 14, 2022
2 parents 7a797e3 + 42dcb87 commit 19292d4
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 51 deletions.
9 changes: 8 additions & 1 deletion controllers/bucket_controller.go
Expand Up @@ -34,6 +34,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -246,7 +247,13 @@ func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts Buc

func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the Bucket
obj := &sourcev1.Bucket{}
Expand Down
9 changes: 8 additions & 1 deletion controllers/gitrepository_controller.go
Expand Up @@ -32,6 +32,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -159,7 +160,13 @@ func (r *GitRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, o

func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the GitRepository
obj := &sourcev1.GitRepository{}
Expand Down
9 changes: 8 additions & 1 deletion controllers/helmchart_controller.go
Expand Up @@ -35,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -180,7 +181,13 @@ func (r *HelmChartReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts

func (r *HelmChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the HelmChart
obj := &sourcev1.HelmChart{}
Expand Down
9 changes: 8 additions & 1 deletion controllers/helmrepository_controller.go
Expand Up @@ -29,6 +29,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -142,7 +143,13 @@ func (r *HelmRepositoryReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager,

func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the HelmRepository
obj := &sourcev1.HelmRepository{}
Expand Down
9 changes: 8 additions & 1 deletion controllers/helmrepository_controller_oci.go
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/uuid"
kuberecorder "k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -107,7 +108,13 @@ func (r *HelmRepositoryOCIReconciler) SetupWithManagerAndOptions(mgr ctrl.Manage

func (r *HelmRepositoryOCIReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx).
// Sets a reconcile ID to correlate logs from all suboperations.
WithValues("reconcileID", uuid.NewUUID())

// logger will be associated to the new context that is
// returned from ctrl.LoggerInto.
ctx = ctrl.LoggerInto(ctx, log)

// Fetch the HelmRepository
obj := &sourcev1.HelmRepository{}
Expand Down
3 changes: 1 addition & 2 deletions controllers/suite_test.go
Expand Up @@ -40,7 +40,6 @@ import (
feathelper "github.com/fluxcd/pkg/runtime/features"
"github.com/fluxcd/pkg/runtime/testenv"
"github.com/fluxcd/pkg/testserver"
"github.com/go-logr/logr"
"github.com/phayes/freeport"

"github.com/distribution/distribution/v3/configuration"
Expand Down Expand Up @@ -209,7 +208,7 @@ func TestMain(m *testing.M) {

fg := feathelper.FeatureGates{}
fg.SupportedFeatures(features.FeatureGates())
managed.InitManagedTransport(logr.Discard())
managed.InitManagedTransport()

if err := (&GitRepositoryReconciler{
Client: testEnv,
Expand Down
2 changes: 1 addition & 1 deletion main.go
Expand Up @@ -311,7 +311,7 @@ func main() {
}()

if enabled, _ := features.Enabled(features.GitManagedTransport); enabled {
managed.InitManagedTransport(ctrl.Log.WithName("managed-transport"))
managed.InitManagedTransport()
} else {
if optimize, _ := feathelper.Enabled(features.OptimizedGitClones); optimize {
features.Disable(features.OptimizedGitClones)
Expand Down
52 changes: 42 additions & 10 deletions pkg/git/libgit2/managed/http.go
Expand Up @@ -45,6 +45,7 @@ package managed

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"errors"
Expand All @@ -55,9 +56,12 @@ import (
"strings"
"sync"

"github.com/fluxcd/pkg/runtime/logger"
pool "github.com/fluxcd/source-controller/internal/transport"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
git2go "github.com/libgit2/git2go/v33"
ctrl "sigs.k8s.io/controller-runtime"
)

var actionSuffixes = []string{
Expand All @@ -81,10 +85,11 @@ func registerManagedHTTP() error {
}

func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Transport) (git2go.SmartSubtransport, error) {
traceLog.Info("[http]: httpSmartSubtransportFactory")
sst := &httpSmartSubtransport{
transport: transport,
httpTransport: pool.NewOrIdle(nil),
ctx: context.Background(),
logger: logr.Discard(),
}

return sst, nil
Expand All @@ -93,6 +98,21 @@ func httpSmartSubtransportFactory(remote *git2go.Remote, transport *git2go.Trans
type httpSmartSubtransport struct {
transport *git2go.Transport
httpTransport *http.Transport

// once is used to ensure that logger and ctx is set only once,
// on the initial (or only) Action call. Without this a mutex must
// be applied to ensure that ctx won't be changed, as this would be
// prone to race conditions in the stdout processing goroutine.
once sync.Once
// ctx defines the context to be used across long-running or
// cancellable operations.
// Defaults to context.Background().
ctx context.Context
// logger keeps a Logger instance for logging. This was preferred
// due to the need to have a correlation ID and URL set and
// reused across all log calls.
// If context is not set, this defaults to logr.Discard().
logger logr.Logger
}

func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go.SmartServiceAction) (git2go.SmartSubtransportStream, error) {
Expand Down Expand Up @@ -133,6 +153,15 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
}
t.httpTransport.DisableCompression = false

t.once.Do(func() {
if opts.Context != nil {
t.ctx = opts.Context
t.logger = ctrl.LoggerFrom(t.ctx,
"transportType", "http",
"url", opts.TargetURL)
}
})

client, req, err := createClientRequest(targetURL, action, t.httpTransport, opts.AuthOpts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -176,8 +205,10 @@ func (t *httpSmartSubtransport) Action(transportOptionsURL string, action git2go
opts.TargetURL = trimActionSuffix(newURL.String())
AddTransportOptions(transportOptionsURL, *opts)

debugLog.Info("[http]: server responded with redirect",
"newURL", opts.TargetURL, "StatusCode", req.Response.StatusCode)
// show as info, as this should be visible regardless of the
// chosen log-level.
t.logger.Info("server responded with redirect",
"newUrl", opts.TargetURL, "StatusCode", req.Response.StatusCode)
}
}
}
Expand Down Expand Up @@ -270,15 +301,16 @@ func createClientRequest(targetURL string, action git2go.SmartServiceAction,
}

func (t *httpSmartSubtransport) Close() error {
traceLog.Info("[http]: httpSmartSubtransport.Close()")
t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Close()")
return nil
}

func (t *httpSmartSubtransport) Free() {
traceLog.Info("[http]: httpSmartSubtransport.Free()")
t.logger.V(logger.TraceLevel).Info("httpSmartSubtransport.Free()")

if t.httpTransport != nil {
traceLog.Info("[http]: release http transport back to pool")
t.logger.V(logger.TraceLevel).Info("release http transport back to pool")

pool.Release(t.httpTransport)
t.httpTransport = nil
}
Expand Down Expand Up @@ -345,18 +377,18 @@ func (self *httpSmartSubtransportStream) Write(buf []byte) (int, error) {

func (self *httpSmartSubtransportStream) Free() {
if self.resp != nil {
traceLog.Info("[http]: httpSmartSubtransportStream.Free()")
self.owner.logger.V(logger.TraceLevel).Info("httpSmartSubtransportStream.Free()")

if self.resp.Body != nil {
// ensure body is fully processed and closed
// for increased likelihood of transport reuse in HTTP/1.x.
// it should not be a problem to do this more than once.
if _, err := io.Copy(io.Discard, self.resp.Body); err != nil {
traceLog.Error(err, "[http]: cannot discard response body")
self.owner.logger.V(logger.TraceLevel).Error(err, "cannot discard response body")
}

if err := self.resp.Body.Close(); err != nil {
traceLog.Error(err, "[http]: cannot close response body")
self.owner.logger.V(logger.TraceLevel).Error(err, "cannot close response body")
}
}
}
Expand Down Expand Up @@ -399,7 +431,7 @@ func (self *httpSmartSubtransportStream) sendRequest() error {
req.ContentLength = -1
}

traceLog.Info("[http]: new request", "method", req.Method, "URL", req.URL)
self.owner.logger.V(logger.TraceLevel).Info("new request", "method", req.Method, "postUrl", req.URL)
resp, err = self.client.Do(req)
if err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions pkg/git/libgit2/managed/http_test.go
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/fluxcd/pkg/gittestserver"
"github.com/fluxcd/source-controller/pkg/git"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"

git2go "github.com/libgit2/git2go/v33"
Expand Down Expand Up @@ -170,7 +169,7 @@ func TestHTTPManagedTransport_E2E(t *testing.T) {
defer server.StopHTTP()

// Force managed transport to be enabled
InitManagedTransport(logr.Discard())
InitManagedTransport()

repoPath := "test.git"
err = server.InitRepo("../../testdata/git/repo", git.DefaultBranch, repoPath)
Expand Down Expand Up @@ -253,7 +252,7 @@ func TestHTTPManagedTransport_HandleRedirect(t *testing.T) {
}

// Force managed transport to be enabled
InitManagedTransport(logr.Discard())
InitManagedTransport()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
13 changes: 2 additions & 11 deletions pkg/git/libgit2/managed/init.go
Expand Up @@ -19,9 +19,6 @@ package managed
import (
"sync"
"time"

"github.com/fluxcd/pkg/runtime/logger"
"github.com/go-logr/logr"
)

var (
Expand All @@ -38,9 +35,7 @@ var (
// handshake, put/get).
fullHttpClientTimeOut time.Duration = 10 * time.Minute

debugLog logr.Logger
traceLog logr.Logger
enabled bool
enabled bool
)

// Enabled defines whether the use of Managed Transport is enabled which
Expand All @@ -63,14 +58,10 @@ func Enabled() bool {
//
// This function will only register managed transports once, subsequent calls
// leads to no-op.
func InitManagedTransport(log logr.Logger) error {
func InitManagedTransport() error {
var err error

once.Do(func() {
log.Info("Initializing managed transport")
debugLog = log.V(logger.DebugLevel)
traceLog = log.V(logger.TraceLevel)

if err = registerManagedHTTP(); err != nil {
return
}
Expand Down

0 comments on commit 19292d4

Please sign in to comment.