2960: Buffer reads when serialize enabled on unix volume.
[arvados.git] / services / keepstore / unix_volume.go
index 4d9e798ac67c71c2a81f51abeb2128b340a6cda6..f652a500238d29f1505866abd02c8c7c21b909f6 100644 (file)
@@ -2,12 +2,12 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
-       "bufio"
        "context"
-       "flag"
+       "encoding/json"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
@@ -22,98 +22,62 @@ import (
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
-type unixVolumeAdder struct {
-       *Config
+func init() {
+       driver["Directory"] = newUnixVolume
 }
 
-// String implements flag.Value
-func (vs *unixVolumeAdder) String() string {
-       return "-"
+func newUnixVolume(params newVolumeParams) (volume, error) {
+       v := &unixVolume{
+               uuid:       params.UUID,
+               cluster:    params.Cluster,
+               volume:     params.ConfigVolume,
+               logger:     params.Logger,
+               metrics:    params.MetricsVecs,
+               bufferPool: params.BufferPool,
+       }
+       err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
+       if err != nil {
+               return nil, err
+       }
+       v.logger = v.logger.WithField("Volume", v.DeviceID())
+       return v, v.check()
 }
 
-func (vs *unixVolumeAdder) Set(path string) error {
-       if dirs := strings.Split(path, ","); len(dirs) > 1 {
-               log.Print("DEPRECATED: using comma-separated volume list.")
-               for _, dir := range dirs {
-                       if err := vs.Set(dir); err != nil {
-                               return err
-                       }
-               }
-               return nil
+func (v *unixVolume) check() error {
+       if v.Root == "" {
+               return errors.New("DriverParameters.Root was not provided")
+       }
+       if v.Serialize {
+               v.locker = &sync.Mutex{}
+       }
+       if !strings.HasPrefix(v.Root, "/") {
+               return fmt.Errorf("DriverParameters.Root %q does not start with '/'", v.Root)
        }
-       vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
-               Root:      path,
-               ReadOnly:  deprecated.flagReadonly,
-               Serialize: deprecated.flagSerializeIO,
-       })
-       return nil
-}
 
-func init() {
-       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
 
-       flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
-       flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
+       _, err := v.os.Stat(v.Root)
+       return err
 }
 
-// Discover adds a UnixVolume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *unixVolumeAdder) Discover() int {
-       added := 0
-       f, err := os.Open(ProcMounts)
-       if err != nil {
-               log.Fatalf("opening %s: %s", ProcMounts, err)
-       }
-       scanner := bufio.NewScanner(f)
-       for scanner.Scan() {
-               args := strings.Fields(scanner.Text())
-               if err := scanner.Err(); err != nil {
-                       log.Fatalf("reading %s: %s", ProcMounts, err)
-               }
-               dev, mount := args[0], args[1]
-               if mount == "/" {
-                       continue
-               }
-               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
-                       continue
-               }
-               keepdir := mount + "/keep"
-               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
-                       continue
-               }
-               // Set the -readonly flag (but only for this volume)
-               // if the filesystem is mounted readonly.
-               flagReadonlyWas := deprecated.flagReadonly
-               for _, fsopt := range strings.Split(args[3], ",") {
-                       if fsopt == "ro" {
-                               deprecated.flagReadonly = true
-                               break
-                       }
-                       if fsopt == "rw" {
-                               break
-                       }
-               }
-               if err := vs.Set(keepdir); err != nil {
-                       log.Printf("adding %q: %s", keepdir, err)
-               } else {
-                       added++
-               }
-               deprecated.flagReadonly = flagReadonlyWas
-       }
-       return added
-}
+// A unixVolume stores and retrieves blocks in a local directory.
+type unixVolume struct {
+       Root      string // path to the volume's root directory
+       Serialize bool
 
-// A UnixVolume stores and retrieves blocks in a local directory.
-type UnixVolume struct {
-       Root                 string // path to the volume's root directory
-       ReadOnly             bool
-       Serialize            bool
-       DirectoryReplication int
-       StorageClasses       []string
+       uuid       string
+       cluster    *arvados.Cluster
+       volume     arvados.Volume
+       logger     logrus.FieldLogger
+       metrics    *volumeMetricsVecs
+       bufferPool *bufferPool
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -125,12 +89,13 @@ type UnixVolume struct {
 // DeviceID returns a globally unique ID for the volume's root
 // directory, consisting of the filesystem's UUID and the path from
 // filesystem root to storage directory, joined by "/". For example,
-// the DeviceID for a local directory "/mnt/xvda1/keep" might be
+// the device ID for a local directory "/mnt/xvda1/keep" might be
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) DeviceID() string {
+func (v *unixVolume) DeviceID() string {
        giveup := func(f string, args ...interface{}) string {
-               log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
-               return ""
+               v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...)
+               host, _ := os.Hostname()
+               return host + ":" + v.Root
        }
        buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
        if err != nil {
@@ -180,6 +145,7 @@ func (v *UnixVolume) DeviceID() string {
        if err != nil {
                return giveup("opening %q: %s", udir, err)
        }
+       defer d.Close()
        uuids, err := d.Readdirnames(0)
        if err != nil {
                return giveup("reading %q: %s", udir, err)
@@ -188,7 +154,7 @@ func (v *UnixVolume) DeviceID() string {
                link := filepath.Join(udir, uuid)
                fi, err = os.Stat(link)
                if err != nil {
-                       log.Printf("error: stat %q: %s", link, err)
+                       v.logger.WithError(err).Errorf("stat(%q) failed", link)
                        continue
                }
                if fi.Sys().(*syscall.Stat_t).Ino == ino {
@@ -198,53 +164,9 @@ func (v *UnixVolume) DeviceID() string {
        return giveup("could not find entry in %q matching %q", udir, dev)
 }
 
-// Examples implements VolumeWithExamples.
-func (*UnixVolume) Examples() []Volume {
-       return []Volume{
-               &UnixVolume{
-                       Root:                 "/mnt/local-disk",
-                       Serialize:            true,
-                       DirectoryReplication: 1,
-               },
-               &UnixVolume{
-                       Root:                 "/mnt/network-disk",
-                       Serialize:            false,
-                       DirectoryReplication: 2,
-               },
-       }
-}
-
-// Type implements Volume
-func (v *UnixVolume) Type() string {
-       return "Directory"
-}
-
-// Start implements Volume
-func (v *UnixVolume) Start(vm *volumeMetricsVecs) error {
-       if v.Serialize {
-               v.locker = &sync.Mutex{}
-       }
-       if !strings.HasPrefix(v.Root, "/") {
-               return fmt.Errorf("volume root does not start with '/': %q", v.Root)
-       }
-       if v.DirectoryReplication == 0 {
-               v.DirectoryReplication = 1
-       }
-       // Set up prometheus metrics
-       lbls := prometheus.Labels{"device_id": v.DeviceID()}
-       v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = vm.getCounterVecsFor(lbls)
-
-       _, err := v.os.Stat(v.Root)
-
-       return err
-}
-
-// Touch sets the timestamp for the given locator to the current time
-func (v *UnixVolume) Touch(loc string) error {
-       if v.ReadOnly {
-               return MethodDisabledError
-       }
-       p := v.blockPath(loc)
+// BlockTouch sets the timestamp for the given locator to the current time
+func (v *unixVolume) BlockTouch(hash string) error {
+       p := v.blockPath(hash)
        f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
                return err
@@ -258,16 +180,16 @@ func (v *UnixVolume) Touch(loc string) error {
                return e
        }
        defer v.unlockfile(f)
-       ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       ts := time.Now()
        v.os.stats.TickOps("utimes")
        v.os.stats.Tick(&v.os.stats.UtimesOps)
-       err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
+       err = os.Chtimes(p, ts, ts)
        v.os.stats.TickErr(err)
        return err
 }
 
 // Mtime returns the stored timestamp for the given locator.
-func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
+func (v *unixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
        fi, err := v.os.Stat(p)
        if err != nil {
@@ -278,7 +200,7 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
+func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
        if err := v.lock(ctx); err != nil {
                return err
        }
@@ -288,200 +210,169 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader
                return err
        }
        defer f.Close()
-       return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
+       return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
 }
 
 // stat is os.Stat() with some extra sanity checks.
-func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+func (v *unixVolume) stat(path string) (os.FileInfo, error) {
        stat, err := v.os.Stat(path)
        if err == nil {
                if stat.Size() < 0 {
                        err = os.ErrInvalid
                } else if stat.Size() > BlockSize {
-                       err = TooLongError
+                       err = errTooLarge
                }
        }
        return stat, err
 }
 
-// Get retrieves a block, copies it to the given slice, and returns
-// the number of bytes copied.
-func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-       return getWithPipe(ctx, loc, buf, v)
-}
-
-// ReadBlock implements BlockReader.
-func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
-       path := v.blockPath(loc)
+// BlockRead reads a block from the volume.
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+       path := v.blockPath(hash)
        stat, err := v.stat(path)
        if err != nil {
-               return v.translateError(err)
+               return 0, v.translateError(err)
        }
-       return v.getFunc(ctx, path, func(rdr io.Reader) error {
-               n, err := io.Copy(w, rdr)
+       var streamer *streamWriterAt
+       if v.locker != nil {
+               buf, err := v.bufferPool.GetContext(ctx)
+               if err != nil {
+                       return 0, err
+               }
+               defer v.bufferPool.Put(buf)
+               streamer = newStreamWriterAt(w, 65536, buf)
+               defer streamer.Close()
+               w = streamer
+       }
+       var n int64
+       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
+               n, err = io.Copy(w, rdr)
                if err == nil && n != stat.Size() {
                        err = io.ErrUnexpectedEOF
                }
                return err
        })
-}
-
-// Compare returns nil if Get(loc) would return the same content as
-// expect. It is functionally equivalent to Get() followed by
-// bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
-       path := v.blockPath(loc)
-       if _, err := v.stat(path); err != nil {
-               return v.translateError(err)
+       if streamer != nil {
+               // If we're using the streamer (and there's no error
+               // so far) flush any remaining buffered data now that
+               // getFunc has released the serialize lock.
+               if err == nil {
+                       err = streamer.Close()
+               }
+               return streamer.WroteAt(), err
        }
-       return v.getFunc(ctx, path, func(rdr io.Reader) error {
-               return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
-       })
-}
-
-// Put stores a block of data identified by the locator string
-// "loc".  It returns nil on success.  If the volume is full, it
-// returns a FullError.  If the write fails due to some other error,
-// that error is returned.
-func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
-       return putWithPipe(ctx, loc, block, v)
+       return int(n), err
 }
 
-// WriteBlock implements BlockWriter.
-func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
-       if v.ReadOnly {
-               return MethodDisabledError
+// BlockWrite stores a block on the volume. If it already exists, its
+// timestamp is updated.
+func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+       if v.isFull() {
+               return errFull
        }
-       if v.IsFull() {
-               return FullError
-       }
-       bdir := v.blockDir(loc)
+       bdir := v.blockDir(hash)
        if err := os.MkdirAll(bdir, 0755); err != nil {
-               log.Printf("%s: could not create directory %s: %s",
-                       loc, bdir, err)
-               return err
+               return fmt.Errorf("error creating directory %s: %s", bdir, err)
        }
 
-       tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
-       if tmperr != nil {
-               log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
-               return tmperr
+       bpath := v.blockPath(hash)
+       tmpfile, err := v.os.TempFile(bdir, "tmp"+hash)
+       if err != nil {
+               return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, hash, err)
        }
+       defer v.os.Remove(tmpfile.Name())
+       defer tmpfile.Close()
 
-       bpath := v.blockPath(loc)
-
-       if err := v.lock(ctx); err != nil {
+       if err = v.lock(ctx); err != nil {
                return err
        }
        defer v.unlock()
-       n, err := io.Copy(tmpfile, rdr)
+       n, err := tmpfile.Write(data)
        v.os.stats.TickOutBytes(uint64(n))
        if err != nil {
-               log.Printf("%s: writing to %s: %s", v, bpath, err)
-               tmpfile.Close()
-               v.os.Remove(tmpfile.Name())
-               return err
+               return fmt.Errorf("error writing %s: %s", bpath, err)
        }
-       if err := tmpfile.Close(); err != nil {
-               log.Printf("closing %s: %s", tmpfile.Name(), err)
-               v.os.Remove(tmpfile.Name())
-               return err
-       }
-       if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
-               log.Printf("rename %s %s: %s", tmpfile.Name(), bpath, err)
-               return v.os.Remove(tmpfile.Name())
+       if err = tmpfile.Close(); err != nil {
+               return fmt.Errorf("error closing %s: %s", tmpfile.Name(), err)
        }
-       return nil
-}
-
-// Status returns a VolumeStatus struct describing the volume's
-// current state, or nil if an error occurs.
-//
-func (v *UnixVolume) Status() *VolumeStatus {
-       fi, err := v.os.Stat(v.Root)
-       if err != nil {
-               log.Printf("%s: os.Stat: %s", v, err)
-               return nil
-       }
-       devnum := fi.Sys().(*syscall.Stat_t).Dev
-
-       var fs syscall.Statfs_t
-       if err := syscall.Statfs(v.Root, &fs); err != nil {
-               log.Printf("%s: statfs: %s", v, err)
-               return nil
+       // ext4 uses a low-precision clock and effectively backdates
+       // files by up to 10 ms, sometimes across a 1-second boundary,
+       // which produces confusing results in logs and tests.  We
+       // avoid this by setting the output file's timestamps
+       // explicitly, using a higher resolution clock.
+       ts := time.Now()
+       v.os.stats.TickOps("utimes")
+       v.os.stats.Tick(&v.os.stats.UtimesOps)
+       if err = os.Chtimes(tmpfile.Name(), ts, ts); err != nil {
+               return fmt.Errorf("error setting timestamps on %s: %s", tmpfile.Name(), err)
        }
-       // These calculations match the way df calculates disk usage:
-       // "free" space is measured by fs.Bavail, but "used" space
-       // uses fs.Blocks - fs.Bfree.
-       free := fs.Bavail * uint64(fs.Bsize)
-       used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{
-               MountPoint: v.Root,
-               DeviceNum:  devnum,
-               BytesFree:  free,
-               BytesUsed:  used,
+       if err = v.os.Rename(tmpfile.Name(), bpath); err != nil {
+               return fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err)
        }
+       return nil
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-// IndexTo writes (to the given Writer) a list of blocks found on this
-// volume which begin with the specified prefix. If the prefix is an
-// empty string, IndexTo writes a complete list of blocks.
-//
-// Each block is given in the format
-//
-//     locator+size modification-time {newline}
-//
-// e.g.:
-//
-//     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-//     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-//     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-//
-func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
-       var lastErr error
+func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
        rootdir, err := v.os.Open(v.Root)
        if err != nil {
                return err
        }
-       defer rootdir.Close()
        v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
-       for {
-               names, err := rootdir.Readdirnames(1)
-               if err == io.EOF {
-                       return lastErr
-               } else if err != nil {
-                       return err
+       subdirs, err := rootdir.Readdirnames(-1)
+       rootdir.Close()
+       if err != nil {
+               return err
+       }
+       for _, subdir := range subdirs {
+               if ctx.Err() != nil {
+                       return ctx.Err()
                }
-               if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
+               if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) {
                        // prefix excludes all blocks stored in this dir
                        continue
                }
-               if !blockDirRe.MatchString(names[0]) {
-                       continue
-               }
-               blockdirpath := filepath.Join(v.Root, names[0])
-               blockdir, err := v.os.Open(blockdirpath)
-               if err != nil {
-                       log.Print("Error reading ", blockdirpath, ": ", err)
-                       lastErr = err
+               if !blockDirRe.MatchString(subdir) {
                        continue
                }
-               v.os.stats.TickOps("readdir")
-               v.os.stats.Tick(&v.os.stats.ReaddirOps)
-               for {
-                       fileInfo, err := blockdir.Readdir(1)
-                       if err == io.EOF {
+               blockdirpath := filepath.Join(v.Root, subdir)
+
+               var dirents []os.DirEntry
+               for attempt := 0; ; attempt++ {
+                       v.os.stats.TickOps("readdir")
+                       v.os.stats.Tick(&v.os.stats.ReaddirOps)
+                       dirents, err = os.ReadDir(blockdirpath)
+                       if ctx.Err() != nil {
+                               return ctx.Err()
+                       } else if err == nil {
                                break
+                       } else if attempt < 5 && strings.Contains(err.Error(), "errno 523") {
+                               // EBADCOOKIE (NFS stopped accepting
+                               // our readdirent cookie) -- retry a
+                               // few times before giving up
+                               v.logger.WithError(err).Printf("retry after error reading %s", blockdirpath)
+                               continue
+                       } else {
+                               return err
+                       }
+               }
+
+               for _, dirent := range dirents {
+                       if ctx.Err() != nil {
+                               return ctx.Err()
+                       }
+                       fileInfo, err := dirent.Info()
+                       if os.IsNotExist(err) {
+                               // File disappeared between ReadDir() and now
+                               continue
                        } else if err != nil {
-                               log.Print("Error reading ", blockdirpath, ": ", err)
-                               lastErr = err
-                               break
+                               v.logger.WithError(err).Errorf("error getting FileInfo for %q in %q", dirent.Name(), blockdirpath)
+                               return err
                        }
-                       name := fileInfo[0].Name()
+                       name := fileInfo.Name()
                        if !strings.HasPrefix(name, prefix) {
                                continue
                        }
@@ -490,24 +381,22 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        }
                        _, err = fmt.Fprint(w,
                                name,
-                               "+", fileInfo[0].Size(),
-                               " ", fileInfo[0].ModTime().UnixNano(),
+                               "+", fileInfo.Size(),
+                               " ", fileInfo.ModTime().UnixNano(),
                                "\n")
                        if err != nil {
-                               log.Print("Error writing : ", err)
-                               lastErr = err
-                               break
+                               return fmt.Errorf("error writing: %s", err)
                        }
                }
-               blockdir.Close()
        }
+       return nil
 }
 
-// Trash trashes the block data from the unix storage
-// If TrashLifetime == 0, the block is deleted
-// Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + TrashLifetime
-func (v *UnixVolume) Trash(loc string) error {
+// BlockTrash trashes the block data from the unix storage.  If
+// BlobTrashLifetime == 0, the block is deleted; otherwise, the block
+// is renamed as path/{loc}.trash.{deadline}, where deadline = now +
+// BlobTrashLifetime.
+func (v *unixVolume) BlockTrash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
        // and Trash() because either (a) the file will be trashed and Touch()
@@ -515,10 +404,6 @@ func (v *UnixVolume) Trash(loc string) error {
        // be re-written), or (b) Touch() will update the file's timestamp and
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
-
-       if v.ReadOnly {
-               return MethodDisabledError
-       }
        if err := v.lock(context.TODO()); err != nil {
                return err
        }
@@ -541,27 +426,23 @@ func (v *UnixVolume) Trash(loc string) error {
        // anyway (because the permission signatures have expired).
        if fi, err := v.os.Stat(p); err != nil {
                return err
-       } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
+       } else if time.Since(fi.ModTime()) < v.cluster.Collections.BlobSigningTTL.Duration() {
                return nil
        }
 
-       if theConfig.TrashLifetime == 0 {
+       if v.cluster.Collections.BlobTrashLifetime == 0 {
                return v.os.Remove(p)
        }
-       return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
+       return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()))
 }
 
-// Untrash moves block from trash back into store
+// BlockUntrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
-func (v *UnixVolume) Untrash(loc string) (err error) {
-       if v.ReadOnly {
-               return MethodDisabledError
-       }
-
+func (v *unixVolume) BlockUntrash(hash string) error {
        v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
-       files, err := ioutil.ReadDir(v.blockDir(loc))
+       files, err := ioutil.ReadDir(v.blockDir(hash))
        if err != nil {
                return err
        }
@@ -571,11 +452,11 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
        }
 
        foundTrash := false
-       prefix := fmt.Sprintf("%v.trash.", loc)
+       prefix := fmt.Sprintf("%v.trash.", hash)
        for _, f := range files {
                if strings.HasPrefix(f.Name(), prefix) {
                        foundTrash = true
-                       err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+                       err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(hash))
                        if err == nil {
                                break
                        }
@@ -586,25 +467,24 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                return os.ErrNotExist
        }
 
-       return
+       return nil
 }
 
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
-func (v *UnixVolume) blockDir(loc string) string {
+func (v *unixVolume) blockDir(loc string) string {
        return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
 // on this volume.
-func (v *UnixVolume) blockPath(loc string) string {
+func (v *unixVolume) blockPath(loc string) string {
        return filepath.Join(v.blockDir(loc), loc)
 }
 
-// IsFull returns true if the free space on the volume is less than
+// isFull returns true if the free space on the volume is less than
 // MinFreeKilobytes.
-//
-func (v *UnixVolume) IsFull() (isFull bool) {
+func (v *unixVolume) isFull() (isFull bool) {
        fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
@@ -618,9 +498,9 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MinFreeKilobytes
+               isFull = avail < BlockSize
        } else {
-               log.Printf("%s: FreeDiskSpace: %s", v, err)
+               v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v.DeviceID())
                isFull = false
        }
 
@@ -634,48 +514,26 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 
 // FreeDiskSpace returns the number of unused 1k blocks available on
 // the volume.
-//
-func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+func (v *unixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
        err = syscall.Statfs(v.Root, &fs)
        if err == nil {
                // Statfs output is not guaranteed to measure free
                // space in terms of 1K blocks.
-               free = fs.Bavail * uint64(fs.Bsize) / 1024
+               free = fs.Bavail * uint64(fs.Bsize)
        }
        return
 }
 
-func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.Root)
-}
-
-// Writable returns false if all future Put, Mtime, and Delete calls
-// are expected to fail.
-func (v *UnixVolume) Writable() bool {
-       return !v.ReadOnly
-}
-
-// Replication returns the number of replicas promised by the
-// underlying device (as specified in configuration).
-func (v *UnixVolume) Replication() int {
-       return v.DirectoryReplication
-}
-
-// GetStorageClasses implements Volume
-func (v *UnixVolume) GetStorageClasses() []string {
-       return v.StorageClasses
-}
-
 // InternalStats returns I/O and filesystem ops counters.
-func (v *UnixVolume) InternalStats() interface{} {
+func (v *unixVolume) InternalStats() interface{} {
        return &v.os.stats
 }
 
 // lock acquires the serialize lock, if one is in use. If ctx is done
 // before the lock is acquired, lock returns ctx.Err() instead of
 // acquiring the lock.
-func (v *UnixVolume) lock(ctx context.Context) error {
+func (v *unixVolume) lock(ctx context.Context) error {
        if v.locker == nil {
                return nil
        }
@@ -687,7 +545,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
        }()
        select {
        case <-ctx.Done():
-               log.Printf("%s: client hung up while waiting for Serialize lock (%s)", v, time.Since(t0))
+               v.logger.Infof("client hung up while waiting for Serialize lock (%s)", time.Since(t0))
                go func() {
                        <-locked
                        v.locker.Unlock()
@@ -699,7 +557,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
 }
 
 // unlock releases the serialize lock, if one is in use.
-func (v *UnixVolume) unlock() {
+func (v *unixVolume) unlock() {
        if v.locker == nil {
                return
        }
@@ -707,7 +565,7 @@ func (v *UnixVolume) unlock() {
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
-func (v *UnixVolume) lockfile(f *os.File) error {
+func (v *unixVolume) lockfile(f *os.File) error {
        v.os.stats.TickOps("flock")
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
@@ -715,7 +573,7 @@ func (v *UnixVolume) lockfile(f *os.File) error {
        return err
 }
 
-func (v *UnixVolume) unlockfile(f *os.File) error {
+func (v *unixVolume) unlockfile(f *os.File) error {
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
        v.os.stats.TickErr(err)
        return err
@@ -723,7 +581,7 @@ func (v *UnixVolume) unlockfile(f *os.File) error {
 
 // Where appropriate, translate a more specific filesystem error to an
 // error recognized by handlers, like os.ErrNotExist.
-func (v *UnixVolume) translateError(err error) error {
+func (v *unixVolume) translateError(err error) error {
        switch err.(type) {
        case *os.PathError:
                // stat() returns a PathError if the parent directory
@@ -738,7 +596,7 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
-func (v *UnixVolume) EmptyTrash() {
+func (v *unixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int64
 
@@ -752,7 +610,7 @@ func (v *UnixVolume) EmptyTrash() {
                }
                deadline, err := strconv.ParseInt(matches[2], 10, 64)
                if err != nil {
-                       log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: %v: ParseInt(%q) failed", path, matches[2])
                        return
                }
                atomic.AddInt64(&bytesInTrash, info.Size())
@@ -762,7 +620,7 @@ func (v *UnixVolume) EmptyTrash() {
                }
                err = v.os.Remove(path)
                if err != nil {
-                       log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: Remove(%q) failed", path)
                        return
                }
                atomic.AddInt64(&bytesDeleted, info.Size())
@@ -774,8 +632,8 @@ func (v *UnixVolume) EmptyTrash() {
                info os.FileInfo
        }
        var wg sync.WaitGroup
-       todo := make(chan dirent, theConfig.EmptyTrashWorkers)
-       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+       todo := make(chan dirent, v.cluster.Collections.BlobDeleteConcurrency)
+       for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
                wg.Add(1)
                go func() {
                        defer wg.Done()
@@ -787,20 +645,30 @@ func (v *UnixVolume) EmptyTrash() {
 
        err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
                if err != nil {
-                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path)
+                       // Don't give up -- keep walking other
+                       // files/dirs
                        return nil
+               } else if !info.Mode().IsDir() {
+                       todo <- dirent{path, info}
+                       return nil
+               } else if path == v.Root || blockDirRe.MatchString(info.Name()) {
+                       // Descend into a directory that we might have
+                       // put trash in.
+                       return nil
+               } else {
+                       // Don't descend into other dirs.
+                       return filepath.SkipDir
                }
-               todo <- dirent{path, info}
-               return nil
        })
        close(todo)
        wg.Wait()
 
        if err != nil {
-               log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+               v.logger.WithError(err).Error("EmptyTrash failed")
        }
 
-       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+       v.logger.Infof("EmptyTrash stats: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type unixStats struct {