10467: Abort S3 request and release buffer if caller disconnects while server is...
[arvados.git] / services / keepstore / volume_unix.go
index edec048dfe5836701f821beefdc3d7b131777982..02f0f9f3d1ba00fb17425a5b393094d4c85ea251 100644 (file)
@@ -2,7 +2,7 @@ package main
 
 import (
        "bufio"
-       "errors"
+       "context"
        "flag"
        "fmt"
        "io"
@@ -19,11 +19,16 @@ import (
 )
 
 type unixVolumeAdder struct {
-       *volumeSet
+       *Config
 }
 
-func (vs *unixVolumeAdder) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+// String implements flag.Value
+func (s *unixVolumeAdder) String() string {
+       return "-"
+}
+
+func (vs *unixVolumeAdder) Set(path string) error {
+       if dirs := strings.Split(path, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
                        if err := vs.Set(dir); err != nil {
@@ -32,33 +37,19 @@ func (vs *unixVolumeAdder) Set(value string) error {
                }
                return nil
        }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
+       vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
+               Root:      path,
+               ReadOnly:  deprecated.flagReadonly,
+               Serialize: deprecated.flagSerializeIO,
        })
        return nil
 }
 
 func init() {
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+
+       flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
+       flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
 }
 
 // Discover adds a UnixVolume for every directory named "keep" that is
@@ -89,10 +80,10 @@ func (vs *unixVolumeAdder) Discover() int {
                }
                // Set the -readonly flag (but only for this volume)
                // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
+               flagReadonlyWas := deprecated.flagReadonly
                for _, fsopt := range strings.Split(args[3], ",") {
                        if fsopt == "ro" {
-                               flagReadonly = true
+                               deprecated.flagReadonly = true
                                break
                        }
                        if fsopt == "rw" {
@@ -104,24 +95,62 @@ func (vs *unixVolumeAdder) Discover() int {
                } else {
                        added++
                }
-               flagReadonly = flagReadonlyWas
+               deprecated.flagReadonly = flagReadonlyWas
        }
        return added
 }
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       // path to the volume's root directory
-       root string
+       Root                 string // path to the volume's root directory
+       ReadOnly             bool
+       Serialize            bool
+       DirectoryReplication int
+
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
-       locker   sync.Locker
-       readonly bool
+       locker sync.Locker
+}
+
+// Examples implements VolumeWithExamples.
+func (*UnixVolume) Examples() []Volume {
+       return []Volume{
+               &UnixVolume{
+                       Root:                 "/mnt/local-disk",
+                       Serialize:            true,
+                       DirectoryReplication: 1,
+               },
+               &UnixVolume{
+                       Root:                 "/mnt/network-disk",
+                       Serialize:            false,
+                       DirectoryReplication: 2,
+               },
+       }
+}
+
+// Type implements Volume
+func (v *UnixVolume) Type() string {
+       return "Directory"
+}
+
+// Start implements Volume
+func (v *UnixVolume) Start() error {
+       if v.Serialize {
+               v.locker = &sync.Mutex{}
+       }
+       if !strings.HasPrefix(v.Root, "/") {
+               return fmt.Errorf("volume root does not start with '/': %q", v.Root)
+       }
+       if v.DirectoryReplication == 0 {
+               v.DirectoryReplication = 1
+       }
+       _, err := os.Stat(v.Root)
+       return err
 }
 
 // Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        p := v.blockPath(loc)
@@ -138,9 +167,8 @@ func (v *UnixVolume) Touch(loc string) error {
                return e
        }
        defer unlockfile(f)
-       now := time.Now().Unix()
-       utime := syscall.Utimbuf{now, now}
-       return syscall.Utime(p, &utime)
+       ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
 }
 
 // Mtime returns the stored timestamp for the given locator.
@@ -183,7 +211,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 
 // Get retrieves a block, copies it to the given slice, and returns
 // the number of bytes copied.
