Skip to content

Commit

Permalink
Support for concurrent nodepool CRUD operations (#6748) (#13173)
Browse files Browse the repository at this point in the history
Signed-off-by: Modular Magician <magic-modules@google.com>

Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician committed Dec 5, 2022
1 parent 2a74cde commit 204717f
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 68 deletions.
3 changes: 3 additions & 0 deletions .changelog/6748.txt
@@ -0,0 +1,3 @@
```release-note:enhancement
container: Added support for concurrent node pool mutations on a cluster. Previously, node pool mutations were restricted to run synchronously clientside. NOTE: While this feature is supported in Terraform from this release onwards, only a limited number of GCP projects will support this behavior initially. The provider will automatically process mutations concurrently as the feature rolls out generally.
```
23 changes: 19 additions & 4 deletions google/mutexkv.go
Expand Up @@ -13,7 +13,7 @@ import (
// their access to individual security groups based on SG ID.
type MutexKV struct {
lock sync.Mutex
store map[string]*sync.Mutex
store map[string]*sync.RWMutex
}

// Locks the mutex for the given key. Caller is responsible for calling Unlock
Expand All @@ -31,13 +31,28 @@ func (m *MutexKV) Unlock(key string) {
log.Printf("[DEBUG] Unlocked %q", key)
}

// Acquires a read-lock on the mutex for the given key. Caller is responsible for calling RUnlock
// for the same key
func (m *MutexKV) RLock(key string) {
log.Printf("[DEBUG] RLocking %q", key)
m.get(key).RLock()
log.Printf("[DEBUG] RLocked %q", key)
}

// Releases a read-lock on the mutex for the given key. Caller must have called RLock for the same key first
func (m *MutexKV) RUnlock(key string) {
log.Printf("[DEBUG] RUnlocking %q", key)
m.get(key).RUnlock()
log.Printf("[DEBUG] RUnlocked %q", key)
}

// Returns a mutex for the given key, no guarantee of its lock status
func (m *MutexKV) get(key string) *sync.Mutex {
func (m *MutexKV) get(key string) *sync.RWMutex {
m.lock.Lock()
defer m.lock.Unlock()
mutex, ok := m.store[key]
if !ok {
mutex = &sync.Mutex{}
mutex = &sync.RWMutex{}
m.store[key] = mutex
}
return mutex
Expand All @@ -46,6 +61,6 @@ func (m *MutexKV) get(key string) *sync.Mutex {
// Returns a properly initialized MutexKV
func NewMutexKV() *MutexKV {
return &MutexKV{
store: make(map[string]*sync.Mutex),
store: make(map[string]*sync.RWMutex),
}
}
113 changes: 49 additions & 64 deletions google/resource_container_node_pool.go
Expand Up @@ -364,11 +364,21 @@ func (nodePoolInformation *NodePoolInformation) parent() string {
)
}

func (nodePoolInformation *NodePoolInformation) lockKey() string {
func (nodePoolInformation *NodePoolInformation) clusterLockKey() string {
return containerClusterMutexKey(nodePoolInformation.project,
nodePoolInformation.location, nodePoolInformation.cluster)
}

func (nodePoolInformation *NodePoolInformation) nodePoolLockKey(nodePoolName string) string {
return fmt.Sprintf(
"projects/%s/locations/%s/clusters/%s/nodePools/%s",
nodePoolInformation.project,
nodePoolInformation.location,
nodePoolInformation.cluster,
nodePoolName,
)
}

func extractNodePoolInformation(d *schema.ResourceData, config *Config) (*NodePoolInformation, error) {
cluster := d.Get("cluster").(string)

Expand Down Expand Up @@ -416,8 +426,15 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)

req := &container.CreateNodePoolRequest{
NodePool: nodePool,
Expand Down Expand Up @@ -501,12 +518,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

state, err := containerNodePoolAwaitRestingState(config, d.Id(), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
Expand Down Expand Up @@ -591,12 +602,6 @@ func resourceContainerNodePoolUpdate(d *schema.ResourceData, meta interface{}) e
}
name := getNodePoolName(d.Id())

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return err
Expand Down Expand Up @@ -635,16 +640,6 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e

name := getNodePoolName(d.Id())

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
if isGoogleApiErrorWithCode(err, 404) {
log.Printf("[INFO] GKE cluster %s doesn't exist, skipping node pool %s deletion", nodePoolInfo.cluster, d.Id())
return nil
}
return err
}

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutDelete))
if err != nil {
// If the node pool doesn't get created and then we try to delete it, we get an error,
Expand All @@ -657,8 +652,15 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e
}
}

mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolInfo.nodePoolLockKey(name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)

timeout := d.Timeout(schema.TimeoutDelete)
startTime := time.Now()
Expand Down Expand Up @@ -1075,13 +1077,19 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
config := meta.(*Config)
name := d.Get(prefix + "name").(string)

lockKey := nodePoolInfo.lockKey()

userAgent, err := generateUserAgentString(d, config.userAgent)
if err != nil {
return err
}

// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Nodepool write-lock will be acquired when update function is called.
npLockKey := nodePoolInfo.nodePoolLockKey(name)

if d.HasChange(prefix + "autoscaling") {
update := &container.ClusterUpdate{
DesiredNodePoolId: name,
Expand Down Expand Up @@ -1124,11 +1132,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1164,8 +1170,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

Expand Down Expand Up @@ -1219,11 +1224,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated tags for node pool %s", name)
}

Expand Down Expand Up @@ -1258,7 +1261,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

Expand Down Expand Up @@ -1290,11 +1293,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated image type in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1326,11 +1327,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name)
}

Expand Down Expand Up @@ -1358,12 +1357,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool size", userAgent,
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize)
}

Expand Down Expand Up @@ -1396,11 +1392,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated management in Node Pool %s", name)
}

Expand All @@ -1425,12 +1419,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.project,
nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated version in Node Pool %s", name)
}

Expand All @@ -1453,11 +1444,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated node locations in Node Pool %s", name)
}

Expand Down Expand Up @@ -1534,12 +1523,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
// Wait until it's updated
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name)
}

Expand Down Expand Up @@ -1568,8 +1554,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

Expand Down

0 comments on commit 204717f

Please sign in to comment.