Merge branch '8784-dir-listings'
[arvados.git] / services / keepstore / volume_unix.go
index 561eb41a45427e1a620c8a23ba302027a94b35fb..da9b110c56e80f1c4572aceb09b65c6dd29e8578 100644 (file)
@@ -1,14 +1,18 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
        "bufio"
-       "errors"
+       "context"
        "flag"
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
+       "os/exec"
        "path/filepath"
        "regexp"
        "strconv"
@@ -16,14 +20,21 @@ import (
        "sync"
        "syscall"
        "time"
+
+       log "github.com/Sirupsen/logrus"
 )
 
 type unixVolumeAdder struct {
-       *volumeSet
+       *Config
+}
+
+// String implements flag.Value
+func (s *unixVolumeAdder) String() string {
+       return "-"
 }
 
-func (vs *unixVolumeAdder) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
+func (vs *unixVolumeAdder) Set(path string) error {
+       if dirs := strings.Split(path, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
                        if err := vs.Set(dir); err != nil {
@@ -32,33 +43,19 @@ func (vs *unixVolumeAdder) Set(value string) error {
                }
                return nil
        }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
+       vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
+               Root:      path,
+               ReadOnly:  deprecated.flagReadonly,
+               Serialize: deprecated.flagSerializeIO,
        })
        return nil
 }
 
 func init() {
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &unixVolumeAdder{&volumes},
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+
+       flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
+       flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
 }
 
 // Discover adds a UnixVolume for every directory named "keep" that is
@@ -89,10 +86,10 @@ func (vs *unixVolumeAdder) Discover() int {
                }
                // Set the -readonly flag (but only for this volume)
                // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
+               flagReadonlyWas := deprecated.flagReadonly
                for _, fsopt := range strings.Split(args[3], ",") {
                        if fsopt == "ro" {
-                               flagReadonly = true
+                               deprecated.flagReadonly = true
                                break
                        }
                        if fsopt == "rw" {
@@ -104,49 +101,167 @@ func (vs *unixVolumeAdder) Discover() int {
                } else {
                        added++
                }
-               flagReadonly = flagReadonlyWas
+               deprecated.flagReadonly = flagReadonlyWas
        }
        return added
 }
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       // path to the volume's root directory
-       root string
+       Root                 string // path to the volume's root directory
+       ReadOnly             bool
+       Serialize            bool
+       DirectoryReplication int
+
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
-       locker   sync.Locker
-       readonly bool
+       locker sync.Locker
+
+       os osWithStats
+}
+
+// DeviceID returns a globally unique ID for the volume's root
+// directory, consisting of the filesystem's UUID and the path from
+// filesystem root to storage directory, joined by "/". For example,
+// the DeviceID for a local directory "/mnt/xvda1/keep" might be
+// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
+func (v *UnixVolume) DeviceID() string {
+       giveup := func(f string, args ...interface{}) string {
+               log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
+               return ""
+       }
+       buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
+       if err != nil {
+               return giveup("findmnt: %s (%q)", err, buf)
+       }
+       findmnt := strings.Fields(string(buf))
+       if len(findmnt) < 2 {
+               return giveup("could not parse findmnt output: %q", buf)
+       }
+       fsRoot, dev := findmnt[0], findmnt[1]
+
+       absRoot, err := filepath.Abs(v.Root)
+       if err != nil {
+               return giveup("resolving relative path %q: %s", v.Root, err)
+       }
+       realRoot, err := filepath.EvalSymlinks(absRoot)
+       if err != nil {
+               return giveup("resolving symlinks in %q: %s", absRoot, err)
+       }
+
+       // Find path from filesystem root to realRoot
+       var fsPath string
+       if strings.HasPrefix(realRoot, fsRoot+"/") {
+               fsPath = realRoot[len(fsRoot):]
+       } else if fsRoot == "/" {
+               fsPath = realRoot
+       } else if fsRoot == realRoot {
+               fsPath = ""
+       } else {
+               return giveup("findmnt reports mount point %q which is not a prefix of volume root %q", fsRoot, realRoot)
+       }
+
+       if !strings.HasPrefix(dev, "/") {
+               return giveup("mount %q device %q is not a path", fsRoot, dev)
+       }
+
+       fi, err := os.Stat(dev)
+       if err != nil {
+               return giveup("stat %q: %s\n", dev, err)
+       }
+       ino := fi.Sys().(*syscall.Stat_t).Ino
+
+       // Find a symlink in /dev/disk/by-uuid/ whose target is (i.e.,
+       // has the same inode as) the mounted device
+       udir := "/dev/disk/by-uuid"
+       d, err := os.Open(udir)
+       if err != nil {
+               return giveup("opening %q: %s", udir, err)
+       }
+       uuids, err := d.Readdirnames(0)
+       if err != nil {
+               return giveup("reading %q: %s", udir, err)
+       }
+       for _, uuid := range uuids {
+               link := filepath.Join(udir, uuid)
+               fi, err = os.Stat(link)
+               if err != nil {
+                       log.Printf("error: stat %q: %s", link, err)
+                       continue
+               }
+               if fi.Sys().(*syscall.Stat_t).Ino == ino {
+                       return uuid + fsPath
+               }
+       }
+       return giveup("could not find entry in %q matching %q", udir, dev)
+}
+
+// Examples implements VolumeWithExamples.
+func (*UnixVolume) Examples() []Volume {
+       return []Volume{
+               &UnixVolume{
+                       Root:                 "/mnt/local-disk",
+                       Serialize:            true,
+                       DirectoryReplication: 1,
+               },
+               &UnixVolume{
+                       Root:                 "/mnt/network-disk",
+                       Serialize:            false,
+                       DirectoryReplication: 2,
+               },
+       }
+}
+
+// Type implements Volume
+func (v *UnixVolume) Type() string {
+       return "Directory"
+}
+
+// Start implements Volume
+func (v *UnixVolume) Start() error {
+       if v.Serialize {
+               v.locker = &sync.Mutex{}
+       }
+       if !strings.HasPrefix(v.Root, "/") {
+               return fmt.Errorf("volume root does not start with '/': %q", v.Root)
+       }
+       if v.DirectoryReplication == 0 {
+               v.DirectoryReplication = 1
+       }
+       _, err := v.os.Stat(v.Root)
+       return err
 }
 
 // Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        p := v.blockPath(loc)
