git » summer » commit 67bb7eb

Parallelize walking logic

author Alberto Bertogli
2023-08-28 21:35:53 UTC
committer Alberto Bertogli
2023-08-29 22:52:31 UTC
parent 278a5347d9bd4450956cf921905d50f588bd9a76

Parallelize walking logic

This patch makes walk() run in parallel by default.

Depending on the media and the nature of the files, this can make things
faster or slower.

Simple, unscientific experiments comparing serial to parallel show large
improvements for data in cache or on SSD, and mixed results on HDD
(depending a lot on the drive and the data itself):

       | before | after | difference
cached |   8.8s |  5.2s |       -40%
ssd    |  28.4s | 12.6s |       -56%
hdd 1  |  12.1s | 14.8s |       +22%
hdd 2  |  23.2s | 18.4s |       -20%

.github/workflows/tests.yaml +1 -1
summer.go +11 -0
test/bad_xattr.t +19 -0
test/basic.t +6 -5
test/help.t +4 -0
test/multiroot.t +5 -4
test/singlefile.t +2 -2
walk.go +56 -2

diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml
index 3b881e5..96461be 100644
--- a/.github/workflows/tests.yaml
+++ b/.github/workflows/tests.yaml
@@ -19,7 +19,7 @@ jobs:
         with:
           go-version: ">=1.20"
       - name: install dependencies
-        run: sudo apt install -y python3-cram
+        run: sudo apt install -y python3-cram xattr
 
       - name: test
         run: ./test/cover.sh
diff --git a/summer.go b/summer.go
index 620258e..131ab2e 100644
--- a/summer.go
+++ b/summer.go
@@ -9,6 +9,7 @@ import (
 	"os"
 	"path/filepath"
 	"regexp"
+	"runtime"
 
 	"golang.org/x/term"
 )
