From: Tom Clegg Date: Fri, 29 Apr 2016 16:55:24 +0000 (-0400) Subject: 9068: Move buffer allocation from volumes to GetBlockHandler. X-Git-Tag: 1.1.0~950^2~2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/959f37498c5f1610612452ab227ba4680b30e8e6 9068: Move buffer allocation from volumes to GetBlockHandler. This makes the Volume interface more idiomatic: Get() accepts a buffer to read into, and returns a number of bytes read, much like the Read() method of an io.Reader. It also makes it possible for GetBlockHandler to notice, while waiting for a buffer, that the client has disconnected: In this case, it releases the network socket and never asks any volumes to do any work. --- diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index f08cebff63..a6b98bd43a 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -139,11 +139,11 @@ func (v *AzureBlobVolume) Check() error { // If the block is younger than azureWriteRaceInterval and is // unexpectedly empty, assume a PutBlob operation is in progress, and // wait for it to finish writing. -func (v *AzureBlobVolume) Get(loc string) ([]byte, error) { +func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) { var deadline time.Time haveDeadline := false - buf, err := v.get(loc) - for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" { + size, err := v.get(loc, buf) + for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" { // Seeing a brand new empty block probably means we're // in a race with CreateBlob, which under the hood // (apparently) does "CreateEmpty" and "CommitData" @@ -163,34 +163,32 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) { } else if time.Now().After(deadline) { break } - bufs.Put(buf) time.Sleep(azureWriteRacePollTime) - buf, err = v.get(loc) + size, err = v.get(loc, buf) } if haveDeadline { - log.Printf("Race ended with len(buf)==%d", len(buf)) + log.Printf("Race ended with size==%d", size) } - return buf, err + return size, err } -func (v *AzureBlobVolume) get(loc string) ([]byte, error) { - expectSize := BlockSize +func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) { + expectSize := len(buf) if azureMaxGetBytes < BlockSize { // Unfortunately the handler doesn't tell us how long the blob // is expected to be, so we have to ask Azure. props, err := v.bsClient.GetBlobProperties(v.containerName, loc) if err != nil { - return nil, v.translateError(err) + return 0, v.translateError(err) } if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 { - return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize) + return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize) } expectSize = int(props.ContentLength) } - buf := bufs.Get(expectSize) if expectSize == 0 { - return buf, nil + return 0, nil } // We'll update this actualSize if/when we get the last piece. @@ -235,11 +233,10 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) { wg.Wait() for _, err := range errors { if err != nil { - bufs.Put(buf) - return nil, v.translateError(err) + return 0, v.translateError(err) } } - return buf[:actualSize], nil + return actualSize, nil } // Compare the given data with existing stored data. diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 439b402214..e3c0e27083 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -425,13 +425,12 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) { if err != nil { t.Error(err) } - gotData, err := v.Get(hash) + gotData := make([]byte, len(data)) + gotLen, err := v.Get(hash, gotData) if err != nil { t.Error(err) } gotHash := fmt.Sprintf("%x", md5.Sum(gotData)) - gotLen := len(gotData) - bufs.Put(gotData) if gotLen != size { t.Error("length mismatch: got %d != %d", gotLen, size) } @@ -477,11 +476,10 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) { // Wait for the stub's Put to create the empty blob v.azHandler.race <- continuePut go func() { - buf, err := v.Get(TestHash) + buf := make([]byte, len(TestBlock)) + _, err := v.Get(TestHash, buf) if err != nil { t.Error(err) - } else { - bufs.Put(buf) } close(allDone) }() @@ -521,15 +519,15 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) { allDone := make(chan struct{}) go func() { defer close(allDone) - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Error(err) return } - if len(buf) != 0 { - t.Errorf("Got %+q, expected empty buf", buf) + if n != 0 { + t.Errorf("Got %+q, expected empty buf", buf[:n]) } - bufs.Put(buf) }() select { case <-allDone: diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 33d585ae1e..7c17424ba5 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -561,7 +561,8 @@ func TestDeleteHandler(t *testing.T) { expectedDc, responseDc) } // Confirm the block has been deleted - _, err := vols[0].Get(TestHash) + buf := make([]byte, BlockSize) + _, err := vols[0].Get(TestHash, buf) var blockDeleted = os.IsNotExist(err) if !blockDeleted { t.Error("superuserExistingBlockReq: block not deleted") @@ -585,7 +586,7 @@ func TestDeleteHandler(t *testing.T) { expectedDc, responseDc) } // Confirm the block has NOT been deleted. - _, err = vols[0].Get(TestHash) + _, err = vols[0].Get(TestHash, buf) if err != nil { t.Errorf("testing delete on new block: %s\n", err) } @@ -913,6 +914,65 @@ func TestPutHandlerNoBufferleak(t *testing.T) { } } +type notifyingResponseRecorder struct { + *httptest.ResponseRecorder + closer chan bool +} + +func (r *notifyingResponseRecorder) CloseNotify() <-chan bool { + return r.closer +} + +func TestGetHandlerClientDisconnect(t *testing.T) { + defer func(was bool) { + enforcePermissions = was + }(enforcePermissions) + enforcePermissions = false + + defer func(orig *bufferPool) { + bufs = orig + }(bufs) + bufs = newBufferPool(1, BlockSize) + defer bufs.Put(bufs.Get(BlockSize)) + + KeepVM = MakeTestVolumeManager(2) + defer KeepVM.Close() + + if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil { + t.Error(err) + } + + resp := ¬ifyingResponseRecorder{ + ResponseRecorder: httptest.NewRecorder(), + closer: make(chan bool, 1), + } + if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok { + t.Fatal("notifyingResponseRecorder is broken") + } + // If anyone asks, the client has disconnected. + resp.closer <- true + + ok := make(chan struct{}) + go func() { + req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil) + (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req) + ok <- struct{}{} + }() + + select { + case <-time.After(20 * time.Second): + t.Fatal("request took >20s, close notifier must be broken") + case <-ok: + } + + ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder) + for i, v := range KeepVM.AllWritable() { + if calls := v.(*MockVolume).called["GET"]; calls != 0 { + t.Errorf("volume %d got %d calls, expected 0", i, calls) + } + } +} + // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource // leak. func TestGetHandlerNoBufferleak(t *testing.T) { diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index a188c47c53..f698982415 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -79,26 +79,39 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) { } } - block, err := GetBlock(mux.Vars(req)["hash"]) + // TODO: Probe volumes to check whether the block _might_ + // exist. Some volumes/types could support a quick existence + // check without causing other operations to suffer. If all + // volumes support that, and assure us the block definitely + // isn't here, we can return 404 now instead of waiting for a + // buffer. + + buf, err := getBufferForResponseWriter(resp, bufs, BlockSize) if err != nil { - // This type assertion is safe because the only errors - // GetBlock can return are DiskHashError or NotFoundError. - http.Error(resp, err.Error(), err.(*KeepError).HTTPCode) + http.Error(resp, err.Error(), http.StatusServiceUnavailable) return } - defer bufs.Put(block) + defer bufs.Put(buf) - resp.Header().Set("Content-Length", strconv.Itoa(len(block))) + size, err := GetBlock(mux.Vars(req)["hash"], buf, resp) + if err != nil { + code := http.StatusInternalServerError + if err, ok := err.(*KeepError); ok { + code = err.HTTPCode + } + http.Error(resp, err.Error(), code) + return + } + + resp.Header().Set("Content-Length", strconv.Itoa(size)) resp.Header().Set("Content-Type", "application/octet-stream") - resp.Write(block) + resp.Write(buf[:size]) } -var errClientDisconnected = fmt.Errorf("client disconnected") - // Get a buffer from the pool -- but give up and return a non-nil // error if resp implements http.CloseNotifier and tells us that the // client has disconnected before we get a buffer. -func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte, error) { +func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) { var closeNotifier <-chan bool if resp, ok := resp.(http.CloseNotifier); ok { closeNotifier = resp.CloseNotify() @@ -119,7 +132,7 @@ func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte, // return it to the pool. bufs.Put(<-bufReady) }() - return nil, errClientDisconnected + return nil, ErrClientDisconnect } } @@ -146,7 +159,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) { return } - buf, err := getBufferForResponseWriter(resp, int(req.ContentLength)) + buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength)) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return @@ -516,7 +529,6 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) { } } -// ============================== // GetBlock and PutBlock implement lower-level code for handling // blocks by rooting through volumes connected to the local machine. // Once the handler has determined that system policy permits the @@ -527,24 +539,21 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) { // should be the only part of the code that cares about which volume a // block is stored on, so it should be responsible for figuring out // which volume to check for fetching blocks, storing blocks, etc. -// ============================== -// GetBlock fetches and returns the block identified by "hash". -// -// On success, GetBlock returns a byte slice with the block data, and -// a nil error. +// GetBlock fetches the block identified by "hash" into the provided +// buf, and returns the data size. // // If the block cannot be found on any volume, returns NotFoundError. // // If the block found does not have the correct MD5 hash, returns // DiskHashError. // -func GetBlock(hash string) ([]byte, error) { +func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) { // Attempt to read the requested hash from a keep volume. errorToCaller := NotFoundError for _, vol := range KeepVM.AllReadable() { - buf, err := vol.Get(hash) + size, err := vol.Get(hash, buf) if err != nil { // IsNotExist is an expected error and may be // ignored. All other errors are logged. In @@ -558,23 +567,22 @@ func GetBlock(hash string) ([]byte, error) { } // Check the file checksum. // - filehash := fmt.Sprintf("%x", md5.Sum(buf)) + filehash := fmt.Sprintf("%x", md5.Sum(buf[:size])) if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. log.Printf("%s: checksum mismatch for request %s (actual %s)", vol, hash, filehash) errorToCaller = DiskHashError - bufs.Put(buf) continue } if errorToCaller == DiskHashError { log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned", vol, hash) } - return buf, nil + return size, nil } - return nil, errorToCaller + return 0, errorToCaller } // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep. diff --git a/services/keepstore/handlers_with_generic_volume_test.go b/services/keepstore/handlers_with_generic_volume_test.go index c5349d399c..dda7edcec3 100644 --- a/services/keepstore/handlers_with_generic_volume_test.go +++ b/services/keepstore/handlers_with_generic_volume_test.go @@ -45,12 +45,13 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t testableVolumes[1].PutRaw(testHash, testBlock) // Get should pass - buf, err := GetBlock(testHash) + buf := make([]byte, len(testBlock)) + n, err := GetBlock(testHash, buf, nil) if err != nil { t.Fatalf("Error while getting block %s", err) } - if bytes.Compare(buf, testBlock) != 0 { - t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testBlock) + if bytes.Compare(buf[:n], testBlock) != 0 { + t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf[:n], testBlock) } } @@ -64,9 +65,10 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory, testableVolumes[1].PutRaw(testHash, badData) // Get should fail - _, err := GetBlock(testHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(testHash, buf, nil) if err == nil { - t.Fatalf("Expected error while getting corrupt block %v", testHash) + t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash) } } @@ -85,11 +87,12 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t } // Check that PutBlock stored the data as expected - buf, err := GetBlock(testHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(testHash, buf, nil) if err != nil { t.Fatalf("Error during GetBlock for %q: %s", testHash, err) - } else if bytes.Compare(buf, testBlock) != 0 { - t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf) + } else if bytes.Compare(buf[:size], testBlock) != 0 { + t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size]) } } @@ -109,10 +112,11 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory, // Put succeeded and overwrote the badData in one volume, // and Get should return the testBlock now, ignoring the bad data. - buf, err := GetBlock(testHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(testHash, buf, nil) if err != nil { t.Fatalf("Error during GetBlock for %q: %s", testHash, err) - } else if bytes.Compare(buf, testBlock) != 0 { - t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf) + } else if bytes.Compare(buf[:size], testBlock) != 0 { + t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size]) } } diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 93ee43c446..80d8670105 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -91,6 +91,7 @@ var ( TooLongError = &KeepError{413, "Block is too large"} MethodDisabledError = &KeepError{405, "Method disabled"} ErrNotImplemented = &KeepError{500, "Unsupported configuration"} + ErrClientDisconnect = &KeepError{503, "Client disconnected"} ) func (e *KeepError) Error() string { diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go index 2a1c3d243a..c0adbc0bd7 100644 --- a/services/keepstore/keepstore_test.go +++ b/services/keepstore/keepstore_test.go @@ -66,12 +66,13 @@ func TestGetBlock(t *testing.T) { } // Check that GetBlock returns success. - result, err := GetBlock(TestHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(TestHash, buf, nil) if err != nil { t.Errorf("GetBlock error: %s", err) } - if fmt.Sprint(result) != fmt.Sprint(TestBlock) { - t.Errorf("expected %s, got %s", TestBlock, result) + if bytes.Compare(buf[:size], TestBlock) != 0 { + t.Errorf("got %v, expected %v", buf[:size], TestBlock) } } @@ -86,9 +87,10 @@ func TestGetBlockMissing(t *testing.T) { defer KeepVM.Close() // Check that GetBlock returns failure. - result, err := GetBlock(TestHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(TestHash, buf, nil) if err != NotFoundError { - t.Errorf("Expected NotFoundError, got %v", result) + t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err) } } @@ -107,9 +109,10 @@ func TestGetBlockCorrupt(t *testing.T) { vols[0].Put(TestHash, BadBlock) // Check that GetBlock returns failure. - result, err := GetBlock(TestHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(TestHash, buf, nil) if err != DiskHashError { - t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result) + t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size]) } } @@ -133,13 +136,14 @@ func TestPutBlockOK(t *testing.T) { } vols := KeepVM.AllReadable() - result, err := vols[1].Get(TestHash) + buf := make([]byte, BlockSize) + n, err := vols[1].Get(TestHash, buf) if err != nil { t.Fatalf("Volume #0 Get returned error: %v", err) } - if string(result) != string(TestBlock) { + if string(buf[:n]) != string(TestBlock) { t.Fatalf("PutBlock stored '%s', Get retrieved '%s'", - string(TestBlock), string(result)) + string(TestBlock), string(buf[:n])) } } @@ -162,14 +166,14 @@ func TestPutBlockOneVol(t *testing.T) { t.Fatalf("PutBlock: n %d err %v", n, err) } - result, err := GetBlock(TestHash) + buf := make([]byte, BlockSize) + size, err := GetBlock(TestHash, buf, nil) if err != nil { t.Fatalf("GetBlock: %v", err) } - if string(result) != string(TestBlock) { - t.Error("PutBlock/GetBlock mismatch") - t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'", - string(TestBlock), string(result)) + if bytes.Compare(buf[:size], TestBlock) != 0 { + t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q", + TestBlock, buf[:size]) } } @@ -191,7 +195,7 @@ func TestPutBlockMD5Fail(t *testing.T) { } // Confirm that GetBlock fails to return anything. - if result, err := GetBlock(TestHash); err != NotFoundError { + if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError { t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)", string(result), err) } @@ -216,10 +220,11 @@ func TestPutBlockCorrupt(t *testing.T) { } // The block on disk should now match TestBlock. - if block, err := GetBlock(TestHash); err != nil { + buf := make([]byte, BlockSize) + if size, err := GetBlock(TestHash, buf, nil); err != nil { t.Errorf("GetBlock: %v", err) - } else if bytes.Compare(block, TestBlock) != 0 { - t.Errorf("GetBlock returned: '%s'", string(block)) + } else if bytes.Compare(buf[:size], TestBlock) != 0 { + t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock) } } @@ -290,12 +295,13 @@ func TestPutBlockTouchFails(t *testing.T) { t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n", oldMtime, newMtime) } - result, err := vols[1].Get(TestHash) + buf := make([]byte, BlockSize) + n, err := vols[1].Get(TestHash, buf) if err != nil { t.Fatalf("vols[1]: %v", err) } - if bytes.Compare(result, TestBlock) != 0 { - t.Errorf("new block does not match test block\nnew block = %v\n", result) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n]) } } diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go index a93b72cf61..8f547a4d4c 100644 --- a/services/keepstore/logging_router.go +++ b/services/keepstore/logging_router.go @@ -19,6 +19,19 @@ type LoggingResponseWriter struct { sentHdr time.Time } +func (w *LoggingResponseWriter) CloseNotify() <-chan bool { + wrapped, ok := w.ResponseWriter.(http.CloseNotifier) + if !ok { + // If upstream doesn't implement CloseNotifier, we can + // satisfy the interface by returning a channel that + // never sends anything (the interface doesn't + // guarantee that anything will ever be sent on the + // channel even if the client disconnects). + return nil + } + return wrapped.CloseNotify() +} + // WriteHeader writes header to ResponseWriter func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) { if loggingWriter.sentHdr == zeroTime { diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go new file mode 100644 index 0000000000..aa88556ae4 --- /dev/null +++ b/services/keepstore/logging_router_test.go @@ -0,0 +1,10 @@ +package main + +import ( + "net/http" + "testing" +) + +func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) { + http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify() +} diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 79a680d58a..d068b2a6e5 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -153,20 +153,18 @@ func (v *S3Volume) Check() error { return nil } -func (v *S3Volume) Get(loc string) ([]byte, error) { +func (v *S3Volume) Get(loc string, buf []byte) (int, error) { rdr, err := v.Bucket.GetReader(loc) if err != nil { - return nil, v.translateError(err) + return 0, v.translateError(err) } defer rdr.Close() - buf := bufs.Get(BlockSize) n, err := io.ReadFull(rdr, buf) switch err { case nil, io.EOF, io.ErrUnexpectedEOF: - return buf[:n], nil + return n, nil default: - bufs.Put(buf) - return nil, v.translateError(err) + return 0, v.translateError(err) } } diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go index ac9406178c..d111caeac8 100644 --- a/services/keepstore/trash_worker_test.go +++ b/services/keepstore/trash_worker_test.go @@ -290,26 +290,27 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress }) // Verify Locator1 to be un/deleted as expected - data, _ := GetBlock(testData.Locator1) + buf := make([]byte, BlockSize) + size, err := GetBlock(testData.Locator1, buf, nil) if testData.ExpectLocator1 { - if len(data) == 0 { + if size == 0 || err != nil { t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1) } } else { - if len(data) > 0 { + if size > 0 || err == nil { t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1) } } // Verify Locator2 to be un/deleted as expected if testData.Locator1 != testData.Locator2 { - data, _ = GetBlock(testData.Locator2) + size, err = GetBlock(testData.Locator2, buf, nil) if testData.ExpectLocator2 { - if len(data) == 0 { + if size == 0 || err != nil { t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2) } } else { - if len(data) > 0 { + if size > 0 || err == nil { t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2) } } @@ -321,7 +322,8 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { if testData.DifferentMtimes { locatorFoundIn := 0 for _, volume := range KeepVM.AllReadable() { - if _, err := volume.Get(testData.Locator1); err == nil { + buf := make([]byte, BlockSize) + if _, err := volume.Get(testData.Locator1, buf); err == nil { locatorFoundIn = locatorFoundIn + 1 } } diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index 17da54fdad..8ae6660fd4 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -10,17 +10,14 @@ import ( // for example, a single mounted disk, a RAID array, an Amazon S3 volume, // etc. type Volume interface { - // Get a block. IFF the returned error is nil, the caller must - // put the returned slice back into the buffer pool when it's - // finished with it. (Otherwise, the buffer pool will be - // depleted and eventually -- when all available buffers are - // used and not returned -- operations will reach deadlock.) + // Get a block: copy the block data into buf, and return the + // number of bytes copied. // // loc is guaranteed to consist of 32 or more lowercase hex // digits. // - // Get should not verify the integrity of the returned data: - // it should just return whatever was found in its backing + // Get should not verify the integrity of the data: it should + // just return whatever was found in its backing // store. (Integrity checking is the caller's responsibility.) // // If an error is encountered that prevents it from @@ -36,10 +33,12 @@ type Volume interface { // access log if the block is not found on any other volumes // either). // - // If the data in the backing store is bigger than BlockSize, - // Get is permitted to return an error without reading any of - // the data. - Get(loc string) ([]byte, error) + // If the data in the backing store is bigger than len(buf), + // then Get is permitted to return an error without reading + // any of the data. + // + // len(buf) will not exceed BlockSize. + Get(loc string, buf []byte) (int, error) // Compare the given data with the stored data (i.e., what Get // would return). If equal, return nil. If not, return diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go index 95166c252f..105795c146 100644 --- a/services/keepstore/volume_generic_test.go +++ b/services/keepstore/volume_generic_test.go @@ -89,14 +89,13 @@ func testGet(t TB, factory TestableVolumeFactory) { v.PutRaw(TestHash, TestBlock) - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Fatal(err) } - bufs.Put(buf) - - if bytes.Compare(buf, TestBlock) != 0 { + if bytes.Compare(buf[:n], TestBlock) != 0 { t.Errorf("expected %s, got %s", string(TestBlock), string(buf)) } } @@ -107,7 +106,8 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) { v := factory(t) defer v.Teardown() - if _, err := v.Get(TestHash2); err == nil { + buf := make([]byte, BlockSize) + if _, err := v.Get(TestHash2, buf); err == nil { t.Errorf("Expected error while getting non-existing block %v", TestHash2) } } @@ -208,24 +208,22 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH v.PutRaw(testHash, testDataA) putErr := v.Put(testHash, testDataB) - buf, getErr := v.Get(testHash) + buf := make([]byte, BlockSize) + n, getErr := v.Get(testHash, buf) if putErr == nil { // Put must not return a nil error unless it has // overwritten the existing data. - if bytes.Compare(buf, testDataB) != 0 { - t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB) + if bytes.Compare(buf[:n], testDataB) != 0 { + t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB) } } else { // It is permissible for Put to fail, but it must // leave us with either the original data, the new // data, or nothing at all. - if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 { - t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB) + if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 { + t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB) } } - if getErr == nil { - bufs.Put(buf) - } } // Put and get multiple blocks @@ -253,34 +251,32 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) { t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err) } - data, err := v.Get(TestHash) + data := make([]byte, BlockSize) + n, err := v.Get(TestHash, data) if err != nil { t.Error(err) } else { - if bytes.Compare(data, TestBlock) != 0 { - t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock) + if bytes.Compare(data[:n], TestBlock) != 0 { + t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock) } - bufs.Put(data) } - data, err = v.Get(TestHash2) + n, err = v.Get(TestHash2, data) if err != nil { t.Error(err) } else { - if bytes.Compare(data, TestBlock2) != 0 { - t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2) + if bytes.Compare(data[:n], TestBlock2) != 0 { + t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2) } - bufs.Put(data) } - data, err = v.Get(TestHash3) + n, err = v.Get(TestHash3, data) if err != nil { t.Error(err) } else { - if bytes.Compare(data, TestBlock3) != 0 { - t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3) + if bytes.Compare(data[:n], TestBlock3) != 0 { + t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3) } - bufs.Put(data) } } @@ -426,14 +422,12 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) { if err := v.Trash(TestHash); err != nil { t.Error(err) } - data, err := v.Get(TestHash) + data := make([]byte, BlockSize) + n, err := v.Get(TestHash, data) if err != nil { t.Error(err) - } else { - if bytes.Compare(data, TestBlock) != 0 { - t.Errorf("Got data %+q, expected %+q", data, TestBlock) - } - bufs.Put(data) + } else if bytes.Compare(data[:n], TestBlock) != 0 { + t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock) } } @@ -455,7 +449,8 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) { if err := v.Trash(TestHash); err != nil { t.Error(err) } - if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) { + data := make([]byte, BlockSize) + if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) { t.Errorf("os.IsNotExist(%v) should have been true", err) } } @@ -514,9 +509,10 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) { } v.PutRaw(TestHash, TestBlock) + buf := make([]byte, BlockSize) // Get from read-only volume should succeed - _, err := v.Get(TestHash) + _, err := v.Get(TestHash, buf) if err != nil { t.Errorf("got err %v, expected nil", err) } @@ -526,7 +522,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) { if err == nil { t.Errorf("Expected error when putting block in a read-only volume") } - _, err = v.Get(TestHash2) + _, err = v.Get(TestHash2, buf) if err == nil { t.Errorf("Expected error when getting block whose put in read-only volume failed") } @@ -561,45 +557,45 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) { v.PutRaw(TestHash3, TestBlock3) sem := make(chan int) - go func(sem chan int) { - buf, err := v.Get(TestHash) + go func() { + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Errorf("err1: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock) != 0 { - t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf)) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n])) } sem <- 1 - }(sem) + }() - go func(sem chan int) { - buf, err := v.Get(TestHash2) + go func() { + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash2, buf) if err != nil { t.Errorf("err2: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock2) != 0 { - t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf)) + if bytes.Compare(buf[:n], TestBlock2) != 0 { + t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n])) } sem <- 1 - }(sem) + }() - go func(sem chan int) { - buf, err := v.Get(TestHash3) + go func() { + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash3, buf) if err != nil { t.Errorf("err3: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock3) != 0 { - t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf)) + if bytes.Compare(buf[:n], TestBlock3) != 0 { + t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n])) } sem <- 1 - }(sem) + }() // Wait for all goroutines to finish - for done := 0; done < 3; { - done += <-sem + for done := 0; done < 3; done++ { + <-sem } } @@ -639,36 +635,34 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) { }(sem) // Wait for all goroutines to finish - for done := 0; done < 3; { - done += <-sem + for done := 0; done < 3; done++ { + <-sem } // Double check that we actually wrote the blocks we expected to write. - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Errorf("Get #1: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock) != 0 { - t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf)) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n])) } - buf, err = v.Get(TestHash2) + n, err = v.Get(TestHash2, buf) if err != nil { t.Errorf("Get #2: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock2) != 0 { - t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf)) + if bytes.Compare(buf[:n], TestBlock2) != 0 { + t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n])) } - buf, err = v.Get(TestHash3) + n, err = v.Get(TestHash3, buf) if err != nil { t.Errorf("Get #3: %v", err) } - bufs.Put(buf) - if bytes.Compare(buf, TestBlock3) != 0 { - t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf)) + if bytes.Compare(buf[:n], TestBlock3) != 0 { + t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n])) } } @@ -689,14 +683,13 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) { if err != nil { t.Fatal(err) } - rdata, err := v.Get(hash) + buf := make([]byte, BlockSize) + n, err := v.Get(hash, buf) if err != nil { t.Error(err) - } else { - defer bufs.Put(rdata) } - if bytes.Compare(rdata, wdata) != 0 { - t.Error("rdata != wdata") + if bytes.Compare(buf[:n], wdata) != 0 { + t.Error("buf %+q != wdata %+q", buf[:n], wdata) } } @@ -717,14 +710,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) { v.PutRaw(TestHash, TestBlock) v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL)) - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { t.Fatal(err) } - if bytes.Compare(buf, TestBlock) != 0 { - t.Errorf("Got data %+q, expected %+q", buf, TestBlock) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock) } - bufs.Put(buf) // Trash err = v.Trash(TestHash) @@ -737,7 +730,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) { t.Error(err) } } else { - _, err = v.Get(TestHash) + _, err = v.Get(TestHash, buf) if err == nil || !os.IsNotExist(err) { t.Errorf("os.IsNotExist(%v) should have been true", err) } @@ -750,14 +743,13 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) { } // Get the block - after trash and untrash sequence - buf, err = v.Get(TestHash) + n, err = v.Get(TestHash, buf) if err != nil { t.Fatal(err) } - if bytes.Compare(buf, TestBlock) != 0 { - t.Errorf("Got data %+q, expected %+q", buf, TestBlock) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock) } - bufs.Put(buf) } func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) { @@ -768,14 +760,14 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) { }(trashLifetime) checkGet := func() error { - buf, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash, buf) if err != nil { return err } - if bytes.Compare(buf, TestBlock) != 0 { - t.Fatalf("Got data %+q, expected %+q", buf, TestBlock) + if bytes.Compare(buf[:n], TestBlock) != 0 { + t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock) } - bufs.Put(buf) return nil } diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go index e8a5a338f5..5671b8d4a9 100644 --- a/services/keepstore/volume_test.go +++ b/services/keepstore/volume_test.go @@ -113,17 +113,16 @@ func (v *MockVolume) Compare(loc string, buf []byte) error { } } -func (v *MockVolume) Get(loc string) ([]byte, error) { +func (v *MockVolume) Get(loc string, buf []byte) (int, error) { v.gotCall("Get") <-v.Gate if v.Bad { - return nil, errors.New("Bad volume") + return 0, errors.New("Bad volume") } else if block, ok := v.Store[loc]; ok { - buf := bufs.Get(len(block)) - copy(buf, block) - return buf, nil + copy(buf[:len(block)], block) + return len(block), nil } - return nil, os.ErrNotExist + return 0, os.ErrNotExist } func (v *MockVolume) Put(loc string, block []byte) error { diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index 996068cf3d..edec048dfe 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -181,26 +181,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) { return stat, err } -// Get retrieves a block identified by the locator string "loc", and -// returns its contents as a byte slice. -// -// Get returns a nil buffer IFF it returns a non-nil error. -func (v *UnixVolume) Get(loc string) ([]byte, error) { +// Get retrieves a block, copies it to the given slice, and returns +// the number of bytes copied. +func (v *UnixVolume) Get(loc string, buf []byte) (int, error) { path := v.blockPath(loc) stat, err := v.stat(path) if err != nil { - return nil, v.translateError(err) + return 0, v.translateError(err) + } + if stat.Size() > int64(len(buf)) { + return 0, TooLongError } - buf := bufs.Get(int(stat.Size())) + var read int + size := int(stat.Size()) err = v.getFunc(path, func(rdr io.Reader) error { - _, err = io.ReadFull(rdr, buf) + read, err = io.ReadFull(rdr, buf[:size]) return err }) - if err != nil { - bufs.Put(buf) - return nil, err - } - return buf, nil + return read, err } // Compare returns nil if Get(loc) would return the same content as diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go index 0775e89ed2..c95538bc4d 100644 --- a/services/keepstore/volume_unix_test.go +++ b/services/keepstore/volume_unix_test.go @@ -106,12 +106,13 @@ func TestGetNotFound(t *testing.T) { defer v.Teardown() v.Put(TestHash, TestBlock) - buf, err := v.Get(TestHash2) + buf := make([]byte, BlockSize) + n, err := v.Get(TestHash2, buf) switch { case os.IsNotExist(err): break case err == nil: - t.Errorf("Read should have failed, returned %s", string(buf)) + t.Errorf("Read should have failed, returned %+q", buf[:n]) default: t.Errorf("Read expected ErrNotExist, got: %s", err) } @@ -151,7 +152,8 @@ func TestUnixVolumeReadonly(t *testing.T) { v.PutRaw(TestHash, TestBlock) - _, err := v.Get(TestHash) + buf := make([]byte, BlockSize) + _, err := v.Get(TestHash, buf) if err != nil { t.Errorf("got err %v, expected nil", err) }