2960: Buffer reads when serialize enabled on unix volume.
[arvados.git] / services / keepstore / unix_volume.go
index f076ccf18419675499e12eed0e3d017824af8e57..f652a500238d29f1505866abd02c8c7c21b909f6 100644 (file)
@@ -28,20 +28,27 @@ import (
 )
 
 func init() {
-       driver["Directory"] = newDirectoryVolume
+       driver["Directory"] = newUnixVolume
 }
 
-func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
-       v := &UnixVolume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
-       err := json.Unmarshal(volume.DriverParameters, &v)
+func newUnixVolume(params newVolumeParams) (volume, error) {
+       v := &unixVolume{
+               uuid:       params.UUID,
+               cluster:    params.Cluster,
+               volume:     params.ConfigVolume,
+               logger:     params.Logger,
+               metrics:    params.MetricsVecs,
+               bufferPool: params.BufferPool,
+       }
+       err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
        if err != nil {
                return nil, err
        }
-       v.logger = v.logger.WithField("Volume", v.String())
+       v.logger = v.logger.WithField("Volume", v.DeviceID())
        return v, v.check()
 }
 
-func (v *UnixVolume) check() error {
+func (v *unixVolume) check() error {
        if v.Root == "" {
                return errors.New("DriverParameters.Root was not provided")
        }
@@ -53,22 +60,24 @@ func (v *UnixVolume) check() error {
        }
 
        // Set up prometheus metrics
-       lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
        v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
 
        _, err := v.os.Stat(v.Root)
        return err
 }
 
-// A UnixVolume stores and retrieves blocks in a local directory.
-type UnixVolume struct {
+// A unixVolume stores and retrieves blocks in a local directory.
+type unixVolume struct {
        Root      string // path to the volume's root directory
        Serialize bool
 
-       cluster *arvados.Cluster
-       volume  arvados.Volume
-       logger  logrus.FieldLogger
-       metrics *volumeMetricsVecs
+       uuid       string
+       cluster    *arvados.Cluster
+       volume     arvados.Volume
+       logger     logrus.FieldLogger
+       metrics    *volumeMetricsVecs
+       bufferPool *bufferPool
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -77,15 +86,16 @@ type UnixVolume struct {
        os osWithStats
 }
 
-// GetDeviceID returns a globally unique ID for the volume's root
+// DeviceID returns a globally unique ID for the volume's root
 // directory, consisting of the filesystem's UUID and the path from
 // filesystem root to storage directory, joined by "/". For example,
 // the device ID for a local directory "/mnt/xvda1/keep" might be
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) GetDeviceID() string {
+func (v *unixVolume) DeviceID() string {
        giveup := func(f string, args ...interface{}) string {
-               v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
-               return ""
+               v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...)
+               host, _ := os.Hostname()
+               return host + ":" + v.Root
        }
        buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
        if err != nil {
@@ -154,12 +164,9 @@ func (v *UnixVolume) GetDeviceID() string {
        return giveup("could not find entry in %q matching %q", udir, dev)
 }
 
-// Touch sets the timestamp for the given locator to the current time
-func (v *UnixVolume) Touch(loc string) error {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       p := v.blockPath(loc)
+// BlockTouch sets the timestamp for the given locator to the current time
+func (v *unixVolume) BlockTouch(hash string) error {
+       p := v.blockPath(hash)
        f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
                return err
@@ -182,7 +189,7 @@ func (v *UnixVolume) Touch(loc string) error {
 }
 
 // Mtime returns the stored timestamp for the given locator.
-func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
+func (v *unixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
        fi, err := v.os.Stat(p)
        if err != nil {
@@ -193,7 +200,7 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
+func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
        if err := v.lock(ctx); err != nil {
                return err
        }
@@ -203,82 +210,75 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader
                return err
        }
        defer f.Close()
-       return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
+       return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
 }
 
 // stat is os.Stat() with some extra sanity checks.
-func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+func (v *unixVolume) stat(path string) (os.FileInfo, error) {
        stat, err := v.os.Stat(path)
        if err == nil {
                if stat.Size() < 0 {
                        err = os.ErrInvalid
                } else if stat.Size() > BlockSize {
-                       err = TooLongError
+                       err = errTooLarge
                }
        }
        return stat, err
 }
 
-// Get retrieves a block, copies it to the given slice, and returns
-// the number of bytes copied.
-func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-       return getWithPipe(ctx, loc, buf, v)
-}
-
-// ReadBlock implements BlockReader.
-func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
-       path := v.blockPath(loc)
+// BlockRead reads a block from the volume.
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+       path := v.blockPath(hash)
        stat, err := v.stat(path)
        if err != nil {
-               return v.translateError(err)
+               return 0, v.translateError(err)
        }
-       return v.getFunc(ctx, path, func(rdr io.Reader) error {
-               n, err := io.Copy(w, rdr)
+       var streamer *streamWriterAt
+       if v.locker != nil {
+               buf, err := v.bufferPool.GetContext(ctx)
+               if err != nil {
+                       return 0, err
+               }
+               defer v.bufferPool.Put(buf)
+               streamer = newStreamWriterAt(w, 65536, buf)
+               defer streamer.Close()
+               w = streamer
+       }
+       var n int64
+       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
+               n, err = io.Copy(w, rdr)
                if err == nil && n != stat.Size() {
                        err = io.ErrUnexpectedEOF
                }
                return err
        })
-}
-
-// Compare returns nil if Get(loc) would return the same content as
-// expect. It is functionally equivalent to Get() followed by
-// bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
-       path := v.blockPath(loc)
-       if _, err := v.stat(path); err != nil {
-               return v.translateError(err)
+       if streamer != nil {
+               // If we're using the streamer (and there's no error
+               // so far) flush any remaining buffered data now that
+               // getFunc has released the serialize lock.
+               if err == nil {
+                       err = streamer.Close()
+               }
+               return streamer.WroteAt(), err
        }
-       return v.getFunc(ctx, path, func(rdr io.Reader) error {
-               return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
-       })
+       return int(n), err
 }
 
-// Put stores a block of data identified by the locator string
-// "loc".  It returns nil on success.  If the volume is full, it
-// returns a FullError.  If the write fails due to some other error,
-// that error is returned.
-func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
-       return putWithPipe(ctx, loc, block, v)
-}
-
-// WriteBlock implements BlockWriter.
-func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       if v.IsFull() {
-               return FullError
+// BlockWrite stores a block on the volume. If it already exists, its
+// timestamp is updated.
+func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+       if v.isFull() {
+               return errFull
        }
-       bdir := v.blockDir(loc)
+       bdir := v.blockDir(hash)
        if err := os.MkdirAll(bdir, 0755); err != nil {
                return fmt.Errorf("error creating directory %s: %s", bdir, err)
        }
 
-       bpath := v.blockPath(loc)
-       tmpfile, err := v.os.TempFile(bdir, "tmp"+loc)
+       bpath := v.blockPath(hash)
+       tmpfile, err := v.os.TempFile(bdir, "tmp"+hash)
        if err != nil {
-               return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, err)
+               return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, hash, err)
        }
        defer v.os.Remove(tmpfile.Name())
        defer tmpfile.Close()
@@ -287,7 +287,7 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
                return err
        }
        defer v.unlock()
-       n, err := io.Copy(tmpfile, rdr)
+       n, err := tmpfile.Write(data)
        v.os.stats.TickOutBytes(uint64(n))
        if err != nil {
                return fmt.Errorf("error writing %s: %s", bpath, err)
@@ -312,94 +312,67 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
        return nil
 }
 
-// Status returns a VolumeStatus struct describing the volume's
-// current state, or nil if an error occurs.
-//
-func (v *UnixVolume) Status() *VolumeStatus {
-       fi, err := v.os.Stat(v.Root)
-       if err != nil {
-               v.logger.WithError(err).Error("stat failed")
-               return nil
-       }
-       devnum := fi.Sys().(*syscall.Stat_t).Dev
-
-       var fs syscall.Statfs_t
-       if err := syscall.Statfs(v.Root, &fs); err != nil {
-               v.logger.WithError(err).Error("statfs failed")
-               return nil
-       }
-       // These calculations match the way df calculates disk usage:
-       // "free" space is measured by fs.Bavail, but "used" space
-       // uses fs.Blocks - fs.Bfree.
-       free := fs.Bavail * uint64(fs.Bsize)
-       used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{
-               MountPoint: v.Root,
-               DeviceNum:  devnum,
-               BytesFree:  free,
-               BytesUsed:  used,
-       }
-}
-
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-// IndexTo writes (to the given Writer) a list of blocks found on this
-// volume which begin with the specified prefix. If the prefix is an
-// empty string, IndexTo writes a complete list of blocks.
-//
-// Each block is given in the format
-//
-//     locator+size modification-time {newline}
-//
-// e.g.:
-//
-//     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-//     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-//     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-//
-func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
-       var lastErr error
+func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
        rootdir, err := v.os.Open(v.Root)
        if err != nil {
                return err
        }
-       defer rootdir.Close()
        v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
-       for {
-               names, err := rootdir.Readdirnames(1)
-               if err == io.EOF {
-                       return lastErr
-               } else if err != nil {
-                       return err
+       subdirs, err := rootdir.Readdirnames(-1)
+       rootdir.Close()
+       if err != nil {
+               return err
+       }
+       for _, subdir := range subdirs {
+               if ctx.Err() != nil {
+                       return ctx.Err()
                }
-               if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
+               if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) {
                        // prefix excludes all blocks stored in this dir
                        continue
                }
-               if !blockDirRe.MatchString(names[0]) {
-                       continue
-               }
-               blockdirpath := filepath.Join(v.Root, names[0])
-               blockdir, err := v.os.Open(blockdirpath)
-               if err != nil {
-                       v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
-                       lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
+               if !blockDirRe.MatchString(subdir) {
                        continue
                }
-               v.os.stats.TickOps("readdir")
-               v.os.stats.Tick(&v.os.stats.ReaddirOps)
-               for {
-                       fileInfo, err := blockdir.Readdir(1)
-                       if err == io.EOF {
+               blockdirpath := filepath.Join(v.Root, subdir)
+
+               var dirents []os.DirEntry
+               for attempt := 0; ; attempt++ {
+                       v.os.stats.TickOps("readdir")
+                       v.os.stats.Tick(&v.os.stats.ReaddirOps)
+                       dirents, err = os.ReadDir(blockdirpath)
+                       if ctx.Err() != nil {
+                               return ctx.Err()
+                       } else if err == nil {
                                break
+                       } else if attempt < 5 && strings.Contains(err.Error(), "errno 523") {
+                               // EBADCOOKIE (NFS stopped accepting
+                               // our readdirent cookie) -- retry a
+                               // few times before giving up
+                               v.logger.WithError(err).Printf("retry after error reading %s", blockdirpath)
+                               continue
+                       } else {
+                               return err
+                       }
+               }
+
+               for _, dirent := range dirents {
+                       if ctx.Err() != nil {
+                               return ctx.Err()
+                       }
+                       fileInfo, err := dirent.Info()
+                       if os.IsNotExist(err) {
+                               // File disappeared between ReadDir() and now
+                               continue
                        } else if err != nil {
-                               v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
-                               lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
-                               break
+                               v.logger.WithError(err).Errorf("error getting FileInfo for %q in %q", dirent.Name(), blockdirpath)
+                               return err
                        }
-                       name := fileInfo[0].Name()
+                       name := fileInfo.Name()
                        if !strings.HasPrefix(name, prefix) {
                                continue
                        }
@@ -408,23 +381,22 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        }
                        _, err = fmt.Fprint(w,
                                name,
-                               "+", fileInfo[0].Size(),
-                               " ", fileInfo[0].ModTime().UnixNano(),
+                               "+", fileInfo.Size(),
+                               " ", fileInfo.ModTime().UnixNano(),
                                "\n")
                        if err != nil {
-                               blockdir.Close()
                                return fmt.Errorf("error writing: %s", err)
                        }
                }
-               blockdir.Close()
        }
+       return nil
 }
 
-// Trash trashes the block data from the unix storage
-// If BlobTrashLifetime == 0, the block is deleted
-// Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + BlobTrashLifetime
-func (v *UnixVolume) Trash(loc string) error {
+// BlockTrash trashes the block data from the unix storage.  If
+// BlobTrashLifetime == 0, the block is deleted; otherwise, the block
+// is renamed as path/{loc}.trash.{deadline}, where deadline = now +
+// BlobTrashLifetime.
+func (v *unixVolume) BlockTrash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
        // and Trash() because either (a) the file will be trashed and Touch()
@@ -432,10 +404,6 @@ func (v *UnixVolume) Trash(loc string) error {
        // be re-written), or (b) Touch() will update the file's timestamp and
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
-
-       if v.volume.ReadOnly || !v.cluster.Collections.BlobTrash {
-               return MethodDisabledError
-       }
        if err := v.lock(context.TODO()); err != nil {
                return err
        }
@@ -468,17 +436,13 @@ func (v *UnixVolume) Trash(loc string) error {
        return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()))
 }
 
-// Untrash moves block from trash back into store
+// BlockUntrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
-func (v *UnixVolume) Untrash(loc string) (err error) {
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-
+func (v *unixVolume) BlockUntrash(hash string) error {
        v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
-       files, err := ioutil.ReadDir(v.blockDir(loc))
+       files, err := ioutil.ReadDir(v.blockDir(hash))
        if err != nil {
                return err
        }
@@ -488,11 +452,11 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
        }
 
        foundTrash := false
-       prefix := fmt.Sprintf("%v.trash.", loc)
+       prefix := fmt.Sprintf("%v.trash.", hash)
        for _, f := range files {
                if strings.HasPrefix(f.Name(), prefix) {
                        foundTrash = true
-                       err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+                       err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(hash))
                        if err == nil {
                                break
                        }
@@ -503,25 +467,24 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                return os.ErrNotExist
        }
 
-       return
+       return nil
 }
 
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
-func (v *UnixVolume) blockDir(loc string) string {
+func (v *unixVolume) blockDir(loc string) string {
        return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
 // on this volume.
-func (v *UnixVolume) blockPath(loc string) string {
+func (v *unixVolume) blockPath(loc string) string {
        return filepath.Join(v.blockDir(loc), loc)
 }
 
-// IsFull returns true if the free space on the volume is less than
+// isFull returns true if the free space on the volume is less than
 // MinFreeKilobytes.
-//
-func (v *UnixVolume) IsFull() (isFull bool) {
+func (v *unixVolume) isFull() (isFull bool) {
        fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
@@ -535,9 +498,9 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MinFreeKilobytes
+               isFull = avail < BlockSize
        } else {
-               v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
+               v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v.DeviceID())
                isFull = false
        }
 
@@ -551,31 +514,26 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 
 // FreeDiskSpace returns the number of unused 1k blocks available on
 // the volume.
-//
-func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+func (v *unixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
        err = syscall.Statfs(v.Root, &fs)
        if err == nil {
                // Statfs output is not guaranteed to measure free
                // space in terms of 1K blocks.
-               free = fs.Bavail * uint64(fs.Bsize) / 1024
+               free = fs.Bavail * uint64(fs.Bsize)
        }
        return
 }
 
-func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.Root)
-}
-
 // InternalStats returns I/O and filesystem ops counters.
-func (v *UnixVolume) InternalStats() interface{} {
+func (v *unixVolume) InternalStats() interface{} {
        return &v.os.stats
 }
 
 // lock acquires the serialize lock, if one is in use. If ctx is done
 // before the lock is acquired, lock returns ctx.Err() instead of
 // acquiring the lock.
-func (v *UnixVolume) lock(ctx context.Context) error {
+func (v *unixVolume) lock(ctx context.Context) error {
        if v.locker == nil {
                return nil
        }
@@ -599,7 +557,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
 }
 
 // unlock releases the serialize lock, if one is in use.
-func (v *UnixVolume) unlock() {
+func (v *unixVolume) unlock() {
        if v.locker == nil {
                return
        }
@@ -607,7 +565,7 @@ func (v *UnixVolume) unlock() {
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
-func (v *UnixVolume) lockfile(f *os.File) error {
+func (v *unixVolume) lockfile(f *os.File) error {
        v.os.stats.TickOps("flock")
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
@@ -615,7 +573,7 @@ func (v *UnixVolume) lockfile(f *os.File) error {
        return err
 }
 
-func (v *UnixVolume) unlockfile(f *os.File) error {
+func (v *unixVolume) unlockfile(f *os.File) error {
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
        v.os.stats.TickErr(err)
        return err
@@ -623,7 +581,7 @@ func (v *UnixVolume) unlockfile(f *os.File) error {
 
 // Where appropriate, translate a more specific filesystem error to an
 // error recognized by handlers, like os.ErrNotExist.
-func (v *UnixVolume) translateError(err error) error {
+func (v *unixVolume) translateError(err error) error {
        switch err.(type) {
        case *os.PathError:
                // stat() returns a PathError if the parent directory
@@ -638,11 +596,7 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
-func (v *UnixVolume) EmptyTrash() {
-       if v.cluster.Collections.BlobDeleteConcurrency < 1 {
-               return
-       }
-
+func (v *unixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int64