7241: Add AzureBlobVolume
[arvados.git] / services / keepstore / volume_unix.go
index 368ddc55f07a9c8f4b78c09ba0cd3917bbdf091a..6c0f5c4e978d995b5c9308f52d5d2414045d64fc 100644 (file)
@@ -18,12 +18,15 @@ import (
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       root      string // path to the volume's root directory
-       serialize bool
-       readonly  bool
-       mutex     sync.Mutex
+       // path to the volume's root directory
+       root string
+       // something to lock during IO, typically a sync.Mutex (or nil
+       // to skip locking)
+       locker   sync.Locker
+       readonly bool
 }
 
+// Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -34,9 +37,9 @@ func (v *UnixVolume) Touch(loc string) error {
                return err
        }
        defer f.Close()
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if e := lockfile(f); e != nil {
                return e
@@ -47,27 +50,28 @@ func (v *UnixVolume) Touch(loc string) error {
        return syscall.Utime(p, &utime)
 }
 
+// Mtime returns the stored timestamp for the given locator.
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
-       if fi, err := os.Stat(p); err != nil {
+       fi, err := os.Stat(p)
+       if err != nil {
                return time.Time{}, err
-       } else {
-               return fi.ModTime(), nil
        }
+       return fi.ModTime(), nil
 }
 
-// Open the given file, apply the serialize lock if enabled, and call
-// the given function if and when the file is ready to read.
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
+       }
        f, err := os.Open(path)
        if err != nil {
                return err
        }
        defer f.Close()
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
-       }
        return fn(f)
 }
 
@@ -77,7 +81,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        if err == nil {
                if stat.Size() < 0 {
                        err = os.ErrInvalid
-               } else if stat.Size() > BLOCKSIZE {
+               } else if stat.Size() > BlockSize {
                        err = TooLongError
                }
        }
@@ -107,7 +111,7 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 }
 
 // Compare returns nil if Get(loc) would return the same content as
-// cmp. It is functionally equivalent to Get() followed by
+// expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
 func (v *UnixVolume) Compare(loc string, expect []byte) error {
        path := v.blockPath(loc)
@@ -118,6 +122,13 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
        bufLen := 1 << 20
        if int64(bufLen) > stat.Size() {
                bufLen = int(stat.Size())
+               if bufLen < 1 {
+                       // len(buf)==0 would prevent us from handling
+                       // empty files the same way as non-empty
+                       // files, because reading 0 bytes at a time
+                       // never reaches EOF.
+                       bufLen = 1
+               }
        }
        cmp := expect
        buf := make([]byte, bufLen)
@@ -169,9 +180,9 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
        }
        bpath := v.blockPath(loc)
 
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
@@ -286,6 +297,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        }
 }
 
+// Delete deletes the block data from the unix storage
 func (v *UnixVolume) Delete(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
@@ -298,9 +310,9 @@ func (v *UnixVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
+       if v.locker != nil {
+               v.locker.Lock()
+               defer v.locker.Unlock()
        }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
@@ -313,7 +325,7 @@ func (v *UnixVolume) Delete(loc string) error {
        }
        defer unlockfile(f)
 
-       // If the block has been PUT in the last blob_signature_ttl
+       // If the block has been PUT in the last blobSignatureTTL
        // seconds, return success without removing the block. This
        // protects data from garbage collection until it is no longer
        // possible for clients to retrieve the unreferenced blocks
@@ -321,7 +333,7 @@ func (v *UnixVolume) Delete(loc string) error {
        if fi, err := os.Stat(p); err != nil {
                return err
        } else {
-               if time.Since(fi.ModTime()) < blob_signature_ttl {
+               if time.Since(fi.ModTime()) < blobSignatureTTL {
                        return nil
                }
        }
@@ -341,7 +353,7 @@ func (v *UnixVolume) blockPath(loc string) string {
 }
 
 // IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
 //
 func (v *UnixVolume) IsFull() (isFull bool) {
        fullSymlink := v.root + "/full"
@@ -357,7 +369,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        }
 
        if avail, err := v.FreeDiskSpace(); err == nil {
-               isFull = avail < MIN_FREE_KILOBYTES
+               isFull = avail < MinFreeKilobytes
        } else {
                log.Printf("%s: FreeDiskSpace: %s\n", v, err)
                isFull = false
@@ -389,6 +401,7 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
+// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
 func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }