3448: check block timestamp before DELETE
authorTim Pierce <twp@curoverse.com>
Thu, 21 Aug 2014 17:50:18 +0000 (13:50 -0400)
committerTim Pierce <twp@curoverse.com>
Thu, 21 Aug 2014 17:55:17 +0000 (13:55 -0400)
volume.Delete locks the target file and checks the timestamp before
proceeding. If the file is newer than permission_ttl specifies, return a
PermissionError. This way, a block that has been marked for recycling by
Data Manager but subsequently was re-added by a user will not be
prematurely deleted.

PutBlock now updates the timestamp on the target block if it already
exists on disk, to prevent DELETE from recycling old blocks that have
just been refreshed.

To update block timestamps, the blob server uses the new volume.Touch
method.  MockVolume and UnixVolume have been updated appropriately.

Refs #3448.

services/keepstore/handlers.go
services/keepstore/volume.go
services/keepstore/volume_unix.go

index 9838694ac83b5d5b1db75e2778f8718fead226af..468eec43cfc5334461e67f195932cf96925dfea3 100644 (file)
@@ -397,6 +397,17 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
        }
 }
 
+// GetBlock, PutBlock and TouchBlock implement lower-level code for
+// handling blocks by rooting through volumes connected to the local
+// machine.  Once the handler has determined that system policy permits
+// the request, it calls these methods to perform the actual operation.
+//
+// TODO(twp): this code would probably be better located in the
+// VolumeManager interface. As an abstraction, the VolumeManager
+// should be the only part of the code that cares about which volume a
+// block is stored on, so it should be responsible for figuring out
+// which volume to check for fetching blocks, storing blocks, etc.
+
 func GetBlock(hash string) ([]byte, error) {
        // Attempt to read the requested hash from a keep volume.
        error_to_caller := NotFoundError
@@ -484,7 +495,16 @@ func PutBlock(block []byte, hash string) error {
        // so there is nothing special to do if err != nil.
        if oldblock, err := GetBlock(hash); err == nil {
                if bytes.Compare(block, oldblock) == 0 {
-                       return nil
+                       // The block already exists; update the timestamp and
+                       // return.
+                       // Note that TouchBlock will fail (and therefore
+                       // so will PutBlock) if the block exists but is found on a
+                       // read-only volume. That is intentional: if the user has
+                       // requested N replicas of a block, we want to be sure that
+                       // there are at least N *writable* replicas, so a block
+                       // that cannot be written to should not count toward the
+                       // replication total.
+                       return TouchBlock(block)
                } else {
                        return CollisionError
                }
@@ -520,6 +540,22 @@ func PutBlock(block []byte, hash string) error {
        }
 }
 
+// TouchBlock finds the block identified by hash and updates its
+// filesystem modification time with the current time.
+func TouchBlock(hash string) error {
+       for _, vol := range KeepVM.Volumes() {
+               err := vol.Touch(hash)
+               if os.IsNotExist(err) {
+                       continue
+               }
+               // either err is nil (meaning success) or some error other
+               // than "file does not exist" (which we should return upward).
+               return err
+       }
+       // If we got here, the block was not found on any volume.
+       return os.ErrNotExist
+}
+
 // IsValidLocator
 //     Return true if the specified string is a valid Keep locator.
 //     When Keep is extended to support hash types other than MD5,
index b15ef23a85a403c83128bfde5ce18037682f86ca..7d791f2be5638dc70c46d28aab4f2ce4984ef862 100644 (file)
@@ -9,11 +9,13 @@ import (
        "fmt"
        "os"
        "strings"
+       "time"
 )
 
 type Volume interface {
        Get(loc string) ([]byte, error)
        Put(loc string, block []byte) error
+       Touch(loc string) error
        Index(prefix string) string
        Delete(loc string) error
        Status() *VolumeStatus
@@ -26,12 +28,17 @@ type Volume interface {
 // on all writes and puts.
 //
 type MockVolume struct {
-       Store map[string][]byte
-       Bad   bool
+       Store      map[string][]byte
+       Timestamps map[string]time.Time
+       Bad        bool
 }
 
 func CreateMockVolume() *MockVolume {
-       return &MockVolume{make(map[string][]byte), false}
+       return &MockVolume{
+               make(map[string][]byte),
+               make(map[string]time.Time),
+               false,
+       }
 }
 
 func (v *MockVolume) Get(loc string) ([]byte, error) {
@@ -48,6 +55,14 @@ func (v *MockVolume) Put(loc string, block []byte) error {
                return errors.New("Bad volume")
        }
        v.Store[loc] = block
+       return Touch(loc)
+}
+
+func (v *MockVolume) Touch(loc string) error {
+       if v.Bad {
+               return errors.New("Bad volume")
+       }
+       v.Timestamps[loc] = time.Now()
        return nil
 }
 
index ddddd5abf901d81a86d5bf0a74cab8bc57081a08..e137045ef683459909f86f16044dfb885158c562 100644 (file)
@@ -97,6 +97,19 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
        return response.err
 }
 
+func (v *UnixVolume) Touch(loc string) error {
+       p := v.blockPath(loc)
+       f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+       if err != nil {
+               return err
+       }
+       lockfile(f)
+       defer unlockfile(f)
+       now := time.Now().Unix()
+       utime := syscall.Utimbuf{now, now}
+       return syscall.Utime(p, &utime)
+}
+
 // Read retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
@@ -230,7 +243,27 @@ func (v *UnixVolume) Index(prefix string) (output string) {
 }
 
 func (v *UnixVolume) Delete(loc string) error {
-       return os.Remove(v.blockPath(loc))
+       p := v.blockPath(loc)
+       f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+       if err != nil {
+               return err
+       }
+       lockfile(f)
+       defer unlockfile(f)
+
+       // Return PermissionError if the block has been PUT more recently
+       // than -permission_ttl.  This guards against a race condition
+       // where a block is old enough that Data Manager has added it to
+       // the trash list, but the user submitted a PUT for the block
+       // since then.
+       if fi, err := os.Stat(p); err != nil {
+               return err
+       } else {
+               if time.Since(fi.ModTime()) < permission_ttl {
+                       return PermissionError
+               }
+       }
+       return os.Remove(p)
 }
 
 // blockDir returns the fully qualified directory name for the directory
@@ -293,3 +326,12 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
+
+// lockfile and unlockfile use flock(2) to manage kernel file locks.
+func lockfile(f os.File) error {
+       return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+}
+
+func unlockfile(f os.File) error {
+       return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+}