Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support overwriting caches #2265

Merged
merged 11 commits into from
Mar 28, 2024
151 changes: 80 additions & 71 deletions pkg/artifactcache/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
server *http.Server
logger logrus.FieldLogger

gcing int32 // TODO: use atomic.Bool when we can use Go 1.19
gcing atomic.Bool
gcAt time.Time

outboundIP string
Expand Down Expand Up @@ -170,7 +170,7 @@
}
defer db.Close()

cache, err := h.findCache(db, keys, version)
cache, err := findCache(db, keys, version)
if err != nil {
h.responseJSON(w, r, 500, err)
return
Expand Down Expand Up @@ -206,32 +206,17 @@
api.Key = strings.ToLower(api.Key)

cache := api.ToCache()
cache.FillKeyVersionHash()
db, err := h.openDB()
if err != nil {
h.responseJSON(w, r, 500, err)
return
}
defer db.Close()
if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
if !errors.Is(err, bolthold.ErrNotFound) {
h.responseJSON(w, r, 500, err)
return
}
} else {
h.responseJSON(w, r, 400, fmt.Errorf("already exist"))
return
}

now := time.Now().Unix()
cache.CreatedAt = now
cache.UsedAt = now
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
h.responseJSON(w, r, 500, err)
return
}
// write back id to db
if err := db.Update(cache.ID, cache); err != nil {
if err := insertCache(db, cache); err != nil {
h.responseJSON(w, r, 500, err)
return
}
Expand Down Expand Up @@ -364,56 +349,40 @@
}

