author | Alberto Bertogli
<albertito@blitiri.com.ar> 2023-08-28 21:35:53 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2023-08-29 22:52:31 UTC |
parent | 278a5347d9bd4450956cf921905d50f588bd9a76 |
.github/workflows/tests.yaml | +1 | -1 |
summer.go | +11 | -0 |
test/bad_xattr.t | +19 | -0 |
test/basic.t | +6 | -5 |
test/help.t | +4 | -0 |
test/multiroot.t | +5 | -4 |
test/singlefile.t | +2 | -2 |
walk.go | +56 | -2 |
diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 3b881e5..96461be 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -19,7 +19,7 @@ jobs: with: go-version: ">=1.20" - name: install dependencies - run: sudo apt install -y python3-cram + run: sudo apt install -y python3-cram xattr - name: test run: ./test/cover.sh diff --git a/summer.go b/summer.go index 620258e..131ab2e 100644 --- a/summer.go +++ b/summer.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "regexp" + "runtime" "golang.org/x/term" ) @@ -47,6 +48,8 @@ var ( forceTTY = flag.Bool("forcetty", false, "force TTY output") exclude = &RepeatedStringFlag{} excludeRe = &RepeatedStringFlag{} + parallel = flag.Int("parallel", 0, + "number of files to process in parallel (0 = number of CPUs)") ) var options = struct { @@ -64,6 +67,9 @@ var options = struct { // Regexp patterns to exclude. excludeRe []*regexp.Regexp + + // How many files to process in parallel. + parallel int }{} func Usage() { @@ -94,6 +100,11 @@ func main() { options.excludeRe = append(options.excludeRe, regexp.MustCompile(s)) } + options.parallel = *parallel + if options.parallel == 0 { + options.parallel = runtime.NumCPU() + } + op := flag.Arg(0) roots := []string{} if flag.NArg() > 1 { diff --git a/test/bad_xattr.t b/test/bad_xattr.t new file mode 100644 index 0000000..59ada0e --- /dev/null +++ b/test/bad_xattr.t @@ -0,0 +1,19 @@ +Test how a bad/invalid xattr value is handled. This shouldn't happen normally, +but it might if the xattr itself gets corrupted. + + $ alias summer="$TESTDIR/../summer" + $ echo marola > hola + $ summer generate . + 0s: 0 matched, 0 modified, 1 new, 0 corrupted + +Corrupt the xattr by writing data that does not serialize into ChecksumV1. We +achieve that by having less data than it expects. + + $ xattr -w user.summer-v1 "xxxx" hola + +Verify and check the error. + + $ summer verify . + 0s: 0 matched, 0 modified, 0 new, 0 corrupted + error in "hola": unexpected EOF + [1] diff --git a/test/basic.t b/test/basic.t index b28d813..d79bce7 100644 --- a/test/basic.t +++ b/test/basic.t @@ -58,16 +58,17 @@ Editing the file makes us ignore the previous checksum. $ summer verify . 0s: 3 matched, 0 modified, 0 new, 0 corrupted -Check verbose and quiet. +Check verbose and quiet. For verbose, use --parallel=1 to ensure reproducible +output. $ touch denuevo - $ summer -v verify . + $ summer --parallel=1 -v verify . "denuevo": missing checksum attribute "empty": match \(checksum:0, mtime:\d+\) (re) "hola": match \(checksum:916db13f, mtime:\d+\) (re) "nueva": match \(checksum:91f3a28e, mtime:\d+\) (re) 0s: 3 matched, 0 modified, 1 new, 0 corrupted - $ summer -v generate . + $ summer --parallel=1 -v generate . "denuevo": writing checksum \(checksum:0, mtime:\d+\) (re) 0s: 0 matched, 0 modified, 1 new, 0 corrupted $ summer -q verify . @@ -79,7 +80,7 @@ Check verbose and quiet. Check that symlinks are ignored. $ ln -s hola thisisasymlink - $ summer -v verify . + $ summer --parallel=1 -v verify . "empty": match \(checksum:0, mtime:\d+\) (re) "hola": match \(checksum:916db13f, mtime:\d+\) (re) "nueva": match \(checksum:91f3a28e, mtime:\d+\) (re) @@ -87,7 +88,7 @@ Check that symlinks are ignored. Check that the root path doesn't confuse us. - $ summer -v verify $PWD + $ summer --parallel=1 -v verify $PWD "/.*/empty": match \(checksum:0, mtime:\d+\) (re) "/.*/hola": match \(checksum:916db13f, mtime:\d+\) (re) "/.*/nueva": match \(checksum:91f3a28e, mtime:\d+\) (re) diff --git a/test/help.t b/test/help.t index d3b11a1..4a2ec9f 100644 --- a/test/help.t +++ b/test/help.t @@ -38,6 +38,8 @@ No arguments. -forcetty \tforce TTY output (esc) -n\tdry-run mode (do not write anything) (esc) + -parallel int + \tnumber of files to process in parallel (0 = number of CPUs) (esc) -q\tquiet mode (esc) -v\tverbose mode (list each file) (esc) -x\tdon't cross filesystem boundaries (esc) @@ -80,6 +82,8 @@ Too few arguments. -forcetty \tforce TTY output (esc) -n\tdry-run mode (do not write anything) (esc) + -parallel int + \tnumber of files to process in parallel (0 = number of CPUs) (esc) -q\tquiet mode (esc) -v\tverbose mode (list each file) (esc) -x\tdon't cross filesystem boundaries (esc) diff --git a/test/multiroot.t b/test/multiroot.t index 4253d7a..1b3a427 100644 --- a/test/multiroot.t +++ b/test/multiroot.t @@ -24,9 +24,10 @@ Test that individual files work well as roots (common use case). 0s: 5 matched, 0 modified, 0 new, 0 corrupted -Check the order is as expected. +Check the order is as expected (when parallel=1, otherwise the order is not +reproducible). - $ summer -v update A B C + $ summer --parallel=1 -v update A B C "A/a1": match \(checksum:0, mtime:\d+\) (re) "A/a2": match \(checksum:0, mtime:\d+\) (re) "B/b1": match \(checksum:0, mtime:\d+\) (re) @@ -39,14 +40,14 @@ Check how we handle getting an error in the middle. $ chmod 0000 B/b1 - $ summer -v verify A B C + $ summer --parallel=1 -v verify A B C "A/a1": match \(checksum:0, mtime:\d+\) (re) "A/a2": match \(checksum:0, mtime:\d+\) (re) 0s: 2 matched, 0 modified, 0 new, 0 corrupted open B/b1: permission denied [1] - $ summer -v update A B C + $ summer --parallel=1 -v update A B C "A/a1": match \(checksum:0, mtime:\d+\) (re) "A/a2": match \(checksum:0, mtime:\d+\) (re) 0s: 2 matched, 0 modified, 0 new, 0 corrupted diff --git a/test/singlefile.t b/test/singlefile.t index e9068bc..fee8406 100644 --- a/test/singlefile.t +++ b/test/singlefile.t @@ -8,7 +8,7 @@ Test that summer works fine when given a file instead of a directory. "./empty": writing checksum \(checksum:0, mtime:\d+\) (re) 0s: 0 matched, 0 modified, 1 new, 0 corrupted - $ summer -v verify . + $ summer --parallel=1 -v verify . "empty": match \(checksum:0, mtime:\d+\) (re) "hola": missing checksum attribute 0s: 1 matched, 0 modified, 1 new, 0 corrupted @@ -16,7 +16,7 @@ Test that summer works fine when given a file instead of a directory. $ summer update ./hola 0s: 0 matched, 0 modified, 1 new, 0 corrupted - $ summer -v verify . + $ summer --parallel=1 -v verify . "empty": match \(checksum:0, mtime:\d+\) (re) "hola": match \(checksum:239059f6, mtime:\d+\) (re) 0s: 2 matched, 0 modified, 0 new, 0 corrupted diff --git a/walk.go b/walk.go index 403a390..fa6faa3 100644 --- a/walk.go +++ b/walk.go @@ -5,6 +5,7 @@ import ( "io/fs" "os" "path/filepath" + "sync" "syscall" ) @@ -64,18 +65,44 @@ func getDeviceForPath(path string) deviceID { type walkFn func(fd *os.File, info fs.FileInfo, p *Progress) error +type walkItem struct { + fd *os.File + info fs.FileInfo + p *Progress +} + func walk(roots []string, fn walkFn) error { rootDev := deviceID(0) p := NewProgress(options.isTTY) defer p.Stop() + // Launch the workers. + wg := sync.WaitGroup{} + workC := make(chan walkItem) + workerErrs := make(chan error, options.parallel) + for i := 0; i < options.parallel; i++ { + wg.Add(1) + go worker(&wg, workC, fn, workerErrs) + } + + // Helper function used by filepath.WalkDir to send items to the workers. wfn := func(path string, d fs.DirEntry, err error) error { + // On each iteration, check if any of the workers had an error. + // If so, return it, which stops the walk immediately. + if werr, ok := hasErr(workerErrs); ok { + return werr + } + + // Open the file one by one, because as part of doing so, the function + // will return fs.SkipDir as needed, so we can't parallelize it. ok, fd, info, err := openAndInfo(path, d, err, rootDev) if !ok || err != nil { return err } - defer fd.Close() - return fn(fd, info, p) + + // Send the work to the workers. They will close the fd. + workC <- walkItem{fd, info, p} + return nil } var err error @@ -86,9 +113,36 @@ func walk(roots []string, fn walkFn) error { break } } + close(workC) + wg.Wait() + + // Check for any errors in the last iterations. + if werr, ok := hasErr(workerErrs); err == nil && ok { + err = werr + } if p.corrupted > 0 && err == nil { err = fmt.Errorf("detected %d corrupted files", p.corrupted) } return err } + +func worker(wg *sync.WaitGroup, c chan walkItem, fn walkFn, errc chan error) { + defer wg.Done() + for item := range c { + err := fn(item.fd, item.info, item.p) + item.fd.Close() + if err != nil { + errc <- fmt.Errorf("error in %q: %w", item.fd.Name(), err) + } + } +} + +func hasErr(errc chan error) (error, bool) { + select { + case err := <-errc: + return err, true + default: + return nil, false + } +}