-       f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+       f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
                return err
        }
        defer f.Close()
-       if v.locker != nil {
-               v.locker.Lock()
-               defer v.locker.Unlock()
+       if err := v.lock(context.TODO()); err != nil {
+               return err
        }
-       if e := lockfile(f); e != nil {
+       defer v.unlock()
+       if e := v.lockfile(f); e != nil {
                return e
        }
-       defer unlockfile(f)
-       now := time.Now().Unix()
-       utime := syscall.Utimbuf{now, now}
-       return syscall.Utime(p, &utime)
+       defer v.unlockfile(f)
+       ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       v.os.stats.Tick(&v.os.stats.UtimesOps)
+       err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
+       v.os.stats.TickErr(err)
+       return err
 }
 
 // Mtime returns the stored timestamp for the given locator.
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
-       fi, err := os.Stat(p)
+       fi, err := v.os.Stat(p)
        if err != nil {
                return time.Time{}, err
        }
@@ -155,22 +270,22 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
-       if v.locker != nil {
-               v.locker.Lock()
-               defer v.locker.Unlock()
+func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
+       if err := v.lock(ctx); err != nil {
+               return err
        }
-       f, err := os.Open(path)
+       defer v.unlock()
+       f, err := v.os.Open(path)
        if err != nil {
                return err
        }
        defer f.Close()
-       return fn(f)
+       return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
 }
 
 // stat is os.Stat() with some extra sanity checks.
 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
-       stat, err := os.Stat(path)
+       stat, err := v.os.Stat(path)
        if err == nil {
                if stat.Size() < 0 {
                        err = os.ErrInvalid
@@ -181,38 +296,38 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        return stat, err
 }
 
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+       return getWithPipe(ctx, loc, buf, v)
+}
+
+// ReadBlock implements BlockReader.
+func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               return nil, v.translateError(err)
+               return v.translateError(err)
        }