// if not found, return (nil, nil) instead of an error.
func (h *Handler) findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) {
if len(keys) == 0 {
return nil, nil
}
key := keys[0] // the first key is for exact match.

cache := &Cache{
Key: key,
Version: version,
}
cache.FillKeyVersionHash()

if err := db.FindOne(cache, bolthold.Where("KeyVersionHash").Eq(cache.KeyVersionHash)); err != nil {
if !errors.Is(err, bolthold.ErrNotFound) {
return nil, err
}
} else if cache.Complete {
return cache, nil
}
ChristopherHX marked this conversation as resolved.
Show resolved Hide resolved
stop := fmt.Errorf("stop")

func findCache(db *bolthold.Store, keys []string, version string) (*Cache, error) {
cache := &Cache{}
for _, prefix := range keys {
found := false
prefixPattern := fmt.Sprintf("^%s", regexp.QuoteMeta(prefix))
re, err := regexp.Compile(prefixPattern)
if err != nil {
continue
}
if err := db.ForEach(bolthold.Where("Key").RegExp(re).And("Version").Eq(version).SortBy("CreatedAt").Reverse(), func(v *Cache) error {
if !strings.HasPrefix(v.Key, prefix) {
return stop
}
if v.Complete {
cache = v
found = true
return stop
}
return nil
}); err != nil {
if !errors.Is(err, stop) {
return nil, err
if err := db.FindOne(cache,
bolthold.Where("Key").RegExp(re).
And("Version").Eq(version).
And("Complete").Eq(true).
SortBy("CreatedAt").Reverse()); err != nil {
if errors.Is(err, bolthold.ErrNotFound) {
continue
}
return nil, fmt.Errorf("find cache: %w", err)

Check warning on line 368 in pkg/artifactcache/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/artifactcache/handler.go#L368

Added line #L368 was not covered by tests
}
if found {
return cache, nil
}
return cache, nil
}
return nil, nil
}

func insertCache(db *bolthold.Store, cache *Cache) error {
if err := db.Insert(bolthold.NextSequence(), cache); err != nil {
return fmt.Errorf("insert cache: %w", err)
}

Check warning on line 378 in pkg/artifactcache/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/artifactcache/handler.go#L377-L378

Added lines #L377 - L378 were not covered by tests
// write back id to db
if err := db.Update(cache.ID, cache); err != nil {
return fmt.Errorf("write back id to db: %w", err)
}

Check warning on line 382 in pkg/artifactcache/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/artifactcache/handler.go#L381-L382

Added lines #L381 - L382 were not covered by tests
return nil
}

func (h *Handler) useCache(id int64) {
db, err := h.openDB()
if err != nil {
Expand All @@ -428,14 +397,21 @@
_ = db.Update(cache.ID, cache)
}

const (
keepUsed = 30 * 24 * time.Hour
keepUnused = 7 * 24 * time.Hour
keepTemp = 5 * time.Minute
keepOld = 5 * time.Minute
)

func (h *Handler) gcCache() {
if atomic.LoadInt32(&h.gcing) != 0 {
if h.gcing.Load() {
return
}
if !atomic.CompareAndSwapInt32(&h.gcing, 0, 1) {
if !h.gcing.CompareAndSwap(false, true) {
return
}
defer atomic.StoreInt32(&h.gcing, 0)
defer h.gcing.Store(false)

if time.Since(h.gcAt) < time.Hour {
h.logger.Debugf("skip gc: %v", h.gcAt.String())
Expand All @@ -444,26 +420,21 @@
h.gcAt = time.Now()
h.logger.Debugf("gc: %v", h.gcAt.String())

const (
keepUsed = 30 * 24 * time.Hour
keepUnused = 7 * 24 * time.Hour
keepTemp = 5 * time.Minute
)

db, err := h.openDB()
if err != nil {
return
}
defer db.Close()

// Remove the caches which are not completed for a while, they are most likely to be broken.
var caches []*Cache
if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix())); err != nil {
if err := db.Find(&caches, bolthold.
Where("UsedAt").Lt(time.Now().Add(-keepTemp).Unix()).
And("Complete").Eq(false),
); err != nil {
h.logger.Warnf("find caches: %v", err)
} else {
for _, cache := range caches {
if cache.Complete {
continue
}
h.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
h.logger.Warnf("delete cache: %v", err)
Expand All @@ -473,8 +444,11 @@
}
}

// Remove the old caches which have not been used recently.
caches = caches[:0]
if err := db.Find(&caches, bolthold.Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix())); err != nil {
if err := db.Find(&caches, bolthold.
Where("UsedAt").Lt(time.Now().Add(-keepUnused).Unix()),
); err != nil {
h.logger.Warnf("find caches: %v", err)
} else {
for _, cache := range caches {
Expand All @@ -487,8 +461,11 @@
}
}

// Remove the old caches which are too old.
caches = caches[:0]
if err := db.Find(&caches, bolthold.Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix())); err != nil {
if err := db.Find(&caches, bolthold.
Where("CreatedAt").Lt(time.Now().Add(-keepUsed).Unix()),
); err != nil {
h.logger.Warnf("find caches: %v", err)
} else {
for _, cache := range caches {
Expand All @@ -500,6 +477,38 @@
h.logger.Infof("deleted cache: %+v", cache)
}
}

// Remove the old caches with the same key and version, keep the latest one.
// Also keep the olds which have been used recently for a while in case of the cache is still in use.
if results, err := db.FindAggregate(
&Cache{},
bolthold.Where("Complete").Eq(true),
"Key", "Version",
); err != nil {
h.logger.Warnf("find aggregate caches: %v", err)

Check warning on line 488 in pkg/artifactcache/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/artifactcache/handler.go#L488

Added line #L488 was not covered by tests
} else {
for _, result := range results {
if result.Count() <= 1 {
continue

Check warning on line 492 in pkg/artifactcache/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/artifactcache/handler.go#L492

Added line #L492 was not covered by tests
}
result.Sort("CreatedAt")
caches = caches[:0]
result.Reduction(&caches)
for _, cache := range caches[:len(caches)-1] {
if time.Since(time.Unix(cache.UsedAt, 0)) < keepOld {
// Keep it since it has been used recently, even if it's old.
// Or it could break downloading in process.
continue
}
h.storage.Remove(cache.ID)
if err := db.Delete(cache.ID, cache); err != nil {
h.logger.Warnf("delete cache: %v", err)
continue

Check warning on line 506 in pkg/artifactcache/handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/artifactcache/handler.go#L505-L506

Added lines #L505 - L506 were not covered by tests
}
h.logger.Infof("deleted cache: %+v", cache)
}
}
}
}

func (h *Handler) responseJSON(w http.ResponseWriter, r *http.Request, code int, v ...any) {
Expand Down