Skip to content

Commit

Permalink
fix: limit copy concurrency (#351)
Browse files Browse the repository at this point in the history
Signed-off-by: Thulio Ferraz Assis <3149049+f0rmiga@users.noreply.github.com>
  • Loading branch information
f0rmiga committed Feb 1, 2023
1 parent 3d58a71 commit 2854f86
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 46 deletions.
55 changes: 39 additions & 16 deletions tools/common/copy.go
Expand Up @@ -26,38 +26,61 @@ func CopyFile(src string, dst string) error {
return err
}

func Copy(src string, dst string, info fs.FileInfo, link bool, verbose bool, wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
func Copy(opts CopyOpts) {
if !opts.info.Mode().IsRegular() {
log.Fatalf("%s is not a regular file", opts.src)
}
if !info.Mode().IsRegular() {
log.Fatalf("%s is not a regular file", src)
}
if link {
if opts.hardlink {
// hardlink this file
if verbose {
fmt.Printf("hardlink %v => %v\n", src, dst)
if opts.verbose {
fmt.Printf("hardlink %v => %v\n", opts.src, opts.dst)
}
err := os.Link(src, dst)
err := os.Link(opts.src, opts.dst)
if err != nil {
// fallback to copy
if verbose {
if opts.verbose {
fmt.Printf("hardlink failed: %v\n", err)
fmt.Printf("copy (fallback) %v => %v\n", src, dst)
fmt.Printf("copy (fallback) %v => %v\n", opts.src, opts.dst)
}
err = CopyFile(src, dst)
err = CopyFile(opts.src, opts.dst)
if err != nil {
log.Fatal(err)
}
}
} else {
// copy this file
if verbose {
fmt.Printf("copy %v => %v\n", src, dst)
if opts.verbose {
fmt.Printf("copy %v => %v\n", opts.src, opts.dst)
}
err := CopyFile(src, dst)
err := CopyFile(opts.src, opts.dst)
if err != nil {
log.Fatal(err)
}
}
}

type CopyWorker struct {
queue <-chan CopyOpts
}

func NewCopyWorker(queue <-chan CopyOpts) *CopyWorker {
return &CopyWorker{queue: queue}
}

func (w *CopyWorker) Run(wg *sync.WaitGroup) {
defer wg.Done()
for opts := range w.queue {
Copy(opts)
}
}

type CopyOpts struct {
src, dst string
info fs.FileInfo
hardlink bool
verbose bool
}

func NewCopyOpts(src string, dst string, info fs.FileInfo, hardlink bool, verbose bool) CopyOpts {
return CopyOpts{src: src, dst: dst, info: info, hardlink: hardlink, verbose: verbose}
}
30 changes: 21 additions & 9 deletions tools/copy_directory/main.go
Expand Up @@ -14,11 +14,14 @@ import (
type pathSet map[string]bool

var srcPaths = pathSet{}
var copyWaitGroup sync.WaitGroup
var hardlink = false
var verbose = false

func copyDir(src string, dst string) error {
type walker struct {
queue chan<- common.CopyOpts
}

func (w *walker) copyDir(src string, dst string) error {
// filepath.WalkDir walks the file tree rooted at root, calling fn for each file or directory in
// the tree, including root. See https://pkg.go.dev/path/filepath#WalkDir for more info.
return filepath.WalkDir(src, func(p string, dirEntry fs.DirEntry, err error) error {
Expand Down Expand Up @@ -59,18 +62,16 @@ func copyDir(src string, dst string) error {
}
if stat.IsDir() {
// symlink points to a directory
return copyDir(linkPath, d)
return w.copyDir(linkPath, d)
} else {
// symlink points to a regular file
copyWaitGroup.Add(1)
go common.Copy(linkPath, d, stat, hardlink, verbose, &copyWaitGroup)
w.queue <- common.NewCopyOpts(linkPath, d, stat, hardlink, verbose)
return nil
}
}

// a regular file
copyWaitGroup.Add(1)
go common.Copy(p, d, info, hardlink, verbose, &copyWaitGroup)
w.queue <- common.NewCopyOpts(p, d, info, hardlink, verbose)
return nil
})
}
Expand Down Expand Up @@ -103,8 +104,19 @@ func main() {
}
}

if err := copyDir(src, dst); err != nil {
queue := make(chan common.CopyOpts, 100)
var wg sync.WaitGroup

const numWorkers = 10
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go common.NewCopyWorker(queue).Run(&wg)
}

walker := &walker{queue}
if err := walker.copyDir(src, dst); err != nil {
log.Fatal(err)
}
copyWaitGroup.Wait()
close(queue)
wg.Wait()
}
51 changes: 30 additions & 21 deletions tools/copy_to_directory/main.go
Expand Up @@ -3,8 +3,8 @@ package main
import (
"encoding/json"
"fmt"
"io"
"io/fs"
"io/ioutil"
"log"
"os"
"path"
Expand Down Expand Up @@ -49,7 +49,6 @@ type config struct {
type copyMap map[string]fileInfo
type pathSet map[string]bool

var copyWaitGroup sync.WaitGroup
var copySet = copyMap{}
var mkdirSet = pathSet{}

Expand All @@ -60,7 +59,7 @@ func parseConfig(configPath string) (*config, error) {
}
defer f.Close()

byteValue, err := ioutil.ReadAll(f)
byteValue, err := io.ReadAll(f)
if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
Expand Down Expand Up @@ -118,7 +117,11 @@ func longestGlobMatch(g string, test string) (string, error) {
return "", nil
}

func copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
type walker struct {
queue chan<- common.CopyOpts
}

func (w *walker) copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
if srcPaths == nil {
srcPaths = pathSet{}
}
Expand Down Expand Up @@ -178,7 +181,7 @@ func copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
Hardlink: file.Hardlink,
FileInfo: stat,
}
return copyDir(cfg, srcPaths, f)
return w.copyDir(cfg, srcPaths, f)
} else {
// symlink points to a regular file
f := fileInfo{
Expand All @@ -191,7 +194,7 @@ func copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
Hardlink: file.Hardlink,
FileInfo: stat,
}
return copyPath(cfg, f)
return w.copyPath(cfg, f)
}
}

Expand All @@ -206,11 +209,11 @@ func copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
Hardlink: file.Hardlink,
FileInfo: info,
}
return copyPath(cfg, f)
return w.copyPath(cfg, f)
})
}