-       buf := bufs.Get(int(stat.Size()))
-       err = v.getFunc(path, func(rdr io.Reader) error {
-               _, err = io.ReadFull(rdr, buf)
+       return v.getFunc(ctx, path, func(rdr io.Reader) error {
+               n, err := io.Copy(w, rdr)
+               if err == nil && n != stat.Size() {
+                       err = io.ErrUnexpectedEOF
+               }
                return err
        })
-       if err != nil {
-               bufs.Put(buf)
-               return nil, err
-       }
-       return buf, nil
 }
 
 // Compare returns nil if Get(loc) would return the same content as
 // expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
        path := v.blockPath(loc)
        if _, err := v.stat(path); err != nil {
                return v.translateError(err)
        }
-       return v.getFunc(path, func(rdr io.Reader) error {
-               return compareReaderWithBuf(rdr, expect, loc[:32])
+       return v.getFunc(ctx, path, func(rdr io.Reader) error {
+               return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
        })
 }
 
@@ -220,8 +335,13 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
-func (v *UnixVolume) Put(loc string, block []byte) error {
-       if v.readonly {
+func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
+       return putWithPipe(ctx, loc, block, v)
+}
+
+// ReadBlock implements BlockWriter.
+func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
+       if v.ReadOnly {
                return MethodDisabledError
        }
        if v.IsFull() {
@@ -234,32 +354,34 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
                return err
        }
 
-       tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
+       tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
        if tmperr != nil {
                log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
                return tmperr
        }
+
        bpath := v.blockPath(loc)
 
-       if v.locker != nil {
-               v.locker.Lock()
-               defer v.locker.Unlock()
+       if err := v.lock(ctx); err != nil {
+               return err
        }
-       if _, err := tmpfile.Write(block); err != nil {
+       defer v.unlock()
+       n, err := io.Copy(tmpfile, rdr)
+       v.os.stats.TickOutBytes(uint64(n))
+       if err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
                tmpfile.Close()
-               os.Remove(tmpfile.Name())
+               v.os.Remove(tmpfile.Name())
                return err
        }
        if err := tmpfile.Close(); err != nil {
                log.Printf("closing %s: %s\n", tmpfile.Name(), err)
-               os.Remove(tmpfile.Name())
+               v.os.Remove(tmpfile.Name())
                return err
        }
-       if err := os.Rename(tmpfile.Name(), bpath); err != nil {
+       if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
                log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
-               os.Remove(tmpfile.Name())
-               return err
+               return v.os.Remove(tmpfile.Name())
        }
        return nil
 }
@@ -268,18 +390,15 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
 // current state, or nil if an error occurs.
 //
 func (v *UnixVolume) Status() *VolumeStatus {
-       var fs syscall.Statfs_t
-       var devnum uint64
-
-       if fi, err := os.Stat(v.root); err == nil {
-               devnum = fi.Sys().(*syscall.Stat_t).Dev
-       } else {
+       fi, err := v.os.Stat(v.Root)
+       if err != nil {
                log.Printf("%s: os.Stat: %s\n", v, err)
                return nil
        }
+       devnum := fi.Sys().(*syscall.Stat_t).Dev
 
-       err := syscall.Statfs(v.root, &fs)
-       if err != nil {
+       var fs syscall.Statfs_t
+       if err := syscall.Statfs(v.Root, &fs); err != nil {
                log.Printf("%s: statfs: %s\n", v, err)
                return nil
        }
@@ -288,7 +407,12 @@ func (v *UnixVolume) Status() *VolumeStatus {
        // uses fs.Blocks - fs.Bfree.
        free := fs.Bavail * uint64(fs.Bsize)
        used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{v.root, devnum, free, used}
+       return &VolumeStatus{
+               MountPoint: v.Root,
+               DeviceNum:  devnum,
+               BytesFree:  free,
+               BytesUsed:  used,
+       }
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
@@ -309,12 +433,13 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
-       var lastErr error = nil
-       rootdir, err := os.Open(v.root)
+       var lastErr error
+       rootdir, err := v.os.Open(v.Root)
        if err != nil {
                return err
        }
        defer rootdir.Close()
+       v.os.stats.Tick(&v.os.stats.ReaddirOps)
        for {
                names, err := rootdir.Readdirnames(1)
                if err == io.EOF {
@@ -329,13 +454,14 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                if !blockDirRe.MatchString(names[0]) {
                        continue
                }
-               blockdirpath := filepath.Join(v.root, names[0])
-               blockdir, err := os.Open(blockdirpath)
+               blockdirpath := filepath.Join(v.Root, names[0])
+               blockdir, err := v.os.Open(blockdirpath)
                if err != nil {
                        log.Print("Error reading ", blockdirpath, ": ", err)
                        lastErr = err
                        continue
                }
+               v.os.stats.Tick(&v.os.stats.ReaddirOps)
                for {
                        fileInfo, err := blockdir.Readdir(1)
                        if err == io.EOF {
@@ -355,7 +481,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
-                               " ", fileInfo[0].ModTime().Unix(),
+                               " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
                }
                blockdir.Close()
@@ -363,9 +489,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 }
 
 // Trash trashes the block data from the unix storage
-// If trashLifetime == 0, the block is deleted
+// If TrashLifetime == 0, the block is deleted
 // Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + trashLifetime
+// where deadline = now + TrashLifetime
 func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
@@ -375,51 +501,50 @@ func (v *UnixVolume) Trash(loc string) error {
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
 
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
-       if v.locker != nil {
-               v.locker.Lock()
-               defer v.locker.Unlock()
+       if err := v.lock(context.TODO()); err != nil {
+               return err
        }
+       defer v.unlock()
        p := v.blockPath(loc)
-       f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+       f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
                return err
        }
        defer f.Close()
-       if e := lockfile(f); e != nil {
+       if e := v.lockfile(f); e != nil {
                return e
        }
-       defer unlockfile(f)
+       defer v.unlockfile(f)
 
        // If the block has been PUT in the last blobSignatureTTL
        // seconds, return success without removing the block. This
        // protects data from garbage collection until it is no longer
        // possible for clients to retrieve the unreferenced blocks
        // anyway (because the permission signatures have expired).
-       if fi, err := os.Stat(p); err != nil {
+       if fi, err := v.os.Stat(p); err != nil {
                return err
-       } else {
-               if time.Since(fi.ModTime()) < blobSignatureTTL {
-                       return nil
-               }
+       } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
+               return nil
        }
 
-       if trashLifetime == 0 {
-               return os.Remove(p)
+       if theConfig.TrashLifetime == 0 {
+               return v.os.Remove(p)
        }
-       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+       return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
 }
 
 // Untrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
 func (v *UnixVolume) Untrash(loc string) (err error) {
-       if v.readonly {
+       if v.ReadOnly {
                return MethodDisabledError
        }
 
+       v.os.stats.Tick(&v.os.stats.ReaddirOps)
        files, err := ioutil.ReadDir(v.blockDir(loc))
        if err != nil {
                return err
@@ -434,7 +559,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
        for _, f := range files {
                if strings.HasPrefix(f.Name(), prefix) {
                        foundTrash = true
-                       err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+                       err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
                        if err == nil {
                                break
                        }
@@ -451,7 +576,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
-       return filepath.Join(v.root, loc[0:3])
+       return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
@@ -464,7 +589,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 // MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
-       fullSymlink := v.root + "/full"
+       fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
        if link, err := os.Readlink(fullSymlink); err == nil {
@@ -496,7 +621,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 //
 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
-       err = syscall.Statfs(v.root, &fs)
+       err = syscall.Statfs(v.Root, &fs)
        if err == nil {
                // Statfs output is not guaranteed to measure free
                // space in terms of 1K blocks.
@@ -506,25 +631,70 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 }
 
 func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.root)
+       return fmt.Sprintf("[UnixVolume %s]", v.Root)
 }
 
-// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
 func (v *UnixVolume) Writable() bool {
-       return !v.readonly
+       return !v.ReadOnly
 }
 
+// Replication returns the number of replicas promised by the
+// underlying device (as specified in configuration).
 func (v *UnixVolume) Replication() int {
-       return 1
+       return v.DirectoryReplication
+}
+
+// InternalStats returns I/O and filesystem ops counters.
+func (v *UnixVolume) InternalStats() interface{} {
+       return &v.os.stats
+}
+
+// lock acquires the serialize lock, if one is in use. If ctx is done
+// before the lock is acquired, lock returns ctx.Err() instead of
+// acquiring the lock.
+func (v *UnixVolume) lock(ctx context.Context) error {
+       if v.locker == nil {
+               return nil
+       }
+       locked := make(chan struct{})
+       go func() {
+               v.locker.Lock()
+               close(locked)
+       }()
+       select {
+       case <-ctx.Done():
+               go func() {
+                       <-locked
+                       v.locker.Unlock()
+               }()
+               return ctx.Err()
+       case <-locked:
+               return nil
+       }
+}
+
+// unlock releases the serialize lock, if one is in use.
+func (v *UnixVolume) unlock() {
+       if v.locker == nil {
+               return
+       }
+       v.locker.Unlock()
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
-func lockfile(f *os.File) error {
-       return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+func (v *UnixVolume) lockfile(f *os.File) error {
+       v.os.stats.Tick(&v.os.stats.FlockOps)
+       err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+       v.os.stats.TickErr(err)
+       return err
 }
 
-func unlockfile(f *os.File) error {
-       return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+func (v *UnixVolume) unlockfile(f *os.File) error {
+       err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+       v.os.stats.TickErr(err)
+       return err
 }
 
 // Where appropriate, translate a more specific filesystem error to an
@@ -540,7 +710,7 @@ func (v *UnixVolume) translateError(err error) error {
        }
 }
 
-var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
@@ -548,14 +718,15 @@ func (v *UnixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int
 
-       err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
                if err != nil {
-                       return err
+                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+                       return nil
                }
                if info.Mode().IsDir() {
                        return nil
                }
-               matches := trashLocRegexp.FindStringSubmatch(path)
+               matches := unixTrashLocRegexp.FindStringSubmatch(path)
                if len(matches) != 3 {
                        return nil
                }
@@ -569,7 +740,7 @@ func (v *UnixVolume) EmptyTrash() {
                if deadline > time.Now().Unix() {
                        return nil
                }
-               err = os.Remove(path)
+               err = v.os.Remove(path)
                if err != nil {
                        log.Printf("EmptyTrash: Remove %v: %v", path, err)
                        return nil
@@ -585,3 +756,68 @@ func (v *UnixVolume) EmptyTrash() {
 
        log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
+
+type unixStats struct {
+       statsTicker
+       OpenOps    uint64
+       StatOps    uint64
+       FlockOps   uint64
+       UtimesOps  uint64
+       CreateOps  uint64
+       RenameOps  uint64
+       UnlinkOps  uint64
+       ReaddirOps uint64
+}
+
+func (s *unixStats) TickErr(err error) {
+       if err == nil {
+               return
+       }
+       s.statsTicker.TickErr(err, fmt.Sprintf("%T", err))
+}
+
+type osWithStats struct {
+       stats unixStats
+}
+
+func (o *osWithStats) Open(name string) (*os.File, error) {
+       o.stats.Tick(&o.stats.OpenOps)
+       f, err := os.Open(name)
+       o.stats.TickErr(err)
+       return f, err
+}
+
+func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+       o.stats.Tick(&o.stats.OpenOps)
+       f, err := os.OpenFile(name, flag, perm)
+       o.stats.TickErr(err)
+       return f, err
+}
+
+func (o *osWithStats) Remove(path string) error {
+       o.stats.Tick(&o.stats.UnlinkOps)
+       err := os.Remove(path)
+       o.stats.TickErr(err)
+       return err
+}
+
+func (o *osWithStats) Rename(a, b string) error {
+       o.stats.Tick(&o.stats.RenameOps)
+       err := os.Rename(a, b)
+       o.stats.TickErr(err)
+       return err
+}
+
+func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+       o.stats.Tick(&o.stats.StatOps)
+       fi, err := os.Stat(path)
+       o.stats.TickErr(err)
+       return fi, err
+}
+
+func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+       o.stats.Tick(&o.stats.CreateOps)
+       f, err := ioutil.TempFile(dir, base)
+       o.stats.TickErr(err)
+       return f, err
+}