Skip to content

Commit

Permalink
fix: modified lifecycle rule to fix setBucketLifecycleRule error in O…
Browse files Browse the repository at this point in the history
…SS Artifact Driver.

Signed-off-by: AlbeeSo <suyashi1321@163.com>
  • Loading branch information
AlbeeSo committed Apr 11, 2024
1 parent d62aa26 commit 8cf5884
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 84 deletions.
42 changes: 21 additions & 21 deletions workflow/artifacts/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/aliyun/credentials-go/credentials"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/utils/pointer"

"github.com/argoproj/argo-workflows/v3/errors"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -202,7 +201,9 @@ func (ossDriver *ArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact
objectName := outputArtifact.OSS.Key
if outputArtifact.OSS.LifecycleRule != nil {
err = setBucketLifecycleRule(osscli, outputArtifact.OSS)
return !isTransientOSSErr(err), err
if err != nil {
return !isTransientOSSErr(err), err
}
}
if isDir {
if err = putDirectory(bucket, objectName, path); err != nil {
Expand Down Expand Up @@ -315,33 +316,32 @@ func setBucketLifecycleRule(client *oss.Client, ossArtifact *wfv1.OSSArtifact) e
return fmt.Errorf("markInfrequentAccessAfterDays cannot be large than markDeletionAfterDays")
}

// Set expiration rule.
expirationRule := oss.BuildLifecycleRuleByDays("expiration-rule", ossArtifact.Key, true, markInfrequentAccessAfterDays)
// Automatically delete the expired delete tag so we don't have to manage it ourselves.
// Delete the current version objects after a period of time.
// If BucketVersioning is enbaled, the objects will turn to non-current version.
expiration := oss.LifecycleExpiration{
ExpiredObjectDeleteMarker: pointer.Bool(true),
Days: markDeletionAfterDays,
}
// Convert to Infrequent Access (IA) storage type for objects that are expired after a period of time.
versionTransition := oss.LifecycleVersionTransition{
NoncurrentDays: markInfrequentAccessAfterDays,
StorageClass: oss.StorageIA,
transition := oss.LifecycleTransition{
Days: markInfrequentAccessAfterDays,
StorageClass: oss.StorageIA,
}
// Mark deletion after a period of time.
versionExpiration := oss.LifecycleVersionExpiration{
NoncurrentDays: markDeletionAfterDays,
// Delete the aborted uploaded parts after a period of time.
abortMultipartUpload := oss.LifecycleAbortMultipartUpload{
Days: markDeletionAfterDays,
}
versionTransitionRule := oss.LifecycleRule{
ID: "version-transition-rule",
Prefix: ossArtifact.Key,
Status: string(oss.VersionEnabled),
Expiration: &expiration,
NonVersionExpiration: &versionExpiration,
NonVersionTransitions: []oss.LifecycleVersionTransition{versionTransition},

rule := oss.LifecycleRule{
ID: "argo-workflows-rule",
Prefix: ossArtifact.Key,
Status: string(oss.VersionEnabled),
Expiration: &expiration,
Transitions: []oss.LifecycleTransition{transition},
AbortMultipartUpload: &abortMultipartUpload,
}

// Set lifecycle rules to the bucket.
rules := []oss.LifecycleRule{expirationRule, versionTransitionRule}
err := client.SetBucketLifecycle(ossArtifact.Bucket, rules)
err := client.SetBucketLifecycle(ossArtifact.Bucket, []oss.LifecycleRule{rule})
return err
}

Expand Down
73 changes: 22 additions & 51 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,37 +292,17 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)

var err error
wfc.wfTaskSetInformer, err = wfc.newWorkflowTaskSetInformer()
if err != nil {
log.Fatal(err)
}

wfc.artGCTaskInformer, err = wfc.newArtGCTaskInformer()
if err != nil {
log.Fatal(err)
}

wfc.taskResultInformer, err = wfc.newWorkflowTaskResultInformer()
if err != nil {
log.Fatal(err)
}

err = wfc.addWorkflowInformerHandlers(ctx)
if err != nil {
log.Fatal(err)
}

wfc.podInformer, err = wfc.newPodInformer(ctx)
wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer()
wfc.artGCTaskInformer = wfc.newArtGCTaskInformer()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
err := wfc.addWorkflowInformerHandlers(ctx)
if err != nil {
log.Fatal(err)
}
wfc.podInformer = wfc.newPodInformer(ctx)
wfc.updateEstimatorFactory()

wfc.configMapInformer, err = wfc.newConfigMapInformer()
if err != nil {
log.Fatal(err)
}
wfc.configMapInformer = wfc.newConfigMapInformer()

// Create Synchronization Manager
wfc.createSynchronizationManager(ctx)
Expand Down Expand Up @@ -1219,14 +1199,15 @@ func (wfc *WorkflowController) newWorkflowPodWatch(ctx context.Context) *cache.L
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.SharedIndexInformer, error) {
func (wfc *WorkflowController) newPodInformer(ctx context.Context) cache.SharedIndexInformer {
source := wfc.newWorkflowPodWatch(ctx)
informer := cache.NewSharedIndexInformer(source, &apiv1.Pod{}, podResyncPeriod, cache.Indexers{
indexes.WorkflowIndex: indexes.MetaWorkflowIndexFunc,
indexes.NodeIDIndex: indexes.MetaNodeIDIndexFunc,
indexes.PodPhaseIndex: indexes.PodPhaseIndexFunc,
})
_, err := informer.AddEventHandler(
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
err := wfc.enqueueWfFromPodLabel(obj)
Expand Down Expand Up @@ -1264,21 +1245,19 @@ func (wfc *WorkflowController) newPodInformer(ctx context.Context) (cache.Shared
},
},
)
if err != nil {
return nil, err
}
return informer, nil
return informer
}

func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer, error) {
func (wfc *WorkflowController) newConfigMapInformer() cache.SharedIndexInformer {
indexInformer := v1.NewFilteredConfigMapInformer(wfc.kubeclientset, wfc.GetManagedNamespace(), 20*time.Minute, cache.Indexers{
indexes.ConfigMapLabelsIndex: indexes.ConfigMapIndexFunc,
}, func(opts *metav1.ListOptions) {
opts.LabelSelector = common.LabelKeyConfigMapType
})
log.WithField("executorPlugins", wfc.executorPlugins != nil).Info("Plugins")
if wfc.executorPlugins != nil {
_, err := indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
indexInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
cm, err := meta.Accessor(obj)
if err != nil {
Expand Down Expand Up @@ -1329,12 +1308,8 @@ func (wfc *WorkflowController) newConfigMapInformer() (cache.SharedIndexInformer
},
},
})
if err != nil {
return nil, err
}

}
return indexInformer, nil
return indexInformer
}