@@ -47,6 +48,8 @@ var (
 	forceTTY      = flag.Bool("forcetty", false, "force TTY output")
 	exclude       = &RepeatedStringFlag{}
 	excludeRe     = &RepeatedStringFlag{}
+	parallel      = flag.Int("parallel", 0,
+		"number of files to process in parallel (0 = number of CPUs)")
 )
 
 var options = struct {
@@ -64,6 +67,9 @@ var options = struct {
 
 	// Regexp patterns to exclude.
 	excludeRe []*regexp.Regexp
+
+	// How many files to process in parallel.
+	parallel int
 }{}
 
 func Usage() {
@@ -94,6 +100,11 @@ func main() {
 		options.excludeRe = append(options.excludeRe, regexp.MustCompile(s))
 	}
 
+	options.parallel = *parallel
+	if options.parallel == 0 {
+		options.parallel = runtime.NumCPU()
+	}
+
 	op := flag.Arg(0)
 	roots := []string{}
 	if flag.NArg() > 1 {
diff --git a/test/bad_xattr.t b/test/bad_xattr.t
new file mode 100644
index 0000000..59ada0e
--- /dev/null
+++ b/test/bad_xattr.t
@@ -0,0 +1,19 @@
+Test how a bad/invalid xattr value is handled. This shouldn't happen normally,
+but it might if the xattr itself gets corrupted.
+
+  $ alias summer="$TESTDIR/../summer"
+  $ echo marola > hola
+  $ summer generate .
+  0s: 0 matched, 0 modified, 1 new, 0 corrupted
+
+Corrupt the xattr by writing data that does not serialize into ChecksumV1. We
+achieve that by having less data than it expects.
+
+  $ xattr -w user.summer-v1 "xxxx" hola
+
+Verify and check the error.
+
+  $ summer verify .
+  0s: 0 matched, 0 modified, 0 new, 0 corrupted
+  error in "hola": unexpected EOF
+  [1]
diff --git a/test/basic.t b/test/basic.t
index b28d813..d79bce7 100644
--- a/test/basic.t
+++ b/test/basic.t
@@ -58,16 +58,17 @@ Editing the file makes us ignore the previous checksum.
   $ summer verify .
   0s: 3 matched, 0 modified, 0 new, 0 corrupted
 
-Check verbose and quiet.
+Check verbose and quiet. For verbose, use --parallel=1 to ensure reproducible
+output.
 
   $ touch denuevo
-  $ summer -v verify .
+  $ summer --parallel=1 -v verify .
   "denuevo": missing checksum attribute
   "empty": match \(checksum:0, mtime:\d+\) (re)
   "hola": match \(checksum:916db13f, mtime:\d+\) (re)
   "nueva": match \(checksum:91f3a28e, mtime:\d+\) (re)
   0s: 3 matched, 0 modified, 1 new, 0 corrupted
-  $ summer -v generate .
+  $ summer --parallel=1 -v generate .
   "denuevo": writing checksum \(checksum:0, mtime:\d+\) (re)
   0s: 0 matched, 0 modified, 1 new, 0 corrupted
   $ summer -q verify .
@@ -79,7 +80,7 @@ Check verbose and quiet.
 Check that symlinks are ignored.
 
   $ ln -s hola thisisasymlink
-  $ summer -v verify .
+  $ summer --parallel=1 -v verify .
   "empty": match \(checksum:0, mtime:\d+\) (re)
   "hola": match \(checksum:916db13f, mtime:\d+\) (re)
   "nueva": match \(checksum:91f3a28e, mtime:\d+\) (re)
@@ -87,7 +88,7 @@ Check that symlinks are ignored.
 
 Check that the root path doesn't confuse us.
 
-  $ summer -v verify $PWD
+  $ summer --parallel=1 -v verify $PWD
   "/.*/empty": match \(checksum:0, mtime:\d+\) (re)
   "/.*/hola": match \(checksum:916db13f, mtime:\d+\) (re)
   "/.*/nueva": match \(checksum:91f3a28e, mtime:\d+\) (re)
diff --git a/test/help.t b/test/help.t
index d3b11a1..4a2ec9f 100644
--- a/test/help.t
+++ b/test/help.t
@@ -38,6 +38,8 @@ No arguments.
     -forcetty
       \tforce TTY output (esc)
     -n\tdry-run mode (do not write anything) (esc)
+    -parallel int
+      \tnumber of files to process in parallel (0 = number of CPUs) (esc)
     -q\tquiet mode (esc)
     -v\tverbose mode (list each file) (esc)
     -x\tdon't cross filesystem boundaries (esc)
@@ -80,6 +82,8 @@ Too few arguments.
     -forcetty
       \tforce TTY output (esc)
     -n\tdry-run mode (do not write anything) (esc)
+    -parallel int
+      \tnumber of files to process in parallel (0 = number of CPUs) (esc)
     -q\tquiet mode (esc)
     -v\tverbose mode (list each file) (esc)
     -x\tdon't cross filesystem boundaries (esc)
diff --git a/test/multiroot.t b/test/multiroot.t
index 4253d7a..1b3a427 100644
--- a/test/multiroot.t
+++ b/test/multiroot.t
@@ -24,9 +24,10 @@ Test that individual files work well as roots (common use case).
   0s: 5 matched, 0 modified, 0 new, 0 corrupted
 
 
-Check the order is as expected.
+Check the order is as expected (when parallel=1, otherwise the order is not
+reproducible).
 
-  $ summer -v update A B C
+  $ summer --parallel=1 -v update A B C
   "A/a1": match \(checksum:0, mtime:\d+\) (re)
   "A/a2": match \(checksum:0, mtime:\d+\) (re)
   "B/b1": match \(checksum:0, mtime:\d+\) (re)
@@ -39,14 +40,14 @@ Check how we handle getting an error in the middle.
 
   $ chmod 0000 B/b1
 
-  $ summer -v verify A B C
+  $ summer --parallel=1 -v verify A B C
   "A/a1": match \(checksum:0, mtime:\d+\) (re)
   "A/a2": match \(checksum:0, mtime:\d+\) (re)
   0s: 2 matched, 0 modified, 0 new, 0 corrupted
   open B/b1: permission denied
   [1]
 
-  $ summer -v update A B C
+  $ summer --parallel=1 -v update A B C
   "A/a1": match \(checksum:0, mtime:\d+\) (re)
   "A/a2": match \(checksum:0, mtime:\d+\) (re)
   0s: 2 matched, 0 modified, 0 new, 0 corrupted
diff --git a/test/singlefile.t b/test/singlefile.t
index e9068bc..fee8406 100644
--- a/test/singlefile.t
+++ b/test/singlefile.t
@@ -8,7 +8,7 @@ Test that summer works fine when given a file instead of a directory.
   "./empty": writing checksum \(checksum:0, mtime:\d+\) (re)
   0s: 0 matched, 0 modified, 1 new, 0 corrupted
 
-  $ summer -v verify .
+  $ summer --parallel=1 -v verify .
   "empty": match \(checksum:0, mtime:\d+\) (re)
   "hola": missing checksum attribute
   0s: 1 matched, 0 modified, 1 new, 0 corrupted
@@ -16,7 +16,7 @@ Test that summer works fine when given a file instead of a directory.
   $ summer update ./hola
   0s: 0 matched, 0 modified, 1 new, 0 corrupted
 
-  $ summer -v verify .
+  $ summer --parallel=1 -v verify .
   "empty": match \(checksum:0, mtime:\d+\) (re)
   "hola": match \(checksum:239059f6, mtime:\d+\) (re)
   0s: 2 matched, 0 modified, 0 new, 0 corrupted
diff --git a/walk.go b/walk.go
index 403a390..fa6faa3 100644
--- a/walk.go
+++ b/walk.go
@@ -5,6 +5,7 @@ import (
 	"io/fs"
 	"os"
 	"path/filepath"
+	"sync"
 	"syscall"
 )
 
@@ -64,18 +65,44 @@ func getDeviceForPath(path string) deviceID {
 
 type walkFn func(fd *os.File, info fs.FileInfo, p *Progress) error
 
+type walkItem struct {
+	fd   *os.File
+	info fs.FileInfo
+	p    *Progress
+}
+
 func walk(roots []string, fn walkFn) error {
 	rootDev := deviceID(0)
 	p := NewProgress(options.isTTY)
 	defer p.Stop()
 
+	// Launch the workers.
+	wg := sync.WaitGroup{}
+	workC := make(chan walkItem)
+	workerErrs := make(chan error, options.parallel)
+	for i := 0; i < options.parallel; i++ {
+		wg.Add(1)
+		go worker(&wg, workC, fn, workerErrs)
+	}
+
+	// Helper function used by filepath.WalkDir to send items to the workers.
 	wfn := func(path string, d fs.DirEntry, err error) error {
+		// On each iteration, check if any of the workers had an error.
+		// If so, return it, which stops the walk immediately.
+		if werr, ok := hasErr(workerErrs); ok {
+			return werr
+		}
+
+		// Open the file one by one, because as part of doing so, the function
+		// will return fs.SkipDir as needed, so we can't parallelize it.
 		ok, fd, info, err := openAndInfo(path, d, err, rootDev)
 		if !ok || err != nil {
 			return err
 		}
-		defer fd.Close()
-		return fn(fd, info, p)
+
+		// Send the work to the workers. They will close the fd.
+		workC <- walkItem{fd, info, p}
+		return nil
 	}
 
 	var err error
@@ -86,9 +113,36 @@ func walk(roots []string, fn walkFn) error {
 			break
 		}
 	}
+	close(workC)
+	wg.Wait()
+
+	// Check for any errors in the last iterations.
+	if werr, ok := hasErr(workerErrs); err == nil && ok {
+		err = werr
+	}
 
 	if p.corrupted > 0 && err == nil {
 		err = fmt.Errorf("detected %d corrupted files", p.corrupted)
 	}
 	return err
 }
+
+func worker(wg *sync.WaitGroup, c chan walkItem, fn walkFn, errc chan error) {
+	defer wg.Done()
+	for item := range c {
+		err := fn(item.fd, item.info, item.p)
+		item.fd.Close()
+		if err != nil {
+			errc <- fmt.Errorf("error in %q: %w", item.fd.Name(), err)
+		}
+	}
+}
+
+func hasErr(errc chan error) (error, bool) {
+	select {
+	case err := <-errc:
+		return err, true
+	default:
+		return nil, false
+	}
+}