Skip to content

Commit

Permalink
controlplane/apiserver: move peer endpoint reconciler to allow aggreg…
Browse files Browse the repository at this point in the history
…ator construction

This reverts commit 0bb6ff5e454cfc99c0b611e22162391eaa31e103.
  • Loading branch information
sttts committed Apr 27, 2024
1 parent 7b73ee0 commit 7527c49
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/config.go
Expand Up @@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
c.ApiExtensions = apiExtensions

aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.Extra.PeerProxy, pluginInitializer)
aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/kube-apiserver/app/server.go
Expand Up @@ -265,14 +265,14 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
}

if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
config.Extra.PeerEndpointLeaseReconciler, err = controlplane.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
config.ControlPlane.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
if err != nil {
return nil, nil, nil, err
}
// build peer proxy config only if peer ca file exists
if opts.PeerCAFile != "" {
config.Extra.PeerProxy, err = controlplane.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer)
config.ControlPlane.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ControlPlane.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer)
if err != nil {
return nil, nil, nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/controlplane/apiserver/completion.go
Expand Up @@ -40,5 +40,9 @@ func (c *Config) Complete() CompletedConfig {
discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.Generic.ExternalAddress}
cfg.Generic.DiscoveryAddresses = discoveryAddresses

if cfg.Extra.PeerEndpointReconcileInterval == 0 {
cfg.Extra.PeerEndpointReconcileInterval = DefaultPeerEndpointReconcileInterval
}

return CompletedConfig{&cfg}
}
14 changes: 14 additions & 0 deletions pkg/controlplane/apiserver/config.go
Expand Up @@ -31,12 +31,14 @@ import (
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/openapi"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/version"
Expand Down Expand Up @@ -67,6 +69,18 @@ type Extra struct {
EnableLogsSupport bool
ProxyTransport *http.Transport

// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
// that can not be served locally
PeerProxy utilpeerproxy.Interface
// PeerEndpointReconcileInterval defines how often the endpoint leases are reconciled in etcd.
PeerEndpointReconcileInterval time.Duration
// PeerEndpointLeaseReconciler updates the peer endpoint leases
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress

ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountMaxExpiration time.Duration
ExtendExpiration bool
Expand Down
89 changes: 89 additions & 0 deletions pkg/controlplane/apiserver/peer.go
@@ -0,0 +1,89 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiserver

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
)

const (
// DefaultPeerEndpointReconcileInterval is the default amount of time for how often
// the peer endpoint leases are reconciled.
DefaultPeerEndpointReconcileInterval = 10 * time.Second
// DefaultPeerEndpointReconcilerTTL is the default TTL timeout for peer endpoint
// leases on the storage layer
DefaultPeerEndpointReconcilerTTL = 15 * time.Second
)

func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
}}

// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}

// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {
ttl := DefaultPeerEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}
77 changes: 10 additions & 67 deletions pkg/controlplane/instance.go
Expand Up @@ -54,7 +54,6 @@ import (
storageapiv1beta1 "k8s.io/api/storage/v1beta1"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -67,14 +66,10 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
"k8s.io/client-go/transport"
"k8s.io/component-helpers/apimachinery/lease"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
Expand Down Expand Up @@ -157,16 +152,6 @@ type Extra struct {
EndpointReconcilerConfig EndpointReconcilerConfig
KubeletClientConfig kubeletclient.KubeletClientConfig

// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
// that can not be served locally
PeerProxy utilpeerproxy.Interface
// PeerEndpointLeaseReconciler updates the peer endpoint leases
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress

// Values to build the IP addresses used by discovery
// The range of IPs to be assigned to services with type=ClusterIP or greater
ServiceIPRange net.IPNet
Expand Down Expand Up @@ -290,6 +275,11 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {

// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig {
if c.ControlPlane.PeerEndpointReconcileInterval == 0 && c.EndpointReconcilerConfig.Interval != 0 {
// default before the generic controlplane completion can
c.ControlPlane.PeerEndpointReconcileInterval = c.EndpointReconcilerConfig.Interval
}

cfg := completedConfig{
c.ControlPlane.Complete(),
&c.Extra,
Expand Down Expand Up @@ -508,11 +498,11 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}

if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
c.ControlPlane.Generic.APIServerID,
peeraddress,
c.Extra.PeerEndpointLeaseReconciler,
c.ControlPlane.Extra.PeerEndpointLeaseReconciler,
c.Extra.EndpointReconcilerConfig.Interval,
client)
if err != nil {
Expand All @@ -529,9 +519,9 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
// Add PostStartHooks for Unknown Version Proxy filter.
if c.Extra.PeerProxy != nil {
if c.ControlPlane.Extra.PeerProxy != nil {
m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
err := c.ControlPlane.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
return err
})
}
Expand Down Expand Up @@ -579,7 +569,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())

peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
controller := lease.NewController(
clock.RealClock{},
Expand Down Expand Up @@ -782,53 +772,6 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
return ret
}

// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (peerreconcilers.PeerEndpointLeaseReconciler, error) {
ttl := DefaultEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := peerreconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}

func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler peerreconcilers.PeerEndpointLeaseReconciler, serializer kruntime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
}}

// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}

// utility function to get the apiserver address that is used by peer apiservers to proxy
// requests to this apiserver in case the peer is incapable of serving the request
func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {
Expand Down

0 comments on commit 7527c49

Please sign in to comment.