Skip to content

Commit

Permalink
Merge pull request #54304 from frodenas/repair-events
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 56308, 54304, 56364, 56388, 55853). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Send events on certain service repair controller errors

**What this PR does / why we need it**:

This PR enables sending events when the api-server service IP and port allocator repair controllers find an error repairing a cluster ip or a port respectively.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #54303

**Special notes for your reviewer**:

In case of an error, events will be emitted [every 3 minutes](https://github.com/kubernetes/kubernetes/blob/master/pkg/master/controller.go#L93) for each failed Service. Even so, event spam protection has been merged (#47367) to mitigate the risk of excessive events.

**Release note**:

```release-note
api-server provides specific events when unable to repair a service cluster ip or node port
```
  • Loading branch information
Kubernetes Submit Queue committed Dec 15, 2017
2 parents 1a455f6 + 8ed0bc1 commit dd4f84f
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 24 deletions.
8 changes: 5 additions & 3 deletions pkg/master/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const kubernetesServiceName = "kubernetes"
type Controller struct {
ServiceClient coreclient.ServicesGetter
NamespaceClient coreclient.NamespacesGetter
EventClient coreclient.EventsGetter

ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPInterval time.Duration
Expand Down Expand Up @@ -77,10 +78,11 @@ type Controller struct {
}

// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter) *Controller {
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter, eventClient coreclient.EventsGetter) *Controller {
return &Controller{
ServiceClient: serviceClient,
NamespaceClient: nsClient,
EventClient: eventClient,

EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
Expand Down Expand Up @@ -124,8 +126,8 @@ func (c *Controller) Start() {
return
}

repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)

// run all of the controllers once prior to returning from Start.
if err := repairClusterIPs.RunOnce(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.
if c.ExtraConfig.EnableCoreControllers {
controllerName := "bootstrap-controller"
coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/registry/core/service/ipallocator/controller/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ go_library(
srcs = ["repair.go"],
importpath = "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/helper:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)
Expand Down
24 changes: 18 additions & 6 deletions pkg/registry/core/service/ipallocator/controller/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"net"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
Expand Down Expand Up @@ -54,6 +57,7 @@ type Repair struct {
network *net.IPNet
alloc rangeallocation.RangeRegistry
leaks map[string]int // counter per leaked IP
recorder record.EventRecorder
}

// How many times we need to detect a leak before we clean up. This is to
Expand All @@ -62,13 +66,18 @@ const numRepairsBeforeLeakCleanup = 3

// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"})

return &Repair{
interval: interval,
serviceClient: serviceClient,
network: network,
alloc: alloc,
leaks: map[string]int{},
recorder: recorder,
}
}

Expand Down Expand Up @@ -136,6 +145,7 @@ func (c *Repair) runOnce() error {
ip := net.ParseIP(svc.Spec.ClusterIP)
if ip == nil {
// cluster IP is corrupt
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s is not a valid IP; please recreate service", svc.Spec.ClusterIP)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
continue
}
Expand All @@ -147,22 +157,24 @@ func (c *Repair) runOnce() error {
stored.Release(ip)
} else {
// cluster IP doesn't seem to be allocated
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotAllocated", "Cluster IP %s is not allocated; repairing", ip)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", ip, svc.Name, svc.Namespace))
}
delete(c.leaks, ip.String()) // it is used, so it can't be leaked
case ipallocator.ErrAllocated:
// TODO: send event
// cluster IP is duplicate
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "Cluster IP %s was assigned to multiple services; please recreate service", ip)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace))
case err.(*ipallocator.ErrNotInRange):
// TODO: send event
// cluster IP is out of range
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPOutOfRange", "Cluster IP %s is not within the service CIDR %s; please recreate service", ip, c.network)
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network))
case ipallocator.ErrFull:
// TODO: send event
// somehow we are out of IPs
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", rebuilt)
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %s is full; you must widen the CIDR in order to create new services", c.network)
return fmt.Errorf("the service CIDR %s is full; you must widen the CIDR in order to create new services", c.network)
default:
c.recorder.Eventf(&svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate cluster IP %s due to an unknown error", ip)
return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
}
_, cidr, _ := net.ParseCIDR(ipregistry.item.Range)
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry)

if err := r.RunOnce(); err != nil {
t.Fatal(err)
Expand All @@ -68,7 +68,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.Core(), cidr, ipregistry)
r = NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestRepairLeak(t *testing.T) {
},
}

r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestRepairWithExisting(t *testing.T) {
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/registry/core/service/portallocator/controller/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ go_library(
srcs = ["repair.go"],
importpath = "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller",
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/service:go_default_library",
"//pkg/registry/core/service/portallocator:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
],
)
Expand Down
23 changes: 17 additions & 6 deletions pkg/registry/core/service/portallocator/controller/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"fmt"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
Expand All @@ -40,6 +43,7 @@ type Repair struct {
portRange net.PortRange
alloc rangeallocation.RangeRegistry
leaks map[int]int // counter per leaked port
recorder record.EventRecorder
}

// How many times we need to detect a leak before we clean up. This is to
Expand All @@ -48,13 +52,18 @@ const numRepairsBeforeLeakCleanup = 3

// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "portallocator-repair-controller"})

return &Repair{
interval: interval,
serviceClient: serviceClient,
portRange: portRange,
alloc: alloc,
leaks: map[int]int{},
recorder: recorder,
}
}

Expand Down Expand Up @@ -130,22 +139,24 @@ func (c *Repair) runOnce() error {
stored.Release(port)
} else {
// doesn't seem to be allocated
c.recorder.Eventf(svc, v1.EventTypeWarning, "PortNotAllocated", "Port %d is not allocated; repairing", port)
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace))
}
delete(c.leaks, port) // it is used, so it can't be leaked
case portallocator.ErrAllocated:
// TODO: send event
// port is duplicate, reallocate
c.recorder.Eventf(svc, v1.EventTypeWarning, "PortAlreadyAllocated", "Port %d was assigned to multiple services; please recreate service", port)
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
case err.(*portallocator.ErrNotInRange):
// TODO: send event
// port is out of range, reallocate
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %v; please recreate", port, svc.Name, svc.Namespace, c.portRange))
c.recorder.Eventf(svc, v1.EventTypeWarning, "PortOutOfRange", "Port %d is not within the port range %s; please recreate service", port, c.portRange)
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange))
case portallocator.ErrFull:
// TODO: send event
// somehow we are out of ports
return fmt.Errorf("the port range %v is full; you must widen the port range in order to create new services", c.portRange)
c.recorder.Eventf(svc, v1.EventTypeWarning, "PortRangeFull", "Port range %s is full; you must widen the port range in order to create new services", c.portRange)
return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange)
default:
c.recorder.Eventf(svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate port %d due to an unknown error", port)
return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "100-200"},
}
pr, _ := net.ParsePortRange(registry.item.Range)
r := NewRepair(0, fakeClient.Core(), *pr, registry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry)

if err := r.RunOnce(); err != nil {
t.Fatal(err)
Expand All @@ -68,7 +68,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{Range: "100-200"},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.Core(), *pr, registry)
r = NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestRepairLeak(t *testing.T) {
},
}

r := NewRepair(0, fakeClient.Core(), *pr, registry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestRepairWithExisting(t *testing.T) {
Data: dst.Data,
},
}
r := NewRepair(0, fakeClient.Core(), *pr, registry)
r := NewRepair(0, fakeClient.Core(), fakeClient.Core(), *pr, registry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit dd4f84f

Please sign in to comment.