7179: Most of golint suggested updates are made. Some names such as "never_delete...
[arvados.git] / services / keepstore / volume_unix.go
index 2ffa8faa39e2193b6e23540e6897c254a6262aa7..f4b49710e9192deabb2119d72ccbb389edcbe7bc 100644 (file)
@@ -1,5 +1,3 @@
-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
 package main
 
 import (
@@ -20,12 +18,15 @@ import (
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       root      string // path to the volume's root directory
-       serialize bool
-       readonly  bool
-       mutex     sync.Mutex
+       // path to the volume's root directory
+       root string
+       // something to lock during IO, typically a sync.Mutex (or nil
+       // to skip locking)
+       locker   sync.Locker
+       readonly bool
 }
 
+// Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -36,9 +37,9 @@ func (v *UnixVolume) Touch(loc string) error {
                return err
        }
        defer f.Close()
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if e := lockfile(f); e != nil {
                return e
@@ -49,27 +50,28 @@ func (v *UnixVolume) Touch(loc string) error {
        return syscall.Utime(p, &utime)
 }
 
+// Mtime returns the stored timestamp for the given locator.
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
-       if fi, err := os.Stat(p); err != nil {
+       fi, err := os.Stat(p)
+       if err != nil {
                return time.Time{}, err
-       } else {
-               return fi.ModTime(), nil
        }
+       return fi.ModTime(), nil
 }
 
-// Open the given file, apply the serialize lock if enabled, and call
-// the given function if and when the file is ready to read.
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
+       }
        f, err := os.Open(path)
        if err != nil {
                return err
        }
        defer f.Close()
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
-       }
        return fn(f)
 }
 
@@ -79,7 +81,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        if err == nil {
                if stat.Size() < 0 {
                        err = os.ErrInvalid
-               } else if stat.Size() > BLOCKSIZE {
+               } else if stat.Size() > BlockSize {
                        err = TooLongError
                }
        }
@@ -109,14 +111,9 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 }
 
 // Compare returns nil if Get(loc) would return the same content as
-// cmp. It is functionally equivalent to Get() followed by
+// expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
-//
-// TODO(TC): Before returning CollisionError, compute the MD5 digest
-// of the data on disk (i.e., known-to-be-equal data in cmp +
-// remaining data on disk) and return DiskHashError instead of
-// CollisionError if it doesn't equal loc[:32].
-func (v *UnixVolume) Compare(loc string, cmp []byte) error {
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
@@ -126,6 +123,7 @@ func (v *UnixVolume) Compare(loc string, cmp []byte) error {
        if int64(bufLen) > stat.Size() {
                bufLen = int(stat.Size())
        }
+       cmp := expect
        buf := make([]byte, bufLen)
        return v.getFunc(path, func(rdr io.Reader) error {
                // Loop invariants: all data read so far matched what
@@ -134,17 +132,13 @@ func (v *UnixVolume) Compare(loc string, cmp []byte) error {
                // reader.
                for {
                        n, err := rdr.Read(buf)
-                       if n > len(cmp) {
-                               // file on disk is too long
-                               return CollisionError
-                       } else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 {
-                               return CollisionError
+                       if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+                               return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
                        }
                        cmp = cmp[n:]
                        if err == io.EOF {
                                if len(cmp) != 0 {
-                                       // file on disk is too short
-                                       return CollisionError
+                                       return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
                                }
                                return nil
                        } else if err != nil {
@@ -179,9 +173,9 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
        }
        bpath := v.blockPath(loc)
 
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
@@ -296,6 +290,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        }
 }
 
+// Delete deletes the block data from the unix storage
 func (v *UnixVolume) Delete(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
@@ -308,9 +303,9 @@ func (v *UnixVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
@@ -351,7 +346,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 }
 
 // IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
        fullSymlink := v.root + "/full"
@@ -367,7 +362,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MIN_FREE_KILOBYTES
+               isFull = avail < MinFreeKilobytes
        } else {
                log.Printf("%s: FreeDiskSpace: %s\n", v, err)
                isFull = false
@@ -399,6 +394,7 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
+// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
 func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }