7121: Replace Get(loc,true) with CompareAndTouch(). Add Compare method to Volume...
authorTom Clegg <tom@curoverse.com>
Thu, 3 Sep 2015 04:54:52 +0000 (00:54 -0400)
committerTom Clegg <tom@curoverse.com>
Mon, 7 Sep 2015 20:42:36 +0000 (16:42 -0400)
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore_test.go
services/keepstore/trash_worker_test.go
services/keepstore/volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go

index a656ecfaff037bacec05e535c31b3518bc4b2079..a9bf91e842f7178fff900e40fb0c2b75fa4e9fba 100644 (file)
@@ -231,6 +231,9 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
                        uri:          "/" + TEST_HASH,
                        request_body: TEST_BLOCK,
                })
+       defer func(orig bool) {
+               never_delete = orig
+       }(never_delete)
        never_delete = false
        IssueRequest(
                &RequestTester{
@@ -246,10 +249,13 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
        }
        for _, e := range []expect{
                {0, "Get", 0},
+               {0, "Compare", 0},
                {0, "Touch", 0},
                {0, "Put", 0},
                {0, "Delete", 0},
-               {1, "Get", 1},
+               {1, "Get", 0},
+               {1, "Compare", 1},
+               {1, "Touch", 1},
                {1, "Put", 1},
                {1, "Delete", 1},
        } {
index a86bb6a5b552887836e24cb858191bcbe920e479..8cc7d8b25cffa7d21565a450e5c6a4de1475b240 100644 (file)
@@ -8,7 +8,6 @@ package main
 // StatusHandler   (GET /status.json)
 
 import (
-       "bytes"
        "container/list"
        "crypto/md5"
        "encoding/json"
@@ -74,7 +73,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       block, err := GetBlock(mux.Vars(req)["hash"], false)
+       block, err := GetBlock(mux.Vars(req)["hash"])
        if err != nil {
                // This type assertion is safe because the only errors
                // GetBlock can return are DiskHashError or NotFoundError.
@@ -442,10 +441,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // which volume to check for fetching blocks, storing blocks, etc.
 
 // ==============================
-// GetBlock fetches and returns the block identified by "hash".  If
-// the update_timestamp argument is true, GetBlock also updates the
-// block's file modification time (for the sake of PutBlock, which
-// must update the file's timestamp when the block already exists).
+// GetBlock fetches and returns the block identified by "hash".
 //
 // On success, GetBlock returns a byte slice with the block data, and
 // a nil error.
@@ -456,22 +452,11 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // DiskHashError.
 //
 
-func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
+func GetBlock(hash string) ([]byte, error) {
        // Attempt to read the requested hash from a keep volume.
        error_to_caller := NotFoundError
 
-       var vols []Volume
-       if update_timestamp {
-               // Pointless to find the block on an unwritable volume
-               // because Touch() will fail -- this is as good as
-               // "not found" for purposes of callers who need to
-               // update_timestamp.
-               vols = KeepVM.AllWritable()
-       } else {
-               vols = KeepVM.AllReadable()
-       }
-
-       for _, vol := range vols {
+       for _, vol := range KeepVM.AllReadable() {
                buf, err := vol.Get(hash)
                if err != nil {
                        // IsNotExist is an expected error and may be
@@ -500,15 +485,6 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
                        log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
                                vol, hash)
                }
-               if update_timestamp {
-                       if err := vol.Touch(hash); err != nil {
-                               error_to_caller = GenericError
-                               log.Printf("%s: Touch %s failed: %s",
-                                       vol, hash, error_to_caller)
-                               bufs.Put(buf)
-                               continue
-                       }
-               }
                return buf, nil
        }
        return nil, error_to_caller
@@ -548,21 +524,11 @@ func PutBlock(block []byte, hash string) error {
                return RequestHashError
        }
 
-       // If we already have a block on disk under this identifier, return
-       // success (but check for MD5 collisions).  While fetching the block,
-       // update its timestamp.
-       // The only errors that GetBlock can return are DiskHashError and NotFoundError.
-       // In either case, we want to write our new (good) block to disk,
-       // so there is nothing special to do if err != nil.
-       //
-       if oldblock, err := GetBlock(hash, true); err == nil {
-               defer bufs.Put(oldblock)
-               if bytes.Compare(block, oldblock) == 0 {
-                       // The block already exists; return success.
-                       return nil
-               } else {
-                       return CollisionError
-               }
+       // If we already have this data, it's intact on disk, and we
+       // can update its timestamp, return success. If we have
+       // different data with the same hash, return failure.
+       if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+               return err
        }
 
        // Choose a Keep volume to write to.
@@ -603,6 +569,35 @@ func PutBlock(block []byte, hash string) error {
        }
 }
 
+// CompareAndTouch returns nil if one of the volumes already has the
+// given content and it successfully updates the relevant block's
+// modification time in order to protect it from premature garbage
+// collection.
+func CompareAndTouch(hash string, buf []byte) error {
+       var bestErr error = NotFoundError
+       for _, vol := range KeepVM.AllWritable() {
+               if err := vol.Compare(hash, buf); err == CollisionError {
+                       // Stop if we have a block with same hash but
+                       // different content. (It will be impossible
+                       // to tell which one is wanted if we have
+                       // both, so there's no point writing it even
+                       // on a different volume.)
+                       return err
+               } else if err != nil {
+                       // Couldn't find, couldn't open, etc.: try next volume.
+                       continue
+               }
+               if err := vol.Touch(hash); err != nil {
+                       log.Printf("%s: Touch %s failed: %s", vol, hash, err)
+                       bestErr = err
+                       continue
+               }
+               // Compare and Touch both worked --> done.
+               return nil
+       }
+       return bestErr
+}
+
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 // IsValidLocator
index e01b01363d4e2de2f77b854ea5789fb996276234..b89925f5bd20568137b178e9861570c173c8d2f0 100644 (file)
@@ -60,7 +60,7 @@ func TestGetBlock(t *testing.T) {
        }
 
        // Check that GetBlock returns success.
-       result, err := GetBlock(TEST_HASH, false)
+       result, err := GetBlock(TEST_HASH)
        if err != nil {
                t.Errorf("GetBlock error: %s", err)
        }
@@ -80,7 +80,7 @@ func TestGetBlockMissing(t *testing.T) {
        defer KeepVM.Close()
 
        // Check that GetBlock returns failure.
-       result, err := GetBlock(TEST_HASH, false)
+       result, err := GetBlock(TEST_HASH)
        if err != NotFoundError {
                t.Errorf("Expected NotFoundError, got %v", result)
        }
@@ -101,7 +101,7 @@ func TestGetBlockCorrupt(t *testing.T) {
        vols[0].Put(TEST_HASH, BAD_BLOCK)
 
        // Check that GetBlock returns failure.
-       result, err := GetBlock(TEST_HASH, false)
+       result, err := GetBlock(TEST_HASH)
        if err != DiskHashError {
                t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
        }
@@ -156,7 +156,7 @@ func TestPutBlockOneVol(t *testing.T) {
                t.Fatalf("PutBlock: %v", err)
        }
 
-       result, err := GetBlock(TEST_HASH, false)
+       result, err := GetBlock(TEST_HASH)
        if err != nil {
                t.Fatalf("GetBlock: %v", err)
        }
@@ -185,7 +185,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
        }
 
        // Confirm that GetBlock fails to return anything.
-       if result, err := GetBlock(TEST_HASH, false); err != NotFoundError {
+       if result, err := GetBlock(TEST_HASH); err != NotFoundError {
                t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
                        string(result), err)
        }
@@ -210,7 +210,7 @@ func TestPutBlockCorrupt(t *testing.T) {
        }
 
        // The block on disk should now match TEST_BLOCK.
-       if block, err := GetBlock(TEST_HASH, false); err != nil {
+       if block, err := GetBlock(TEST_HASH); err != nil {
                t.Errorf("GetBlock: %v", err)
        } else if bytes.Compare(block, TEST_BLOCK) != 0 {
                t.Errorf("GetBlock returned: '%s'", string(block))
index 40b291e6f3a0d268eef374d43a6f489701d02ab9..a626d9be9b67aff2369d6e4399f90f81b806d99b 100644 (file)
@@ -290,7 +290,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
        // Verify Locator1 to be un/deleted as expected
-       data, _ := GetBlock(testData.Locator1, false)
+       data, _ := GetBlock(testData.Locator1)
        if testData.ExpectLocator1 {
                if len(data) == 0 {
                        t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
@@ -303,7 +303,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
        // Verify Locator2 to be un/deleted as expected
        if testData.Locator1 != testData.Locator2 {
-               data, _ = GetBlock(testData.Locator2, false)
+               data, _ = GetBlock(testData.Locator2)
                if testData.ExpectLocator2 {
                        if len(data) == 0 {
                                t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
index 64fea34bfe1c32ad9b6b6b33a74c82f8b9f0252f..d3616d0812d4081bf3cdd49be3082759661c4ec4 100644 (file)
@@ -15,7 +15,12 @@ type Volume interface {
        // put the returned slice back into the buffer pool when it's
        // finished with it.
        Get(loc string) ([]byte, error)
-       Put(loc string, block []byte) error
+       // Confirm Get() would return buf. If so, return nil. If not,
+       // return CollisionError or DiskHashError (depending on
+       // whether the data on disk matches the expected hash), or
+       // whatever error was encountered opening/reading the file.
+       Compare(loc string, data []byte) error
+       Put(loc string, data []byte) error
        Touch(loc string) error
        Mtime(loc string) (time.Time, error)
        IndexTo(prefix string, writer io.Writer) error
index c5a7491b3d542951983ad67e7eec29baa972f966..cbf6fb881679f92a4e25dd9cfa08bd26c1ae3df2 100644 (file)
@@ -1,6 +1,8 @@
 package main
 
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
        "fmt"
        "io"
@@ -71,6 +73,24 @@ func (v *MockVolume) gotCall(method string) {
        }
 }
 
+func (v *MockVolume) Compare(loc string, buf []byte) error {
+       v.gotCall("Compare")
+       <-v.Gate
+       if v.Bad {
+               return errors.New("Bad volume")
+       } else if block, ok := v.Store[loc]; ok {
+               if fmt.Sprintf("%x", md5.Sum(block)) != loc {
+                       return DiskHashError
+               }
+               if bytes.Compare(buf, block) != 0 {
+                       return CollisionError
+               }
+               return nil
+       } else {
+               return NotFoundError
+       }
+}
+
 func (v *MockVolume) Get(loc string) ([]byte, error) {
        v.gotCall("Get")
        <-v.Gate
index a7ad6f9e499c80439c27cb1beed33060674ed776..2ffa8faa39e2193b6e23540e6897c254a6262aa7 100644 (file)
@@ -3,6 +3,7 @@
 package main
 
 import (
+       "bytes"
        "fmt"
        "io"
        "io/ioutil"
@@ -57,35 +58,49 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        }
 }
 
+// Open the given file, apply the serialize lock if enabled, and call
+// the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+       f, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
+       return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+       stat, err := os.Stat(path)
+       if err == nil {
+               if stat.Size() < 0 {
+                       err = os.ErrInvalid
+               } else if stat.Size() > BLOCKSIZE {
+                       err = TooLongError
+               }
+       }
+       return stat, err
+}
+
 // Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
+// Get returns a nil buffer IFF it returns a non-nil error.
 func (v *UnixVolume) Get(loc string) ([]byte, error) {
        path := v.blockPath(loc)
-       stat, err := os.Stat(path)
+       stat, err := v.stat(path)
        if err != nil {
                return nil, err
        }
-       if stat.Size() < 0 {
-               return nil, os.ErrInvalid
-       } else if stat.Size() == 0 {
-               return bufs.Get(0), nil
-       } else if stat.Size() > BLOCKSIZE {
-               return nil, TooLongError
-       }
-       f, err := os.Open(path)
-       if err != nil {
-               return nil, err
-       }
-       defer f.Close()
        buf := bufs.Get(int(stat.Size()))
-       if v.serialize {
-               v.mutex.Lock()
-               defer v.mutex.Unlock()
-       }
-       _, err = io.ReadFull(f, buf)
+       err = v.getFunc(path, func(rdr io.Reader) error {
+               _, err = io.ReadFull(rdr, buf)
+               return err
+       })
        if err != nil {
                bufs.Put(buf)
                return nil, err
@@ -93,6 +108,52 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
        return buf, nil
 }
 
+// Compare returns nil if Get(loc) would return the same content as
+// cmp. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+//
+// TODO(TC): Before returning CollisionError, compute the MD5 digest
+// of the data on disk (i.e., known-to-be-equal data in cmp +
+// remaining data on disk) and return DiskHashError instead of
+// CollisionError if it doesn't equal loc[:32].
+func (v *UnixVolume) Compare(loc string, cmp []byte) error {
+       path := v.blockPath(loc)
+       stat, err := v.stat(path)
+       if err != nil {
+               return err
+       }
+       bufLen := 1 << 20
+       if int64(bufLen) > stat.Size() {
+               bufLen = int(stat.Size())
+       }
+       buf := make([]byte, bufLen)
+       return v.getFunc(path, func(rdr io.Reader) error {
+               // Loop invariants: all data read so far matched what
+               // we expected, and the first N bytes of cmp are
+               // expected to equal the next N bytes read from
+               // reader.
+               for {
+                       n, err := rdr.Read(buf)
+                       if n > len(cmp) {
+                               // file on disk is too long
+                               return CollisionError
+                       } else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 {
+                               return CollisionError
+                       }
+                       cmp = cmp[n:]
+                       if err == io.EOF {
+                               if len(cmp) != 0 {
+                                       // file on disk is too short
+                                       return CollisionError
+                               }
+                               return nil
+                       } else if err != nil {
+                               return err
+                       }
+               }
+       })
+}
+
 // Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
index ebb8421d9e1d3dbf6b8fce1f3fca0953568de931..6ccc865b113d703581dc631ed7810dbeaf973bd9 100644 (file)
@@ -2,7 +2,9 @@ package main
 
 import (
        "bytes"
+       "errors"
        "fmt"
+       "io"
        "io/ioutil"
        "os"
        "regexp"
@@ -385,3 +387,83 @@ func TestNodeStatus(t *testing.T) {
                t.Errorf("uninitialized bytes_used in %v", volinfo)
        }
 }
+
+func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
+       v := TempUnixVolume(t, false, false)
+       defer _teardown(v)
+
+       v.Put(TEST_HASH, TEST_BLOCK)
+       mockErr := errors.New("Mock error")
+       err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+               return mockErr
+       })
+       if err != mockErr {
+               t.Errorf("Got %v, expected %v", err, mockErr)
+       }
+}
+
+func TestUnixVolumeGetFuncFileError(t *testing.T) {
+       v := TempUnixVolume(t, false, false)
+       defer _teardown(v)
+
+       funcCalled := false
+       err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+               funcCalled = true
+               return nil
+       })
+       if err == nil {
+               t.Errorf("Expected error opening non-existent file")
+       }
+       if funcCalled {
+               t.Errorf("Worker func should not have been called")
+       }
+}
+
+func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
+       v := TempUnixVolume(t, true, false)
+       defer _teardown(v)
+
+       v.mutex.Lock()
+       locked := true
+       go func() {
+               // TODO(TC): Don't rely on Sleep. Mock the mutex instead?
+               time.Sleep(10 * time.Millisecond)
+               locked = false
+               v.mutex.Unlock()
+       }()
+       v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+               if locked {
+                       t.Errorf("Worker func called before serialize lock was obtained")
+               }
+               return nil
+       })
+}
+
+func TestUnixVolumeCompare(t *testing.T) {
+       v := TempUnixVolume(t, false, false)
+       defer _teardown(v)
+
+       v.Put(TEST_HASH, TEST_BLOCK)
+       err := v.Compare(TEST_HASH, TEST_BLOCK)
+       if err != nil {
+               t.Errorf("Got err %q, expected nil", err)
+       }
+
+       err = v.Compare(TEST_HASH, []byte("baddata"))
+       if err != CollisionError {
+               t.Errorf("Got err %q, expected %q", err, CollisionError)
+       }
+
+       _store(t, v, TEST_HASH, []byte("baddata"))
+       err = v.Compare(TEST_HASH, TEST_BLOCK)
+       if err != DiskHashError {
+               t.Errorf("Got err %q, expected %q", err, DiskHashError)
+       }
+
+       p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+       os.Chmod(p, 000)
+       err = v.Compare(TEST_HASH, TEST_BLOCK)
+       if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
+               t.Errorf("Got err %q, expected %q", err, "permission denied")
+       }
+}