7241: Add -azure-storage-replication flag.
[arvados.git] / services / keepstore / volume_unix.go
index 4db2a5338400af7aa8c31089df63cf8cf71c502e..98c31d1eab6d0c18f3d242daf898c3d25345e490 100644 (file)
-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
 package main
 
 import (
+       "bufio"
+       "errors"
+       "flag"
        "fmt"
+       "io"
        "io/ioutil"
        "log"
        "os"
        "path/filepath"
+       "regexp"
        "strconv"
        "strings"
+       "sync"
        "syscall"
        "time"
 )
 
-// IORequests are encapsulated Get or Put requests.  They are used to
-// implement serialized I/O (i.e. only one read/write operation per
-// volume). When running in serialized mode, the Keep front end sends
-// IORequests on a channel to an IORunner, which handles them one at a
-// time and returns an IOResponse.
-//
-type IOMethod int
-
-const (
-       KeepGet IOMethod = iota
-       KeepPut
-)
-
-type IORequest struct {
-       method IOMethod
-       loc    string
-       data   []byte
-       reply  chan *IOResponse
-}
-
-type IOResponse struct {
-       data []byte
-       err  error
-}
-
-// A UnixVolume has the following properties:
-//
-//   root
-//       the path to the volume's root directory
-//   queue
-//       A channel of IORequests. If non-nil, all I/O requests for
-//       this volume should be queued on this channel; the result
-//       will be delivered on the IOResponse channel supplied in the
-//       request.
-//
-type UnixVolume struct {
-       root  string // path to this volume
-       queue chan *IORequest
+type unixVolumeAdder struct {
+       *volumeSet
 }
 
-func (v *UnixVolume) IOHandler() {
-       for req := range v.queue {
-               var result IOResponse
-               switch req.method {
-               case KeepGet:
-                       result.data, result.err = v.Read(req.loc)
-               case KeepPut:
-                       result.err = v.Write(req.loc, req.data)
+func (vs *unixVolumeAdder) Set(value string) error {
+       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+               log.Print("DEPRECATED: using comma-separated volume list.")
+               for _, dir := range dirs {
+                       if err := vs.Set(dir); err != nil {
+                               return err
+                       }
                }
-               req.reply <- &result
+               return nil
+       }
+       if len(value) == 0 || value[0] != '/' {
+               return errors.New("Invalid volume: must begin with '/'.")
+       }
+       if _, err := os.Stat(value); err != nil {
+               return err
+       }
+       var locker sync.Locker
+       if flagSerializeIO {
+               locker = &sync.Mutex{}
        }
+       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
+               root:     value,
+               locker:   locker,
+               readonly: flagReadonly,
+       })
+       return nil
 }
 
-func MakeUnixVolume(root string, serialize bool) (v UnixVolume) {
-       if serialize {
-               v = UnixVolume{root, make(chan *IORequest)}
-               go v.IOHandler()
-       } else {
-               v = UnixVolume{root, nil}
-       }
-       return
+func init() {
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volumes",
+               "Deprecated synonym for -volume.")
+       flag.Var(
+               &unixVolumeAdder{&volumes},
+               "volume",
+               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
 }
 
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
-       if v.queue == nil {
-               return v.Read(loc)
+// Discover adds a UnixVolume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *unixVolumeAdder) Discover() int {
+       added := 0
+       f, err := os.Open(ProcMounts)
+       if err != nil {
+               log.Fatalf("opening %s: %s", ProcMounts, err)
+       }
+       scanner := bufio.NewScanner(f)
+       for scanner.Scan() {
+               args := strings.Fields(scanner.Text())
+               if err := scanner.Err(); err != nil {
+                       log.Fatalf("reading %s: %s", ProcMounts, err)
+               }
+               dev, mount := args[0], args[1]
+               if mount == "/" {
+                       continue
+               }
+               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+                       continue
+               }
+               keepdir := mount + "/keep"
+               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+                       continue
+               }
+               // Set the -readonly flag (but only for this volume)
+               // if the filesystem is mounted readonly.
+               flagReadonlyWas := flagReadonly
+               for _, fsopt := range strings.Split(args[3], ",") {
+                       if fsopt == "ro" {
+                               flagReadonly = true
+                               break
+                       }
+                       if fsopt == "rw" {
+                               break
+                       }
+               }
+               if err := vs.Set(keepdir); err != nil {
+                       log.Printf("adding %q: %s", keepdir, err)
+               } else {
+                       added++
+               }
+               flagReadonly = flagReadonlyWas
        }
-       reply := make(chan *IOResponse)
-       v.queue <- &IORequest{KeepGet, loc, nil, reply}
-       response := <-reply
-       return response.data, response.err
+       return added
 }
 
-func (v *UnixVolume) Put(loc string, block []byte) error {
-       if v.queue == nil {
-               return v.Write(loc, block)
-       }
-       reply := make(chan *IOResponse)
-       v.queue <- &IORequest{KeepPut, loc, block, reply}
-       response := <-reply
-       return response.err
+// A UnixVolume stores and retrieves blocks in a local directory.
+type UnixVolume struct {
+       // path to the volume's root directory
+       root string
+       // something to lock during IO, typically a sync.Mutex (or nil
+       // to skip locking)
+       locker   sync.Locker
+       readonly bool
 }
 
+// Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
                return err
        }
        defer f.Close()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
+       }
        if e := lockfile(f); e != nil {
                return e
        }
@@ -113,37 +143,87 @@ func (v *UnixVolume) Touch(loc string) error {
        return syscall.Utime(p, &utime)
 }
 
+// Mtime returns the stored timestamp for the given locator.
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
-       if fi, err := os.Stat(p); err != nil {
+       fi, err := os.Stat(p)
+       if err != nil {
                return time.Time{}, err
-       } else {
-               return fi.ModTime(), nil
        }
+       return fi.ModTime(), nil
 }
 
-// Read retrieves a block identified by the locator string "loc", and
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
+       }
+       f, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+       return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+       stat, err := os.Stat(path)
+       if err == nil {
+               if stat.Size() < 0 {
+                       err = os.ErrInvalid
+               } else if stat.Size() > BlockSize {
+                       err = TooLongError
+               }
+       }
+       return stat, err
+}
+
+// Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be opened or read, Read returns a nil slice
-// and the os.Error that was generated.
-//
-// If the block is present but its content hash does not match loc,
-// Read returns the block and a CorruptError.  It is the caller's
-// responsibility to decide what (if anything) to do with the
-// corrupted data block.
-//
-func (v *UnixVolume) Read(loc string) ([]byte, error) {
-       buf, err := ioutil.ReadFile(v.blockPath(loc))
-       return buf, err
+// Get returns a nil buffer IFF it returns a non-nil error.
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+       path := v.blockPath(loc)
+       stat, err := v.stat(path)
+       if err != nil {
+               return nil, err
+       }
+       buf := bufs.Get(int(stat.Size()))
+       err = v.getFunc(path, func(rdr io.Reader) error {
+               _, err = io.ReadFull(rdr, buf)
+               return err
+       })
+       if err != nil {
+               bufs.Put(buf)
+               return nil, err
+       }
+       return buf, nil
 }
 
-// Write stores a block of data identified by the locator string
+// Compare returns nil if Get(loc) would return the same content as
+// expect. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
+       path := v.blockPath(loc)
+       if _, err := v.stat(path); err != nil {
+               return err
+       }
+       return v.getFunc(path, func(rdr io.Reader) error {
+               return compareReaderWithBuf(rdr, expect, loc[:32])
+       })
+}
+
+// Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
-//
-func (v *UnixVolume) Write(loc string, block []byte) error {
+func (v *UnixVolume) Put(loc string, block []byte) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
        if v.IsFull() {
                return FullError
        }
@@ -161,8 +241,14 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
        }
        bpath := v.blockPath(loc)
 
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
+       }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
+               tmpfile.Close()
+               os.Remove(tmpfile.Name())
                return err
        }
        if err := tmpfile.Close(); err != nil {
@@ -179,7 +265,7 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
 }
 
 // Status returns a VolumeStatus struct describing the volume's
