X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c8b119d10b41cd507a6677d4feab7974362a153e..da83807d6bcef1c1f0bb78479c5ec17f150f5eda:/services/keepstore/unix_volume.go diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index 1706473cc8..dee4bdc1c1 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "context" @@ -135,6 +135,7 @@ func (v *UnixVolume) GetDeviceID() string { if err != nil { return giveup("opening %q: %s", udir, err) } + defer d.Close() uuids, err := d.Readdirnames(0) if err != nil { return giveup("reading %q: %s", udir, err) @@ -274,29 +275,25 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) return fmt.Errorf("error creating directory %s: %s", bdir, err) } - tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc) - if tmperr != nil { - return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr) - } - bpath := v.blockPath(loc) + tmpfile, err := v.os.TempFile(bdir, "tmp"+loc) + if err != nil { + return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, err) + } + defer v.os.Remove(tmpfile.Name()) + defer tmpfile.Close() - if err := v.lock(ctx); err != nil { + if err = v.lock(ctx); err != nil { return err } defer v.unlock() n, err := io.Copy(tmpfile, rdr) v.os.stats.TickOutBytes(uint64(n)) if err != nil { - err = fmt.Errorf("error writing %s: %s", bpath, err) - tmpfile.Close() - v.os.Remove(tmpfile.Name()) - return err + return fmt.Errorf("error writing %s: %s", bpath, err) } - if err := tmpfile.Close(); err != nil { - err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err) - v.os.Remove(tmpfile.Name()) - return err + if err = tmpfile.Close(); err != nil { + return fmt.Errorf("error closing %s: %s", tmpfile.Name(), err) } // ext4 uses a low-precision clock and effectively backdates // files by up to 10 ms, sometimes across a 1-second boundary, @@ -307,28 +304,28 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) v.os.stats.TickOps("utimes") v.os.stats.Tick(&v.os.stats.UtimesOps) if err = os.Chtimes(tmpfile.Name(), ts, ts); err != nil { - err = fmt.Errorf("error setting timestamps on %s: %s", tmpfile.Name(), err) - v.os.Remove(tmpfile.Name()) - return err + return fmt.Errorf("error setting timestamps on %s: %s", tmpfile.Name(), err) } - if err := v.os.Rename(tmpfile.Name(), bpath); err != nil { - err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err) - v.os.Remove(tmpfile.Name()) - return err + if err = v.os.Rename(tmpfile.Name(), bpath); err != nil { + return fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err) } return nil } // Status returns a VolumeStatus struct describing the volume's // current state, or nil if an error occurs. -// func (v *UnixVolume) Status() *VolumeStatus { fi, err := v.os.Stat(v.Root) if err != nil { v.logger.WithError(err).Error("stat failed") return nil } - devnum := fi.Sys().(*syscall.Stat_t).Dev + // uint64() cast here supports GOOS=darwin where Dev is + // int32. If the device number is negative, the unsigned + // devnum won't be the real device number any more, but that's + // fine -- all we care about is getting the same number each + // time. + devnum := uint64(fi.Sys().(*syscall.Stat_t).Dev) var fs syscall.Statfs_t if err := syscall.Statfs(v.Root, &fs); err != nil { @@ -357,56 +354,63 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`) // // Each block is given in the format // -// locator+size modification-time {newline} +// locator+size modification-time {newline} // // e.g.: // -// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303 -// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 -// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 -// +// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303 +// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 +// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { - var lastErr error rootdir, err := v.os.Open(v.Root) if err != nil { return err } - defer rootdir.Close() v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) - for { - names, err := rootdir.Readdirnames(1) - if err == io.EOF { - return lastErr - } else if err != nil { - return err - } - if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) { + subdirs, err := rootdir.Readdirnames(-1) + rootdir.Close() + if err != nil { + return err + } + for _, subdir := range subdirs { + if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) { // prefix excludes all blocks stored in this dir continue } - if !blockDirRe.MatchString(names[0]) { + if !blockDirRe.MatchString(subdir) { continue } - blockdirpath := filepath.Join(v.Root, names[0]) - blockdir, err := v.os.Open(blockdirpath) - if err != nil { - v.logger.WithError(err).Errorf("error reading %q", blockdirpath) - lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err) - continue - } - v.os.stats.TickOps("readdir") - v.os.stats.Tick(&v.os.stats.ReaddirOps) - for { - fileInfo, err := blockdir.Readdir(1) - if err == io.EOF { + blockdirpath := filepath.Join(v.Root, subdir) + + var dirents []os.DirEntry + for attempt := 0; ; attempt++ { + v.os.stats.TickOps("readdir") + v.os.stats.Tick(&v.os.stats.ReaddirOps) + dirents, err = os.ReadDir(blockdirpath) + if err == nil { break + } else if attempt < 5 && strings.Contains(err.Error(), "errno 523") { + // EBADCOOKIE (NFS stopped accepting + // our readdirent cookie) -- retry a + // few times before giving up + v.logger.WithError(err).Printf("retry after error reading %s", blockdirpath) + continue + } else { + return err + } + } + + for _, dirent := range dirents { + fileInfo, err := dirent.Info() + if os.IsNotExist(err) { + // File disappeared between ReadDir() and now + continue } else if err != nil { - v.logger.WithError(err).Errorf("error reading %q", blockdirpath) - lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err) - break + v.logger.WithError(err).Errorf("error getting FileInfo for %q in %q", dirent.Name(), blockdirpath) + return err } - name := fileInfo[0].Name() + name := fileInfo.Name() if !strings.HasPrefix(name, prefix) { continue } @@ -415,16 +419,15 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { } _, err = fmt.Fprint(w, name, - "+", fileInfo[0].Size(), - " ", fileInfo[0].ModTime().UnixNano(), + "+", fileInfo.Size(), + " ", fileInfo.ModTime().UnixNano(), "\n") if err != nil { - blockdir.Close() return fmt.Errorf("error writing: %s", err) } } - blockdir.Close() } + return nil } // Trash trashes the block data from the unix storage @@ -439,8 +442,7 @@ func (v *UnixVolume) Trash(loc string) error { // be re-written), or (b) Touch() will update the file's timestamp and // Trash() will read the correct up-to-date timestamp and choose not to // trash the file. - - if v.volume.ReadOnly || !v.cluster.Collections.BlobTrash { + if v.volume.ReadOnly && !v.volume.AllowTrashWhenReadOnly { return MethodDisabledError } if err := v.lock(context.TODO()); err != nil { @@ -527,7 +529,6 @@ func (v *UnixVolume) blockPath(loc string) string { // IsFull returns true if the free space on the volume is less than // MinFreeKilobytes. -// func (v *UnixVolume) IsFull() (isFull bool) { fullSymlink := v.Root + "/full" @@ -558,7 +559,6 @@ func (v *UnixVolume) IsFull() (isFull bool) { // FreeDiskSpace returns the number of unused 1k blocks available on // the volume. -// func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) { var fs syscall.Statfs_t err = syscall.Statfs(v.Root, &fs) @@ -646,10 +646,6 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`) // EmptyTrash walks hierarchy looking for {hash}.trash.* // and deletes those with deadline < now. func (v *UnixVolume) EmptyTrash() { - if v.cluster.Collections.BlobDeleteConcurrency < 1 { - return - } - var bytesDeleted, bytesInTrash int64 var blocksDeleted, blocksInTrash int64