diff --git a/cmd/registry/config-dev.yml b/cmd/registry/config-dev.yml index 9bf36583ea..76010d63cb 100644 --- a/cmd/registry/config-dev.yml +++ b/cmd/registry/config-dev.yml @@ -14,6 +14,8 @@ storage: maintenance: uploadpurging: enabled: false + tag: + concurrencylimit: 10 http: addr: :5000 debug: diff --git a/configuration/configuration.go b/configuration/configuration.go index 427081977c..aa74e3cb4e 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -441,6 +441,8 @@ func (storage Storage) Type() string { // allow configuration of delete case "redirect": // allow configuration of redirect + case "tag": + // allow configuration of tag default: storageType = append(storageType, k) } @@ -454,6 +456,19 @@ func (storage Storage) Type() string { return "" } +// TagParameters returns the Parameters map for a Storage tag configuration +func (storage Storage) TagParameters() Parameters { + return storage["tag"] +} + +// setTagParameter changes the parameter at the provided key to the new value +func (storage Storage) setTagParameter(key string, value interface{}) { + if _, ok := storage["tag"]; !ok { + storage["tag"] = make(Parameters) + } + storage["tag"][key] = value +} + // Parameters returns the Parameters map for a Storage configuration func (storage Storage) Parameters() Parameters { return storage[storage.Type()] @@ -482,6 +497,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error { // allow configuration of delete case "redirect": // allow configuration of redirect + case "tag": + // allow configuration of tag default: types = append(types, k) } diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index 2139f8f1a9..e3bf1bf1c9 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -39,6 +39,9 @@ var configStruct = Configuration{ "url1": "https://foo.example.com", "path1": "/some-path", }, + "tag": Parameters{ + "concurrencylimit": 10, + }, }, Auth: Auth{ "silly": Parameters{ @@ -167,6 +170,8 @@ storage: int1: 42 url1: "https://foo.example.com" path1: "/some-path" + tag: + concurrencylimit: 10 auth: silly: realm: silly @@ -542,6 +547,9 @@ func copyConfig(config Configuration) *Configuration { for k, v := range config.Storage.Parameters() { configCopy.Storage.setParameter(k, v) } + for k, v := range config.Storage.TagParameters() { + configCopy.Storage.setTagParameter(k, v) + } configCopy.Auth = Auth{config.Auth.Type(): Parameters{}} for k, v := range config.Auth.Parameters() { diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 2983176b85..709a0e919a 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -184,6 +184,18 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App { } } + // configure tag lookup concurrency limit + if p := config.Storage.TagParameters(); p != nil { + l, ok := p["concurrencylimit"] + if ok { + limit, ok := l.(int) + if !ok { + panic("tag lookup concurrency limit config key must have a integer value") + } + options = append(options, storage.TagLookupConcurrencyLimit(limit)) + } + } + // configure redirects var redirectDisabled bool if redirectConfig, ok := config.Storage["redirect"]; ok { diff --git a/registry/handlers/manifests.go b/registry/handlers/manifests.go index 704a8ab7f2..144c04482e 100644 --- a/registry/handlers/manifests.go +++ b/registry/handlers/manifests.go @@ -6,6 +6,7 @@ import ( "mime" "net/http" "strings" + "sync" "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/internal/dcontext" @@ -13,11 +14,13 @@ import ( "github.com/distribution/distribution/v3/manifest/ocischema" "github.com/distribution/distribution/v3/manifest/schema2" "github.com/distribution/distribution/v3/registry/api/errcode" + "github.com/distribution/distribution/v3/registry/storage" "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/reference" "github.com/gorilla/handlers" "github.com/opencontainers/go-digest" v1 "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/sync/errgroup" ) const ( @@ -481,12 +484,26 @@ func (imh *manifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Reques return } + var ( + errs []error + mu sync.Mutex + ) + g := errgroup.Group{} + g.SetLimit(storage.DefaultConcurrencyLimit) for _, tag := range referencedTags { - if err := tagService.Untag(imh, tag); err != nil { - imh.Errors = append(imh.Errors, err) - return - } + tag := tag + + g.Go(func() error { + if err := tagService.Untag(imh, tag); err != nil { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + return nil + }) } + _ = g.Wait() // imh will record all errors, so ignore the error of Wait() + imh.Errors = errs w.WriteHeader(http.StatusAccepted) } diff --git a/registry/storage/registry.go b/registry/storage/registry.go index ecf483bf9c..5bc5295c9d 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -3,6 +3,7 @@ package storage import ( "context" "regexp" + "runtime" "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/registry/storage/cache" @@ -10,6 +11,10 @@ import ( "github.com/distribution/reference" ) +var ( + DefaultConcurrencyLimit = runtime.GOMAXPROCS(0) +) + // registry is the top-level implementation of Registry for use in the storage // package. All instances should descend from this object. type registry struct { @@ -18,6 +23,7 @@ type registry struct { statter *blobStatter // global statter service. blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider deleteEnabled bool + tagLookupConcurrencyLimit int resumableDigestEnabled bool blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory manifestURLs manifestURLs @@ -40,6 +46,13 @@ func EnableRedirect(registry *registry) error { return nil } +func TagLookupConcurrencyLimit(concurrencyLimit int) RegistryOption { + return func(registry *registry) error { + registry.tagLookupConcurrencyLimit = concurrencyLimit + return nil + } +} + // EnableDelete is a functional option for NewRegistry. It enables deletion on // the registry. func EnableDelete(registry *registry) error { @@ -184,9 +197,14 @@ func (repo *repository) Named() reference.Named { } func (repo *repository) Tags(ctx context.Context) distribution.TagService { + limit := DefaultConcurrencyLimit + if repo.tagLookupConcurrencyLimit > 0 { + limit = repo.tagLookupConcurrencyLimit + } tags := &tagStore{ - repository: repo, - blobStore: repo.registry.blobStore, + repository: repo, + blobStore: repo.registry.blobStore, + concurrencyLimit: limit, } return tags diff --git a/registry/storage/tagstore.go b/registry/storage/tagstore.go index 29dcf4e3f1..ed384b9fc6 100644 --- a/registry/storage/tagstore.go +++ b/registry/storage/tagstore.go @@ -4,10 +4,13 @@ import ( "context" "path" "sort" + "sync" + + "github.com/opencontainers/go-digest" + "golang.org/x/sync/errgroup" "github.com/distribution/distribution/v3" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" - "github.com/opencontainers/go-digest" ) var _ distribution.TagService = &tagStore{} @@ -18,8 +21,9 @@ var _ distribution.TagService = &tagStore{} // which only makes use of the Digest field of the returned distribution.Descriptor // but does not enable full roundtripping of Descriptor objects type tagStore struct { - repository *repository - blobStore *blobStore + repository *repository + blobStore *blobStore + concurrencyLimit int } // All returns all tags @@ -145,26 +149,45 @@ func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([ return nil, err } - var tags []string + g := errgroup.Group{} + g.SetLimit(ts.concurrencyLimit) + + var ( + tags []string + mu sync.Mutex + ) for _, tag := range allTags { - tagLinkPathSpec := manifestTagCurrentPathSpec{ - name: ts.repository.Named().Name(), - tag: tag, - } + tag := tag - tagLinkPath, _ := pathFor(tagLinkPathSpec) - tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) - if err != nil { - switch err.(type) { - case storagedriver.PathNotFoundError: - continue + g.Go(func() error { + tagLinkPathSpec := manifestTagCurrentPathSpec{ + name: ts.repository.Named().Name(), + tag: tag, } - return nil, err - } - if tagDigest == desc.Digest { - tags = append(tags, tag) - } + tagLinkPath, _ := pathFor(tagLinkPathSpec) + tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) + if err != nil { + switch err.(type) { + case storagedriver.PathNotFoundError: + return nil + } + return err + } + + if tagDigest == desc.Digest { + mu.Lock() + tags = append(tags, tag) + mu.Unlock() + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + return nil, err } return tags, nil