Skip to content

Commit

Permalink
Merge pull request #10518 from EdDev/net-netpod-state
Browse files Browse the repository at this point in the history
net, setup: Replace `configstate` for the plug/hotplug flow
  • Loading branch information
kubevirt-bot committed Oct 19, 2023
2 parents ec3dcfd + df66198 commit 1978e37
Show file tree
Hide file tree
Showing 13 changed files with 497 additions and 385 deletions.
3 changes: 1 addition & 2 deletions pkg/network/setup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"//pkg/network/dhcp:go_default_library",
"//pkg/network/domainspec:go_default_library",
"//pkg/network/driver:go_default_library",
"//pkg/network/errors:go_default_library",
"//pkg/network/istio:go_default_library",
"//pkg/network/link:go_default_library",
"//pkg/network/namescheme:go_default_library",
Expand Down Expand Up @@ -57,8 +56,8 @@ go_test(
"//pkg/network/cache:go_default_library",
"//pkg/network/dhcp:go_default_library",
"//pkg/network/driver:go_default_library",
"//pkg/network/errors:go_default_library",
"//pkg/network/namescheme:go_default_library",
"//pkg/network/setup/netpod:go_default_library",
"//pkg/network/sriov:go_default_library",
"//pkg/network/vmispec:go_default_library",
"//pkg/os/fs:go_default_library",
Expand Down
61 changes: 0 additions & 61 deletions pkg/network/setup/configstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
package network

import (
"fmt"

v1 "kubevirt.io/api/core/v1"

"k8s.io/apimachinery/pkg/util/errors"

"kubevirt.io/kubevirt/pkg/network/cache"
neterrors "kubevirt.io/kubevirt/pkg/network/errors"
)

type configStateCacheRUD interface {
Expand All @@ -45,64 +42,6 @@ func NewConfigState(configStateCache configStateCacheRUD, ns NSExecutor) ConfigS
return ConfigState{cache: configStateCache, ns: ns}
}

// Run passes through the state machine flow, executing the following steps:
// - PreRun processes the nics and potentially updates and filters them (e.g. filter-out networks marked for removal).
// - Discover the current pod network configuration status and persist some of it for future use.
// - Configure the pod network.
//
// The discovery step can be executed repeatedly with no limitation.
// The configuration step is allowed to run only once. Any attempt to run it again will cause a critical error.
func (c *ConfigState) Run(networkNames []string, setupFunc func(func() error) error) error {
var pendingNets []string
for _, netName := range networkNames {
state, err := c.cache.Read(netName)
if err != nil {
return err
}

switch state {
case cache.PodIfaceNetworkPreparationPending:
pendingNets = append(pendingNets, netName)
case cache.PodIfaceNetworkPreparationStarted:
return neterrors.CreateCriticalNetworkError(
fmt.Errorf("network %s preparation cannot be restarted", netName),
)
}
}

if len(pendingNets) == 0 {
return nil
}

err := c.ns.Do(func() error {
return c.plug(networkNames, setupFunc)
})
return err
}

func (c *ConfigState) plug(networkNames []string, setupFunc func(func() error) error) error {
ferr := setupFunc(func() error {
for _, networkName := range networkNames {
if werr := c.cache.Write(networkName, cache.PodIfaceNetworkPreparationStarted); werr != nil {
return fmt.Errorf("failed to mark configuration as started for %s: %v", networkName, werr)
}
}
return nil
})
if ferr != nil {
return ferr
}

for _, networkName := range networkNames {
if werr := c.cache.Write(networkName, cache.PodIfaceNetworkPreparationFinished); werr != nil {
return neterrors.CreateCriticalNetworkError(
fmt.Errorf("failed to mark configuration as finished for %s: %w", networkName, werr),
)
}
}
return nil
}

func (c *ConfigState) Unplug(networks []v1.Network, filterFunc func([]v1.Network) ([]string, error), cleanupFunc func(string) error) error {
var nonPendingNetworks []v1.Network
var err error
Expand Down
160 changes: 0 additions & 160 deletions pkg/network/setup/configstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package network

import (
"errors"
"fmt"

v1 "kubevirt.io/api/core/v1"
Expand All @@ -29,7 +28,6 @@ import (
. "github.com/onsi/gomega"

"kubevirt.io/kubevirt/pkg/network/cache"
neterrors "kubevirt.io/kubevirt/pkg/network/errors"
)

const (
Expand All @@ -44,167 +42,9 @@ var _ = Describe("config state", func() {
var (
configState ConfigState
configStateCache configStateCacheStub
networkNames []string
ns nsExecutorStub
)

Context("Run", func() {
BeforeEach(func() {
configStateCache = newConfigStateCacheStub()
ns = nsExecutorStub{}
configState = NewConfigState(&configStateCache, ns)
networkNames = []string{testNet0}
})

It("runs with no current state (cache is empty)", func() {
config := &configStub{}

Expect(configState.Run(networkNames, config.f)).To(Succeed())

Expect(configStateCache.Read(testNet0)).To(Equal(cache.PodIfaceNetworkPreparationFinished))
})

It("runs with current pending state", func() {
Expect(configStateCache.Write(testNet0, cache.PodIfaceNetworkPreparationPending)).To(Succeed())
config := &configStub{}

Expect(configState.Run(networkNames, config.f)).To(Succeed())

Expect(configStateCache.Read(testNet0)).To(Equal(cache.PodIfaceNetworkPreparationFinished))
})

It("runs with current started state", func() {
Expect(configStateCache.Write(testNet0, cache.PodIfaceNetworkPreparationStarted)).To(Succeed())
config := &configStub{}

ns.shouldNotBeExecuted = true
err := configState.Run(networkNames, config.f)

Expect(err).To(HaveOccurred())
var criticalNetErr *neterrors.CriticalNetworkError
Expect(errors.As(err, &criticalNetErr)).To(BeTrue())

Expect(config.executed).To(BeFalse(), "the config step should not be execute")

Expect(configStateCache.Read(testNet0)).To(Equal(cache.PodIfaceNetworkPreparationStarted))
})

It("runs with current finished state", func() {
Expect(configStateCache.Write(testNet0, cache.PodIfaceNetworkPreparationFinished)).To(Succeed())
config := &configStub{}

ns.shouldNotBeExecuted = true
Expect(configState.Run(networkNames, config.f)).To(Succeed())

Expect(config.executed).To(BeFalse(), "the config step should not execute")

Expect(configStateCache.Read(testNet0)).To(Equal(cache.PodIfaceNetworkPreparationFinished))
})

It("runs and fails at the setup step", func() {
injectedErr := fmt.Errorf("fail config")
config := &configStub{errRun: injectedErr}

Expect(configState.Run(networkNames, config.f)).To(MatchError(injectedErr))

Expect(config.executed).To(BeTrue(), "the config step should execute")

Expect(configStateCache.Read(testNet0)).To(Equal(cache.PodIfaceNetworkPreparationStarted))
})

It("runs and fails reading the cache", func() {
injectedErr := fmt.Errorf("fail read cache")
configStateCache.readErr = injectedErr
configState = NewConfigState(&configStateCache, ns)

config := &configStub{}

ns.shouldNotBeExecuted = true
Expect(configState.Run(networkNames, config.f)).To(MatchError(injectedErr))

Expect(config.executed).To(BeFalse(), "the config step shouldn't execute")
})

It("runs and fails writing the cache", func() {
injectedErr := fmt.Errorf("fail write cache")
configStateCache.writeErr = injectedErr
configState = NewConfigState(&configStateCache, ns)

config := &configStub{}

Expect(configState.Run(networkNames, config.f)).To(MatchError(ContainSubstring(injectedErr.Error())))

Expect(config.executed).To(BeFalse(), "the config step shouldn't execute")
})

When("with multiple interfaces", func() {
BeforeEach(func() {
networkNames = append(networkNames, testNet1, testNet2)
})

It("runs with no current state (cache is empty)", func() {
config := &configStub{}

Expect(configState.Run(networkNames, config.f)).To(Succeed())

Expect(config.executed).To(BeTrue())

for _, testNet := range []string{testNet0, testNet1, testNet2} {
Expect(configStateCache.Read(testNet)).To(Equal(cache.PodIfaceNetworkPreparationFinished))
}
})

It("runs with current state set as pending and finished", func() {
Expect(configStateCache.Write(testNet1, cache.PodIfaceNetworkPreparationPending)).To(Succeed())
Expect(configStateCache.Write(testNet2, cache.PodIfaceNetworkPreparationFinished)).To(Succeed())

config := &configStub{}

Expect(configState.Run(networkNames, config.f)).To(Succeed())

Expect(config.executed).To(BeTrue())

for _, testNet := range []string{testNet0, testNet1, testNet2} {
Expect(configStateCache.Read(testNet)).To(Equal(cache.PodIfaceNetworkPreparationFinished))
}
})

It("runs with current state (for one network) set as started causes critical error", func() {
Expect(configStateCache.Write(testNet0, cache.PodIfaceNetworkPreparationPending)).To(Succeed())
Expect(configStateCache.Write(testNet1, cache.PodIfaceNetworkPreparationFinished)).To(Succeed())
Expect(configStateCache.Write(testNet2, cache.PodIfaceNetworkPreparationStarted)).To(Succeed())

config := &configStub{}

err := configState.Run(networkNames, config.f)

var criticalNetErr *neterrors.CriticalNetworkError
Expect(errors.As(err, &criticalNetErr)).To(BeTrue())
Expect(err).To(MatchError("Critical network error: network testnet2 preparation cannot be restarted"))

Expect(config.executed).To(BeFalse())

Expect(configStateCache.Read(testNet0)).To(Equal(cache.PodIfaceNetworkPreparationPending))
Expect(configStateCache.Read(testNet1)).To(Equal(cache.PodIfaceNetworkPreparationFinished))
Expect(configStateCache.Read(testNet2)).To(Equal(cache.PodIfaceNetworkPreparationStarted))
})

It("runs and fails at the setup step", func() {
injectedErr := fmt.Errorf("fail write cache")
configStateCache.writeErr = injectedErr
configState = NewConfigState(&configStateCache, ns)
config := &configStub{}

Expect(configState.Run(networkNames, config.f)).To(MatchError(ContainSubstring(injectedErr.Error())))

Expect(config.executed).To(BeFalse(), "the config step shouldn't execute")
for _, testNet := range []string{testNet0, testNet1, testNet2} {
Expect(configStateCache.Read(testNet)).To(Equal(cache.PodIfaceNetworkPreparationPending))
}
})
})
})

Context("Unplug", func() {
var (
filterFunc *filterFuncStub
Expand Down
54 changes: 27 additions & 27 deletions pkg/network/setup/netconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ type cacheCreator interface {

type ConfigStateExecutor interface {
Unplug(networks []v1.Network, filterFunc func([]v1.Network) ([]string, error), cleanupFunc func(string) error) error
Run(networkNames []string, setupFunc func(func() error) error) error
}

type NetConf struct {
cacheCreator cacheCreator
nsFactory nsFactory
configState map[string]ConfigStateExecutor
state map[string]*netpod.State
configStateMutex *sync.RWMutex
}

Expand All @@ -69,12 +69,13 @@ func NewNetConf() *NetConf {
var cacheFactory cache.CacheCreator
return NewNetConfWithCustomFactoryAndConfigState(func(pid int) NSExecutor {
return netns.New(pid)
}, cacheFactory, map[string]ConfigStateExecutor{})
}, cacheFactory, map[string]ConfigStateExecutor{}, map[string]*netpod.State{})
}

func NewNetConfWithCustomFactoryAndConfigState(nsFactory nsFactory, cacheCreator cacheCreator, configState map[string]ConfigStateExecutor) *NetConf {
func NewNetConfWithCustomFactoryAndConfigState(nsFactory nsFactory, cacheCreator cacheCreator, configState map[string]ConfigStateExecutor, state map[string]*netpod.State) *NetConf {
return &NetConf{
configState: configState,
state: state,
configStateMutex: &sync.RWMutex{},
cacheCreator: cacheCreator,
nsFactory: nsFactory,
Expand All @@ -87,25 +88,9 @@ func (c *NetConf) Setup(vmi *v1.VirtualMachineInstance, networks []v1.Network, l
return fmt.Errorf("setup failed at pre-setup stage, err: %w", err)
}

ownerID, _ := strconv.Atoi(netdriver.LibvirtUserAndGroupId)
if util.IsNonRootVMI(vmi) {
ownerID = util.NonRootUID
}
queuesCapacity := int(converter.NetworkQueuesCapacity(vmi))
netpod := netpod.NewNetPod(
vmi.Spec.Networks,
vmi.Spec.Domain.Devices.Interfaces,
string(vmi.UID),
launcherPid,
ownerID,
queuesCapacity,
netpod.WithMasqueradeAdapter(newMasqueradeAdapter(vmi)),
netpod.WithCacheCreator(c.cacheCreator),
)
netConfigurator := NewVMNetworkConfigurator(vmi, c.cacheCreator, WithNetSetup(netpod), WithLauncherPid(launcherPid))

c.configStateMutex.RLock()
configState, ok := c.configState[string(vmi.UID)]
state := c.state[string(vmi.UID)]
c.configStateMutex.RUnlock()
if !ok {
cache := NewConfigStateCache(string(vmi.UID), c.cacheCreator)
Expand All @@ -116,14 +101,31 @@ func (c *NetConf) Setup(vmi *v1.VirtualMachineInstance, networks []v1.Network, l
ns := c.nsFactory(launcherPid)
newConfigState := NewConfigState(configStateCache, ns)
configState = &newConfigState
state = netpod.NewState(configStateCache, ns)
c.configStateMutex.Lock()
c.configState[string(vmi.UID)] = configState
c.state[string(vmi.UID)] = state
c.configStateMutex.Unlock()
}

err := netConfigurator.SetupPodNetworkPhase1(networks, configState)
ownerID, _ := strconv.Atoi(netdriver.LibvirtUserAndGroupId)
if util.IsNonRootVMI(vmi) {
ownerID = util.NonRootUID
}
queuesCapacity := int(converter.NetworkQueuesCapacity(vmi))
netpod := netpod.NewNetPod(
vmi.Spec.Networks,
vmi.Spec.Domain.Devices.Interfaces,
string(vmi.UID),
launcherPid,
ownerID,
queuesCapacity,
state,
netpod.WithMasqueradeAdapter(newMasqueradeAdapter(vmi)),
netpod.WithCacheCreator(c.cacheCreator),
)

if err != nil {
if err := netpod.Setup(); err != nil {
return fmt.Errorf("setup failed, err: %w", err)
}

Expand All @@ -132,11 +134,9 @@ func (c *NetConf) Setup(vmi *v1.VirtualMachineInstance, networks []v1.Network, l
})
absentNets := netvmispec.FilterNetworksByInterfaces(vmi.Spec.Networks, absentIfaces)
if len(absentIfaces) != 0 {
err = c.hotUnplugInterfaces(vmi, absentNets, configState, launcherPid)
}

if err != nil {
return err
if err := c.hotUnplugInterfaces(vmi, absentNets, configState, launcherPid); err != nil {
return err
}
}
return nil
}
Expand Down

0 comments on commit 1978e37

Please sign in to comment.