Skip to content

Commit

Permalink
Removes old, empty directories from datatype root directories (#99)
Browse files Browse the repository at this point in the history
* Removes old, empty directories

Pusher did not remove old directories. Every extant directory creates an
inotify watch in pusher. Over time and/or for experiments (e.g., wehe)
that create a large amount of directories, this can create a very large
number of useless watches, which eat up memory, and eventually may run
up against fs.inotify.max_user_watches, causing other containers on the
system to fail.

This commit also fixes an apparent variable name bug in which the value
for maxFileAge was put into a variable named minFileAge.

* Removes old, empty directories when walking dir

Previously, directories were not touched. On long-running containers
directories would begin to pile up causing a large amount of useless
pusher inotify directories watches.

* Uses Go v1.18 to build Docker image, and Alpine 3.15

* Updates Go modules

* Reverts prometheus/client_golang to v1.11.0

I was running into this with newer versions:

prometheus/prometheus#10574

* Updates Travis to use Go v1.18

* Reverts Go to v1.17 (from v1.18)

The travis build was failing with something like this:

mattn/goveralls#207

... and the Docker build was failing with this error:

Step 8/10 : COPY --from=build /go/bin/pusher /
COPY failed: stat go/bin/pusher: file does not exist

* Adds a clarifying comment about removing dirs

* Simplifies dir removal logic, removes branching
  • Loading branch information
nkinkade committed May 12, 2022
1 parent 174abf0 commit e64e83e
Show file tree
Hide file tree
Showing 6 changed files with 573 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,7 +1,7 @@
language: go

go:
- "1.13"
- "1.17"

services:
- docker
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
@@ -1,4 +1,4 @@
FROM golang:1.13 as build
FROM golang:1.17 as build
# Add the local files to be sure we are building the local source code instead
# of downloading from GitHub.
# Don't add any of the other libraries, because we live at HEAD.
Expand All @@ -13,7 +13,7 @@ RUN go get \
github.com/m-lab/pusher

# Now copy the built binary into a minimal base image.
FROM alpine:3.7
FROM alpine:3.15
# By default, alpine has no root certs. Add them so pusher can use PKI to
# verify that Google Cloud Storage is actually Google Cloud Storage.
RUN apk add --no-cache ca-certificates
Expand Down
53 changes: 50 additions & 3 deletions finder/findfiles.go
Expand Up @@ -15,6 +15,7 @@ package finder

import (
"context"
"io"
"log"
"os"
"path/filepath"
Expand All @@ -27,6 +28,11 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

// The minimum age of a directory before it will be considered for removal, if
// it is also empty. 25h should ensure that the current day's directory is never
// removed prematurely.
const minDirectoryAge time.Duration = 25 * time.Hour

// Set up the prometheus metrics.
var (
pusherFinderRuns = promauto.NewCounter(prometheus.CounterOpts{
Expand All @@ -52,20 +58,22 @@ var (

// findFiles recursively searches through a given directory to find all the files which are old enough to be eligible for upload.
// The list of files returned is sorted by mtime.
func findFiles(datatype string, directory filename.System, minFileAge time.Duration) []filename.System {
func findFiles(datatype string, directory filename.System, maxFileAge time.Duration) []filename.System {
// Give an initial capacity to the slice. 1024 chosen because it's a nice round number.
// TODO: Choose a better default.
eligibleFiles := make(map[filename.System]os.FileInfo)
eligibleTime := time.Now().Add(-minFileAge)
eligibleTime := time.Now().Add(-maxFileAge)
totalEligibleSize := int64(0)

err := filepath.Walk(string(directory), func(path string, info os.FileInfo, err error) error {
if err != nil {
// Any error terminates the walk.
return err
}
// Check whether a directory is very old and empty, and removes it if so.
if info.IsDir() {
return nil
err = checkDirectory(datatype, path, info.ModTime())
return err
}
if eligibleTime.After(info.ModTime()) {
eligibleFiles[filename.System(path)] = info
Expand Down Expand Up @@ -100,6 +108,45 @@ func findFiles(datatype string, directory filename.System, minFileAge time.Durat
return fileList
}

// checkDirectory checks to see if a directory is sufficiently old and empty.
// If so, it removes the directory from the filesystem to prevent old, empty
// directories from piling up in the filesystem.
func checkDirectory(datatype string, path string, mTime time.Time) error {
// Do not delete the root datatype directory.
if datatype == filepath.Base(path) {
return nil
}
// Do nothing if the directory is less than constant minDirectoryAge. This
// could probably be more aggressive.
eligibleTime := time.Now().Add(-minDirectoryAge)
if mTime.After(eligibleTime) {
return nil
}
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
// Read the contents of the directory, looking only as far as the first file
// found. We don't care how many files there are, only that at least one
// exists. An error of type io.EOF indicates an empty directory.
// https://pkg.go.dev/os#File.Readdirnames
// https://stackoverflow.com/a/30708914
// Implementation note: we are using Readdirnames() instead of Readdir()
// because the former does not stat each file, but only returns file names,
// which is more efficient for our use case.
_, err = f.Readdirnames(1)
if err != io.EOF {
return err
}
err = os.Remove(path)
if err != nil {
return err
}
log.Printf("Removed old, empty directory %s.", path)
return nil
}

// FindForever repeatedly runs FindFiles until its context is canceled.
//
// It randomizes the inter-`find` sleep time in an effort to avoid thundering
Expand Down
42 changes: 37 additions & 5 deletions finder/findfiles_test.go
Expand Up @@ -2,6 +2,8 @@ package finder_test

import (
"context"
"errors"
"io/fs"
"io/ioutil"
"os"
"testing"
Expand All @@ -25,6 +27,21 @@ func TestFindForever(t *testing.T) {
rtx.Must(ioutil.WriteFile(tempdir+"/next_oldest_file", []byte("moredata\n"), 0644), "WriteFile failed")
newtime = time.Now().Add(time.Duration(-12) * time.Hour)
rtx.Must(os.Chtimes(tempdir+"/next_oldest_file", newtime, newtime), "Chtimes failed")
// Set up the directories.
//
// An old, empty directory.
rtx.Must(os.Mkdir(tempdir+"/old_empty_dir", 0750), "Mkdir failed")
newtime = time.Now().Add(time.Duration(-26) * time.Hour)
rtx.Must(os.Chtimes(tempdir+"/old_empty_dir", newtime, newtime), "Chtimes failed")
// An old directory, but not empty.
rtx.Must(os.Mkdir(tempdir+"/old_not_empty_dir", 0750), "Mkdir failed")
newtime = time.Now().Add(time.Duration(-30) * time.Hour)
rtx.Must(os.Chtimes(tempdir+"/old_not_empty_dir", newtime, newtime), "Chtimes failed")
rtx.Must(ioutil.WriteFile(tempdir+"/old_not_empty_dir/test_file", []byte("data\n"), 0644), "WriteFile failed")
newtime = time.Now().Add(time.Duration(-27) * time.Hour)
rtx.Must(os.Chtimes(tempdir+"/old_not_empty_dir/test_file", newtime, newtime), "Chtimes failed")
// A new directory.
rtx.Must(os.Mkdir(tempdir+"/new_dir", 0750), "Mkdir failed")
// Set up the receiver channel.
foundFiles := make(chan filename.System)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -38,15 +55,30 @@ func TestFindForever(t *testing.T) {
localfiles := []filename.System{
<-foundFiles,
<-foundFiles,
<-foundFiles,
}
if len(localfiles) != 2 {
// Test files.
if len(localfiles) != 3 {
t.Errorf("len(localfiles) (%d) != 2", len(localfiles))
}
if string(localfiles[0]) != tempdir+"/oldest_file" {
t.Errorf("wrong name[0]: %s", localfiles[0])
if string(localfiles[0]) != tempdir+"/old_not_empty_dir/test_file" {
t.Errorf("wrong name[1]: %s", localfiles[0])
}
if string(localfiles[1]) != tempdir+"/oldest_file" {
t.Errorf("wrong name[1]: %s", localfiles[0])
}
if string(localfiles[2]) != tempdir+"/next_oldest_file" {
t.Errorf("wrong name[2]: %s", localfiles[1])
}
// Test directories.
if _, err = os.Stat(tempdir + "/old_empty_dir"); errors.Is(err, fs.ErrExist) {
t.Errorf("Directory %s/old_empty_dir exists, but shouldn't", tempdir)
}
if _, err = os.Stat(tempdir + "/old_not_empty_dir"); errors.Is(err, fs.ErrNotExist) {
t.Errorf("Directory %s/old_not_empty_dir does not exist, but should", tempdir)
}
if string(localfiles[1]) != tempdir+"/next_oldest_file" {
t.Errorf("wrong name[1]: %s", localfiles[1])
if _, err = os.Stat(tempdir + "/new_dir"); errors.Is(err, fs.ErrNotExist) {
t.Errorf("Directory %s/new_dir does not exist, but should", tempdir)
}
}

Expand Down
23 changes: 15 additions & 8 deletions go.mod
Expand Up @@ -3,13 +3,20 @@ module github.com/m-lab/pusher
go 1.13

require (
cloud.google.com/go/storage v1.6.0
github.com/go-test/deep v1.0.5 // indirect
github.com/googleapis/google-cloud-go-testing v0.0.0-20191008195207-8e1d251e947d
github.com/m-lab/go v1.3.0
github.com/prometheus/client_golang v1.3.0
cloud.google.com/go v0.101.1 // indirect
cloud.google.com/go/storage v1.22.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/googleapis/google-cloud-go-testing v0.0.0-20210719221736-1c9a4c676720
github.com/json-iterator/go v1.1.12 // indirect
github.com/m-lab/go v0.1.47
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rjeczalik/notify v0.9.2
golang.org/x/net v0.0.0-20200222125558-5a598a2470a0
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae
google.golang.org/api v0.21.0
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6
google.golang.org/api v0.79.0
)

0 comments on commit e64e83e

Please sign in to comment.