-func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
@@ -219,7 +247,7 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
 func (v *UnixVolume) Put(loc string, block []byte) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.IsFull() {
@@ -269,14 +297,14 @@ func (v *UnixVolume) Status() *VolumeStatus {
        var fs syscall.Statfs_t
        var devnum uint64
 
-       if fi, err := os.Stat(v.root); err == nil {
+       if fi, err := os.Stat(v.Root); err == nil {
                devnum = fi.Sys().(*syscall.Stat_t).Dev
        } else {
                log.Printf("%s: os.Stat: %s\n", v, err)
                return nil
        }
 
-       err := syscall.Statfs(v.root, &fs)
+       err := syscall.Statfs(v.Root, &fs)
        if err != nil {
                log.Printf("%s: statfs: %s\n", v, err)
                return nil
@@ -286,7 +314,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
        // uses fs.Blocks - fs.Bfree.
        free := fs.Bavail * uint64(fs.Bsize)
        used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{v.root, devnum, free, used}
+       return &VolumeStatus{v.Root, devnum, free, used}
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
@@ -307,8 +335,8 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
-       var lastErr error = nil
-       rootdir, err := os.Open(v.root)
+       var lastErr error
+       rootdir, err := os.Open(v.Root)
        if err != nil {
                return err
        }
@@ -327,7 +355,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                if !blockDirRe.MatchString(names[0]) {
                        continue
                }
-               blockdirpath := filepath.Join(v.root, names[0])
+               blockdirpath := filepath.Join(v.Root, names[0])
                blockdir, err := os.Open(blockdirpath)
                if err != nil {
                        log.Print("Error reading ", blockdirpath, ": ", err)
@@ -353,7 +381,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
-                               " ", fileInfo[0].ModTime().Unix(),
+                               " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
                }
                blockdir.Close()
@@ -361,9 +389,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 }
 
 // Trash trashes the block data from the unix storage
-// If trashLifetime == 0, the block is deleted
+// If TrashLifetime == 0, the block is deleted
 // Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + trashLifetime
+// where deadline = now + TrashLifetime
 func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
@@ -373,7 +401,7 @@ func (v *UnixVolume) Trash(loc string) error {
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
 
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.locker != nil {
@@ -398,23 +426,21 @@ func (v *UnixVolume) Trash(loc string) error {
        // anyway (because the permission signatures have expired).
        if fi, err := os.Stat(p); err != nil {
                return err
-       } else {
-               if time.Since(fi.ModTime()) < blobSignatureTTL {
-                       return nil
-               }
+       } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
+               return nil
        }
 
-       if trashLifetime == 0 {
+       if theConfig.TrashLifetime == 0 {
                return os.Remove(p)
        }
-       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
 }
 
 // Untrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
 func (v *UnixVolume) Untrash(loc string) (err error) {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
 
@@ -449,7 +475,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
-       return filepath.Join(v.root, loc[0:3])
+       return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
@@ -462,7 +488,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 // MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
-       fullSymlink := v.root + "/full"
+       fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
        if link, err := os.Readlink(fullSymlink); err == nil {
@@ -494,7 +520,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 //
 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
-       err = syscall.Statfs(v.root, &fs)
+       err = syscall.Statfs(v.Root, &fs)
        if err == nil {
                // Statfs output is not guaranteed to measure free
                // space in terms of 1K blocks.
@@ -504,16 +530,19 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 }
 
 func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.root)
+       return fmt.Sprintf("[UnixVolume %s]", v.Root)
 }
 
-// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
 func (v *UnixVolume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
+// Replication returns the number of replicas promised by the
+// underlying device (as specified in configuration).
 func (v *UnixVolume) Replication() int {
-       return 1
+       return v.DirectoryReplication
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
@@ -538,7 +567,7 @@ func (v *UnixVolume) translateError(err error) error {
        }
 }
 
-var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
@@ -546,7 +575,7 @@ func (v *UnixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int
 
-       err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
                if err != nil {
                        log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
                        return nil
@@ -554,7 +583,7 @@ func (v *UnixVolume) EmptyTrash() {
                if info.Mode().IsDir() {
                        return nil
                }
-               matches := trashLocRegexp.FindStringSubmatch(path)
+               matches := unixTrashLocRegexp.FindStringSubmatch(path)
                if len(matches) != 3 {
                        return nil
                }