-// current state.
+// current state, or nil if an error occurs.
 //
 func (v *UnixVolume) Status() *VolumeStatus {
        var fs syscall.Statfs_t
@@ -205,14 +291,15 @@ func (v *UnixVolume) Status() *VolumeStatus {
        return &VolumeStatus{v.root, devnum, free, used}
 }
 
-// Index returns a list of blocks found on this volume which begin with
-// the specified prefix. If the prefix is an empty string, Index returns
-// a complete list of blocks.
+var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
+
+// IndexTo writes (to the given Writer) a list of blocks found on this
+// volume which begin with the specified prefix. If the prefix is an
+// empty string, IndexTo writes a complete list of blocks.
 //
-// The return value is a multiline string (separated by
-// newlines). Each line is in the format
+// Each block is given in the format
 //
-//     locator+size modification-time
+//     locator+size modification-time {newline}
 //
 // e.g.:
 //
@@ -220,41 +307,74 @@ func (v *UnixVolume) Status() *VolumeStatus {
 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
-func (v *UnixVolume) Index(prefix string) (output string) {
-       filepath.Walk(v.root,
-               func(path string, info os.FileInfo, err error) error {
-                       // This WalkFunc inspects each path in the volume
-                       // and prints an index line for all files that begin
-                       // with prefix.
-                       if err != nil {
-                               log.Printf("IndexHandler: %s: walking to %s: %s",
-                                       v, path, err)
-                               return nil
-                       }
-                       locator := filepath.Base(path)
-                       // Skip directories that do not match prefix.
-                       // We know there is nothing interesting inside.
-                       if info.IsDir() &&
-                               !strings.HasPrefix(locator, prefix) &&
-                               !strings.HasPrefix(prefix, locator) {
-                               return filepath.SkipDir
-                       }
-                       // Skip any file that is not apparently a locator, e.g. .meta files
-                       if !IsValidLocator(locator) {
-                               return nil
+func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
+       var lastErr error = nil
+       rootdir, err := os.Open(v.root)
+       if err != nil {
+               return err
+       }
+       defer rootdir.Close()
+       for {
+               names, err := rootdir.Readdirnames(1)
+               if err == io.EOF {
+                       return lastErr
+               } else if err != nil {
+                       return err
+               }
+               if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
+                       // prefix excludes all blocks stored in this dir
+                       continue
+               }
+               if !blockDirRe.MatchString(names[0]) {
+                       continue
+               }
+               blockdirpath := filepath.Join(v.root, names[0])
+               blockdir, err := os.Open(blockdirpath)
+               if err != nil {
+                       log.Print("Error reading ", blockdirpath, ": ", err)
+                       lastErr = err
+                       continue
+               }
+               for {
+                       fileInfo, err := blockdir.Readdir(1)
+                       if err == io.EOF {
+                               break
+                       } else if err != nil {
+                               log.Print("Error reading ", blockdirpath, ": ", err)
+                               lastErr = err
+                               break
                        }
-                       // Print filenames beginning with prefix
-                       if !info.IsDir() && strings.HasPrefix(locator, prefix) {
-                               output = output + fmt.Sprintf(
-                                       "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+                       name := fileInfo[0].Name()
+                       if !strings.HasPrefix(name, prefix) {
+                               continue
                        }
-                       return nil
-               })
-
-       return
+                       _, err = fmt.Fprint(w,
+                               name,
+                               "+", fileInfo[0].Size(),
+                               " ", fileInfo[0].ModTime().Unix(),
+                               "\n")
+               }
+               blockdir.Close()
+       }
 }
 
+// Delete deletes the block data from the unix storage
 func (v *UnixVolume) Delete(loc string) error {
+       // Touch() must be called before calling Write() on a block.  Touch()
+       // also uses lockfile().  This avoids a race condition between Write()
+       // and Delete() because either (a) the file will be deleted and Touch()
+       // will signal to the caller that the file is not present (and needs to
+       // be re-written), or (b) Touch() will update the file's timestamp and
+       // Delete() will read the correct up-to-date timestamp and choose not to
+       // delete the file.
+
+       if v.readonly {
+               return MethodDisabledError
+       }
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
+       }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
@@ -266,15 +386,15 @@ func (v *UnixVolume) Delete(loc string) error {
        }
        defer unlockfile(f)
 
-       // If the block has been PUT more recently than -permission_ttl,
-       // return success without removing the block.  This guards against
-       // a race condition where a block is old enough that Data Manager
-       // has added it to the trash list, but the user submitted a PUT
-       // for the block since then.
+       // If the block has been PUT in the last blobSignatureTTL
+       // seconds, return success without removing the block. This
+       // protects data from garbage collection until it is no longer
+       // possible for clients to retrieve the unreferenced blocks
+       // anyway (because the permission signatures have expired).
        if fi, err := os.Stat(p); err != nil {
                return err
        } else {
-               if time.Since(fi.ModTime()) < permission_ttl {
+               if time.Since(fi.ModTime()) < blobSignatureTTL {
                        return nil
                }
        }
@@ -294,7 +414,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 }
 
 // IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
        fullSymlink := v.root + "/full"
@@ -310,7 +430,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MIN_FREE_KILOBYTES
+               isFull = avail < MinFreeKilobytes
        } else {
                log.Printf("%s: FreeDiskSpace: %s\n", v, err)
                isFull = false
@@ -342,6 +462,15 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
+// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
+func (v *UnixVolume) Writable() bool {
+       return !v.readonly
+}
+
+func (v *UnixVolume) Replication() int {
+       return 1
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)