X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b6d7efab2c4bffa3fabd55b166e44cca8ac1391f..fb429aa6a8dd1d28d08038abd8de8b9206a1d51e:/services/keepstore/unix_volume.go diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index ceccd11c92..a053ba3e6b 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "context" @@ -135,6 +135,7 @@ func (v *UnixVolume) GetDeviceID() string { if err != nil { return giveup("opening %q: %s", udir, err) } + defer d.Close() uuids, err := d.Readdirnames(0) if err != nil { return giveup("reading %q: %s", udir, err) @@ -172,10 +173,10 @@ func (v *UnixVolume) Touch(loc string) error { return e } defer v.unlockfile(f) - ts := syscall.NsecToTimespec(time.Now().UnixNano()) + ts := time.Now() v.os.stats.TickOps("utimes") v.os.stats.Tick(&v.os.stats.UtimesOps) - err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts}) + err = os.Chtimes(p, ts, ts) v.os.stats.TickErr(err) return err } @@ -274,34 +275,39 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) return fmt.Errorf("error creating directory %s: %s", bdir, err) } - tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc) - if tmperr != nil { - return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr) - } - bpath := v.blockPath(loc) + tmpfile, err := v.os.TempFile(bdir, "tmp"+loc) + if err != nil { + return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, err) + } + defer v.os.Remove(tmpfile.Name()) + defer tmpfile.Close() - if err := v.lock(ctx); err != nil { + if err = v.lock(ctx); err != nil { return err } defer v.unlock() n, err := io.Copy(tmpfile, rdr) v.os.stats.TickOutBytes(uint64(n)) if err != nil { - err = fmt.Errorf("error writing %s: %s", bpath, err) - tmpfile.Close() - v.os.Remove(tmpfile.Name()) - return err + return fmt.Errorf("error writing %s: %s", bpath, err) } - if err := tmpfile.Close(); err != nil { - err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err) - v.os.Remove(tmpfile.Name()) - return err + if err = tmpfile.Close(); err != nil { + return fmt.Errorf("error closing %s: %s", tmpfile.Name(), err) } - if err := v.os.Rename(tmpfile.Name(), bpath); err != nil { - err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err) - v.os.Remove(tmpfile.Name()) - return err + // ext4 uses a low-precision clock and effectively backdates + // files by up to 10 ms, sometimes across a 1-second boundary, + // which produces confusing results in logs and tests. We + // avoid this by setting the output file's timestamps + // explicitly, using a higher resolution clock. + ts := time.Now() + v.os.stats.TickOps("utimes") + v.os.stats.Tick(&v.os.stats.UtimesOps) + if err = os.Chtimes(tmpfile.Name(), ts, ts); err != nil { + return fmt.Errorf("error setting timestamps on %s: %s", tmpfile.Name(), err) + } + if err = v.os.Rename(tmpfile.Name(), bpath); err != nil { + return fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err) } return nil } @@ -353,47 +359,55 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`) // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 // func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { - var lastErr error rootdir, err := v.os.Open(v.Root) if err != nil { return err } - defer rootdir.Close() v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) - for { - names, err := rootdir.Readdirnames(1) - if err == io.EOF { - return lastErr - } else if err != nil { - return err - } - if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) { + subdirs, err := rootdir.Readdirnames(-1) + rootdir.Close() + if err != nil { + return err + } + for _, subdir := range subdirs { + if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) { // prefix excludes all blocks stored in this dir continue } - if !blockDirRe.MatchString(names[0]) { + if !blockDirRe.MatchString(subdir) { continue } - blockdirpath := filepath.Join(v.Root, names[0]) - blockdir, err := v.os.Open(blockdirpath) - if err != nil { - v.logger.WithError(err).Errorf("error reading %q", blockdirpath) - lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err) - continue - } - v.os.stats.TickOps("readdir") - v.os.stats.Tick(&v.os.stats.ReaddirOps) - for { - fileInfo, err := blockdir.Readdir(1) - if err == io.EOF { + blockdirpath := filepath.Join(v.Root, subdir) + + var dirents []os.DirEntry + for attempt := 0; ; attempt++ { + v.os.stats.TickOps("readdir") + v.os.stats.Tick(&v.os.stats.ReaddirOps) + dirents, err = os.ReadDir(blockdirpath) + if err == nil { break + } else if attempt < 5 && strings.Contains(err.Error(), "errno 523") { + // EBADCOOKIE (NFS stopped accepting + // our readdirent cookie) -- retry a + // few times before giving up + v.logger.WithError(err).Printf("retry after error reading %s", blockdirpath) + continue + } else { + return err + } + } + + for _, dirent := range dirents { + fileInfo, err := dirent.Info() + if os.IsNotExist(err) { + // File disappeared between ReadDir() and now + continue } else if err != nil { - v.logger.WithError(err).Errorf("error reading %q", blockdirpath) - lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err) - break + v.logger.WithError(err).Errorf("error getting FileInfo for %q in %q", dirent.Name(), blockdirpath) + return err } - name := fileInfo[0].Name() + name := fileInfo.Name() if !strings.HasPrefix(name, prefix) { continue } @@ -402,16 +416,15 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { } _, err = fmt.Fprint(w, name, - "+", fileInfo[0].Size(), - " ", fileInfo[0].ModTime().UnixNano(), + "+", fileInfo.Size(), + " ", fileInfo.ModTime().UnixNano(), "\n") if err != nil { - blockdir.Close() return fmt.Errorf("error writing: %s", err) } } - blockdir.Close() } + return nil } // Trash trashes the block data from the unix storage @@ -686,10 +699,20 @@ func (v *UnixVolume) EmptyTrash() { err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error { if err != nil { v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path) + // Don't give up -- keep walking other + // files/dirs return nil + } else if !info.Mode().IsDir() { + todo <- dirent{path, info} + return nil + } else if path == v.Root || blockDirRe.MatchString(info.Name()) { + // Descend into a directory that we might have + // put trash in. + return nil + } else { + // Don't descend into other dirs. + return filepath.SkipDir } - todo <- dirent{path, info} - return nil }) close(todo) wg.Wait()