2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / unix_volume.go
index dee4bdc1c1ed1d59badb076dedac2615eef3f2d7..98edfae14d7e602d79e055a77d698dc8a6b466d2 100644 (file)
@@ -28,16 +28,22 @@ import (
 )
 
 func init() {
-       driver["Directory"] = newDirectoryVolume
+       driver["Directory"] = newUnixVolume
 }
 
-func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
-       v := &UnixVolume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
-       err := json.Unmarshal(volume.DriverParameters, &v)
+func newUnixVolume(params newVolumeParams) (volume, error) {
+       v := &UnixVolume{
+               uuid:    params.UUID,
+               cluster: params.Cluster,
+               volume:  params.ConfigVolume,
+               logger:  params.Logger,
+               metrics: params.MetricsVecs,
+       }
+       err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
        if err != nil {
                return nil, err
        }
-       v.logger = v.logger.WithField("Volume", v.String())
+       v.logger = v.logger.WithField("Volume", v.DeviceID())
        return v, v.check()
 }
 
@@ -53,7 +59,7 @@ func (v *UnixVolume) check() error {
        }
 
        // Set up prometheus metrics
-       lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
        v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
 
        _, err := v.os.Stat(v.Root)
@@ -65,6 +71,7 @@ type UnixVolume struct {
        Root      string // path to the volume's root directory
        Serialize bool
 
+       uuid    string
        cluster *arvados.Cluster
        volume  arvados.Volume
        logger  logrus.FieldLogger
@@ -77,15 +84,16 @@ type UnixVolume struct {
        os osWithStats
 }
 
-// GetDeviceID returns a globally unique ID for the volume's root
+// DeviceID returns a globally unique ID for the volume's root
 // directory, consisting of the filesystem's UUID and the path from
 // filesystem root to storage directory, joined by "/". For example,
 // the device ID for a local directory "/mnt/xvda1/keep" might be
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) GetDeviceID() string {
+func (v *UnixVolume) DeviceID() string {
        giveup := func(f string, args ...interface{}) string {
-               v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
-               return ""
+               v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...)
+               host, _ := os.Hostname()
+               return host + ":" + v.Root
        }
        buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
        if err != nil {
@@ -154,12 +162,9 @@ func (v *UnixVolume) GetDeviceID() string {
        return giveup("could not find entry in %q matching %q", udir, dev)
 }
 
-// Touch sets the timestamp for the given locator to the current time
-func (v *UnixVolume) Touch(loc string) error {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       p := v.blockPath(loc)
+// BlockTouch sets the timestamp for the given locator to the current time
+func (v *UnixVolume) BlockTouch(hash string) error {
+       p := v.blockPath(hash)
        f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
                return err
@@ -203,7 +208,7 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader
                return err
        }
        defer f.Close()
-       return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
+       return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
 }
 
 // stat is os.Stat() with some extra sanity checks.
@@ -213,72 +218,45 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
                if stat.Size() < 0 {
                        err = os.ErrInvalid
                } else if stat.Size() > BlockSize {
-                       err = TooLongError
+                       err = errTooLarge
                }
        }
        return stat, err
 }
 
-// Get retrieves a block, copies it to the given slice, and returns
-// the number of bytes copied.
-func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-       return getWithPipe(ctx, loc, buf, v)
-}
-
-// ReadBlock implements BlockReader.
-func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
-       path := v.blockPath(loc)
+// BlockRead reads a block from the volume.
+func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+       path := v.blockPath(hash)
        stat, err := v.stat(path)
        if err != nil {
-               return v.translateError(err)
+               return 0, v.translateError(err)
        }
-       return v.getFunc(ctx, path, func(rdr io.Reader) error {
-               n, err := io.Copy(w, rdr)
+       var n int64
+       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
+               n, err = io.Copy(w, rdr)
                if err == nil && n != stat.Size() {
                        err = io.ErrUnexpectedEOF
                }
                return err
        })
