-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
package main
import (
+ "bytes"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path/filepath"
+ "regexp"
"strconv"
"strings"
"sync"
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- root string // path to the volume's root directory
- serialize bool
- readonly bool
- mutex sync.Mutex
+ // path to the volume's root directory
+ root string
+ // something to lock during IO, typically a sync.Mutex (or nil
+ // to skip locking)
+ locker sync.Locker
+ readonly bool
}
+// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
return err
}
defer f.Close()
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if e := lockfile(f); e != nil {
return e
return syscall.Utime(p, &utime)
}
+// Mtime returns the stored timestamp for the given locator.
func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
p := v.blockPath(loc)
- if fi, err := os.Stat(p); err != nil {
+ fi, err := os.Stat(p)
+ if err != nil {
return time.Time{}, err
- } else {
- return fi.ModTime(), nil
}
+ return fi.ModTime(), nil
+}
+
+// Lock the locker (if one is in use), open the file for reading, and
+// call the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
+ }
+ f, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+ stat, err := os.Stat(path)
+ if err == nil {
+ if stat.Size() < 0 {
+ err = os.ErrInvalid
+ } else if stat.Size() > BlockSize {
+ err = TooLongError
+ }
+ }
+ return stat, err
}
// Get retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
+// Get returns a nil buffer IFF it returns a non-nil error.
func (v *UnixVolume) Get(loc string) ([]byte, error) {
path := v.blockPath(loc)
- stat, err := os.Stat(path)
+ stat, err := v.stat(path)
if err != nil {
return nil, err
}
- if stat.Size() < 0 {
- return nil, os.ErrInvalid
- } else if stat.Size() == 0 {
- return bufs.Get(0), nil
- } else if stat.Size() > BLOCKSIZE {
- return nil, TooLongError
- }
- f, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- defer f.Close()
buf := bufs.Get(int(stat.Size()))
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
- }
- _, err = io.ReadFull(f, buf)
+ err = v.getFunc(path, func(rdr io.Reader) error {
+ _, err = io.ReadFull(rdr, buf)
+ return err
+ })
if err != nil {
bufs.Put(buf)
return nil, err
return buf, nil
}
+// Compare returns nil if Get(loc) would return the same content as
+// expect. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
+ path := v.blockPath(loc)
+ stat, err := v.stat(path)
+ if err != nil {
+ return err
+ }
+ bufLen := 1 << 20
+ if int64(bufLen) > stat.Size() {
+ bufLen = int(stat.Size())
+ if bufLen < 1 {
+ // len(buf)==0 would prevent us from handling
+ // empty files the same way as non-empty
+ // files, because reading 0 bytes at a time
+ // never reaches EOF.
+ bufLen = 1
+ }
+ }
+ cmp := expect
+ buf := make([]byte, bufLen)
+ return v.getFunc(path, func(rdr io.Reader) error {
+ // Loop invariants: all data read so far matched what
+ // we expected, and the first N bytes of cmp are
+ // expected to equal the next N bytes read from
+ // reader.
+ for {
+ n, err := rdr.Read(buf)
+ if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+ return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
+ }
+ cmp = cmp[n:]
+ if err == io.EOF {
+ if len(cmp) != 0 {
+ return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
+ }
+ return nil
+ } else if err != nil {
+ return err
+ }
+ }
+ })
+}
+
// Put stores a block of data identified by the locator string
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
}
bpath := v.blockPath(loc)
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
return &VolumeStatus{v.root, devnum, free, used}
}
+var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
+
// IndexTo writes (to the given Writer) a list of blocks found on this
// volume which begin with the specified prefix. If the prefix is an
// empty string, IndexTo writes a complete list of blocks.
// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
//
func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
- return filepath.Walk(v.root,
- func(path string, info os.FileInfo, err error) error {
- if err != nil {
- log.Printf("%s: IndexTo Walk error at %s: %s",
- v, path, err)
- return nil
- }
- basename := filepath.Base(path)
- if info.IsDir() &&
- !strings.HasPrefix(basename, prefix) &&
- !strings.HasPrefix(prefix, basename) {
- // Skip directories that do not match
- // prefix. We know there is nothing
- // interesting inside.
- return filepath.SkipDir
+ var lastErr error = nil
+ rootdir, err := os.Open(v.root)
+ if err != nil {
+ return err
+ }
+ defer rootdir.Close()
+ for {
+ names, err := rootdir.Readdirnames(1)
+ if err == io.EOF {
+ return lastErr
+ } else if err != nil {
+ return err
+ }
+ if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
+ // prefix excludes all blocks stored in this dir
+ continue
+ }
+ if !blockDirRe.MatchString(names[0]) {
+ continue
+ }
+ blockdirpath := filepath.Join(v.root, names[0])
+ blockdir, err := os.Open(blockdirpath)
+ if err != nil {
+ log.Print("Error reading ", blockdirpath, ": ", err)
+ lastErr = err
+ continue
+ }
+ for {
+ fileInfo, err := blockdir.Readdir(1)
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ log.Print("Error reading ", blockdirpath, ": ", err)
+ lastErr = err
+ break
}
- if info.IsDir() ||
- !IsValidLocator(basename) ||
- !strings.HasPrefix(basename, prefix) {
- return nil
+ name := fileInfo[0].Name()
+ if !strings.HasPrefix(name, prefix) {
+ continue
}
- _, err = fmt.Fprintf(w, "%s+%d %d\n",
- basename, info.Size(), info.ModTime().Unix())
- return err
- })
+ _, err = fmt.Fprint(w,
+ name,
+ "+", fileInfo[0].Size(),
+ " ", fileInfo[0].ModTime().Unix(),
+ "\n")
+ }
+ blockdir.Close()
+ }
}
+// Delete deletes the block data from the unix storage
func (v *UnixVolume) Delete(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
if v.readonly {
return MethodDisabledError
}
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
}
defer unlockfile(f)
- // If the block has been PUT in the last blob_signature_ttl
+ // If the block has been PUT in the last blobSignatureTTL
// seconds, return success without removing the block. This
// protects data from garbage collection until it is no longer
// possible for clients to retrieve the unreferenced blocks
if fi, err := os.Stat(p); err != nil {
return err
} else {
- if time.Since(fi.ModTime()) < blob_signature_ttl {
+ if time.Since(fi.ModTime()) < blobSignatureTTL {
return nil
}
}
}
// IsFull returns true if the free space on the volume is less than
-// MIN_FREE_KILOBYTES.
+// MinFreeKilobytes.
//
func (v *UnixVolume) IsFull() (isFull bool) {
fullSymlink := v.root + "/full"
}
if avail, err := v.FreeDiskSpace(); err == nil {
- isFull = avail < MIN_FREE_KILOBYTES
+ isFull = avail < MinFreeKilobytes
} else {
log.Printf("%s: FreeDiskSpace: %s\n", v, err)
isFull = false
return fmt.Sprintf("[UnixVolume %s]", v.root)
}
+// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
func (v *UnixVolume) Writable() bool {
return !v.readonly
}