Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for concurrent nodepool CRUD operations #13173

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/6748.txt
@@ -0,0 +1,3 @@
```release-note:enhancement
container: Added support for concurrent node pool mutations on a cluster. Previously, node pool mutations were restricted to run synchronously clientside. NOTE: While this feature is supported in Terraform from this release onwards, only a limited number of GCP projects will support this behavior initially. The provider will automatically process mutations concurrently as the feature rolls out generally.
```
23 changes: 19 additions & 4 deletions google/mutexkv.go
Expand Up @@ -13,7 +13,7 @@ import (
// their access to individual security groups based on SG ID.
type MutexKV struct {
lock sync.Mutex
store map[string]*sync.Mutex
store map[string]*sync.RWMutex
}

// Locks the mutex for the given key. Caller is responsible for calling Unlock
Expand All @@ -31,13 +31,28 @@ func (m *MutexKV) Unlock(key string) {
log.Printf("[DEBUG] Unlocked %q", key)
}

// Acquires a read-lock on the mutex for the given key. Caller is responsible for calling RUnlock
// for the same key
func (m *MutexKV) RLock(key string) {
log.Printf("[DEBUG] RLocking %q", key)
m.get(key).RLock()
log.Printf("[DEBUG] RLocked %q", key)
}

// Releases a read-lock on the mutex for the given key. Caller must have called RLock for the same key first
func (m *MutexKV) RUnlock(key string) {
log.Printf("[DEBUG] RUnlocking %q", key)
m.get(key).RUnlock()
log.Printf("[DEBUG] RUnlocked %q", key)
}

// Returns a mutex for the given key, no guarantee of its lock status
func (m *MutexKV) get(key string) *sync.Mutex {
func (m *MutexKV) get(key string) *sync.RWMutex {
m.lock.Lock()
defer m.lock.Unlock()
mutex, ok := m.store[key]
if !ok {
mutex = &sync.Mutex{}
mutex = &sync.RWMutex{}
m.store[key] = mutex
}
return mutex
Expand All @@ -46,6 +61,6 @@ func (m *MutexKV) get(key string) *sync.Mutex {
// Returns a properly initialized MutexKV
func NewMutexKV() *MutexKV {
return &MutexKV{
store: make(map[string]*sync.Mutex),
store: make(map[string]*sync.RWMutex),
}
}
113 changes: 49 additions & 64 deletions google/resource_container_node_pool.go
Expand Up @@ -364,11 +364,21 @@ func (nodePoolInformation *NodePoolInformation) parent() string {
)
}

func (nodePoolInformation *NodePoolInformation) lockKey() string {
func (nodePoolInformation *NodePoolInformation) clusterLockKey() string {
return containerClusterMutexKey(nodePoolInformation.project,
nodePoolInformation.location, nodePoolInformation.cluster)
}

func (nodePoolInformation *NodePoolInformation) nodePoolLockKey(nodePoolName string) string {
return fmt.Sprintf(
"projects/%s/locations/%s/clusters/%s/nodePools/%s",
nodePoolInformation.project,
nodePoolInformation.location,
nodePoolInformation.cluster,
nodePoolName,
)
}

func extractNodePoolInformation(d *schema.ResourceData, config *Config) (*NodePoolInformation, error) {
cluster := d.Get("cluster").(string)

Expand Down Expand Up @@ -416,8 +426,15 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolInfo.nodePoolLockKey(nodePool.Name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)

req := &container.CreateNodePoolRequest{
NodePool: nodePool,
Expand Down Expand Up @@ -501,12 +518,6 @@ func resourceContainerNodePoolCreate(d *schema.ResourceData, meta interface{}) e
return err
}

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

state, err := containerNodePoolAwaitRestingState(config, d.Id(), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
Expand Down Expand Up @@ -591,12 +602,6 @@ func resourceContainerNodePoolUpdate(d *schema.ResourceData, meta interface{}) e
}
name := getNodePoolName(d.Id())

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return err
Expand Down Expand Up @@ -635,16 +640,6 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e

name := getNodePoolName(d.Id())

//Check cluster is in running state
_, err = containerClusterAwaitRestingState(config, nodePoolInfo.project, nodePoolInfo.location, nodePoolInfo.cluster, userAgent, d.Timeout(schema.TimeoutCreate))
if err != nil {
if isGoogleApiErrorWithCode(err, 404) {
log.Printf("[INFO] GKE cluster %s doesn't exist, skipping node pool %s deletion", nodePoolInfo.cluster, d.Id())
return nil
}
return err
}

_, err = containerNodePoolAwaitRestingState(config, nodePoolInfo.fullyQualifiedName(name), nodePoolInfo.project, userAgent, d.Timeout(schema.TimeoutDelete))
if err != nil {
// If the node pool doesn't get created and then we try to delete it, we get an error,
Expand All @@ -657,8 +652,15 @@ func resourceContainerNodePoolDelete(d *schema.ResourceData, meta interface{}) e
}
}

mutexKV.Lock(nodePoolInfo.lockKey())
defer mutexKV.Unlock(nodePoolInfo.lockKey())
// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Acquire write-lock on nodepool.
npLockKey := nodePoolInfo.nodePoolLockKey(name)
mutexKV.Lock(npLockKey)
defer mutexKV.Unlock(npLockKey)

timeout := d.Timeout(schema.TimeoutDelete)
startTime := time.Now()
Expand Down Expand Up @@ -1075,13 +1077,19 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
config := meta.(*Config)
name := d.Get(prefix + "name").(string)

lockKey := nodePoolInfo.lockKey()

userAgent, err := generateUserAgentString(d, config.userAgent)
if err != nil {
return err
}

// Acquire read-lock on cluster.
clusterLockKey := nodePoolInfo.clusterLockKey()
mutexKV.RLock(clusterLockKey)
defer mutexKV.RUnlock(clusterLockKey)

// Nodepool write-lock will be acquired when update function is called.
npLockKey := nodePoolInfo.nodePoolLockKey(name)

if d.HasChange(prefix + "autoscaling") {
update := &container.ClusterUpdate{
DesiredNodePoolId: name,
Expand Down Expand Up @@ -1124,11 +1132,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated autoscaling in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1164,8 +1170,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

Expand Down Expand Up @@ -1219,11 +1224,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated tags for node pool %s", name)
}

Expand Down Expand Up @@ -1258,7 +1261,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

Expand Down Expand Up @@ -1290,11 +1293,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated image type in Node Pool %s", d.Id())
}

Expand Down Expand Up @@ -1326,11 +1327,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated workload_metadata_config for node pool %s", name)
}

Expand Down Expand Up @@ -1358,12 +1357,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool size", userAgent,
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] GKE node pool %s size has been updated to %d", name, newSize)
}

Expand Down Expand Up @@ -1396,11 +1392,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.location, "updating GKE node pool management", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated management in Node Pool %s", name)
}

Expand All @@ -1425,12 +1419,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
nodePoolInfo.project,
nodePoolInfo.location, "updating GKE node pool version", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated version in Node Pool %s", name)
}

Expand All @@ -1453,11 +1444,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool node locations", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated node locations in Node Pool %s", name)
}

Expand Down Expand Up @@ -1534,12 +1523,9 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
// Wait until it's updated
return containerOperationWait(config, op, nodePoolInfo.project, nodePoolInfo.location, "updating GKE node pool upgrade settings", userAgent, timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

log.Printf("[INFO] Updated upgrade settings in Node Pool %s", name)
}

Expand Down Expand Up @@ -1568,8 +1554,7 @@ func nodePoolUpdate(d *schema.ResourceData, meta interface{}, nodePoolInfo *Node
timeout)
}

// Call update serially.
if err := lockedCall(lockKey, updateF); err != nil {
if err := retryWhileIncompatibleOperation(timeout, npLockKey, updateF); err != nil {
return err
}

Expand Down