Skip to content

Commit

Permalink
fix: use blob digest as cleanup and cache key for refs in registry pr…
Browse files Browse the repository at this point in the history
…oxy mode
  • Loading branch information
yndai committed Jan 2, 2024
1 parent 316e409 commit 9410501
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 44 deletions.
29 changes: 20 additions & 9 deletions registry/proxy/scheduler/scheduler.go
Expand Up @@ -158,17 +158,22 @@ func (ttles *TTLExpirationScheduler) Start() error {
return nil
}

func (ttles *TTLExpirationScheduler) add(r reference.Reference, ttl time.Duration, eType int) {
func (ttles *TTLExpirationScheduler) add(r reference.Canonical, ttl time.Duration, eType int) {
// Use the raw digest as the scheduler entry key so that common blob TTLs can be properly extended as they are
// pulled with other manifests. Note that schedulerEntry.Key remains as the full reference format to avoid info loss
// when recovering the schedule from disk.
entryKey := r.Digest().String()
entry := &schedulerEntry{
Key: r.String(),
Expiry: time.Now().Add(ttl),
EntryType: eType,
}
dcontext.GetLogger(ttles.ctx).Infof("Adding new scheduler entry for %s with ttl=%s", entry.Key, time.Until(entry.Expiry))
if oldEntry, present := ttles.entries[entry.Key]; present && oldEntry.timer != nil {
if oldEntry, present := ttles.entries[entryKey]; present && oldEntry.timer != nil {
oldEntry.timer.Stop()
dcontext.GetLogger(ttles.ctx).Infof("Replacing existing scheduler entry for common blob %s", entryKey)
}
ttles.entries[entry.Key] = entry
ttles.entries[entryKey] = entry
entry.timer = ttles.startTimer(entry, ttl)
ttles.indexDirty = true
}
Expand All @@ -192,15 +197,21 @@ func (ttles *TTLExpirationScheduler) startTimer(entry *schedulerEntry, ttl time.
}

ref, err := reference.Parse(entry.Key)
if err == nil {
if err := f(ref); err != nil {
dcontext.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
}
} else {
if err != nil {
dcontext.GetLogger(ttles.ctx).Errorf("Error unpacking reference: %s", err)
return
}
cRef, ok := ref.(reference.Canonical)
if !ok {
dcontext.GetLogger(ttles.ctx).Errorf("Scheduler expected canonicalReference, but got : %T", ref)
return
}

if err := f(ref); err != nil {
dcontext.GetLogger(ttles.ctx).Errorf("Scheduler error returned from OnExpire(%s): %s", entry.Key, err)
}

delete(ttles.entries, entry.Key)
delete(ttles.entries, cRef.Digest().String())
ttles.indexDirty = true
})
}
Expand Down
154 changes: 120 additions & 34 deletions registry/proxy/scheduler/scheduler_test.go
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/distribution/reference"
)

