21126: Add AllowTrashWhenReadOnly flag.
[arvados.git] / services / keepstore / unix_volume.go
index 46f4db4095bfb286c82f4f07a988c16cee5ebe63..dee4bdc1c1ed1d59badb076dedac2615eef3f2d7 100644 (file)
@@ -314,14 +314,18 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 
 // Status returns a VolumeStatus struct describing the volume's
 // current state, or nil if an error occurs.
-//
 func (v *UnixVolume) Status() *VolumeStatus {
        fi, err := v.os.Stat(v.Root)
        if err != nil {
                v.logger.WithError(err).Error("stat failed")
                return nil
        }
-       devnum := fi.Sys().(*syscall.Stat_t).Dev
+       // uint64() cast here supports GOOS=darwin where Dev is
+       // int32. If the device number is negative, the unsigned
+       // devnum won't be the real device number any more, but that's
+       // fine -- all we care about is getting the same number each
+       // time.
+       devnum := uint64(fi.Sys().(*syscall.Stat_t).Dev)
 
        var fs syscall.Statfs_t
        if err := syscall.Statfs(v.Root, &fs); err != nil {
@@ -350,14 +354,13 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //
 // Each block is given in the format
 //
-//     locator+size modification-time {newline}
+//     locator+size modification-time {newline}
 //
 // e.g.:
 //
-//     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-//     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-//     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-//
+//     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+//     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+//     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        rootdir, err := v.os.Open(v.Root)
        if err != nil {
@@ -379,23 +382,25 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        continue
                }
                blockdirpath := filepath.Join(v.Root, subdir)
-               blockdir, err := v.os.Open(blockdirpath)
-               if err != nil {
-                       v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
-                       return fmt.Errorf("error reading %q: %s", blockdirpath, err)
-               }
-               v.os.stats.TickOps("readdir")
-               v.os.stats.Tick(&v.os.stats.ReaddirOps)
-               // ReadDir() (compared to Readdir(), which returns
-               // FileInfo structs) helps complete the sequence of
-               // readdirent calls as quickly as possible, reducing
-               // the likelihood of NFS EBADCOOKIE (523) errors.
-               dirents, err := blockdir.ReadDir(-1)
-               blockdir.Close()
-               if err != nil {
-                       v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
-                       return fmt.Errorf("error reading %q: %s", blockdirpath, err)
+
+               var dirents []os.DirEntry
+               for attempt := 0; ; attempt++ {
+                       v.os.stats.TickOps("readdir")
+                       v.os.stats.Tick(&v.os.stats.ReaddirOps)
+                       dirents, err = os.ReadDir(blockdirpath)
+                       if err == nil {
+                               break
+                       } else if attempt < 5 && strings.Contains(err.Error(), "errno 523") {
+                               // EBADCOOKIE (NFS stopped accepting
+                               // our readdirent cookie) -- retry a
+                               // few times before giving up
+                               v.logger.WithError(err).Printf("retry after error reading %s", blockdirpath)
+                               continue
+                       } else {
+                               return err
+                       }
                }
+
                for _, dirent := range dirents {
                        fileInfo, err := dirent.Info()
                        if os.IsNotExist(err) {
@@ -437,8 +442,7 @@ func (v *UnixVolume) Trash(loc string) error {
        // be re-written), or (b) Touch() will update the file's timestamp and
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
-
-       if v.volume.ReadOnly || !v.cluster.Collections.BlobTrash {
+       if v.volume.ReadOnly && !v.volume.AllowTrashWhenReadOnly {
                return MethodDisabledError
        }
        if err := v.lock(context.TODO()); err != nil {
@@ -525,7 +529,6 @@ func (v *UnixVolume) blockPath(loc string) string {
 
 // IsFull returns true if the free space on the volume is less than
 // MinFreeKilobytes.
-//
 func (v *UnixVolume) IsFull() (isFull bool) {
        fullSymlink := v.Root + "/full"
 
@@ -556,7 +559,6 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 
 // FreeDiskSpace returns the number of unused 1k blocks available on
 // the volume.
-//
 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
        err = syscall.Statfs(v.Root, &fs)
@@ -644,10 +646,6 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() {
-       if v.cluster.Collections.BlobDeleteConcurrency < 1 {
-               return
-       }
-
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int64