+       return int(n), err
 }
 
-// Compare returns nil if Get(loc) would return the same content as
-// expect. It is functionally equivalent to Get() followed by
-// bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
-       path := v.blockPath(loc)
-       if _, err := v.stat(path); err != nil {
-               return v.translateError(err)
-       }
-       return v.getFunc(ctx, path, func(rdr io.Reader) error {
-               return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
-       })
-}
-
-// Put stores a block of data identified by the locator string
-// "loc".  It returns nil on success.  If the volume is full, it
-// returns a FullError.  If the write fails due to some other error,
-// that error is returned.
-func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
-       return putWithPipe(ctx, loc, block, v)
-}
-
-// WriteBlock implements BlockWriter.
-func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       if v.IsFull() {
-               return FullError
+// BlockWrite stores a block on the volume. If it already exists, its
+// timestamp is updated.
+func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+       if v.isFull() {
+               return errFull
        }
-       bdir := v.blockDir(loc)
+       bdir := v.blockDir(hash)
        if err := os.MkdirAll(bdir, 0755); err != nil {
                return fmt.Errorf("error creating directory %s: %s", bdir, err)
        }
 
-       bpath := v.blockPath(loc)
-       tmpfile, err := v.os.TempFile(bdir, "tmp"+loc)
+       bpath := v.blockPath(hash)
+       tmpfile, err := v.os.TempFile(bdir, "tmp"+hash)
        if err != nil {
-               return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, err)
+               return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, hash, err)
        }
        defer v.os.Remove(tmpfile.Name())
        defer tmpfile.Close()
@@ -287,7 +265,7 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
                return err
        }
        defer v.unlock()
-       n, err := io.Copy(tmpfile, rdr)
+       n, err := tmpfile.Write(data)
        v.os.stats.TickOutBytes(uint64(n))
        if err != nil {
                return fmt.Errorf("error writing %s: %s", bpath, err)
@@ -312,56 +290,10 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
        return nil
 }
 
