10467: Abort S3 request and release buffer if caller disconnects while server is...
[arvados.git] / services / keepstore / volume_unix.go
index da1d390279b3c0afe94a388c002275b0cf44790e..02f0f9f3d1ba00fb17425a5b393094d4c85ea251 100644 (file)
@@ -2,7 +2,7 @@ package main
 
 import (
        "bufio"
-       "errors"
+       "context"
        "flag"
        "fmt"
        "io"
@@ -19,11 +19,16 @@ import (
 )
 
 type unixVolumeAdder struct {
-       *volumeSet
+       *Config
 }
 
-func (vs *unixVolumeAdder) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+// String implements flag.Value
+func (s *unixVolumeAdder) String() string {
+       return "-"
+}
+
+func (vs *unixVolumeAdder) Set(path string) error {
+       if dirs := strings.Split(path, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
                        if err := vs.Set(dir); err != nil {
@@ -32,33 +37,19 @@ func (vs *unixVolumeAdder) Set(value string) error {
                }
                return nil
        }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
+       vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
+               Root:      path,
+               ReadOnly:  deprecated.flagReadonly,
+               Serialize: deprecated.flagSerializeIO,
        })
        return nil
 }
 
 func init() {
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+
+       flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
+       flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
 }
 
 // Discover adds a UnixVolume for every directory named "keep" that is
@@ -89,10 +80,10 @@ func (vs *unixVolumeAdder) Discover() int {
                }
                // Set the -readonly flag (but only for this volume)
                // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
+               flagReadonlyWas := deprecated.flagReadonly
                for _, fsopt := range strings.Split(args[3], ",") {
                        if fsopt == "ro" {
-                               flagReadonly = true
+                               deprecated.flagReadonly = true
                                break
                        }
                        if fsopt == "rw" {
@@ -104,24 +95,62 @@ func (vs *unixVolumeAdder) Discover() int {
                } else {
                        added++
                }
-               flagReadonly = flagReadonlyWas
+               deprecated.flagReadonly = flagReadonlyWas
        }
        return added
 }
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       // path to the volume's root directory
-       root string
+       Root                 string // path to the volume's root directory
+       ReadOnly             bool
+       Serialize            bool
+       DirectoryReplication int
+
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
-       locker   sync.Locker
-       readonly bool
+       locker sync.Locker
+}
+
+// Examples implements VolumeWithExamples.
+func (*UnixVolume) Examples() []Volume {
+       return []Volume{
+               &UnixVolume{
+                       Root:                 "/mnt/local-disk",
+                       Serialize:            true,
+                       DirectoryReplication: 1,
+               },
+               &UnixVolume{
+                       Root:                 "/mnt/network-disk",
+                       Serialize:            false,
+                       DirectoryReplication: 2,
+               },
+       }
+}
+
+// Type implements Volume
+func (v *UnixVolume) Type() string {
+       return "Directory"
+}
+
+// Start implements Volume
+func (v *UnixVolume) Start() error {
+       if v.Serialize {
+               v.locker = &sync.Mutex{}
+       }
+       if !strings.HasPrefix(v.Root, "/") {
+               return fmt.Errorf("volume root does not start with '/': %q", v.Root)
+       }
+       if v.DirectoryReplication == 0 {
+               v.DirectoryReplication = 1
+       }
+       _, err := os.Stat(v.Root)
+       return err
 }
 
 // Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        p := v.blockPath(loc)
@@ -138,9 +167,8 @@ func (v *UnixVolume) Touch(loc string) error {
                return e
        }
        defer unlockfile(f)
-       now := time.Now().Unix()
-       utime := syscall.Utimbuf{now, now}
-       return syscall.Utime(p, &utime)
+       ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
 }
 
 // Mtime returns the stored timestamp for the given locator.
@@ -181,26 +209,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        return stat, err
 }
 
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               return nil, v.translateError(err)
+               return 0, v.translateError(err)
        }
-       buf := bufs.Get(int(stat.Size()))
+       if stat.Size() > int64(len(buf)) {
+               return 0, TooLongError
+       }
+       var read int
+       size := int(stat.Size())
        err = v.getFunc(path, func(rdr io.Reader) error {
-               _, err = io.ReadFull(rdr, buf)
+               read, err = io.ReadFull(rdr, buf[:size])
                return err
        })
-       if err != nil {
-               bufs.Put(buf)
-               return nil, err
-       }
-       return buf, nil
+       return read, err
 }
 
 // Compare returns nil if Get(loc) would return the same content as
