-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
package main
import (
+ "bytes"
"fmt"
"io"
"io/ioutil"
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- root string // path to the volume's root directory
- serialize bool
- readonly bool
- mutex sync.Mutex
+ // path to the volume's root directory
+ root string
+ // something to lock during IO, typically a sync.Mutex (or nil
+ // to skip locking)
+ locker sync.Locker
+ readonly bool
}
+// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
return err
}
defer f.Close()
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if e := lockfile(f); e != nil {
return e
return syscall.Utime(p, &utime)
}
+// Mtime returns the stored timestamp for the given locator.
func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
p := v.blockPath(loc)
- if fi, err := os.Stat(p); err != nil {
+ fi, err := os.Stat(p)
+ if err != nil {
return time.Time{}, err
- } else {
- return fi.ModTime(), nil
}
+ return fi.ModTime(), nil
+}
+
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
+ }
+ f, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+ stat, err := os.Stat(path)
+ if err == nil {
+ if stat.Size() < 0 {
+ err = os.ErrInvalid
+ } else if stat.Size() > BlockSize {
+ err = TooLongError
+ }
+ }
+ return stat, err
}
// Get retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
+// Get returns a nil buffer IFF it returns a non-nil error.
func (v *UnixVolume) Get(loc string) ([]byte, error) {
path := v.blockPath(loc)
- stat, err := os.Stat(path)
+ stat, err := v.stat(path)
if err != nil {
return nil, err
}
- if stat.Size() < 0 {
- return nil, os.ErrInvalid
- } else if stat.Size() == 0 {
- return bufs.Get(0), nil
- } else if stat.Size() > BLOCKSIZE {
- return nil, TooLongError
- }
- f, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- defer f.Close()
buf := bufs.Get(int(stat.Size()))
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
- }
- _, err = io.ReadFull(f, buf)
+ err = v.getFunc(path, func(rdr io.Reader) error {
+ _, err = io.ReadFull(rdr, buf)
+ return err
+ })
if err != nil {
bufs.Put(buf)
return nil, err
return buf, nil
}
+// Compare returns nil if Get(loc) would return the same content as
+// expect. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
+ path := v.blockPath(loc)
+ stat, err := v.stat(path)
+ if err != nil {
+ return err
+ }
+ bufLen := 1 << 20
+ if int64(bufLen) > stat.Size() {
+ bufLen = int(stat.Size())
+ if bufLen < 1 {
+ // len(buf)==0 would prevent us from handling
+ // empty files the same way as non-empty
+ // files, because reading 0 bytes at a time
+ // never reaches EOF.
+ bufLen = 1
+ }
+ }
+ cmp := expect
+ buf := make([]byte, bufLen)
+ return v.getFunc(path, func(rdr io.Reader) error {
+ // Loop invariants: all data read so far matched what
+ // we expected, and the first N bytes of cmp are
+ // expected to equal the next N bytes read from
+ // reader.
+ for {
+ n, err := rdr.Read(buf)
+ if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+ return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
+ }
+ cmp = cmp[n:]
+ if err == io.EOF {
+ if len(cmp) != 0 {
+ return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
+ }
+ return nil
+ } else if err != nil {
+ return err
+ }
+ }
+ })
+}
+
// Put stores a block of data identified by the locator string
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
}
bpath := v.blockPath(loc)
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
}
}
+// Delete deletes the block data from the unix storage
func (v *UnixVolume) Delete(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
if v.readonly {
return MethodDisabledError
}
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
}
defer unlockfile(f)
- // If the block has been PUT in the last blob_signature_ttl
+ // If the block has been PUT in the last blobSignatureTTL
// seconds, return success without removing the block. This
// protects data from garbage collection until it is no longer
// possible for clients to retrieve the unreferenced blocks
if fi, err := os.Stat(p); err != nil {
return err
} else {
- if time.Since(fi.ModTime()) < blob_signature_ttl {
+ if time.Since(fi.ModTime()) < blobSignatureTTL {
return nil
}
}
}
// IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
//
func (v *UnixVolume) IsFull() (isFull bool) {
fullSymlink := v.root + "/full"
}
if avail, err := v.FreeDiskSpace(); err == nil {
- isFull = avail < MIN_FREE_KILOBYTES
+ isFull = avail < MinFreeKilobytes
} else {
log.Printf("%s: FreeDiskSpace: %s\n", v, err)
isFull = false
return fmt.Sprintf("[UnixVolume %s]", v.root)
}
+// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
func (v *UnixVolume) Writable() bool {
return !v.readonly
}