13383: Implement EmptyTrashWorkers in azure and filesystem backends.
[arvados.git] / services / keepstore / volume_unix.go
index 681095d3a268e0ed9270dd979204d06becdd0b91..23d675359244942097072d88e1bd98daf9d46c6c 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -8,15 +12,15 @@ import (
        "io"
        "io/ioutil"
        "os"
+       "os/exec"
        "path/filepath"
        "regexp"
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "syscall"
        "time"
-
-       log "github.com/Sirupsen/logrus"
 )
 
 type unixVolumeAdder struct {
@@ -107,6 +111,7 @@ type UnixVolume struct {
        ReadOnly             bool
        Serialize            bool
        DirectoryReplication int
+       StorageClasses       []string
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -115,6 +120,82 @@ type UnixVolume struct {
        os osWithStats
 }
 
+// DeviceID returns a globally unique ID for the volume's root
+// directory, consisting of the filesystem's UUID and the path from
+// filesystem root to storage directory, joined by "/". For example,
+// the DeviceID for a local directory "/mnt/xvda1/keep" might be
+// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
+func (v *UnixVolume) DeviceID() string {
+       giveup := func(f string, args ...interface{}) string {
+               log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
+               return ""
+       }
+       buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
+       if err != nil {
+               return giveup("findmnt: %s (%q)", err, buf)
+       }
+       findmnt := strings.Fields(string(buf))
+       if len(findmnt) < 2 {
+               return giveup("could not parse findmnt output: %q", buf)
+       }
+       fsRoot, dev := findmnt[0], findmnt[1]
+
+       absRoot, err := filepath.Abs(v.Root)
+       if err != nil {
+               return giveup("resolving relative path %q: %s", v.Root, err)
+       }
+       realRoot, err := filepath.EvalSymlinks(absRoot)
+       if err != nil {
+               return giveup("resolving symlinks in %q: %s", absRoot, err)
+       }
+
+       // Find path from filesystem root to realRoot
+       var fsPath string
+       if strings.HasPrefix(realRoot, fsRoot+"/") {
+               fsPath = realRoot[len(fsRoot):]
+       } else if fsRoot == "/" {
+               fsPath = realRoot
+       } else if fsRoot == realRoot {
+               fsPath = ""
+       } else {
+               return giveup("findmnt reports mount point %q which is not a prefix of volume root %q", fsRoot, realRoot)
+       }
+
+       if !strings.HasPrefix(dev, "/") {
+               return giveup("mount %q device %q is not a path", fsRoot, dev)
+       }
+
+       fi, err := os.Stat(dev)
+       if err != nil {
+               return giveup("stat %q: %s\n", dev, err)
+       }
+       ino := fi.Sys().(*syscall.Stat_t).Ino
+
+       // Find a symlink in /dev/disk/by-uuid/ whose target is (i.e.,
+       // has the same inode as) the mounted device
+       udir := "/dev/disk/by-uuid"
+       d, err := os.Open(udir)
+       if err != nil {
+               return giveup("opening %q: %s", udir, err)
+       }
+       uuids, err := d.Readdirnames(0)
+       if err != nil {
+               return giveup("reading %q: %s", udir, err)
+       }
+       for _, uuid := range uuids {
+               link := filepath.Join(udir, uuid)
+               fi, err = os.Stat(link)
+               if err != nil {
+                       log.Printf("error: stat %q: %s", link, err)
+                       continue
+               }
+               if fi.Sys().(*syscall.Stat_t).Ino == ino {
+                       return uuid + fsPath
+               }
+       }
+       return giveup("could not find entry in %q matching %q", udir, dev)
+}
+
 // Examples implements VolumeWithExamples.
 func (*UnixVolume) Examples() []Volume {
        return []Volume{
@@ -358,6 +439,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                return err
        }
        defer rootdir.Close()
+       v.os.stats.Tick(&v.os.stats.ReaddirOps)
        for {
                names, err := rootdir.Readdirnames(1)
                if err == io.EOF {
@@ -379,6 +461,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        lastErr = err
                        continue
                }
+               v.os.stats.Tick(&v.os.stats.ReaddirOps)
                for {
                        fileInfo, err := blockdir.Readdir(1)
                        if err == io.EOF {
@@ -400,6 +483,11 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                                "+", fileInfo[0].Size(),
                                " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
+                       if err != nil {
+                               log.Print("Error writing : ", err)
+                               lastErr = err
+                               break
+                       }
                }
                blockdir.Close()
        }
@@ -461,6 +549,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                return MethodDisabledError
        }
 
+       v.os.stats.Tick(&v.os.stats.ReaddirOps)
        files, err := ioutil.ReadDir(v.blockDir(loc))
        if err != nil {
                return err
@@ -562,6 +651,11 @@ func (v *UnixVolume) Replication() int {
        return v.DirectoryReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // InternalStats returns I/O and filesystem ops counters.
 func (v *UnixVolume) InternalStats() interface{} {
        return &v.os.stats
@@ -632,39 +726,61 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
-       var blocksDeleted, blocksInTrash int
+       var blocksDeleted, blocksInTrash int64
 
-       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
-               if err != nil {
-                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
-                       return nil
-               }
+       doFile := func(path string, info os.FileInfo) {
                if info.Mode().IsDir() {
-                       return nil
+                       return
                }
                matches := unixTrashLocRegexp.FindStringSubmatch(path)
                if len(matches) != 3 {
-                       return nil
+                       return
                }
                deadline, err := strconv.ParseInt(matches[2], 10, 64)
                if err != nil {
                        log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
-                       return nil
+                       return
                }
-               bytesInTrash += info.Size()
-               blocksInTrash++
+               atomic.AddInt64(&bytesInTrash, info.Size())
+               atomic.AddInt64(&blocksInTrash, 1)
                if deadline > time.Now().Unix() {
-                       return nil
+                       return
                }
                err = v.os.Remove(path)
                if err != nil {
                        log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       return
+               }
+               atomic.AddInt64(&bytesDeleted, info.Size())
+               atomic.AddInt64(&blocksDeleted, 1)
+       }
+
+       type dirent struct {
+               path string
+               info os.FileInfo
+       }
+       var wg sync.WaitGroup
+       todo := make(chan dirent, theConfig.EmptyTrashWorkers)
+       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for e := range todo {
+                               doFile(e.path, e.info)
+                       }
+               }()
+       }
+
+       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
+               if err != nil {
+                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
                        return nil
                }
-               bytesDeleted += info.Size()
-               blocksDeleted++
+               todo <- dirent{path, info}
                return nil
        })
+       close(todo)
+       wg.Wait()
 
        if err != nil {
                log.Printf("EmptyTrash error for %v: %v", v.String(), err)