7159: Omit non-Keep blobs from index
[arvados.git] / services / keepstore / volume_unix.go
index 368ddc55f07a9c8f4b78c09ba0cd3917bbdf091a..910cc25d613cb7690f944b418aebf5c205c7aced 100644 (file)
@@ -1,7 +1,9 @@
 package main
 
 import (
 package main
 
 import (
-       "bytes"
+       "bufio"
+       "errors"
+       "flag"
        "fmt"
        "io"
        "io/ioutil"
        "fmt"
        "io"
        "io/ioutil"
@@ -16,14 +18,108 @@ import (
        "time"
 )
 
        "time"
 )
 
+type unixVolumeAdder struct {
+       *volumeSet
+}
+
+func (vs *unixVolumeAdder) Set(value string) error {
+       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+               log.Print("DEPRECATED: using comma-separated volume list.")
+               for _, dir := range dirs {
+                       if err := vs.Set(dir); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+       if len(value) == 0 || value[0] != '/' {
+               return errors.New("Invalid volume: must begin with '/'.")
+       }
+       if _, err := os.Stat(value); err != nil {
+               return err
+       }
+       var locker sync.Locker
+       if flagSerializeIO {
+               locker = &sync.Mutex{}
+       }
+       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
+               root:     value,
+               locker:   locker,
+               readonly: flagReadonly,
+       })
+       return nil
+}
+
+func init() {
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volumes",
+               "Deprecated synonym for -volume.")
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volume",
+               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+}
+
+// Discover adds a UnixVolume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *unixVolumeAdder) Discover() int {
+       added := 0
+       f, err := os.Open(ProcMounts)
+       if err != nil {
+               log.Fatalf("opening %s: %s", ProcMounts, err)
+       }
+       scanner := bufio.NewScanner(f)
+       for scanner.Scan() {
+               args := strings.Fields(scanner.Text())
+               if err := scanner.Err(); err != nil {
+                       log.Fatalf("reading %s: %s", ProcMounts, err)
+               }
+               dev, mount := args[0], args[1]
+               if mount == "/" {
+                       continue
+               }
+               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+                       continue
+               }
+               keepdir := mount + "/keep"
+               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+                       continue
+               }
+               // Set the -readonly flag (but only for this volume)
+               // if the filesystem is mounted readonly.
+               flagReadonlyWas := flagReadonly
+               for _, fsopt := range strings.Split(args[3], ",") {
+                       if fsopt == "ro" {
+                               flagReadonly = true
+                               break
+                       }
+                       if fsopt == "rw" {
+                               break
+                       }
+               }
+               if err := vs.Set(keepdir); err != nil {
+                       log.Printf("adding %q: %s", keepdir, err)
+               } else {
+                       added++
+               }
+               flagReadonly = flagReadonlyWas
+       }
+       return added
+}
+
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       root      string // path to the volume's root directory
-       serialize bool
-       readonly  bool
-       mutex     sync.Mutex
+       // path to the volume's root directory
+       root string
+       // something to lock during IO, typically a sync.Mutex (or nil
+       // to skip locking)
+       locker   sync.Locker
+       readonly bool
 }
 
 }
 
+// Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
 func (v *UnixVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -34,9 +130,9 @@ func (v *UnixVolume) Touch(loc string) error {
                return err
        }
        defer f.Close()
                return err
        }
        defer f.Close()
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if e := lockfile(f); e != nil {
                return e
        }
        if e := lockfile(f); e != nil {
                return e
@@ -47,27 +143,28 @@ func (v *UnixVolume) Touch(loc string) error {
        return syscall.Utime(p, &utime)
 }
 
        return syscall.Utime(p, &utime)
 }
 
+// Mtime returns the stored timestamp for the given locator.
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
-       if fi, err := os.Stat(p); err != nil {
+       fi, err := os.Stat(p)
+       if err != nil {
                return time.Time{}, err
                return time.Time{}, err
-       } else {
-               return fi.ModTime(), nil
        }
        }
+       return fi.ModTime(), nil
 }
 
 }
 
-// Open the given file, apply the serialize lock if enabled, and call
-// the given function if and when the file is ready to read.
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
+       }
        f, err := os.Open(path)
        if err != nil {
                return err
        }
        defer f.Close()
        f, err := os.Open(path)
        if err != nil {
                return err
        }
        defer f.Close()
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
-       }
        return fn(f)
 }
 
        return fn(f)
 }
 
@@ -77,7 +174,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        if err == nil {
                if stat.Size() < 0 {
                        err = os.ErrInvalid
        if err == nil {
                if stat.Size() < 0 {
                        err = os.ErrInvalid
-               } else if stat.Size() > BLOCKSIZE {
+               } else if stat.Size() > BlockSize {
                        err = TooLongError
                }
        }
                        err = TooLongError
                }
        }
@@ -92,7 +189,7 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               return nil, err
+               return nil, v.translateError(err)
        }
        buf := bufs.Get(int(stat.Size()))
        err = v.getFunc(path, func(rdr io.Reader) error {
        }
        buf := bufs.Get(int(stat.Size()))
        err = v.getFunc(path, func(rdr io.Reader) error {
@@ -107,40 +204,15 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 }
 
 // Compare returns nil if Get(loc) would return the same content as
 }
 
 // Compare returns nil if Get(loc) would return the same content as
-// cmp. It is functionally equivalent to Get() followed by
+// expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
 func (v *UnixVolume) Compare(loc string, expect []byte) error {
        path := v.blockPath(loc)
 // bytes.Compare(), but uses less memory.
 func (v *UnixVolume) Compare(loc string, expect []byte) error {
        path := v.blockPath(loc)
-       stat, err := v.stat(path)
-       if err != nil {
-               return err
-       }
-       bufLen := 1 << 20
-       if int64(bufLen) > stat.Size() {
-               bufLen = int(stat.Size())
+       if _, err := v.stat(path); err != nil {
+               return v.translateError(err)
        }
        }
-       cmp := expect
-       buf := make([]byte, bufLen)
        return v.getFunc(path, func(rdr io.Reader) error {
        return v.getFunc(path, func(rdr io.Reader) error {
-               // Loop invariants: all data read so far matched what
-               // we expected, and the first N bytes of cmp are
-               // expected to equal the next N bytes read from
-               // reader.
-               for {
-                       n, err := rdr.Read(buf)
-                       if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
-                               return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
-                       }
-                       cmp = cmp[n:]
-                       if err == io.EOF {
-                               if len(cmp) != 0 {
-                                       return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
-                               }
-                               return nil
-                       } else if err != nil {
-                               return err
-                       }
-               }
+               return compareReaderWithBuf(rdr, expect, loc[:32])
        })
 }
 
        })
 }
 
@@ -169,9 +241,9 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
        }
        bpath := v.blockPath(loc)
 
        }
        bpath := v.blockPath(loc)
 
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
        }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
@@ -220,6 +292,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
+var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 // IndexTo writes (to the given Writer) a list of blocks found on this
 // volume which begin with the specified prefix. If the prefix is an
 
 // IndexTo writes (to the given Writer) a list of blocks found on this
 // volume which begin with the specified prefix. If the prefix is an
@@ -276,6 +349,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        if !strings.HasPrefix(name, prefix) {
                                continue
                        }
                        if !strings.HasPrefix(name, prefix) {
                                continue
                        }
+                       if !blockFileRe.MatchString(name) {
+                               continue
+                       }
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
@@ -286,6 +362,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        }
 }
 
        }
 }
 
+// Delete deletes the block data from the unix storage
 func (v *UnixVolume) Delete(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
 func (v *UnixVolume) Delete(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
@@ -298,9 +375,9 @@ func (v *UnixVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
        if v.readonly {
                return MethodDisabledError
        }
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
@@ -313,7 +390,7 @@ func (v *UnixVolume) Delete(loc string) error {
        }
        defer unlockfile(f)
 
        }
        defer unlockfile(f)
 
-       // If the block has been PUT in the last blob_signature_ttl
+       // If the block has been PUT in the last blobSignatureTTL
        // seconds, return success without removing the block. This
        // protects data from garbage collection until it is no longer
        // possible for clients to retrieve the unreferenced blocks
        // seconds, return success without removing the block. This
        // protects data from garbage collection until it is no longer
        // possible for clients to retrieve the unreferenced blocks
@@ -321,7 +398,7 @@ func (v *UnixVolume) Delete(loc string) error {
        if fi, err := os.Stat(p); err != nil {
                return err
        } else {
        if fi, err := os.Stat(p); err != nil {
                return err
        } else {
-               if time.Since(fi.ModTime()) < blob_signature_ttl {
+               if time.Since(fi.ModTime()) < blobSignatureTTL {
                        return nil
                }
        }
                        return nil
                }
        }
@@ -341,7 +418,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 }
 
 // IsFull returns true if the free space on the volume is less than
 }
 
 // IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
        fullSymlink := v.root + "/full"
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
        fullSymlink := v.root + "/full"
@@ -357,7 +434,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MIN_FREE_KILOBYTES
+               isFull = avail < MinFreeKilobytes
        } else {
                log.Printf("%s: FreeDiskSpace: %s\n", v, err)
                isFull = false
        } else {
                log.Printf("%s: FreeDiskSpace: %s\n", v, err)
                isFull = false
@@ -389,10 +466,15 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
+// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
 func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }
 
 func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }
 
+func (v *UnixVolume) Replication() int {
+       return 1
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
@@ -401,3 +483,16 @@ func lockfile(f *os.File) error {
 func unlockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
 }
 func unlockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
 }
+
+// Where appropriate, translate a more specific filesystem error to an
+// error recognized by handlers, like os.ErrNotExist.
+func (v *UnixVolume) translateError(err error) error {
+       switch err.(type) {
+       case *os.PathError:
+               // stat() returns a PathError if the parent directory
+               // (not just the file itself) is missing
+               return os.ErrNotExist
+       default:
+               return err
+       }
+}