Skip to content

Commit

Permalink
Merge pull request #124765 from chengjoey/manual-cherry-pick-of-#1245…
Browse files Browse the repository at this point in the history
…59-upstream-release-1.27

Manual cherry pick of #124559 upstream release 1.27
  • Loading branch information
k8s-ci-robot committed May 10, 2024
2 parents 07b526c + cd50a0a commit 7b69ee1
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 18 deletions.
27 changes: 16 additions & 11 deletions pkg/scheduler/schedule_one.go
Expand Up @@ -386,14 +386,17 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
UnschedulablePlugins: sets.NewString(),
}

allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, diagnosis, err
return nil, framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
UnschedulablePlugins: sets.NewString(),
}, err
}

diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap, len(allNodes)),
UnschedulablePlugins: sets.NewString(),
}
// Run "prefilter" plugins.
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
Expand Down Expand Up @@ -434,12 +437,14 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
nodes := allNodes
if !preRes.AllNodes() {
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
for n := range preRes.NodeNames {
nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
if err != nil {
return nil, diagnosis, err
for _, n := range allNodes {
if !preRes.NodeNames.Has(n.Node().Name) {
// We consider Nodes that are filtered out by PreFilterResult as rejected via UnschedulableAndUnresolvable.
// We have to record them in NodeToStatusMap so that they won't be considered as candidates in the preemption.
diagnosis.NodeToStatusMap[n.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result")
continue
}
nodes = append(nodes, nInfo)
nodes = append(nodes, n)
}
}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/schedule_one_test.go
Expand Up @@ -1965,7 +1965,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
nodes: []string{"node1", "node2", "node3"},
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
wantNodes: sets.NewString("node2"),
wantEvaluatedNodes: pointer.Int32(1),
wantEvaluatedNodes: pointer.Int32(3),
},
{
name: "test prefilter plugin returning non-intersecting nodes",
Expand Down
14 changes: 14 additions & 0 deletions pkg/scheduler/testing/wrappers.go
Expand Up @@ -845,6 +845,20 @@ func (p *PersistentVolumeWrapper) HostPathVolumeSource(src *v1.HostPathVolumeSou
return p
}

// NodeAffinityIn creates a HARD node affinity (with the operator In)
// and injects into the pv.
func (p *PersistentVolumeWrapper) NodeAffinityIn(key string, vals []string) *PersistentVolumeWrapper {
if p.Spec.NodeAffinity == nil {
p.Spec.NodeAffinity = &v1.VolumeNodeAffinity{}
}
if p.Spec.NodeAffinity.Required == nil {
p.Spec.NodeAffinity.Required = &v1.NodeSelector{}
}
nodeSelector := MakeNodeSelector().In(key, vals).Obj()
p.Spec.NodeAffinity.Required.NodeSelectorTerms = append(p.Spec.NodeAffinity.Required.NodeSelectorTerms, nodeSelector.NodeSelectorTerms...)
return p
}

// ResourceClaimWrapper wraps a ResourceClaim inside.
type ResourceClaimWrapper struct{ resourcev1alpha2.ResourceClaim }

Expand Down
73 changes: 73 additions & 0 deletions test/e2e/scheduling/predicates.go
Expand Up @@ -38,6 +38,7 @@ import (
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2eruntimeclass "k8s.io/kubernetes/test/e2e/framework/node/runtimeclass"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
testutils "k8s.io/kubernetes/test/utils"
Expand Down Expand Up @@ -855,6 +856,78 @@ var _ = SIGDescribe("SchedulerPredicates [Serial]", func() {
ginkgo.By("Expect all pods are scheduled and running")
framework.ExpectNoError(e2epod.WaitForPodsRunning(cs, ns, replicas, time.Minute))
})

// Regression test for an extended scenario for https://issues.k8s.io/123465
ginkgo.It("when PVC has node-affinity to non-existent/illegal nodes, the pod should be scheduled normally if suitable nodes exist", func(ctx context.Context) {
nodeName := GetNodeThatCanRunPod(ctx, f)
nonExistentNodeName1 := string(uuid.NewUUID())
nonExistentNodeName2 := string(uuid.NewUUID())
hostLabel := "kubernetes.io/hostname"
localPath := "/tmp"
podName := "bind-pv-with-non-existent-nodes"
pvcName := "pvc-" + string(uuid.NewUUID())
_, pvc, err := e2epv.CreatePVPVC(ctx, cs, f.Timeouts, e2epv.PersistentVolumeConfig{
PVSource: v1.PersistentVolumeSource{
Local: &v1.LocalVolumeSource{
Path: localPath,
},
},
Prebind: &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: ns},
},
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: hostLabel,
Operator: v1.NodeSelectorOpIn,
// add non-existent nodes to the list
Values: []string{nodeName, nonExistentNodeName1, nonExistentNodeName2},
},
},
},
},
},
},
}, e2epv.PersistentVolumeClaimConfig{
Name: pvcName,
}, ns, true)
framework.ExpectNoError(err)
bindPvPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "pause",
Image: imageutils.GetE2EImage(imageutils.Pause),
VolumeMounts: []v1.VolumeMount{
{
Name: "data",
MountPath: "/tmp",
},
},
},
},
Volumes: []v1.Volume{
{
Name: "data",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
},
},
},
}
_, err = f.ClientSet.CoreV1().Pods(ns).Create(ctx, bindPvPod, metav1.CreateOptions{})
framework.ExpectNoError(err)
framework.ExpectNoError(e2epod.WaitForPodNotPending(ctx, f.ClientSet, ns, podName))
})
})

