diff --git a/registry/proxy/scheduler/scheduler.go b/registry/proxy/scheduler/scheduler.go index ed1d9d41983..847685a09b1 100644 --- a/registry/proxy/scheduler/scheduler.go +++ b/registry/proxy/scheduler/scheduler.go @@ -158,17 +158,22 @@ func (ttles *TTLExpirationScheduler) Start() error { return nil } -func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) { +func (ttles *TTLExpirationScheduler) add(r reference.Canonical, ttl time.Duration, eType int) { + // Use the raw digest as the scheduler entry key so that common blob TTLs can be properly extended as they are + // pulled with other manifests. Note that schedulerEntry.Key remains as the full reference format to avoid info loss + // when recovering the schedule from disk. + entryKey := r.Digest().String() entry := &schedulerEntry{ Key: r.String(), Expiry: time.Now().Add(ttl), EntryType: eType, } dcontext.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, time.Until(entry.Expiry)) - if oldEntry, present := ttles.entries[entry.Key]; present && oldEntry.timer != nil { + if oldEntry, present := ttles.entries[entryKey]; present && oldEntry.timer != nil { oldEntry.timer.Stop() + dcontext.GetLogger(ttles.ctx).Infof("Replacing existing scheduler entry for common blob %s", entryKey) } - ttles.entries[entry.Key] = entry + ttles.entries[entryKey] = entry entry.timer = ttles.startTimer(entry, ttl) ttles.indexDirty = true } @@ -192,15 +197,21 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time. } ref, err := reference.Parse(entry.Key) - if err == nil { - if err := f(ref); err != nil { - dcontext.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err) - } - } else { + if err != nil { dcontext.GetLogger(ttles.ctx).Errorf("Error unpacking reference: %s", err) + return + } + cRef, ok := ref.(reference.Canonical) + if !ok { + dcontext.GetLogger(ttles.ctx).Errorf("Scheduler expected canonicalReference, but got : %T", ref) + return + } + + if err := f(ref); err != nil { + dcontext.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err) } - delete(ttles.entries, entry.Key) + delete(ttles.entries, cRef.Digest().String()) ttles.indexDirty = true }) } diff --git a/registry/proxy/scheduler/scheduler_test.go b/registry/proxy/scheduler/scheduler_test.go index 38fa0f58077..9d9afdce081 100644 --- a/registry/proxy/scheduler/scheduler_test.go +++ b/registry/proxy/scheduler/scheduler_test.go @@ -11,7 +11,7 @@ import ( "github.com/distribution/reference" ) -func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) { +func testRefs(t *testing.T) (reference.Canonical, reference.Canonical, reference.Canonical) { ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") if err != nil { t.Fatalf("could not parse reference: %v", err) @@ -27,31 +27,37 @@ func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference t.Fatalf("could not parse reference: %v", err) } - return ref1, ref2, ref3 + return ref1.(reference.Canonical), ref2.(reference.Canonical), ref3.(reference.Canonical) } func TestSchedule(t *testing.T) { ref1, ref2, ref3 := testRefs(t) timeUnit := time.Millisecond - remainingRepos := map[string]bool{ - ref1.String(): true, - ref2.String(): true, - ref3.String(): true, + remainingRefs := map[string]bool{ + ref1.Digest().String(): true, + ref2.Digest().String(): true, + ref3.Digest().String(): true, } var mu sync.Mutex s := New(dcontext.Background(), inmemory.New(), "/ttl") - deleteFunc := func(repoName reference.Reference) error { - if len(remainingRepos) == 0 { + deleteFunc := func(ref reference.Reference) error { + cRef, ok := ref.(reference.Canonical) + if !ok { + t.Fatalf("reference is not cannonical (includes name & digest): %v", ref) + } + refKey := cRef.Digest().String() + + if len(remainingRefs) == 0 { t.Fatalf("Incorrect expiry count") } - _, ok := remainingRepos[repoName.String()] + _, ok = remainingRefs[refKey] if !ok { - t.Fatalf("Trying to remove nonexistent repo: %s", repoName) + t.Fatalf("Trying to remove nonexistent ref: %s", refKey) } - t.Log("removing", repoName) + t.Log("removing", refKey) mu.Lock() - delete(remainingRepos, repoName.String()) + delete(remainingRefs, refKey) mu.Unlock() return nil @@ -71,49 +77,56 @@ func TestSchedule(t *testing.T) { s.Unlock() }() - // Ensure all repos are deleted + // Ensure all refs are deleted <-time.After(50 * timeUnit) mu.Lock() defer mu.Unlock() - if len(remainingRepos) != 0 { - t.Fatalf("Repositories remaining: %#v", remainingRepos) + if len(remainingRefs) != 0 { + t.Fatalf("Refs remaining: %#v", remainingRefs) } } func TestRestoreOld(t *testing.T) { ref1, ref2, _ := testRefs(t) - remainingRepos := map[string]bool{ - ref1.String(): true, - ref2.String(): true, + remainingRefs := map[string]bool{ + ref1.Digest().String(): true, + ref2.Digest().String(): true, } var wg sync.WaitGroup - wg.Add(len(remainingRepos)) + wg.Add(len(remainingRefs)) var mu sync.Mutex - deleteFunc := func(r reference.Reference) error { + deleteFunc := func(ref reference.Reference) error { mu.Lock() defer mu.Unlock() - if r.String() == ref1.String() && len(remainingRepos) == 2 { + + cRef, ok := ref.(reference.Canonical) + if !ok { + t.Fatalf("reference is not cannonical (includes name & digest): %v", ref) + } + refKey := cRef.Digest().String() + + if cRef.(reference.Canonical).Digest() == ref1.Digest() && len(remainingRefs) == 2 { t.Errorf("ref1 should not be removed first") } - _, ok := remainingRepos[r.String()] + _, ok = remainingRefs[refKey] if !ok { - t.Fatalf("Trying to remove nonexistent repo: %s", r) + t.Fatalf("Trying to remove nonexistent ref: %s", refKey) } - delete(remainingRepos, r.String()) + delete(remainingRefs, refKey) wg.Done() return nil } timeUnit := time.Millisecond serialized, err := json.Marshal(&map[string]schedulerEntry{ - ref1.String(): { + ref1.Digest().String(): { Expiry: time.Now().Add(10 * timeUnit), Key: ref1.String(), EntryType: 0, }, - ref2.String(): { + ref2.Digest().String(): { Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first Key: ref2.String(), EntryType: 0, @@ -141,8 +154,8 @@ func TestRestoreOld(t *testing.T) { wg.Wait() mu.Lock() defer mu.Unlock() - if len(remainingRepos) != 0 { - t.Fatalf("Repositories remaining: %#v", remainingRepos) + if len(remainingRefs) != 0 { + t.Fatalf("Refs remaining: %#v", remainingRefs) } } @@ -150,15 +163,15 @@ func TestStopRestore(t *testing.T) { ref1, ref2, _ := testRefs(t) timeUnit := time.Millisecond - remainingRepos := map[string]bool{ - ref1.String(): true, - ref2.String(): true, + remainingRefs := map[string]bool{ + ref1.Digest().String(): true, + ref2.Digest().String(): true, } var mu sync.Mutex deleteFunc := func(r reference.Reference) error { mu.Lock() - delete(remainingRepos, r.String()) + delete(remainingRefs, r.(reference.Canonical).Digest().String()) mu.Unlock() return nil } @@ -191,8 +204,8 @@ func TestStopRestore(t *testing.T) { <-time.After(500 * timeUnit) mu.Lock() defer mu.Unlock() - if len(remainingRepos) != 0 { - t.Fatalf("Repositories remaining: %#v", remainingRepos) + if len(remainingRefs) != 0 { + t.Fatalf("Refs remaining: %#v", remainingRefs) } } @@ -207,3 +220,76 @@ func TestDoubleStart(t *testing.T) { t.Fatalf("Scheduler started twice without error") } } + +func TestCommonRef(t *testing.T) { + ref1, ref2, ref3 := testRefs(t) + + timeUnit := time.Millisecond + + // Create a shared blob reference for ref3 + ref3Copy, err := reference.Parse("anothertestrepo@" + ref3.Digest().String()) + if err != nil { + t.Fatalf("could not parse reference: %v", err) + } + cRef3Copy := ref3Copy.(reference.Canonical) + + remainingRefs := map[string]bool{ + ref1.Digest().String(): true, + ref2.Digest().String(): true, + ref3.Digest().String(): true, + } + + var mu sync.Mutex + s := New(dcontext.Background(), inmemory.New(), "/ttl") + deleteFunc := func(ref reference.Reference) error { + cRef, ok := ref.(reference.Canonical) + if !ok { + t.Fatalf("reference is not cannonical (includes name & digest): %v", ref) + } + refKey := cRef.Digest().String() + + if len(remainingRefs) == 0 { + t.Fatalf("Incorrect expiry count") + } + _, ok = remainingRefs[refKey] + if !ok { + t.Fatalf("Trying to remove nonexistent ref: %s", refKey) + } + t.Log("removing", refKey) + mu.Lock() + delete(remainingRefs, refKey) + mu.Unlock() + + return nil + } + s.onBlobExpire = deleteFunc + err = s.Start() + if err != nil { + t.Fatalf("Error starting ttlExpirationScheduler: %s", err) + } + + s.add(ref1, 3*timeUnit, entryTypeBlob) + s.add(ref2, 1*timeUnit, entryTypeBlob) + + func() { + s.Lock() + s.add(ref3, 1*timeUnit, entryTypeBlob) + // This should override the existing expiry of ref3 + s.add(cRef3Copy, 60000*timeUnit, entryTypeBlob) + s.Unlock() + }() + + // Wait for refs to be deleted + <-time.After(50 * timeUnit) + + mu.Lock() + defer mu.Unlock() + + // Only the common blob should be reminaing + if len(remainingRefs) != 1 { + t.Fatalf("Expected 1 ref remaining, but got: %#v", remainingRefs) + } + if _, ok := remainingRefs[ref3.Digest().String()]; !ok { + t.Fatalf("Expected ref3 to be remaining, but got: %#v", remainingRefs) + } +} diff --git a/registry/storage/cache/memory/memory.go b/registry/storage/cache/memory/memory.go index f83894bf54c..8b6072d400d 100644 --- a/registry/storage/cache/memory/memory.go +++ b/registry/storage/cache/memory/memory.go @@ -57,7 +57,7 @@ func (imbdcp *inMemoryBlobDescriptorCacheProvider) RepositoryScoped(repo string) } return &repositoryScopedInMemoryBlobDescriptorCache{ - repo: repo, + repo: "", // todo: removed repo scope to handle common blob invalidation parent: imbdcp, }, nil }