Merge branch '9857-cwl-acceptlist-re' refs #9857
[arvados.git] / services / keepstore / volume_unix.go
index a7ad6f9e499c80439c27cb1beed33060674ed776..5982fb0484eae0a37ef09e24b9526492c5b0459f 100644 (file)
@@ -1,8 +1,9 @@
-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
 package main
 
 import (
+       "bufio"
+       "errors"
+       "flag"
        "fmt"
        "io"
        "io/ioutil"
@@ -17,14 +18,108 @@ import (
        "time"
 )
 
+type unixVolumeAdder struct {
+       *volumeSet
+}
+
+func (vs *unixVolumeAdder) Set(value string) error {
+       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+               log.Print("DEPRECATED: using comma-separated volume list.")
+               for _, dir := range dirs {
+                       if err := vs.Set(dir); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+       if len(value) == 0 || value[0] != '/' {
+               return errors.New("Invalid volume: must begin with '/'.")
+       }
+       if _, err := os.Stat(value); err != nil {
+               return err
+       }
+       var locker sync.Locker
+       if flagSerializeIO {
+               locker = &sync.Mutex{}
+       }
+       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
+               root:     value,
+               locker:   locker,
+               readonly: flagReadonly,
+       })
+       return nil
+}
+
+func init() {
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volumes",
+               "Deprecated synonym for -volume.")
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volume",
+               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+}
+
+// Discover adds a UnixVolume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *unixVolumeAdder) Discover() int {
+       added := 0
+       f, err := os.Open(ProcMounts)
+       if err != nil {
+               log.Fatalf("opening %s: %s", ProcMounts, err)
+       }
+       scanner := bufio.NewScanner(f)
+       for scanner.Scan() {
+               args := strings.Fields(scanner.Text())
+               if err := scanner.Err(); err != nil {
+                       log.Fatalf("reading %s: %s", ProcMounts, err)
+               }
+               dev, mount := args[0], args[1]
+               if mount == "/" {
+                       continue
+               }
+               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+                       continue
+               }
+               keepdir := mount + "/keep"
+               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+                       continue
+               }
+               // Set the -readonly flag (but only for this volume)
+               // if the filesystem is mounted readonly.
+               flagReadonlyWas := flagReadonly
+               for _, fsopt := range strings.Split(args[3], ",") {
+                       if fsopt == "ro" {
+                               flagReadonly = true
+                               break
+                       }
+                       if fsopt == "rw" {
+                               break
+                       }
+               }
+               if err := vs.Set(keepdir); err != nil {
+                       log.Printf("adding %q: %s", keepdir, err)
+               } else {
+                       added++
+               }
+               flagReadonly = flagReadonlyWas
+       }
+       return added
+}
+
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       root      string // path to the volume's root directory
-       serialize bool
-       readonly  bool
-       mutex     sync.Mutex
+       // path to the volume's root directory
+       root string
+       // something to lock during IO, typically a sync.Mutex (or nil
+       // to skip locking)
+       locker   sync.Locker
+       readonly bool
 }
 
+// Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -35,62 +130,87 @@ func (v *UnixVolume) Touch(loc string) error {
                return err
        }
        defer f.Close()
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if e := lockfile(f); e != nil {
                return e
        }
        defer unlockfile(f)
-       now := time.Now().Unix()
-       utime := syscall.Utimbuf{now, now}
-       return syscall.Utime(p, &utime)
+       ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
 }
 
+// Mtime returns the stored timestamp for the given locator.
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
-       if fi, err := os.Stat(p); err != nil {
+       fi, err := os.Stat(p)
+       if err != nil {
                return time.Time{}, err
-       } else {
-               return fi.ModTime(), nil
        }
+       return fi.ModTime(), nil
 }
 
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
-       path := v.blockPath(loc)
-       stat, err := os.Stat(path)
-       if err != nil {
-               return nil, err
-       }
-       if stat.Size() < 0 {
-               return nil, os.ErrInvalid
-       } else if stat.Size() == 0 {
-               return bufs.Get(0), nil
-       } else if stat.Size() > BLOCKSIZE {
-               return nil, TooLongError
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        f, err := os.Open(path)
        if err != nil {
-               return nil, err
+               return err
        }
        defer f.Close()
-       buf := bufs.Get(int(stat.Size()))
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+       stat, err := os.Stat(path)
+       if err == nil {
+               if stat.Size() < 0 {
+                       err = os.ErrInvalid
+               } else if stat.Size() > BlockSize {
+                       err = TooLongError
+               }
        }
-       _, err = io.ReadFull(f, buf)
+       return stat, err
+}
+
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+       path := v.blockPath(loc)
+       stat, err := v.stat(path)
        if err != nil {
-               bufs.Put(buf)
-               return nil, err
+               return 0, v.translateError(err)
+       }
+       if stat.Size() > int64(len(buf)) {
+               return 0, TooLongError
+       }
+       var read int
+       size := int(stat.Size())
+       err = v.getFunc(path, func(rdr io.Reader) error {
+               read, err = io.ReadFull(rdr, buf[:size])
+               return err
+       })
+       return read, err
+}
+
+// Compare returns nil if Get(loc) would return the same content as
+// expect. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
+       path := v.blockPath(loc)
+       if _, err := v.stat(path); err != nil {
+               return v.translateError(err)
        }
-       return buf, nil
+       return v.getFunc(path, func(rdr io.Reader) error {
+               return compareReaderWithBuf(rdr, expect, loc[:32])
+       })
 }
 
 // Put stores a block of data identified by the locator string