func copyPath(cfg *config, file fileInfo) error {
func (w *walker) copyPath(cfg *config, file fileInfo) error {
// Apply filters and transformations in the following order:
//
// - `include_external_repositories`
Expand Down Expand Up @@ -261,10 +264,7 @@ func copyPath(cfg *config, file fileInfo) error {
return err
}
if rootPathMatch != "" {
outputPath = outputPath[len(rootPathMatch):]
if strings.HasPrefix(outputPath, "/") {
outputPath = outputPath[1:]
}
outputPath = strings.TrimPrefix(outputPath[len(rootPathMatch):], "/")
}

// apply include_srcs_patterns
Expand Down Expand Up @@ -323,14 +323,13 @@ func copyPath(cfg *config, file fileInfo) error {

if !cfg.AllowOverwrites {
// if we don't allow overwrites then we can start copying as soon as a copy is calculated
copyWaitGroup.Add(1)
go common.Copy(file.Path, outputPath, file.FileInfo, file.Hardlink, cfg.Verbose, &copyWaitGroup)
w.queue <- common.NewCopyOpts(file.Path, outputPath, file.FileInfo, file.Hardlink, cfg.Verbose)
}

return nil
}

func copyPaths(cfg *config) error {
func (w *walker) copyPaths(cfg *config) error {
for _, file := range cfg.Files {
info, err := os.Lstat(file.Path)
if err != nil {
Expand All @@ -355,11 +354,11 @@ func copyPaths(cfg *config) error {
}

if file.FileInfo.IsDir() {
if err := copyDir(cfg, nil, file); err != nil {
if err := w.copyDir(cfg, nil, file); err != nil {
return err
}
} else {
if err := copyPath(cfg, file); err != nil {
if err := w.copyPath(cfg, file); err != nil {
return err
}
}
Expand Down Expand Up @@ -387,18 +386,28 @@ func main() {
log.Fatal(err)
}

if err = copyPaths(cfg); err != nil {
queue := make(chan common.CopyOpts, 100)
var wg sync.WaitGroup

const numWorkers = 10
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go common.NewCopyWorker(queue).Run(&wg)
}

walker := &walker{queue}
if err = walker.copyPaths(cfg); err != nil {
log.Fatal(err)
}

if cfg.AllowOverwrites {
// if we allow overwrites then we must wait until all copy paths are calculated before starting
// any copy operations
for outputPath, file := range copySet {
copyWaitGroup.Add(1)
go common.Copy(file.Path, outputPath, file.FileInfo, file.Hardlink, cfg.Verbose, &copyWaitGroup)
queue <- common.NewCopyOpts(file.Path, outputPath, file.FileInfo, file.Hardlink, cfg.Verbose)
}
}

copyWaitGroup.Wait()
close(queue)
wg.Wait()
}

0 comments on commit 2854f86

Please sign in to comment.