-// Status returns a VolumeStatus struct describing the volume's
-// current state, or nil if an error occurs.
-func (v *UnixVolume) Status() *VolumeStatus {
-       fi, err := v.os.Stat(v.Root)
-       if err != nil {
-               v.logger.WithError(err).Error("stat failed")
-               return nil
-       }
-       // uint64() cast here supports GOOS=darwin where Dev is
-       // int32. If the device number is negative, the unsigned
-       // devnum won't be the real device number any more, but that's
-       // fine -- all we care about is getting the same number each
-       // time.
-       devnum := uint64(fi.Sys().(*syscall.Stat_t).Dev)
-
-       var fs syscall.Statfs_t
-       if err := syscall.Statfs(v.Root, &fs); err != nil {
-               v.logger.WithError(err).Error("statfs failed")
-               return nil
-       }
-       // These calculations match the way df calculates disk usage:
-       // "free" space is measured by fs.Bavail, but "used" space
-       // uses fs.Blocks - fs.Bfree.
-       free := fs.Bavail * uint64(fs.Bsize)
-       used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{
-               MountPoint: v.Root,
-               DeviceNum:  devnum,
-               BytesFree:  free,
-               BytesUsed:  used,
-       }
-}
-
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-// IndexTo writes (to the given Writer) a list of blocks found on this
-// volume which begin with the specified prefix. If the prefix is an
-// empty string, IndexTo writes a complete list of blocks.
-//
-// Each block is given in the format
-//
-//     locator+size modification-time {newline}
-//
-// e.g.:
-//
-//     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-//     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-//     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
+func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
        rootdir, err := v.os.Open(v.Root)
        if err != nil {
                return err
@@ -374,6 +306,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                return err
        }
        for _, subdir := range subdirs {
+               if ctx.Err() != nil {
+                       return ctx.Err()
+               }
                if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) {
                        // prefix excludes all blocks stored in this dir
                        continue
@@ -388,7 +323,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        v.os.stats.TickOps("readdir")
                        v.os.stats.Tick(&v.os.stats.ReaddirOps)
                        dirents, err = os.ReadDir(blockdirpath)
-                       if err == nil {
+                       if ctx.Err() != nil {
+                               return ctx.Err()
+                       } else if err == nil {
                                break
                        } else if attempt < 5 && strings.Contains(err.Error(), "errno 523") {
                                // EBADCOOKIE (NFS stopped accepting
@@ -402,6 +339,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                }
 
                for _, dirent := range dirents {
+                       if ctx.Err() != nil {
+                               return ctx.Err()
+                       }
                        fileInfo, err := dirent.Info()
                        if os.IsNotExist(err) {
                                // File disappeared between ReadDir() and now
@@ -430,11 +370,11 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        return nil
 }
 
-// Trash trashes the block data from the unix storage
-// If BlobTrashLifetime == 0, the block is deleted
-// Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + BlobTrashLifetime
-func (v *UnixVolume) Trash(loc string) error {
+// BlockTrash trashes the block data from the unix storage.  If
+// BlobTrashLifetime == 0, the block is deleted; otherwise, the block
+// is renamed as path/{loc}.trash.{deadline}, where deadline = now +
+// BlobTrashLifetime.
+func (v *UnixVolume) BlockTrash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
        // and Trash() because either (a) the file will be trashed and Touch()
@@ -442,9 +382,6 @@ func (v *UnixVolume) Trash(loc string) error {
        // be re-written), or (b) Touch() will update the file's timestamp and
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
-       if v.volume.ReadOnly && !v.volume.AllowTrashWhenReadOnly {
-               return MethodDisabledError
-       }
        if err := v.lock(context.TODO()); err != nil {
                return err
        }
@@ -477,17 +414,13 @@ func (v *UnixVolume) Trash(loc string) error {
        return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()))
 }
 
-// Untrash moves block from trash back into store
+// BlockUntrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
-func (v *UnixVolume) Untrash(loc string) (err error) {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-
+func (v *UnixVolume) BlockUntrash(hash string) error {
        v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
-       files, err := ioutil.ReadDir(v.blockDir(loc))
+       files, err := ioutil.ReadDir(v.blockDir(hash))
        if err != nil {
                return err
        }
@@ -497,11 +430,11 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
        }
 
        foundTrash := false
-       prefix := fmt.Sprintf("%v.trash.", loc)
+       prefix := fmt.Sprintf("%v.trash.", hash)
        for _, f := range files {
                if strings.HasPrefix(f.Name(), prefix) {
                        foundTrash = true
-                       err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+                       err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(hash))
                        if err == nil {
                                break
                        }
@@ -512,7 +445,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                return os.ErrNotExist
        }
 
-       return
+       return nil
 }
 
 // blockDir returns the fully qualified directory name for the directory
@@ -527,9 +460,9 @@ func (v *UnixVolume) blockPath(loc string) string {
        return filepath.Join(v.blockDir(loc), loc)
 }
 
-// IsFull returns true if the free space on the volume is less than
+// isFull returns true if the free space on the volume is less than
 // MinFreeKilobytes.
-func (v *UnixVolume) IsFull() (isFull bool) {
+func (v *UnixVolume) isFull() (isFull bool) {
        fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
@@ -543,9 +476,9 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MinFreeKilobytes
+               isFull = avail < BlockSize
        } else {
-               v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
+               v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v.DeviceID())
                isFull = false
        }
 
@@ -565,15 +498,11 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
        if err == nil {
                // Statfs output is not guaranteed to measure free
                // space in terms of 1K blocks.
-               free = fs.Bavail * uint64(fs.Bsize) / 1024
+               free = fs.Bavail * uint64(fs.Bsize)
        }
        return
 }
 
-func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.Root)
-}
-
 // InternalStats returns I/O and filesystem ops counters.
 func (v *UnixVolume) InternalStats() interface{} {
        return &v.os.stats