@@ -221,7 +247,7 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
 func (v *UnixVolume) Put(loc string, block []byte) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.IsFull() {
@@ -271,14 +297,14 @@ func (v *UnixVolume) Status() *VolumeStatus {
        var fs syscall.Statfs_t
        var devnum uint64
 
-       if fi, err := os.Stat(v.root); err == nil {
+       if fi, err := os.Stat(v.Root); err == nil {
                devnum = fi.Sys().(*syscall.Stat_t).Dev
        } else {
                log.Printf("%s: os.Stat: %s\n", v, err)
                return nil
        }
 
-       err := syscall.Statfs(v.root, &fs)
+       err := syscall.Statfs(v.Root, &fs)
        if err != nil {
                log.Printf("%s: statfs: %s\n", v, err)
                return nil
@@ -288,7 +314,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
        // uses fs.Blocks - fs.Bfree.
        free := fs.Bavail * uint64(fs.Bsize)
        used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{v.root, devnum, free, used}
+       return &VolumeStatus{v.Root, devnum, free, used}
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
@@ -309,8 +335,8 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
-       var lastErr error = nil
-       rootdir, err := os.Open(v.root)
+       var lastErr error
+       rootdir, err := os.Open(v.Root)
        if err != nil {
                return err
        }
@@ -329,7 +355,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                if !blockDirRe.MatchString(names[0]) {
                        continue
                }
-               blockdirpath := filepath.Join(v.root, names[0])
+               blockdirpath := filepath.Join(v.Root, names[0])
                blockdir, err := os.Open(blockdirpath)
                if err != nil {
                        log.Print("Error reading ", blockdirpath, ": ", err)
@@ -355,24 +381,27 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
-                               " ", fileInfo[0].ModTime().Unix(),
+                               " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
                }
                blockdir.Close()
        }
 }
 
-// Delete deletes the block data from the unix storage
+// Trash trashes the block data from the unix storage
+// If TrashLifetime == 0, the block is deleted
+// Else, the block is renamed as path/{loc}.trash.{deadline},
+// where deadline = now + TrashLifetime
 func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
-       // and Delete() because either (a) the file will be deleted and Touch()
+       // and Trash() because either (a) the file will be trashed and Touch()
        // will signal to the caller that the file is not present (and needs to
        // be re-written), or (b) Touch() will update the file's timestamp and
-       // Delete() will read the correct up-to-date timestamp and choose not to
-       // delete the file.
+       // Trash() will read the correct up-to-date timestamp and choose not to
+       // trash the file.
 
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.locker != nil {
@@ -397,24 +426,56 @@ func (v *UnixVolume) Trash(loc string) error {
        // anyway (because the permission signatures have expired).
        if fi, err := os.Stat(p); err != nil {
                return err
-       } else {
-               if time.Since(fi.ModTime()) < blobSignatureTTL {
-                       return nil
-               }
+       } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
+               return nil
+       }
+
+       if theConfig.TrashLifetime == 0 {
+               return os.Remove(p)
        }
-       return os.Remove(p)
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
 }
 
 // Untrash moves block from trash back into store
-// TBD
-func (v *UnixVolume) Untrash(loc string) error {
-       return nil
+// Look for path/{loc}.trash.{deadline} in storage,
+// and rename the first such file as path/{loc}
+func (v *UnixVolume) Untrash(loc string) (err error) {
+       if v.ReadOnly {
+               return MethodDisabledError
+       }
+
+       files, err := ioutil.ReadDir(v.blockDir(loc))
+       if err != nil {
+               return err
+       }
+
+       if len(files) == 0 {
+               return os.ErrNotExist
+       }
+
+       foundTrash := false
+       prefix := fmt.Sprintf("%v.trash.", loc)
+       for _, f := range files {
+               if strings.HasPrefix(f.Name(), prefix) {
+                       foundTrash = true
+                       err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+                       if err == nil {
+                               break
+                       }
+               }
+       }
+
+       if foundTrash == false {
+               return os.ErrNotExist
+       }
+
+       return
 }
 
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
-       return filepath.Join(v.root, loc[0:3])
+       return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
@@ -427,7 +488,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 // MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
-       fullSymlink := v.root + "/full"
+       fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
        if link, err := os.Readlink(fullSymlink); err == nil {
@@ -459,7 +520,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 //
 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
-       err = syscall.Statfs(v.root, &fs)
+       err = syscall.Statfs(v.Root, &fs)
        if err == nil {
                // Statfs output is not guaranteed to measure free
                // space in terms of 1K blocks.
@@ -469,16 +530,19 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 }
 
 func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.root)
+       return fmt.Sprintf("[UnixVolume %s]", v.Root)
 }
 
-// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
 func (v *UnixVolume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
+// Replication returns the number of replicas promised by the
+// underlying device (as specified in configuration).
 func (v *UnixVolume) Replication() int {
-       return 1
+       return v.DirectoryReplication
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
@@ -502,3 +566,50 @@ func (v *UnixVolume) translateError(err error) error {
                return err
        }
 }
+
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+
+// EmptyTrash walks hierarchy looking for {hash}.trash.*
+// and deletes those with deadline < now.
+func (v *UnixVolume) EmptyTrash() {
+       var bytesDeleted, bytesInTrash int64
+       var blocksDeleted, blocksInTrash int
+
+       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
+               if err != nil {
+                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+                       return nil
+               }
+               if info.Mode().IsDir() {
+                       return nil
+               }
+               matches := unixTrashLocRegexp.FindStringSubmatch(path)
+               if len(matches) != 3 {
+                       return nil
+               }
+               deadline, err := strconv.ParseInt(matches[2], 10, 64)
+               if err != nil {
+                       log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+                       return nil
+               }
+               bytesInTrash += info.Size()
+               blocksInTrash++
+               if deadline > time.Now().Unix() {
+                       return nil
+               }
+               err = os.Remove(path)
+               if err != nil {
+                       log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       return nil
+               }
+               bytesDeleted += info.Size()
+               blocksDeleted++
+               return nil
+       })
+
+       if err != nil {
+               log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+       }
+
+       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}