@@ -118,9 +238,9 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
        }
        bpath := v.blockPath(loc)
 
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
@@ -169,6 +289,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
+var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 // IndexTo writes (to the given Writer) a list of blocks found on this
 // volume which begin with the specified prefix. If the prefix is an
@@ -185,7 +306,7 @@ var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
-       var lastErr error = nil
+       var lastErr error
        rootdir, err := os.Open(v.root)
        if err != nil {
                return err
@@ -225,31 +346,38 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        if !strings.HasPrefix(name, prefix) {
                                continue
                        }
+                       if !blockFileRe.MatchString(name) {
+                               continue
+                       }
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
-                               " ", fileInfo[0].ModTime().Unix(),
+                               " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
                }
                blockdir.Close()
        }
 }
 
-func (v *UnixVolume) Delete(loc string) error {
+// Trash trashes the block data from the unix storage
+// If trashLifetime == 0, the block is deleted
+// Else, the block is renamed as path/{loc}.trash.{deadline},
+// where deadline = now + trashLifetime
+func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
-       // and Delete() because either (a) the file will be deleted and Touch()
+       // and Trash() because either (a) the file will be trashed and Touch()
        // will signal to the caller that the file is not present (and needs to
        // be re-written), or (b) Touch() will update the file's timestamp and
-       // Delete() will read the correct up-to-date timestamp and choose not to
-       // delete the file.
+       // Trash() will read the correct up-to-date timestamp and choose not to
+       // trash the file.
 
        if v.readonly {
                return MethodDisabledError
        }
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
@@ -262,19 +390,57 @@ func (v *UnixVolume) Delete(loc string) error {
        }
        defer unlockfile(f)
 
-       // If the block has been PUT in the last blob_signature_ttl
+       // If the block has been PUT in the last blobSignatureTTL
        // seconds, return success without removing the block. This
        // protects data from garbage collection until it is no longer
        // possible for clients to retrieve the unreferenced blocks
        // anyway (because the permission signatures have expired).
        if fi, err := os.Stat(p); err != nil {
                return err
-       } else {
-               if time.Since(fi.ModTime()) < blob_signature_ttl {
-                       return nil
+       } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+               return nil
+       }
+
+       if trashLifetime == 0 {
+               return os.Remove(p)
+       }
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+}
+
+// Untrash moves block from trash back into store
+// Look for path/{loc}.trash.{deadline} in storage,
+// and rename the first such file as path/{loc}
+func (v *UnixVolume) Untrash(loc string) (err error) {
+       if v.readonly {
+               return MethodDisabledError
+       }
+
+       files, err := ioutil.ReadDir(v.blockDir(loc))
+       if err != nil {
+               return err
+       }
+
+       if len(files) == 0 {
+               return os.ErrNotExist
+       }
+
+       foundTrash := false
+       prefix := fmt.Sprintf("%v.trash.", loc)
+       for _, f := range files {
+               if strings.HasPrefix(f.Name(), prefix) {
+                       foundTrash = true
+                       err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+                       if err == nil {
+                               break
+                       }
                }
        }
-       return os.Remove(p)
+
+       if foundTrash == false {
+               return os.ErrNotExist
+       }
+
+       return
 }
 
 // blockDir returns the fully qualified directory name for the directory
@@ -290,7 +456,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 }
 
 // IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
        fullSymlink := v.root + "/full"
@@ -306,7 +472,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MIN_FREE_KILOBYTES
+               isFull = avail < MinFreeKilobytes
        } else {
                log.Printf("%s: FreeDiskSpace: %s\n", v, err)
                isFull = false
@@ -338,10 +504,18 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
 func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }
 
+// Replication returns the number of replicas promised by the
+// underlying device (currently assumed to be 1).
+func (v *UnixVolume) Replication() int {
+       return 1
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
@@ -350,3 +524,63 @@ func lockfile(f *os.File) error {
 func unlockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
 }
+
+// Where appropriate, translate a more specific filesystem error to an
+// error recognized by handlers, like os.ErrNotExist.
+func (v *UnixVolume) translateError(err error) error {
+       switch err.(type) {
+       case *os.PathError:
+               // stat() returns a PathError if the parent directory
+               // (not just the file itself) is missing
+               return os.ErrNotExist
+       default:
+               return err
+       }
+}
+
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+
+// EmptyTrash walks hierarchy looking for {hash}.trash.*
+// and deletes those with deadline < now.
+func (v *UnixVolume) EmptyTrash() {
+       var bytesDeleted, bytesInTrash int64
+       var blocksDeleted, blocksInTrash int
+
+       err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+               if err != nil {
+                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+                       return nil
+               }
+               if info.Mode().IsDir() {
+                       return nil
+               }
+               matches := unixTrashLocRegexp.FindStringSubmatch(path)
+               if len(matches) != 3 {
+                       return nil
+               }
+               deadline, err := strconv.ParseInt(matches[2], 10, 64)
+               if err != nil {
+                       log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+                       return nil
+               }
+               bytesInTrash += info.Size()
+               blocksInTrash++
+               if deadline > time.Now().Unix() {
+                       return nil
+               }
+               err = os.Remove(path)
+               if err != nil {
+                       log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       return nil
+               }
+               bytesDeleted += info.Size()
+               blocksDeleted++
+               return nil
+       })
+
+       if err != nil {
+               log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+       }
+
+       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}