}
}
+// GetBlock, PutBlock and TouchBlock implement lower-level code for
+// handling blocks by rooting through volumes connected to the local
+// machine. Once the handler has determined that system policy permits
+// the request, it calls these methods to perform the actual operation.
+//
+// TODO(twp): this code would probably be better located in the
+// VolumeManager interface. As an abstraction, the VolumeManager
+// should be the only part of the code that cares about which volume a
+// block is stored on, so it should be responsible for figuring out
+// which volume to check for fetching blocks, storing blocks, etc.
+
func GetBlock(hash string) ([]byte, error) {
// Attempt to read the requested hash from a keep volume.
error_to_caller := NotFoundError
// so there is nothing special to do if err != nil.
if oldblock, err := GetBlock(hash); err == nil {
if bytes.Compare(block, oldblock) == 0 {
- return nil
+ // The block already exists; update the timestamp and
+ // return.
+ // Note that TouchBlock will fail (and therefore
+ // so will PutBlock) if the block exists but is found on a
+ // read-only volume. That is intentional: if the user has
+ // requested N replicas of a block, we want to be sure that
+ // there are at least N *writable* replicas, so a block
+ // that cannot be written to should not count toward the
+ // replication total.
+ return TouchBlock(block)
} else {
return CollisionError
}
}
}
+// TouchBlock finds the block identified by hash and updates its
+// filesystem modification time with the current time.
+func TouchBlock(hash string) error {
+ for _, vol := range KeepVM.Volumes() {
+ err := vol.Touch(hash)
+ if os.IsNotExist(err) {
+ continue
+ }
+ // either err is nil (meaning success) or some error other
+ // than "file does not exist" (which we should return upward).
+ return err
+ }
+ // If we got here, the block was not found on any volume.
+ return os.ErrNotExist
+}
+
// IsValidLocator
// Return true if the specified string is a valid Keep locator.
// When Keep is extended to support hash types other than MD5,
"fmt"
"os"
"strings"
+ "time"
)
type Volume interface {
Get(loc string) ([]byte, error)
Put(loc string, block []byte) error
+ Touch(loc string) error
Index(prefix string) string
Delete(loc string) error
Status() *VolumeStatus
// on all writes and puts.
//
type MockVolume struct {
- Store map[string][]byte
- Bad bool
+ Store map[string][]byte
+ Timestamps map[string]time.Time
+ Bad bool
}
func CreateMockVolume() *MockVolume {
- return &MockVolume{make(map[string][]byte), false}
+ return &MockVolume{
+ make(map[string][]byte),
+ make(map[string]time.Time),
+ false,
+ }
}
func (v *MockVolume) Get(loc string) ([]byte, error) {
return errors.New("Bad volume")
}
v.Store[loc] = block
+ return Touch(loc)
+}
+
+func (v *MockVolume) Touch(loc string) error {
+ if v.Bad {
+ return errors.New("Bad volume")
+ }
+ v.Timestamps[loc] = time.Now()
return nil
}
return response.err
}
+func (v *UnixVolume) Touch(loc string) error {
+ p := v.blockPath(loc)
+ f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ return err
+ }
+ lockfile(f)
+ defer unlockfile(f)
+ now := time.Now().Unix()
+ utime := syscall.Utimbuf{now, now}
+ return syscall.Utime(p, &utime)
+}
+
// Read retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
}
func (v *UnixVolume) Delete(loc string) error {
- return os.Remove(v.blockPath(loc))
+ p := v.blockPath(loc)
+ f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+ if err != nil {
+ return err
+ }
+ lockfile(f)
+ defer unlockfile(f)
+
+ // Return PermissionError if the block has been PUT more recently
+ // than -permission_ttl. This guards against a race condition
+ // where a block is old enough that Data Manager has added it to
+ // the trash list, but the user submitted a PUT for the block
+ // since then.
+ if fi, err := os.Stat(p); err != nil {
+ return err
+ } else {
+ if time.Since(fi.ModTime()) < permission_ttl {
+ return PermissionError
+ }
+ }
+ return os.Remove(p)
}
// blockDir returns the fully qualified directory name for the directory
func (v *UnixVolume) String() string {
return fmt.Sprintf("[UnixVolume %s]", v.root)
}
+
+// lockfile and unlockfile use flock(2) to manage kernel file locks.
+func lockfile(f os.File) error {
+ return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+}
+
+func unlockfile(f os.File) error {
+ return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+}