// call this func whenever the configuration changes, or when the workflow informer changes
Expand Down Expand Up @@ -1459,7 +1434,7 @@ func (wfc *WorkflowController) syncPodPhaseMetrics() {
}
}

func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.WorkflowTaskSetInformer, error) {
func (wfc *WorkflowController) newWorkflowTaskSetInformer() wfextvv1alpha1.WorkflowTaskSetInformer {
informer := externalversions.NewSharedInformerFactoryWithOptions(
wfc.wfclientset,
workflowTaskSetResyncPeriod,
Expand All @@ -1468,7 +1443,8 @@ func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.Work
r := util.InstanceIDRequirement(wfc.Config.InstanceID)
x.LabelSelector = r.String()
})).Argoproj().V1alpha1().WorkflowTaskSets()
_, err := informer.Informer().AddEventHandler(
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
informer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
Expand All @@ -1477,13 +1453,10 @@ func (wfc *WorkflowController) newWorkflowTaskSetInformer() (wfextvv1alpha1.Work
}
},
})
if err != nil {
return nil, err
}
return informer, nil
return informer
}

func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowArtifactGCTaskInformer, error) {
func (wfc *WorkflowController) newArtGCTaskInformer() wfextvv1alpha1.WorkflowArtifactGCTaskInformer {
informer := externalversions.NewSharedInformerFactoryWithOptions(
wfc.wfclientset,
workflowTaskSetResyncPeriod,
Expand All @@ -1492,7 +1465,8 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr
r := util.InstanceIDRequirement(wfc.Config.InstanceID)
x.LabelSelector = r.String()
})).Argoproj().V1alpha1().WorkflowArtifactGCTasks()
_, err := informer.Informer().AddEventHandler(
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
informer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
Expand All @@ -1501,8 +1475,5 @@ func (wfc *WorkflowController) newArtGCTaskInformer() (wfextvv1alpha1.WorkflowAr
}
},
})
if err != nil {
return nil, err
}
return informer, nil
return informer
}
6 changes: 3 additions & 3 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ func newController(options ...interface{}) (context.CancelFunc, *WorkflowControl
wfc.wfInformer = util.NewWorkflowInformer(dynamicClient, "", 0, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
wfc.wfTaskSetInformer = informerFactory.Argoproj().V1alpha1().WorkflowTaskSets()
wfc.artGCTaskInformer = informerFactory.Argoproj().V1alpha1().WorkflowArtifactGCTasks()
wfc.taskResultInformer, _ = wfc.newWorkflowTaskResultInformer()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
_ = wfc.addWorkflowInformerHandlers(ctx)
wfc.podInformer, _ = wfc.newPodInformer(ctx)
wfc.configMapInformer, _ = wfc.newConfigMapInformer()
wfc.podInformer = wfc.newPodInformer(ctx)
wfc.configMapInformer = wfc.newConfigMapInformer()
wfc.createSynchronizationManager(ctx)
_ = wfc.initManagers(ctx)

Expand Down
11 changes: 4 additions & 7 deletions workflow/controller/taskresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/argoproj/argo-workflows/v3/workflow/controller/indexes"
)

func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedIndexInformer, error) {
func (wfc *WorkflowController) newWorkflowTaskResultInformer() cache.SharedIndexInformer {
labelSelector := labels.NewSelector().
Add(*workflowReq).
Add(wfc.instanceIDReq()).
Expand All @@ -36,7 +36,8 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedInde
options.ResourceVersion = ""
},
)
_, err := informer.AddEventHandler(
//nolint:errcheck // the error only happens if the informer was stopped, and it hasn't even started (https://github.com/kubernetes/client-go/blob/46588f2726fa3e25b1704d6418190f424f95a990/tools/cache/shared_informer.go#L580)
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(new interface{}) {
result := new.(*wfv1.WorkflowTaskResult)
Expand All @@ -49,14 +50,10 @@ func (wfc *WorkflowController) newWorkflowTaskResultInformer() (cache.SharedInde
wfc.wfQueue.AddRateLimited(result.Namespace + "/" + workflow)
},
})
if err != nil {
return nil, err
}
return informer, nil
return informer
}

func (woc *wfOperationCtx) taskResultReconciliation() {

objs, _ := woc.controller.taskResultInformer.GetIndexer().ByIndex(indexes.WorkflowIndex, woc.wf.Namespace+"/"+woc.wf.Name)
woc.log.WithField("numObjs", len(objs)).Info("Task-result reconciliation")
for _, obj := range objs {
Expand Down
2 changes: 0 additions & 2 deletions workflow/gccontroller/gc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Controller struct {

// NewController returns a new workflow ttl controller
func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedIndexInformer, metrics *metrics.Metrics, retentionPolicy *config.RetentionPolicy) *Controller {

orderedQueue := map[wfv1.WorkflowPhase]*gcHeap{
wfv1.WorkflowFailed: NewHeap(),
wfv1.WorkflowError: NewHeap(),
Expand Down Expand Up @@ -180,7 +179,6 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {

// enqueueWF conditionally queues a workflow to the ttl queue if it is within the deletion period
func (c *Controller) enqueueWF(obj interface{}) {

un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("'%v' is not an unstructured", obj)
Expand Down

0 comments on commit 8cf5884

Please sign in to comment.