Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: limit copy concurrency #351

Merged
merged 1 commit into from Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 39 additions & 16 deletions tools/common/copy.go
Expand Up @@ -26,38 +26,61 @@ func CopyFile(src string, dst string) error {
return err
}

func Copy(src string, dst string, info fs.FileInfo, link bool, verbose bool, wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
func Copy(opts CopyOpts) {
if !opts.info.Mode().IsRegular() {
log.Fatalf("%s is not a regular file", opts.src)
}
if !info.Mode().IsRegular() {
log.Fatalf("%s is not a regular file", src)
}
if link {
if opts.hardlink {
// hardlink this file
if verbose {
fmt.Printf("hardlink %v => %v\n", src, dst)
if opts.verbose {
fmt.Printf("hardlink %v => %v\n", opts.src, opts.dst)
}
err := os.Link(src, dst)
err := os.Link(opts.src, opts.dst)
if err != nil {
// fallback to copy
if verbose {
if opts.verbose {
fmt.Printf("hardlink failed: %v\n", err)
fmt.Printf("copy (fallback) %v => %v\n", src, dst)
fmt.Printf("copy (fallback) %v => %v\n", opts.src, opts.dst)
}
err = CopyFile(src, dst)
err = CopyFile(opts.src, opts.dst)
if err != nil {
log.Fatal(err)
}
}
} else {
// copy this file
if verbose {
fmt.Printf("copy %v => %v\n", src, dst)
if opts.verbose {
fmt.Printf("copy %v => %v\n", opts.src, opts.dst)
}
err := CopyFile(src, dst)
err := CopyFile(opts.src, opts.dst)
if err != nil {
log.Fatal(err)
}
}
}

type CopyWorker struct {
queue <-chan CopyOpts
}

func NewCopyWorker(queue <-chan CopyOpts) *CopyWorker {
return &CopyWorker{queue: queue}
}

func (w *CopyWorker) Run(wg *sync.WaitGroup) {
defer wg.Done()
for opts := range w.queue {
Copy(opts)
}
}

type CopyOpts struct {
src, dst string
info fs.FileInfo
hardlink bool
verbose bool
}

func NewCopyOpts(src string, dst string, info fs.FileInfo, hardlink bool, verbose bool) CopyOpts {
return CopyOpts{src: src, dst: dst, info: info, hardlink: hardlink, verbose: verbose}
}
30 changes: 21 additions & 9 deletions tools/copy_directory/main.go
Expand Up @@ -14,11 +14,14 @@ import (
type pathSet map[string]bool

var srcPaths = pathSet{}
var copyWaitGroup sync.WaitGroup
var hardlink = false
var verbose = false

func copyDir(src string, dst string) error {
type walker struct {
queue chan<- common.CopyOpts
}

func (w *walker) copyDir(src string, dst string) error {
// filepath.WalkDir walks the file tree rooted at root, calling fn for each file or directory in
// the tree, including root. See https://pkg.go.dev/path/filepath#WalkDir for more info.
return filepath.WalkDir(src, func(p string, dirEntry fs.DirEntry, err error) error {
Expand Down Expand Up @@ -59,18 +62,16 @@ func copyDir(src string, dst string) error {
}
if stat.IsDir() {
// symlink points to a directory
return copyDir(linkPath, d)
return w.copyDir(linkPath, d)
} else {
// symlink points to a regular file
copyWaitGroup.Add(1)
go common.Copy(linkPath, d, stat, hardlink, verbose, &copyWaitGroup)
w.queue <- common.NewCopyOpts(linkPath, d, stat, hardlink, verbose)
return nil
}
}

// a regular file
copyWaitGroup.Add(1)
go common.Copy(p, d, info, hardlink, verbose, &copyWaitGroup)
w.queue <- common.NewCopyOpts(p, d, info, hardlink, verbose)
return nil
})
}
Expand Down Expand Up @@ -103,8 +104,19 @@ func main() {
}
}

if err := copyDir(src, dst); err != nil {
queue := make(chan common.CopyOpts, 100)
var wg sync.WaitGroup

const numWorkers = 10
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go common.NewCopyWorker(queue).Run(&wg)
}

walker := &walker{queue}
if err := walker.copyDir(src, dst); err != nil {
log.Fatal(err)
}
copyWaitGroup.Wait()
close(queue)
f0rmiga marked this conversation as resolved.
Show resolved Hide resolved
wg.Wait()
}
51 changes: 30 additions & 21 deletions tools/copy_to_directory/main.go
Expand Up @@ -3,8 +3,8 @@ package main
import (
"encoding/json"
"fmt"
"io"
"io/fs"
"io/ioutil"
"log"
"os"
"path"
Expand Down Expand Up @@ -49,7 +49,6 @@ type config struct {
type copyMap map[string]fileInfo
type pathSet map[string]bool

var copyWaitGroup sync.WaitGroup
var copySet = copyMap{}
var mkdirSet = pathSet{}

Expand All @@ -60,7 +59,7 @@ func parseConfig(configPath string) (*config, error) {
}
defer f.Close()

byteValue, err := ioutil.ReadAll(f)
byteValue, err := io.ReadAll(f)
f0rmiga marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
Expand Down Expand Up @@ -118,7 +117,11 @@ func longestGlobMatch(g string, test string) (string, error) {
return "", nil
}

func copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
type walker struct {
queue chan<- common.CopyOpts
}

func (w *walker) copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
if srcPaths == nil {
srcPaths = pathSet{}
}
Expand Down Expand Up @@ -178,7 +181,7 @@ func copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
Hardlink: file.Hardlink,
FileInfo: stat,
}
return copyDir(cfg, srcPaths, f)
return w.copyDir(cfg, srcPaths, f)
} else {
// symlink points to a regular file
f := fileInfo{
Expand All @@ -191,7 +194,7 @@ func copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
Hardlink: file.Hardlink,
FileInfo: stat,
}
return copyPath(cfg, f)
return w.copyPath(cfg, f)
}
}

Expand All @@ -206,11 +209,11 @@ func copyDir(cfg *config, srcPaths pathSet, file fileInfo) error {
Hardlink: file.Hardlink,
FileInfo: info,
}
return copyPath(cfg, f)
return w.copyPath(cfg, f)
})
}

