Merge branch '7491-keepclient-bugs' refs #7491
[arvados.git] / services / keepstore / volume_unix.go
index bc6a6e08ce175b2eab67b3beeefbb7636bcb8a14..98c31d1eab6d0c18f3d242daf898c3d25345e490 100644 (file)
@@ -1,7 +1,9 @@
 package main
 
 import (
-       "bytes"
+       "bufio"
+       "errors"
+       "flag"
        "fmt"
        "io"
        "io/ioutil"
@@ -16,6 +18,97 @@ import (
        "time"
 )
 
+type unixVolumeAdder struct {
+       *volumeSet
+}
+
+func (vs *unixVolumeAdder) Set(value string) error {
+       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+               log.Print("DEPRECATED: using comma-separated volume list.")
+               for _, dir := range dirs {
+                       if err := vs.Set(dir); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+       if len(value) == 0 || value[0] != '/' {
+               return errors.New("Invalid volume: must begin with '/'.")
+       }
+       if _, err := os.Stat(value); err != nil {
+               return err
+       }
+       var locker sync.Locker
+       if flagSerializeIO {
+               locker = &sync.Mutex{}
+       }
+       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
+               root:     value,
+               locker:   locker,
+               readonly: flagReadonly,
+       })
+       return nil
+}
+
+func init() {
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volumes",
+               "Deprecated synonym for -volume.")
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volume",
+               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+}
+
+// Discover adds a UnixVolume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *unixVolumeAdder) Discover() int {
+       added := 0
+       f, err := os.Open(ProcMounts)
+       if err != nil {
+               log.Fatalf("opening %s: %s", ProcMounts, err)
+       }
+       scanner := bufio.NewScanner(f)
+       for scanner.Scan() {
+               args := strings.Fields(scanner.Text())
+               if err := scanner.Err(); err != nil {
+                       log.Fatalf("reading %s: %s", ProcMounts, err)
+               }
+               dev, mount := args[0], args[1]
+               if mount == "/" {
+                       continue
+               }
+               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+                       continue
+               }
+               keepdir := mount + "/keep"
+               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+                       continue
+               }
+               // Set the -readonly flag (but only for this volume)
+               // if the filesystem is mounted readonly.
+               flagReadonlyWas := flagReadonly
+               for _, fsopt := range strings.Split(args[3], ",") {
+                       if fsopt == "ro" {
+                               flagReadonly = true
+                               break
+                       }
+                       if fsopt == "rw" {
+                               break
+                       }
+               }
+               if err := vs.Set(keepdir); err != nil {
+                       log.Printf("adding %q: %s", keepdir, err)
+               } else {
+                       added++
+               }
+               flagReadonly = flagReadonlyWas
+       }
+       return added
+}
+
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
        // path to the volume's root directory
@@ -115,41 +208,11 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 // bytes.Compare(), but uses less memory.
 func (v *UnixVolume) Compare(loc string, expect []byte) error {
        path := v.blockPath(loc)
-       stat, err := v.stat(path)
-       if err != nil {
+       if _, err := v.stat(path); err != nil {
                return err
        }
-       bufLen := 1 << 20
-       if int64(bufLen) > stat.Size() {
-               bufLen = int(stat.Size())
-       }
-       cmp := expect
-       buf := make([]byte, bufLen)
        return v.getFunc(path, func(rdr io.Reader) error {
-               // Loop invariants: all data read so far matched what
-               // we expected, and the first N bytes of cmp are
-               // expected to equal the next N bytes read from
-               // reader.
-               for {
-                       n, err := rdr.Read(buf)
-                       if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
-                               return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
-                       }
-                       cmp = cmp[n:]
-                       if err == io.EOF {
-                               if len(cmp) != 0 {
-                                       return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
-                               }
-                               return nil
-                       } else if err != nil {
-                               return err
-                       } else if len(expect) == 0 && n == 0 && bytes.Compare(buf, expect) == 0 {
-                               // When reading an empty block, EOF is not returned. Probably a Go issue.
-                               // This is resulting in an infinite loop resulting in #7329
-                               // Handle empty block scenario explicitly until the EOF issue is resolved.
-                               return nil
-                       }
-               }
+               return compareReaderWithBuf(rdr, expect, loc[:32])
        })
 }
 
@@ -404,6 +467,10 @@ func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }
 
+func (v *UnixVolume) Replication() int {
+       return 1
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)