Skip to content

Commit

Permalink
Merge pull request #65094 from hzxuzhonghu/le-client-timeout
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 65094, 65533, 63522, 65694, 65702). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

set leader election client and renew timeout

**What this PR does / why we need it**:

set leader-election client timeout

set timeout for tryAcquireOrRenew

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #65090 #65257

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
  • Loading branch information
Kubernetes Submit Queue committed Jul 3, 2018
2 parents f7d95e2 + 90b287c commit 47020f3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
5 changes: 4 additions & 1 deletion cmd/kube-controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,10 @@ func (s KubeControllerManagerOptions) Config(allControllers []string, disabledBy
return nil, err
}

leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
// shallow copy, do not modify the kubeconfig.Timeout.
config := *kubeconfig
config.Timeout = s.GenericComponent.LeaderElection.RenewDeadline.Duration
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "leader-election"))

eventRecorder := createRecorder(client, KubeControllerManagerUserAgent)

Expand Down
10 changes: 7 additions & 3 deletions cmd/kube-scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"os"
"strconv"
"time"

"github.com/golang/glog"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -203,7 +204,7 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
}

// prepare kube clients.
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master)
client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func makeLeaderElectionConfig(config componentconfig.KubeSchedulerLeaderElection

// createClients creates a kube client and an event client from the given config and masterOverride.
// TODO remove masterOverride when CLI flags are removed.
func createClients(config componentconfig.ClientConnectionConfiguration, masterOverride string) (clientset.Interface, clientset.Interface, v1core.EventsGetter, error) {
func createClients(config componentconfig.ClientConnectionConfiguration, masterOverride string, timeout time.Duration) (clientset.Interface, clientset.Interface, v1core.EventsGetter, error) {
if len(config.KubeConfigFile) == 0 && len(masterOverride) == 0 {
glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
}
Expand All @@ -289,7 +290,10 @@ func createClients(config componentconfig.ClientConnectionConfiguration, masterO
return nil, nil, nil, err
}

leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "leader-election"))
// shallow copy, do not modify the kubeConfig.Timeout.
restConfig := *kubeConfig
restConfig.Timeout = timeout
leaderElectionClient, err := clientset.NewForConfig(restclient.AddUserAgent(&restConfig, "leader-election"))
if err != nil {
return nil, nil, nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,20 @@ func (le *LeaderElector) renew(ctx context.Context) {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(), nil
done := make(chan bool, 1)
go func() {
defer close(done)
done <- le.tryAcquireOrRenew()
}()

select {
case <-timeoutCtx.Done():
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
case result := <-done:
return result, nil
}
}, timeoutCtx.Done())

le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
Expand Down

0 comments on commit 47020f3

Please sign in to comment.