Skip to content

Commit

Permalink
Replace Parallelize with function ParallelizeUntil and formally depre…
Browse files Browse the repository at this point in the history
…cate the Parallelize
  • Loading branch information
wgliang committed Oct 5, 2018
1 parent 6e88271 commit c2622dd
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 15 deletions.
3 changes: 2 additions & 1 deletion pkg/scheduler/algorithm/predicates/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package predicates

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -487,7 +488,7 @@ func getTPMapMatchingIncomingAffinityAntiAffinity(pod *v1.Pod, nodeInfoMap map[s
appendResult(node.Name, nodeTopologyPairsAffinityPodsMaps, nodeTopologyPairsAntiAffinityPodsMaps)
}
}
workqueue.Parallelize(16, len(allNodeNames), processNode)
workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
return topologyPairsAffinityPodsMaps, topologyPairsAntiAffinityPodsMaps, firstError
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/algorithm/priorities/interpod_affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package priorities

import (
"context"
"sync"

"k8s.io/api/core/v1"
Expand Down Expand Up @@ -210,7 +211,7 @@ func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, node
}
}
}
workqueue.Parallelize(16, len(allNodeNames), processNode)
workqueue.ParallelizeUntil(context.TODO(), 16, len(allNodeNames), processNode)
if pm.firstError != nil {
return nil, pm.firstError
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ func PrioritizeNodes(
}
}
}
workqueue.Parallelize(16, len(nodes), processNode)
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), processNode)
for i, priorityConfig := range priorityConfigs {
if priorityConfig.Reduce == nil {
continue
Expand Down Expand Up @@ -914,7 +914,7 @@ func selectNodesForPreemption(pod *v1.Pod,
resultLock.Unlock()
}
}
workqueue.Parallelize(16, len(potentialNodes), checkNode)
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil
}

Expand Down
2 changes: 2 additions & 0 deletions staging/src/k8s.io/client-go/util/workqueue/parallelizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type DoWorkPieceFunc func(piece int)

// Parallelize is a very simple framework that allows for parallelizing
// N independent pieces of work.
//
// Deprecated: Use ParallelizeUntil instead.
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc) {
ParallelizeUntil(nil, workers, pieces, doWorkPiece)
}
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/apimachinery/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apimachinery

import (
"context"
"fmt"
"math/rand"
"reflect"
Expand Down Expand Up @@ -44,7 +45,7 @@ var _ = SIGDescribe("Servers with support for API chunking", func() {
c := f.ClientSet
client := c.CoreV1().PodTemplates(ns)
By("creating a large number of resources")
workqueue.Parallelize(20, numberOfTotalResources, func(i int) {
workqueue.ParallelizeUntil(context.TODO(), 20, numberOfTotalResources, func(i int) {
for tries := 3; tries >= 0; tries-- {
_, err := client.Create(&v1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/apimachinery/table_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package apimachinery

import (
"bytes"
"context"
"fmt"
"text/tabwriter"

Expand Down Expand Up @@ -79,7 +80,7 @@ var _ = SIGDescribe("Servers with support for Table transformation", func() {
client := c.CoreV1().PodTemplates(ns)

By("creating a large number of resources")
workqueue.Parallelize(5, 20, func(i int) {
workqueue.ParallelizeUntil(context.TODO(), 5, 20, func(i int) {
for tries := 3; tries >= 0; tries-- {
_, err := client.Create(&v1.PodTemplate{
ObjectMeta: metav1.ObjectMeta{
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/network/dns_scale_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package network

import (
"context"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -66,7 +67,7 @@ var _ = SIGDescribe("[Feature:PerformanceDNS][Serial]", func() {
framework.ExpectNoError(testutils.CreateServiceWithRetries(f.ClientSet, services[i].Namespace, services[i]))
}
framework.Logf("Creating %v test services", maxServicesPerCluster)
workqueue.Parallelize(parallelCreateServiceWorkers, len(services), createService)
workqueue.ParallelizeUntil(context.TODO(), parallelCreateServiceWorkers, len(services), createService)
dnsTest := dnsTestCommon{
f: f,
c: f.ClientSet,
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/scalability/density.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scalability

import (
"context"
"fmt"
"math"
"os"
Expand Down Expand Up @@ -851,7 +852,7 @@ var _ = SIGDescribe("Density", func() {
name := additionalPodsPrefix + "-" + strconv.Itoa(podIndexOffset+i+1)
framework.ExpectNoError(framework.DeleteRCAndWaitForGC(c, rcNameToNsMap[name], name))
}
workqueue.Parallelize(25, nodeCount, deleteRC)
workqueue.ParallelizeUntil(context.TODO(), 25, nodeCount, deleteRC)
podDeletionPhase.End()
}
close(stopCh)
Expand Down
11 changes: 6 additions & 5 deletions test/e2e/scalability/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scalability

import (
"context"
"fmt"
"math"
"math/rand"
Expand All @@ -37,11 +38,14 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
scaleclient "k8s.io/client-go/scale"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/extensions"
Expand All @@ -52,9 +56,6 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)

const (
Expand Down Expand Up @@ -240,7 +241,7 @@ var _ = SIGDescribe("Load capacity", func() {
defer GinkgoRecover()
framework.ExpectNoError(testutils.CreateServiceWithRetries(clientset, services[i].Namespace, services[i]))
}
workqueue.Parallelize(serviceOperationsParallelism, len(services), createService)
workqueue.ParallelizeUntil(context.TODO(), serviceOperationsParallelism, len(services), createService)
framework.Logf("%v Services created.", len(services))
defer func(services []*v1.Service) {
serviceCleanupPhase := testPhaseDurations.StartPhase(800, "services deletion")
Expand All @@ -250,7 +251,7 @@ var _ = SIGDescribe("Load capacity", func() {
defer GinkgoRecover()
framework.ExpectNoError(testutils.DeleteResourceWithRetries(clientset, api.Kind("Service"), services[i].Namespace, services[i].Name, nil))
}
workqueue.Parallelize(serviceOperationsParallelism, len(services), deleteService)
workqueue.ParallelizeUntil(context.TODO(), serviceOperationsParallelism, len(services), deleteService)
framework.Logf("Services deleted")
}(services)
} else {
Expand Down
5 changes: 3 additions & 2 deletions test/utils/runners.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package utils

import (
"context"
"fmt"
"math"
"os"
Expand Down Expand Up @@ -1061,9 +1062,9 @@ func CreatePod(client clientset.Interface, namespace string, podCount int, podTe
}

if podCount < 30 {
workqueue.Parallelize(podCount, podCount, createPodFunc)
workqueue.ParallelizeUntil(context.TODO(), podCount, podCount, createPodFunc)
} else {
workqueue.Parallelize(30, podCount, createPodFunc)
workqueue.ParallelizeUntil(context.TODO(), 30, podCount, createPodFunc)
}
return createError
}
Expand Down

0 comments on commit c2622dd

Please sign in to comment.