From: Tom Clegg Date: Thu, 3 Sep 2015 04:54:52 +0000 (-0400) Subject: 7121: Replace Get(loc,true) with CompareAndTouch(). Add Compare method to Volume... X-Git-Tag: 1.1.0~1363^2~10 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/7930d7abaabf2fd1f3432eca10f26b821e0ef94f 7121: Replace Get(loc,true) with CompareAndTouch(). Add Compare method to Volume, UnixVolume, MockVolume. --- diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index a656ecfaff..a9bf91e842 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -231,6 +231,9 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { uri: "/" + TEST_HASH, request_body: TEST_BLOCK, }) + defer func(orig bool) { + never_delete = orig + }(never_delete) never_delete = false IssueRequest( &RequestTester{ @@ -246,10 +249,13 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) { } for _, e := range []expect{ {0, "Get", 0}, + {0, "Compare", 0}, {0, "Touch", 0}, {0, "Put", 0}, {0, "Delete", 0}, - {1, "Get", 1}, + {1, "Get", 0}, + {1, "Compare", 1}, + {1, "Touch", 1}, {1, "Put", 1}, {1, "Delete", 1}, } { diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index a86bb6a5b5..8cc7d8b25c 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -8,7 +8,6 @@ package main // StatusHandler (GET /status.json) import ( - "bytes" "container/list" "crypto/md5" "encoding/json" @@ -74,7 +73,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) { } } - block, err := GetBlock(mux.Vars(req)["hash"], false) + block, err := GetBlock(mux.Vars(req)["hash"]) if err != nil { // This type assertion is safe because the only errors // GetBlock can return are DiskHashError or NotFoundError. @@ -442,10 +441,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) { // which volume to check for fetching blocks, storing blocks, etc. // ============================== -// GetBlock fetches and returns the block identified by "hash". If -// the update_timestamp argument is true, GetBlock also updates the -// block's file modification time (for the sake of PutBlock, which -// must update the file's timestamp when the block already exists). +// GetBlock fetches and returns the block identified by "hash". // // On success, GetBlock returns a byte slice with the block data, and // a nil error. @@ -456,22 +452,11 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) { // DiskHashError. // -func GetBlock(hash string, update_timestamp bool) ([]byte, error) { +func GetBlock(hash string) ([]byte, error) { // Attempt to read the requested hash from a keep volume. error_to_caller := NotFoundError - var vols []Volume - if update_timestamp { - // Pointless to find the block on an unwritable volume - // because Touch() will fail -- this is as good as - // "not found" for purposes of callers who need to - // update_timestamp. - vols = KeepVM.AllWritable() - } else { - vols = KeepVM.AllReadable() - } - - for _, vol := range vols { + for _, vol := range KeepVM.AllReadable() { buf, err := vol.Get(hash) if err != nil { // IsNotExist is an expected error and may be @@ -500,15 +485,6 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) { log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned", vol, hash) } - if update_timestamp { - if err := vol.Touch(hash); err != nil { - error_to_caller = GenericError - log.Printf("%s: Touch %s failed: %s", - vol, hash, error_to_caller) - bufs.Put(buf) - continue - } - } return buf, nil } return nil, error_to_caller @@ -548,21 +524,11 @@ func PutBlock(block []byte, hash string) error { return RequestHashError } - // If we already have a block on disk under this identifier, return - // success (but check for MD5 collisions). While fetching the block, - // update its timestamp. - // The only errors that GetBlock can return are DiskHashError and NotFoundError. - // In either case, we want to write our new (good) block to disk, - // so there is nothing special to do if err != nil. - // - if oldblock, err := GetBlock(hash, true); err == nil { - defer bufs.Put(oldblock) - if bytes.Compare(block, oldblock) == 0 { - // The block already exists; return success. - return nil - } else { - return CollisionError - } + // If we already have this data, it's intact on disk, and we + // can update its timestamp, return success. If we have + // different data with the same hash, return failure. + if err := CompareAndTouch(hash, block); err == nil || err == CollisionError { + return err } // Choose a Keep volume to write to. @@ -603,6 +569,35 @@ func PutBlock(block []byte, hash string) error { } } +// CompareAndTouch returns nil if one of the volumes already has the +// given content and it successfully updates the relevant block's +// modification time in order to protect it from premature garbage +// collection. +func CompareAndTouch(hash string, buf []byte) error { + var bestErr error = NotFoundError + for _, vol := range KeepVM.AllWritable() { + if err := vol.Compare(hash, buf); err == CollisionError { + // Stop if we have a block with same hash but + // different content. (It will be impossible + // to tell which one is wanted if we have + // both, so there's no point writing it even + // on a different volume.) + return err + } else if err != nil { + // Couldn't find, couldn't open, etc.: try next volume. + continue + } + if err := vol.Touch(hash); err != nil { + log.Printf("%s: Touch %s failed: %s", vol, hash, err) + bestErr = err + continue + } + // Compare and Touch both worked --> done. + return nil + } + return bestErr +} + var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`) // IsValidLocator diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go index e01b01363d..b89925f5bd 100644 --- a/services/keepstore/keepstore_test.go +++ b/services/keepstore/keepstore_test.go @@ -60,7 +60,7 @@ func TestGetBlock(t *testing.T) { } // Check that GetBlock returns success. - result, err := GetBlock(TEST_HASH, false) + result, err := GetBlock(TEST_HASH) if err != nil { t.Errorf("GetBlock error: %s", err) } @@ -80,7 +80,7 @@ func TestGetBlockMissing(t *testing.T) { defer KeepVM.Close() // Check that GetBlock returns failure. - result, err := GetBlock(TEST_HASH, false) + result, err := GetBlock(TEST_HASH) if err != NotFoundError { t.Errorf("Expected NotFoundError, got %v", result) } @@ -101,7 +101,7 @@ func TestGetBlockCorrupt(t *testing.T) { vols[0].Put(TEST_HASH, BAD_BLOCK) // Check that GetBlock returns failure. - result, err := GetBlock(TEST_HASH, false) + result, err := GetBlock(TEST_HASH) if err != DiskHashError { t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result) } @@ -156,7 +156,7 @@ func TestPutBlockOneVol(t *testing.T) { t.Fatalf("PutBlock: %v", err) } - result, err := GetBlock(TEST_HASH, false) + result, err := GetBlock(TEST_HASH) if err != nil { t.Fatalf("GetBlock: %v", err) } @@ -185,7 +185,7 @@ func TestPutBlockMD5Fail(t *testing.T) { } // Confirm that GetBlock fails to return anything. - if result, err := GetBlock(TEST_HASH, false); err != NotFoundError { + if result, err := GetBlock(TEST_HASH); err != NotFoundError { t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)", string(result), err) } @@ -210,7 +210,7 @@ func TestPutBlockCorrupt(t *testing.T) { } // The block on disk should now match TEST_BLOCK. - if block, err := GetBlock(TEST_HASH, false); err != nil { + if block, err := GetBlock(TEST_HASH); err != nil { t.Errorf("GetBlock: %v", err) } else if bytes.Compare(block, TEST_BLOCK) != 0 { t.Errorf("GetBlock returned: '%s'", string(block)) diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go index 40b291e6f3..a626d9be9b 100644 --- a/services/keepstore/trash_worker_test.go +++ b/services/keepstore/trash_worker_test.go @@ -290,7 +290,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress }) // Verify Locator1 to be un/deleted as expected - data, _ := GetBlock(testData.Locator1, false) + data, _ := GetBlock(testData.Locator1) if testData.ExpectLocator1 { if len(data) == 0 { t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1) @@ -303,7 +303,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { // Verify Locator2 to be un/deleted as expected if testData.Locator1 != testData.Locator2 { - data, _ = GetBlock(testData.Locator2, false) + data, _ = GetBlock(testData.Locator2) if testData.ExpectLocator2 { if len(data) == 0 { t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2) diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index 64fea34bfe..d3616d0812 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -15,7 +15,12 @@ type Volume interface { // put the returned slice back into the buffer pool when it's // finished with it. Get(loc string) ([]byte, error) - Put(loc string, block []byte) error + // Confirm Get() would return buf. If so, return nil. If not, + // return CollisionError or DiskHashError (depending on + // whether the data on disk matches the expected hash), or + // whatever error was encountered opening/reading the file. + Compare(loc string, data []byte) error + Put(loc string, data []byte) error Touch(loc string) error Mtime(loc string) (time.Time, error) IndexTo(prefix string, writer io.Writer) error diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go index c5a7491b3d..cbf6fb8816 100644 --- a/services/keepstore/volume_test.go +++ b/services/keepstore/volume_test.go @@ -1,6 +1,8 @@ package main import ( + "bytes" + "crypto/md5" "errors" "fmt" "io" @@ -71,6 +73,24 @@ func (v *MockVolume) gotCall(method string) { } } +func (v *MockVolume) Compare(loc string, buf []byte) error { + v.gotCall("Compare") + <-v.Gate + if v.Bad { + return errors.New("Bad volume") + } else if block, ok := v.Store[loc]; ok { + if fmt.Sprintf("%x", md5.Sum(block)) != loc { + return DiskHashError + } + if bytes.Compare(buf, block) != 0 { + return CollisionError + } + return nil + } else { + return NotFoundError + } +} + func (v *MockVolume) Get(loc string) ([]byte, error) { v.gotCall("Get") <-v.Gate diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index a7ad6f9e49..2ffa8faa39 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -3,6 +3,7 @@ package main import ( + "bytes" "fmt" "io" "io/ioutil" @@ -57,35 +58,49 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) { } } +// Open the given file, apply the serialize lock if enabled, and call +// the given function if and when the file is ready to read. +func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + if v.serialize { + v.mutex.Lock() + defer v.mutex.Unlock() + } + return fn(f) +} + +// stat is os.Stat() with some extra sanity checks. +func (v *UnixVolume) stat(path string) (os.FileInfo, error) { + stat, err := os.Stat(path) + if err == nil { + if stat.Size() < 0 { + err = os.ErrInvalid + } else if stat.Size() > BLOCKSIZE { + err = TooLongError + } + } + return stat, err +} + // Get retrieves a block identified by the locator string "loc", and // returns its contents as a byte slice. // -// If the block could not be found, opened, or read, Get returns a nil -// slice and whatever non-nil error was returned by Stat or ReadFile. +// Get returns a nil buffer IFF it returns a non-nil error. func (v *UnixVolume) Get(loc string) ([]byte, error) { path := v.blockPath(loc) - stat, err := os.Stat(path) + stat, err := v.stat(path) if err != nil { return nil, err } - if stat.Size() < 0 { - return nil, os.ErrInvalid - } else if stat.Size() == 0 { - return bufs.Get(0), nil - } else if stat.Size() > BLOCKSIZE { - return nil, TooLongError - } - f, err := os.Open(path) - if err != nil { - return nil, err - } - defer f.Close() buf := bufs.Get(int(stat.Size())) - if v.serialize { - v.mutex.Lock() - defer v.mutex.Unlock() - } - _, err = io.ReadFull(f, buf) + err = v.getFunc(path, func(rdr io.Reader) error { + _, err = io.ReadFull(rdr, buf) + return err + }) if err != nil { bufs.Put(buf) return nil, err @@ -93,6 +108,52 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) { return buf, nil } +// Compare returns nil if Get(loc) would return the same content as +// cmp. It is functionally equivalent to Get() followed by +// bytes.Compare(), but uses less memory. +// +// TODO(TC): Before returning CollisionError, compute the MD5 digest +// of the data on disk (i.e., known-to-be-equal data in cmp + +// remaining data on disk) and return DiskHashError instead of +// CollisionError if it doesn't equal loc[:32]. +func (v *UnixVolume) Compare(loc string, cmp []byte) error { + path := v.blockPath(loc) + stat, err := v.stat(path) + if err != nil { + return err + } + bufLen := 1 << 20 + if int64(bufLen) > stat.Size() { + bufLen = int(stat.Size()) + } + buf := make([]byte, bufLen) + return v.getFunc(path, func(rdr io.Reader) error { + // Loop invariants: all data read so far matched what + // we expected, and the first N bytes of cmp are + // expected to equal the next N bytes read from + // reader. + for { + n, err := rdr.Read(buf) + if n > len(cmp) { + // file on disk is too long + return CollisionError + } else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 { + return CollisionError + } + cmp = cmp[n:] + if err == io.EOF { + if len(cmp) != 0 { + // file on disk is too short + return CollisionError + } + return nil + } else if err != nil { + return err + } + } + }) +} + // Put stores a block of data identified by the locator string // "loc". It returns nil on success. If the volume is full, it // returns a FullError. If the write fails due to some other error, diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go index ebb8421d9e..6ccc865b11 100644 --- a/services/keepstore/volume_unix_test.go +++ b/services/keepstore/volume_unix_test.go @@ -2,7 +2,9 @@ package main import ( "bytes" + "errors" "fmt" + "io" "io/ioutil" "os" "regexp" @@ -385,3 +387,83 @@ func TestNodeStatus(t *testing.T) { t.Errorf("uninitialized bytes_used in %v", volinfo) } } + +func TestUnixVolumeGetFuncWorkerError(t *testing.T) { + v := TempUnixVolume(t, false, false) + defer _teardown(v) + + v.Put(TEST_HASH, TEST_BLOCK) + mockErr := errors.New("Mock error") + err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error { + return mockErr + }) + if err != mockErr { + t.Errorf("Got %v, expected %v", err, mockErr) + } +} + +func TestUnixVolumeGetFuncFileError(t *testing.T) { + v := TempUnixVolume(t, false, false) + defer _teardown(v) + + funcCalled := false + err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error { + funcCalled = true + return nil + }) + if err == nil { + t.Errorf("Expected error opening non-existent file") + } + if funcCalled { + t.Errorf("Worker func should not have been called") + } +} + +func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) { + v := TempUnixVolume(t, true, false) + defer _teardown(v) + + v.mutex.Lock() + locked := true + go func() { + // TODO(TC): Don't rely on Sleep. Mock the mutex instead? + time.Sleep(10 * time.Millisecond) + locked = false + v.mutex.Unlock() + }() + v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error { + if locked { + t.Errorf("Worker func called before serialize lock was obtained") + } + return nil + }) +} + +func TestUnixVolumeCompare(t *testing.T) { + v := TempUnixVolume(t, false, false) + defer _teardown(v) + + v.Put(TEST_HASH, TEST_BLOCK) + err := v.Compare(TEST_HASH, TEST_BLOCK) + if err != nil { + t.Errorf("Got err %q, expected nil", err) + } + + err = v.Compare(TEST_HASH, []byte("baddata")) + if err != CollisionError { + t.Errorf("Got err %q, expected %q", err, CollisionError) + } + + _store(t, v, TEST_HASH, []byte("baddata")) + err = v.Compare(TEST_HASH, TEST_BLOCK) + if err != DiskHashError { + t.Errorf("Got err %q, expected %q", err, DiskHashError) + } + + p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH) + os.Chmod(p, 000) + err = v.Compare(TEST_HASH, TEST_BLOCK) + if err == nil || strings.Index(err.Error(), "permission denied") < 0 { + t.Errorf("Got err %q, expected %q", err, "permission denied") + } +}