func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference.Reference) {
func testRefs(t *testing.T) (reference.Canonical, reference.Canonical, reference.Canonical) {
ref1, err := reference.Parse("testrepo@sha256:aaaaeaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
if err != nil {
t.Fatalf("could not parse reference: %v", err)
Expand All @@ -27,31 +27,37 @@ func testRefs(t *testing.T) (reference.Reference, reference.Reference, reference
t.Fatalf("could not parse reference: %v", err)
}

return ref1, ref2, ref3
return ref1.(reference.Canonical), ref2.(reference.Canonical), ref3.(reference.Canonical)
}

func TestSchedule(t *testing.T) {
ref1, ref2, ref3 := testRefs(t)
timeUnit := time.Millisecond
remainingRepos := map[string]bool{
ref1.String(): true,
ref2.String(): true,
ref3.String(): true,
remainingRefs := map[string]bool{
ref1.Digest().String(): true,
ref2.Digest().String(): true,
ref3.Digest().String(): true,
}

var mu sync.Mutex
s := New(dcontext.Background(), inmemory.New(), "/ttl")
deleteFunc := func(repoName reference.Reference) error {
if len(remainingRepos) == 0 {
deleteFunc := func(ref reference.Reference) error {
cRef, ok := ref.(reference.Canonical)
if !ok {
t.Fatalf("reference is not cannonical (includes name & digest): %v", ref)
}
refKey := cRef.Digest().String()

if len(remainingRefs) == 0 {
t.Fatalf("Incorrect expiry count")
}
_, ok := remainingRepos[repoName.String()]
_, ok = remainingRefs[refKey]
if !ok {
t.Fatalf("Trying to remove nonexistent repo: %s", repoName)
t.Fatalf("Trying to remove nonexistent ref: %s", refKey)
}
t.Log("removing", repoName)
t.Log("removing", refKey)
mu.Lock()
delete(remainingRepos, repoName.String())
delete(remainingRefs, refKey)
mu.Unlock()

return nil
Expand All @@ -71,49 +77,56 @@ func TestSchedule(t *testing.T) {
s.Unlock()
}()

// Ensure all repos are deleted
// Ensure all refs are deleted
<-time.After(50 * timeUnit)

mu.Lock()
defer mu.Unlock()
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
if len(remainingRefs) != 0 {
t.Fatalf("Refs remaining: %#v", remainingRefs)
}
}

func TestRestoreOld(t *testing.T) {
ref1, ref2, _ := testRefs(t)
remainingRepos := map[string]bool{
ref1.String(): true,
ref2.String(): true,
remainingRefs := map[string]bool{
ref1.Digest().String(): true,
ref2.Digest().String(): true,
}

var wg sync.WaitGroup
wg.Add(len(remainingRepos))
wg.Add(len(remainingRefs))
var mu sync.Mutex
deleteFunc := func(r reference.Reference) error {
deleteFunc := func(ref reference.Reference) error {
mu.Lock()
defer mu.Unlock()
if r.String() == ref1.String() && len(remainingRepos) == 2 {

cRef, ok := ref.(reference.Canonical)
if !ok {
t.Fatalf("reference is not cannonical (includes name & digest): %v", ref)
}
refKey := cRef.Digest().String()

if cRef.(reference.Canonical).Digest() == ref1.Digest() && len(remainingRefs) == 2 {
t.Errorf("ref1 should not be removed first")
}
_, ok := remainingRepos[r.String()]
_, ok = remainingRefs[refKey]
if !ok {
t.Fatalf("Trying to remove nonexistent repo: %s", r)
t.Fatalf("Trying to remove nonexistent ref: %s", refKey)
}
delete(remainingRepos, r.String())
delete(remainingRefs, refKey)
wg.Done()
return nil
}

timeUnit := time.Millisecond
serialized, err := json.Marshal(&map[string]schedulerEntry{
ref1.String(): {
ref1.Digest().String(): {
Expiry: time.Now().Add(10 * timeUnit),
Key: ref1.String(),
EntryType: 0,
},
ref2.String(): {
ref2.Digest().String(): {
Expiry: time.Now().Add(-3 * timeUnit), // TTL passed, should be removed first
Key: ref2.String(),
EntryType: 0,
Expand Down Expand Up @@ -141,24 +154,24 @@ func TestRestoreOld(t *testing.T) {
wg.Wait()
mu.Lock()
defer mu.Unlock()
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
if len(remainingRefs) != 0 {
t.Fatalf("Refs remaining: %#v", remainingRefs)
}
}

func TestStopRestore(t *testing.T) {
ref1, ref2, _ := testRefs(t)

timeUnit := time.Millisecond
remainingRepos := map[string]bool{
ref1.String(): true,
ref2.String(): true,
remainingRefs := map[string]bool{
ref1.Digest().String(): true,
ref2.Digest().String(): true,
}

var mu sync.Mutex
deleteFunc := func(r reference.Reference) error {
mu.Lock()
delete(remainingRepos, r.String())
delete(remainingRefs, r.(reference.Canonical).Digest().String())
mu.Unlock()
return nil
}
Expand Down Expand Up @@ -191,8 +204,8 @@ func TestStopRestore(t *testing.T) {
<-time.After(500 * timeUnit)
mu.Lock()
defer mu.Unlock()
if len(remainingRepos) != 0 {
t.Fatalf("Repositories remaining: %#v", remainingRepos)
if len(remainingRefs) != 0 {
t.Fatalf("Refs remaining: %#v", remainingRefs)
}
}

Expand All @@ -207,3 +220,76 @@ func TestDoubleStart(t *testing.T) {
t.Fatalf("Scheduler started twice without error")
}
}

func TestCommonRef(t *testing.T) {
ref1, ref2, ref3 := testRefs(t)

timeUnit := time.Millisecond

// Create a shared blob reference for ref3
ref3Copy, err := reference.Parse("anothertestrepo@" + ref3.Digest().String())
if err != nil {
t.Fatalf("could not parse reference: %v", err)
}
cRef3Copy := ref3Copy.(reference.Canonical)

remainingRefs := map[string]bool{
ref1.Digest().String(): true,
ref2.Digest().String(): true,
ref3.Digest().String(): true,
}

var mu sync.Mutex
s := New(dcontext.Background(), inmemory.New(), "/ttl")
deleteFunc := func(ref reference.Reference) error {
cRef, ok := ref.(reference.Canonical)
if !ok {
t.Fatalf("reference is not cannonical (includes name & digest): %v", ref)
}
refKey := cRef.Digest().String()

if len(remainingRefs) == 0 {
t.Fatalf("Incorrect expiry count")
}
_, ok = remainingRefs[refKey]
if !ok {
t.Fatalf("Trying to remove nonexistent ref: %s", refKey)
}
t.Log("removing", refKey)
mu.Lock()
delete(remainingRefs, refKey)
mu.Unlock()

return nil
}
s.onBlobExpire = deleteFunc
err = s.Start()
if err != nil {
t.Fatalf("Error starting ttlExpirationScheduler: %s", err)
}

s.add(ref1, 3*timeUnit, entryTypeBlob)
s.add(ref2, 1*timeUnit, entryTypeBlob)

func() {
s.Lock()
s.add(ref3, 1*timeUnit, entryTypeBlob)
// This should override the existing expiry of ref3
s.add(cRef3Copy, 60000*timeUnit, entryTypeBlob)
s.Unlock()
}()

// Wait for refs to be deleted
<-time.After(50 * timeUnit)

mu.Lock()
defer mu.Unlock()

// Only the common blob should be reminaing
if len(remainingRefs) != 1 {
t.Fatalf("Expected 1 ref remaining, but got: %#v", remainingRefs)
}
if _, ok := remainingRefs[ref3.Digest().String()]; !ok {
t.Fatalf("Expected ref3 to be remaining, but got: %#v", remainingRefs)
}
}
2 changes: 1 addition & 1 deletion registry/storage/cache/memory/memory.go
Expand Up @@ -57,7 +57,7 @@ func (imbdcp *inMemoryBlobDescriptorCacheProvider) RepositoryScoped(repo string)
}

return &repositoryScopedInMemoryBlobDescriptorCache{
repo: repo,
repo: "", // todo: removed repo scope to handle common blob invalidation
parent: imbdcp,
}, nil
}
Expand Down

0 comments on commit 9410501

Please sign in to comment.