func patchPod(cs clientset.Interface, old, new *v1.Pod) (*v1.Pod, error) {
Expand Down
106 changes: 106 additions & 0 deletions test/integration/scheduler/filters/filters_test.go
Expand Up @@ -42,6 +42,7 @@ var (
createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache
createNamespacesWithLabels = testutils.CreateNamespacesWithLabels
createNode = testutils.CreateNode
updateNode = testutils.UpdateNode
createPausePod = testutils.CreatePausePod
deletePod = testutils.DeletePod
getPod = testutils.GetPod
Expand Down Expand Up @@ -1755,6 +1756,111 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) {
},
enableReadWriteOncePod: true,
},
{
name: "pod with pvc has node-affinity to non-existent/illegal nodes",
init: func(cs kubernetes.Interface, ns string) error {
storage := v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}
volType := v1.HostPathDirectoryOrCreate
pv, err := testutils.CreatePV(cs, st.MakePersistentVolume().
Name("pv-has-non-existent-nodes").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteMany}).
Capacity(storage.Requests).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: &volType}).
NodeAffinityIn("kubernetes.io/hostname", []string{"node-available", "non-existing"}). // one node exist, one doesn't
Obj())
if err != nil {
return fmt.Errorf("cannot create pv: %w", err)
}
_, err = testutils.CreatePVC(cs, st.MakePersistentVolumeClaim().
Name("pvc-has-non-existent-nodes").
Namespace(ns).
Annotation(volume.AnnBindCompleted, "true").
VolumeName(pv.Name).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteMany}).
Resources(storage).
Obj())
if err != nil {
return fmt.Errorf("cannot create pvc: %w", err)
}
return nil
},
pod: &testutils.PausePodConfig{
Name: "pod-with-pvc-has-non-existent-nodes",
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc-has-non-existent-nodes",
},
},
}},
},
update: func(cs kubernetes.Interface, ns string) error {
_, err := createNode(cs, st.MakeNode().Label("kubernetes.io/hostname", "node-available").Name("node-available").Obj())
if err != nil {
return fmt.Errorf("cannot create node: %w", err)
}
return nil
},
},
{
name: "pod with pvc got scheduled after node updated it's label",
init: func(cs kubernetes.Interface, ns string) error {
_, err := createNode(cs, st.MakeNode().Label("foo", "foo").Name("node-foo").Obj())
if err != nil {
return fmt.Errorf("cannot create node: %w", err)
}
storage := v1.ResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}
volType := v1.HostPathDirectoryOrCreate
pv, err := testutils.CreatePV(cs, st.MakePersistentVolume().
Name("pv-foo").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteMany}).
Capacity(storage.Requests).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: &volType}).
NodeAffinityIn("foo", []string{"bar"}).
Obj())
if err != nil {
return fmt.Errorf("cannot create pv: %w", err)
}
_, err = testutils.CreatePVC(cs, st.MakePersistentVolumeClaim().
Name("pvc-foo").
Namespace(ns).
Annotation(volume.AnnBindCompleted, "true").
VolumeName(pv.Name).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteMany}).
Resources(storage).
Obj())
if err != nil {
return fmt.Errorf("cannot create pvc: %w", err)
}
return nil
},
pod: &testutils.PausePodConfig{
Name: "pod-with-pvc-foo",
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc-foo",
},
},
}},
},
update: func(cs kubernetes.Interface, ns string) error {
_, err := updateNode(cs, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-foo",
Labels: map[string]string{
"foo": "bar",
},
},
})
if err != nil {
return fmt.Errorf("cannot update node: %w", err)
}
return nil
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
31 changes: 25 additions & 6 deletions test/integration/scheduler/plugins/plugins_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -70,9 +71,10 @@ type PreEnqueuePlugin struct {
}

type PreFilterPlugin struct {
numPreFilterCalled int
failPreFilter bool
rejectPreFilter bool
numPreFilterCalled int
failPreFilter bool
rejectPreFilter bool
preFilterResultNodes sets.String
}

type ScorePlugin struct {
Expand Down Expand Up @@ -453,6 +455,9 @@ func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.Cycle
if pp.rejectPreFilter {
return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
if len(pp.preFilterResultNodes) != 0 {
return &framework.PreFilterResult{NodeNames: pp.preFilterResultNodes}, nil
}
return nil, nil
}

Expand Down Expand Up @@ -578,9 +583,10 @@ func TestPreFilterPlugin(t *testing.T) {
defer testutils.CleanupTest(t, testCtx)

tests := []struct {
name string
fail bool
reject bool
name string
fail bool
reject bool
preFilterResultNodes sets.String
}{
{
name: "disable fail and reject flags",
Expand All @@ -597,12 +603,25 @@ func TestPreFilterPlugin(t *testing.T) {
fail: false,
reject: true,
},
{
name: "inject legal node names in PreFilterResult",
fail: false,
reject: false,
preFilterResultNodes: sets.NewString("test-node-0", "test-node-1"),
},
{
name: "inject legal and illegal node names in PreFilterResult",
fail: false,
reject: false,
preFilterResultNodes: sets.NewString("test-node-0", "non-existent-node"),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
preFilterPlugin.failPreFilter = test.fail
preFilterPlugin.rejectPreFilter = test.reject
preFilterPlugin.preFilterResultNodes = test.preFilterResultNodes
// Create a best effort pod.
pod, err := createPausePod(testCtx.ClientSet,
initPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
Expand Down

0 comments on commit 7b69ee1

Please sign in to comment.