func copyPath(cfg *config, file fileInfo) error {
func (w *walker) copyPath(cfg *config, file fileInfo) error {
// Apply filters and transformations in the following order:
//
// - `include_external_repositories`
Expand Down Expand Up @@ -261,10 +264,7 @@ func copyPath(cfg *config, file fileInfo) error {
return err
}
if rootPathMatch != "" {
outputPath = outputPath[len(rootPathMatch):]
if strings.HasPrefix(outputPath, "/") {
outputPath = outputPath[1:]
}
outputPath = strings.TrimPrefix(outputPath[len(rootPathMatch):], "/")
}

// apply include_srcs_patterns
Expand Down Expand Up @@ -323,14 +323,13 @@ func copyPath(cfg *config, file fileInfo) error {

if !cfg.AllowOverwrites {
// if we don't allow overwrites then we can start copying as soon as a copy is calculated
copyWaitGroup.Add(1)
go common.Copy(file.Path, outputPath, file.FileInfo, file.Hardlink, cfg.Verbose, &copyWaitGroup)
w.queue <- common.NewCopyOpts(file.Path, outputPath, file.FileInfo, file.Hardlink, cfg.Verbose)
}

return nil
}

func copyPaths(cfg *config) error {
func (w *walker) copyPaths(cfg *config) error {
for _, file := range cfg.Files {
info, err := os.Lstat(file.Path)
if err != nil {
Expand All @@ -355,11 +354,11 @@ func copyPaths(cfg *config) error {
}

if file.FileInfo.IsDir() {
if err := copyDir(cfg, nil, file); err != nil {
if err := w.copyDir(cfg, nil, file); err != nil {
return err
}
} else {
if err := copyPath(cfg, file); err != nil {
if err := w.copyPath(cfg, file); err != nil {
return err
}
}
Expand Down Expand Up @@ -387,18 +386,28 @@ func main() {
log.Fatal(err)
}

if err = copyPaths(cfg); err != nil {
queue := make(chan common.CopyOpts, 100)
var wg sync.WaitGroup

const numWorkers = 10
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go common.NewCopyWorker(queue).Run(&wg)
}

walker := &walker{queue}
if err = walker.copyPaths(cfg); err != nil {
log.Fatal(err)
}

if cfg.AllowOverwrites {
// if we allow overwrites then we must wait until all copy paths are calculated before starting
// any copy operations
for outputPath, file := range copySet {
copyWaitGroup.Add(1)
go common.Copy(file.Path, outputPath, file.FileInfo, file.Hardlink, cfg.Verbose, &copyWaitGroup)
queue <- common.NewCopyOpts(file.Path, outputPath, file.FileInfo, file.Hardlink, cfg.Verbose)
}
}

copyWaitGroup.Wait()
close(queue)
wg.Wait()
}