Skip to content

Commit

Permalink
Merge pull request #122145 from pohly/cloud-providers-context-support
Browse files Browse the repository at this point in the history
cloud providers: enhance context support
  • Loading branch information
k8s-ci-robot committed Apr 25, 2024
2 parents 8a9031f + 50c1243 commit ae02f87
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 109 deletions.
Expand Up @@ -120,9 +120,6 @@ func NewCloudNodeController(
nodeStatusUpdateFrequency time.Duration,
workerCount int32) (*CloudNodeController, error) {

eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"})

_, instancesSupported := cloud.Instances()
_, instancesV2Supported := cloud.InstancesV2()
if !instancesSupported && !instancesV2Supported {
Expand All @@ -132,8 +129,6 @@ func NewCloudNodeController(
cnc := &CloudNodeController{
nodeInformer: nodeInformer,
kubeClient: kubeClient,
broadcaster: eventBroadcaster,
recorder: recorder,
cloud: cloud,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
workerCount: workerCount,
Expand All @@ -156,7 +151,21 @@ func NewCloudNodeController(
// This controller updates newly registered nodes with information
// from the cloud provider. This call is blocking so should be called
// via a goroutine
//
//logcheck:context // RunWithContext should be used instead of Run in code which supports contextual logging.
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
cnc.RunWithContext(wait.ContextForChannel(stopCh), controllerManagerMetrics)
}

// RunWithContext will sync informer caches and starting workers.
// This controller updates newly registered nodes with information
// from the cloud provider. This call is blocking so should be called
// via a goroutine
func (cnc *CloudNodeController) RunWithContext(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
cnc.broadcaster = record.NewBroadcaster(record.WithContext(ctx))
cnc.recorder = cnc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"})
stopCh := ctx.Done()

defer utilruntime.HandleCrash()
defer cnc.workqueue.ShutDown()

Expand All @@ -178,16 +187,16 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet

// The periodic loop for updateNodeStatus polls the Cloud Provider periodically
// to reconcile the nodes addresses and labels.
go wait.Until(func() {
if err := cnc.UpdateNodeStatus(context.TODO()); err != nil {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
if err := cnc.UpdateNodeStatus(ctx); err != nil {
klog.Errorf("failed to update node status: %v", err)
}
}, cnc.nodeStatusUpdateFrequency, stopCh)
}, cnc.nodeStatusUpdateFrequency)

// These workers initialize the nodes added to the cluster,
// those that are Tainted with TaintExternalCloudProvider.
for i := int32(0); i < cnc.workerCount; i++ {
go wait.Until(cnc.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, cnc.runWorker, time.Second)
}

<-stopCh
Expand All @@ -196,14 +205,14 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (cnc *CloudNodeController) runWorker() {
for cnc.processNextWorkItem() {
func (cnc *CloudNodeController) runWorker(ctx context.Context) {
for cnc.processNextWorkItem(ctx) {
}
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (cnc *CloudNodeController) processNextWorkItem() bool {
func (cnc *CloudNodeController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := cnc.workqueue.Get()
if shutdown {
return false
Expand All @@ -223,7 +232,7 @@ func (cnc *CloudNodeController) processNextWorkItem() bool {

// Run the syncHandler, passing it the key of the
// Node resource to be synced.
if err := cnc.syncHandler(key); err != nil {
if err := cnc.syncHandler(ctx, key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
cnc.workqueue.AddRateLimited(key)
klog.Infof("error syncing '%s': %v, requeuing", key, err)
Expand All @@ -245,14 +254,14 @@ func (cnc *CloudNodeController) processNextWorkItem() bool {
}

// syncHandler implements the logic of the controller.
func (cnc *CloudNodeController) syncHandler(key string) error {
func (cnc *CloudNodeController) syncHandler(ctx context.Context, key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

return cnc.syncNode(context.TODO(), name)
return cnc.syncNode(ctx, name)
}

// UpdateNodeStatus updates the node status, such as node addresses
Expand Down Expand Up @@ -456,7 +465,7 @@ func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) e
modify(newNode)
}

_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
_, err = cnc.kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
if err != nil {
return err
}
Expand Down
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -1777,10 +1778,14 @@ func Test_syncNode(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

clientset := fake.NewSimpleClientset(test.existingNode)
factory := informers.NewSharedInformerFactory(clientset, 0)

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
cloudNodeController := &CloudNodeController{
kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(),
Expand All @@ -1799,12 +1804,12 @@ func Test_syncNode(t *testing.T) {
w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop()

err := cloudNodeController.syncNode(context.TODO(), test.existingNode.Name)
err := cloudNodeController.syncNode(ctx, test.existingNode.Name)
if (err != nil) != test.expectedErr {
t.Fatalf("error got: %v expected: %v", err, test.expectedErr)
}

updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{})
updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, test.existingNode.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated nodes: %v", err)
}
Expand Down Expand Up @@ -1884,6 +1889,11 @@ func Test_reconcileNodeLabels(t *testing.T) {

for _, test := range testcases {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stopCh := ctx.Done()

testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node01",
Expand All @@ -1899,9 +1909,6 @@ func Test_reconcileNodeLabels(t *testing.T) {
nodeInformer: factory.Core().V1().Nodes(),
}

stopCh := make(chan struct{})
defer close(stopCh)

// activate node informer
factory.Core().V1().Nodes().Informer()
factory.Start(stopCh)
Expand All @@ -1914,7 +1921,7 @@ func Test_reconcileNodeLabels(t *testing.T) {
t.Errorf("unexpected error")
}

actualNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), "node01", metav1.GetOptions{})
actualNode, err := clientset.CoreV1().Nodes().Get(ctx, "node01", metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated node: %v", err)
}
Expand Down Expand Up @@ -2062,6 +2069,10 @@ func TestNodeAddressesChangeDetected(t *testing.T) {

// Test updateNodeAddress with instanceV2, same test case with TestNodeAddressesNotUpdate.
func TestNodeAddressesNotUpdateV2(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

existingNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
Expand Down Expand Up @@ -2119,13 +2130,13 @@ func TestNodeAddressesNotUpdateV2(t *testing.T) {
cloud: fakeCloud,
}

instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(context.TODO(), existingNode)
instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(ctx, existingNode)
if err != nil {
t.Errorf("get instance metadata with error %v", err)
}
cloudNodeController.updateNodeAddress(context.TODO(), existingNode, instanceMeta)
cloudNodeController.updateNodeAddress(ctx, existingNode, instanceMeta)

updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{})
updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, existingNode.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated nodes: %v", err)
}
Expand All @@ -2138,6 +2149,10 @@ func TestNodeAddressesNotUpdateV2(t *testing.T) {
// This test checks that a node with the external cloud provider taint is cloudprovider initialized and
// and node addresses will not be updated when node isn't present according to the cloudprovider
func TestNodeAddressesNotUpdate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

existingNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
Expand Down Expand Up @@ -2195,13 +2210,13 @@ func TestNodeAddressesNotUpdate(t *testing.T) {
cloud: fakeCloud,
}

instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(context.TODO(), existingNode)
instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(ctx, existingNode)
if err != nil {
t.Errorf("get instance metadata with error %v", err)
}
cloudNodeController.updateNodeAddress(context.TODO(), existingNode, instanceMeta)
cloudNodeController.updateNodeAddress(ctx, existingNode, instanceMeta)

updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{})
updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, existingNode.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated nodes: %v", err)
}
Expand Down Expand Up @@ -2514,11 +2529,13 @@ func TestGetInstanceMetadata(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)

cloudNodeController := &CloudNodeController{
cloud: test.fakeCloud,
}

metadata, err := cloudNodeController.getInstanceMetadata(context.TODO(), test.existingNode)
metadata, err := cloudNodeController.getInstanceMetadata(ctx, test.existingNode)
if (err != nil) != test.expectErr {
t.Fatalf("error expected %v got: %v", test.expectErr, err)
}
Expand Down Expand Up @@ -2574,6 +2591,10 @@ func TestUpdateNodeStatus(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

fakeCloud := &fakecloud.Cloud{
EnableInstancesV2: false,
Addresses: []v1.NodeAddress{
Expand All @@ -2600,7 +2621,7 @@ func TestUpdateNodeStatus(t *testing.T) {
})

factory := informers.NewSharedInformerFactory(clientset, 0)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
nodeInformer := factory.Core().V1().Nodes()
nodeIndexer := nodeInformer.Informer().GetIndexer()
cloudNodeController := &CloudNodeController{
Expand All @@ -2621,11 +2642,13 @@ func TestUpdateNodeStatus(t *testing.T) {
}
}

w := eventBroadcaster.StartLogging(klog.Infof)
w := eventBroadcaster.StartStructuredLogging(0)
defer w.Stop()

start := time.Now()
cloudNodeController.UpdateNodeStatus(context.TODO())
if err := cloudNodeController.UpdateNodeStatus(ctx); err != nil {
t.Fatalf("error updating node status: %v", err)
}
t.Logf("%d workers: processed %d nodes int %v ", test.workers, test.nodes, time.Since(start))
if len(fakeCloud.Calls) != test.nodes {
t.Errorf("expected %d cloud-provider calls, got %d", test.nodes, len(fakeCloud.Calls))
Expand Down
Expand Up @@ -21,7 +21,7 @@ import (
"errors"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -74,9 +74,6 @@ func NewCloudNodeLifecycleController(
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration) (*CloudNodeLifecycleController, error) {

eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})

if kubeClient == nil {
return nil, errors.New("kubernetes client is nil")
}
Expand All @@ -94,8 +91,6 @@ func NewCloudNodeLifecycleController(
c := &CloudNodeLifecycleController{
kubeClient: kubeClient,
nodeLister: nodeInformer.Lister(),
broadcaster: eventBroadcaster,
recorder: recorder,
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
}
Expand All @@ -106,6 +101,9 @@ func NewCloudNodeLifecycleController(
// Run starts the main loop for this controller. Run is blocking so should
// be called via a goroutine
func (c *CloudNodeLifecycleController) Run(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
c.broadcaster = record.NewBroadcaster(record.WithContext(ctx))
c.recorder = c.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})

defer utilruntime.HandleCrash()
controllerManagerMetrics.ControllerStarted("cloud-node-lifecycle")
defer controllerManagerMetrics.ControllerStopped("cloud-node-lifecycle")
Expand Down

0 comments on commit ae02f87

Please sign in to comment.