9068: Move buffer allocation from volumes to GetBlockHandler.
authorTom Clegg <tom@curoverse.com>
Fri, 29 Apr 2016 16:55:24 +0000 (12:55 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 4 May 2016 15:10:22 +0000 (11:10 -0400)
This makes the Volume interface more idiomatic: Get() accepts a buffer
to read into, and returns a number of bytes read, much like the Read()
method of an io.Reader.

It also makes it possible for GetBlockHandler to notice, while waiting
for a buffer, that the client has disconnected: In this case, it
releases the network socket and never asks any volumes to do any work.

16 files changed:
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/handlers_with_generic_volume_test.go
services/keepstore/keepstore.go
services/keepstore/keepstore_test.go
services/keepstore/logging_router.go
services/keepstore/logging_router_test.go [new file with mode: 0644]
services/keepstore/s3_volume.go
services/keepstore/trash_worker_test.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go

index f08cebff63c65dc5cbbd941407602c262779708a..a6b98bd43aa9ce9ee691820695905ca27901d90f 100644 (file)
@@ -139,11 +139,11 @@ func (v *AzureBlobVolume) Check() error {
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
        var deadline time.Time
        haveDeadline := false
-       buf, err := v.get(loc)
-       for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
+       size, err := v.get(loc, buf)
+       for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
                // Seeing a brand new empty block probably means we're
                // in a race with CreateBlob, which under the hood
                // (apparently) does "CreateEmpty" and "CommitData"
@@ -163,34 +163,32 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
                } else if time.Now().After(deadline) {
                        break
                }
-               bufs.Put(buf)
                time.Sleep(azureWriteRacePollTime)
-               buf, err = v.get(loc)
+               size, err = v.get(loc, buf)
        }
        if haveDeadline {
-               log.Printf("Race ended with len(buf)==%d", len(buf))
+               log.Printf("Race ended with size==%d", size)
        }
-       return buf, err
+       return size, err
 }
 
-func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
-       expectSize := BlockSize
+func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
+       expectSize := len(buf)
        if azureMaxGetBytes < BlockSize {
                // Unfortunately the handler doesn't tell us how long the blob
                // is expected to be, so we have to ask Azure.
                props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
                if err != nil {
-                       return nil, v.translateError(err)
+                       return 0, v.translateError(err)
                }
                if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
-                       return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+                       return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
                }
                expectSize = int(props.ContentLength)
        }
 
-       buf := bufs.Get(expectSize)
        if expectSize == 0 {
-               return buf, nil
+               return 0, nil
        }
 
        // We'll update this actualSize if/when we get the last piece.
@@ -235,11 +233,10 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
        wg.Wait()
        for _, err := range errors {
                if err != nil {
-                       bufs.Put(buf)
-                       return nil, v.translateError(err)
+                       return 0, v.translateError(err)
                }
        }
-       return buf[:actualSize], nil
+       return actualSize, nil
 }
 
 // Compare the given data with existing stored data.
index 439b40221465ada53c805c7b7afb47ba974652a9..e3c0e27083245f2d7cbead33f915f9365774cc02 100644 (file)
@@ -425,13 +425,12 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
                if err != nil {
                        t.Error(err)
                }
-               gotData, err := v.Get(hash)
+               gotData := make([]byte, len(data))
+               gotLen, err := v.Get(hash, gotData)
                if err != nil {
                        t.Error(err)
                }
                gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
-               gotLen := len(gotData)
-               bufs.Put(gotData)
                if gotLen != size {
                        t.Error("length mismatch: got %d != %d", gotLen, size)
                }
@@ -477,11 +476,10 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
        // Wait for the stub's Put to create the empty blob
        v.azHandler.race <- continuePut
        go func() {
-               buf, err := v.Get(TestHash)
+               buf := make([]byte, len(TestBlock))
+               _, err := v.Get(TestHash, buf)
                if err != nil {
                        t.Error(err)
-               } else {
-                       bufs.Put(buf)
                }
                close(allDone)
        }()
@@ -521,15 +519,15 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
        allDone := make(chan struct{})
        go func() {
                defer close(allDone)
-               buf, err := v.Get(TestHash)
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash, buf)
                if err != nil {
                        t.Error(err)
                        return
                }
-               if len(buf) != 0 {
-                       t.Errorf("Got %+q, expected empty buf", buf)
+               if n != 0 {
+                       t.Errorf("Got %+q, expected empty buf", buf[:n])
                }
-               bufs.Put(buf)
        }()
        select {
        case <-allDone:
index 33d585ae1e69f18868f7eb57278637c7f855761b..7c17424ba568227790469e4e32867f33fea8ff4e 100644 (file)
@@ -561,7 +561,8 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has been deleted
-       _, err := vols[0].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       _, err := vols[0].Get(TestHash, buf)
        var blockDeleted = os.IsNotExist(err)
        if !blockDeleted {
                t.Error("superuserExistingBlockReq: block not deleted")
@@ -585,7 +586,7 @@ func TestDeleteHandler(t *testing.T) {
                        expectedDc, responseDc)
        }
        // Confirm the block has NOT been deleted.
-       _, err = vols[0].Get(TestHash)
+       _, err = vols[0].Get(TestHash, buf)
        if err != nil {
                t.Errorf("testing delete on new block: %s\n", err)
        }
@@ -913,6 +914,65 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
        }
 }
 
+type notifyingResponseRecorder struct {
+       *httptest.ResponseRecorder
+       closer chan bool
+}
+
+func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
+       return r.closer
+}
+
+func TestGetHandlerClientDisconnect(t *testing.T) {
+       defer func(was bool) {
+               enforcePermissions = was
+       }(enforcePermissions)
+       enforcePermissions = false
+
+       defer func(orig *bufferPool) {
+               bufs = orig
+       }(bufs)
+       bufs = newBufferPool(1, BlockSize)
+       defer bufs.Put(bufs.Get(BlockSize))
+
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+
+       if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+               t.Error(err)
+       }
+
+       resp := &notifyingResponseRecorder{
+               ResponseRecorder: httptest.NewRecorder(),
+               closer:           make(chan bool, 1),
+       }
+       if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
+               t.Fatal("notifyingResponseRecorder is broken")
+       }
+       // If anyone asks, the client has disconnected.
+       resp.closer <- true
+
+       ok := make(chan struct{})
+       go func() {
+               req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+               ok <- struct{}{}
+       }()
+
+       select {
+       case <-time.After(20 * time.Second):
+               t.Fatal("request took >20s, close notifier must be broken")
+       case <-ok:
+       }
+
+       ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+       for i, v := range KeepVM.AllWritable() {
+               if calls := v.(*MockVolume).called["GET"]; calls != 0 {
+                       t.Errorf("volume %d got %d calls, expected 0", i, calls)
+               }
+       }
+}
+
 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
 // leak.
 func TestGetHandlerNoBufferleak(t *testing.T) {
index a188c47c53451ae9d3f7c3f9dbf64f6b03377d92..f698982415aae5bd7d8a341428acb2d8bdb57317 100644 (file)
@@ -79,26 +79,39 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       block, err := GetBlock(mux.Vars(req)["hash"])
+       // TODO: Probe volumes to check whether the block _might_
+       // exist. Some volumes/types could support a quick existence
+       // check without causing other operations to suffer. If all
+       // volumes support that, and assure us the block definitely
+       // isn't here, we can return 404 now instead of waiting for a
+       // buffer.
+
+       buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
        if err != nil {
-               // This type assertion is safe because the only errors
-               // GetBlock can return are DiskHashError or NotFoundError.
-               http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
+               http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
        }
-       defer bufs.Put(block)
+       defer bufs.Put(buf)
 
-       resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+       size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+       if err != nil {
+               code := http.StatusInternalServerError
+               if err, ok := err.(*KeepError); ok {
+                       code = err.HTTPCode
+               }
+               http.Error(resp, err.Error(), code)
+               return
+       }
+
+       resp.Header().Set("Content-Length", strconv.Itoa(size))
        resp.Header().Set("Content-Type", "application/octet-stream")
-       resp.Write(block)
+       resp.Write(buf[:size])
 }
 
-var errClientDisconnected = fmt.Errorf("client disconnected")
-
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if resp implements http.CloseNotifier and tells us that the
 // client has disconnected before we get a buffer.
-func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte, error) {
+func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
        var closeNotifier <-chan bool
        if resp, ok := resp.(http.CloseNotifier); ok {
                closeNotifier = resp.CloseNotify()
@@ -119,7 +132,7 @@ func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte,
                        // return it to the pool.
                        bufs.Put(<-bufReady)
                }()
-               return nil, errClientDisconnected
+               return nil, ErrClientDisconnect
        }
 }
 
@@ -146,7 +159,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       buf, err := getBufferForResponseWriter(resp, int(req.ContentLength))
+       buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -516,7 +529,6 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
        }
 }
 
-// ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
 // Once the handler has determined that system policy permits the
@@ -527,24 +539,21 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 // should be the only part of the code that cares about which volume a
 // block is stored on, so it should be responsible for figuring out
 // which volume to check for fetching blocks, storing blocks, etc.
-// ==============================
 
-// GetBlock fetches and returns the block identified by "hash".
-//
-// On success, GetBlock returns a byte slice with the block data, and
-// a nil error.
+// GetBlock fetches the block identified by "hash" into the provided
+// buf, and returns the data size.
 //
 // If the block cannot be found on any volume, returns NotFoundError.
 //
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
 //
-func GetBlock(hash string) ([]byte, error) {
+func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
        // Attempt to read the requested hash from a keep volume.
        errorToCaller := NotFoundError
 
        for _, vol := range KeepVM.AllReadable() {
-               buf, err := vol.Get(hash)
+               size, err := vol.Get(hash, buf)
                if err != nil {
                        // IsNotExist is an expected error and may be
                        // ignored. All other errors are logged. In
@@ -558,23 +567,22 @@ func GetBlock(hash string) ([]byte, error) {
                }
                // Check the file checksum.
                //
-               filehash := fmt.Sprintf("%x", md5.Sum(buf))
+               filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
                        log.Printf("%s: checksum mismatch for request %s (actual %s)",
                                vol, hash, filehash)
                        errorToCaller = DiskHashError
-                       bufs.Put(buf)
                        continue
                }
                if errorToCaller == DiskHashError {
                        log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
                                vol, hash)
                }
-               return buf, nil
+               return size, nil
        }
-       return nil, errorToCaller
+       return 0, errorToCaller
 }
 
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
index c5349d399c32ebc5692d0a39d4cc8c9c3ad83e1a..dda7edcec3509e683465a93d5eb775bf18f16d19 100644 (file)
@@ -45,12 +45,13 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
        testableVolumes[1].PutRaw(testHash, testBlock)
 
        // Get should pass
-       buf, err := GetBlock(testHash)
+       buf := make([]byte, len(testBlock))
+       n, err := GetBlock(testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error while getting block %s", err)
        }
-       if bytes.Compare(buf, testBlock) != 0 {
-               t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testBlock)
+       if bytes.Compare(buf[:n], testBlock) != 0 {
+               t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf[:n], testBlock)
        }
 }
 
@@ -64,9 +65,10 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory,
        testableVolumes[1].PutRaw(testHash, badData)
 
        // Get should fail
-       _, err := GetBlock(testHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(testHash, buf, nil)
        if err == nil {
-               t.Fatalf("Expected error while getting corrupt block %v", testHash)
+               t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
        }
 }
 
@@ -85,11 +87,12 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
        }
 
        // Check that PutBlock stored the data as expected
-       buf, err := GetBlock(testHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
-       } else if bytes.Compare(buf, testBlock) != 0 {
-               t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+       } else if bytes.Compare(buf[:size], testBlock) != 0 {
+               t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
        }
 }
 
@@ -109,10 +112,11 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
 
        // Put succeeded and overwrote the badData in one volume,
        // and Get should return the testBlock now, ignoring the bad data.
-       buf, err := GetBlock(testHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(testHash, buf, nil)
        if err != nil {
                t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
-       } else if bytes.Compare(buf, testBlock) != 0 {
-               t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+       } else if bytes.Compare(buf[:size], testBlock) != 0 {
+               t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
        }
 }
index 93ee43c446cf96624a09a0ff7660d198cacdd3cd..80d867010568090592283e14a8bff28a35b4dda4 100644 (file)
@@ -91,6 +91,7 @@ var (
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
        ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
+       ErrClientDisconnect = &KeepError{503, "Client disconnected"}
 )
 
 func (e *KeepError) Error() string {
index 2a1c3d243ab922855b2bf6344f69631a78272662..c0adbc0bd74dad7d115dfe70a3374b2815704c56 100644 (file)
@@ -66,12 +66,13 @@ func TestGetBlock(t *testing.T) {
        }
 
        // Check that GetBlock returns success.
-       result, err := GetBlock(TestHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(TestHash, buf, nil)
        if err != nil {
                t.Errorf("GetBlock error: %s", err)
        }
-       if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
-               t.Errorf("expected %s, got %s", TestBlock, result)
+       if bytes.Compare(buf[:size], TestBlock) != 0 {
+               t.Errorf("got %v, expected %v", buf[:size], TestBlock)
        }
 }
 
@@ -86,9 +87,10 @@ func TestGetBlockMissing(t *testing.T) {
        defer KeepVM.Close()
 
        // Check that GetBlock returns failure.
-       result, err := GetBlock(TestHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(TestHash, buf, nil)
        if err != NotFoundError {
-               t.Errorf("Expected NotFoundError, got %v", result)
+               t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
        }
 }
 
@@ -107,9 +109,10 @@ func TestGetBlockCorrupt(t *testing.T) {
        vols[0].Put(TestHash, BadBlock)
 
        // Check that GetBlock returns failure.
-       result, err := GetBlock(TestHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(TestHash, buf, nil)
        if err != DiskHashError {
-               t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
+               t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
        }
 }
 
@@ -133,13 +136,14 @@ func TestPutBlockOK(t *testing.T) {
        }
 
        vols := KeepVM.AllReadable()
-       result, err := vols[1].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := vols[1].Get(TestHash, buf)
        if err != nil {
                t.Fatalf("Volume #0 Get returned error: %v", err)
        }
-       if string(result) != string(TestBlock) {
+       if string(buf[:n]) != string(TestBlock) {
                t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
-                       string(TestBlock), string(result))
+                       string(TestBlock), string(buf[:n]))
        }
 }
 
@@ -162,14 +166,14 @@ func TestPutBlockOneVol(t *testing.T) {
                t.Fatalf("PutBlock: n %d err %v", n, err)
        }
 
-       result, err := GetBlock(TestHash)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(TestHash, buf, nil)
        if err != nil {
                t.Fatalf("GetBlock: %v", err)
        }
-       if string(result) != string(TestBlock) {
-               t.Error("PutBlock/GetBlock mismatch")
-               t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
-                       string(TestBlock), string(result))
+       if bytes.Compare(buf[:size], TestBlock) != 0 {
+               t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
+                       TestBlock, buf[:size])
        }
 }
 
@@ -191,7 +195,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
        }
 
        // Confirm that GetBlock fails to return anything.
-       if result, err := GetBlock(TestHash); err != NotFoundError {
+       if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
                t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
                        string(result), err)
        }
@@ -216,10 +220,11 @@ func TestPutBlockCorrupt(t *testing.T) {
        }
 
        // The block on disk should now match TestBlock.
-       if block, err := GetBlock(TestHash); err != nil {
+       buf := make([]byte, BlockSize)
+       if size, err := GetBlock(TestHash, buf, nil); err != nil {
                t.Errorf("GetBlock: %v", err)
-       } else if bytes.Compare(block, TestBlock) != 0 {
-               t.Errorf("GetBlock returned: '%s'", string(block))
+       } else if bytes.Compare(buf[:size], TestBlock) != 0 {
+               t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
        }
 }
 
@@ -290,12 +295,13 @@ func TestPutBlockTouchFails(t *testing.T) {
                t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
                        oldMtime, newMtime)
        }
-       result, err := vols[1].Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := vols[1].Get(TestHash, buf)
        if err != nil {
                t.Fatalf("vols[1]: %v", err)
        }
-       if bytes.Compare(result, TestBlock) != 0 {
-               t.Errorf("new block does not match test block\nnew block = %v\n", result)
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
+               t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
        }
 }
 
index a93b72cf611cfc4dfa6283b8bd249c02db52ca19..8f547a4d4c83af76e9e32d13ed51e4feb9492542 100644 (file)
@@ -19,6 +19,19 @@ type LoggingResponseWriter struct {
        sentHdr      time.Time
 }
 
+func (w *LoggingResponseWriter) CloseNotify() <-chan bool {
+       wrapped, ok := w.ResponseWriter.(http.CloseNotifier)
+       if !ok {
+               // If upstream doesn't implement CloseNotifier, we can
+               // satisfy the interface by returning a channel that
+               // never sends anything (the interface doesn't
+               // guarantee that anything will ever be sent on the
+               // channel even if the client disconnects).
+               return nil
+       }
+       return wrapped.CloseNotify()
+}
+
 // WriteHeader writes header to ResponseWriter
 func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
        if loggingWriter.sentHdr == zeroTime {
diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go
new file mode 100644 (file)
index 0000000..aa88556
--- /dev/null
@@ -0,0 +1,10 @@
+package main
+
+import (
+       "net/http"
+       "testing"
+)
+
+func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
+       http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
+}
index 79a680d58a3efebab11467ca2f3a474d2e0d0feb..d068b2a6e5da0601eb7f9bb4c6a0ec8e4b891774 100644 (file)
@@ -153,20 +153,18 @@ func (v *S3Volume) Check() error {
        return nil
 }
 
-func (v *S3Volume) Get(loc string) ([]byte, error) {
+func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
        rdr, err := v.Bucket.GetReader(loc)
        if err != nil {
-               return nil, v.translateError(err)
+               return 0, v.translateError(err)
        }
        defer rdr.Close()
-       buf := bufs.Get(BlockSize)
        n, err := io.ReadFull(rdr, buf)
        switch err {
        case nil, io.EOF, io.ErrUnexpectedEOF:
-               return buf[:n], nil
+               return n, nil
        default:
-               bufs.Put(buf)
-               return nil, v.translateError(err)
+               return 0, v.translateError(err)
        }
 }
 
index ac9406178c00ffaffc024f61ffc981479fc9d4e9..d111caeac8e5b571202502e0aea63f07816365ba 100644 (file)
@@ -290,26 +290,27 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
        // Verify Locator1 to be un/deleted as expected
-       data, _ := GetBlock(testData.Locator1)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(testData.Locator1, buf, nil)
        if testData.ExpectLocator1 {
-               if len(data) == 0 {
+               if size == 0 || err != nil {
                        t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
                }
        } else {
-               if len(data) > 0 {
+               if size > 0 || err == nil {
                        t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
                }
        }
 
        // Verify Locator2 to be un/deleted as expected
        if testData.Locator1 != testData.Locator2 {
-               data, _ = GetBlock(testData.Locator2)
+               size, err = GetBlock(testData.Locator2, buf, nil)
                if testData.ExpectLocator2 {
-                       if len(data) == 0 {
+                       if size == 0 || err != nil {
                                t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
                        }
                } else {
-                       if len(data) > 0 {
+                       if size > 0 || err == nil {
                                t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
                        }
                }
@@ -321,7 +322,8 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        if testData.DifferentMtimes {
                locatorFoundIn := 0
                for _, volume := range KeepVM.AllReadable() {
-                       if _, err := volume.Get(testData.Locator1); err == nil {
+                       buf := make([]byte, BlockSize)
+                       if _, err := volume.Get(testData.Locator1, buf); err == nil {
                                locatorFoundIn = locatorFoundIn + 1
                        }
                }
index 17da54fdadbca571cae93fde18342d2776b4e3a7..8ae6660fd477fa90365a019c837a121a08cc9595 100644 (file)
@@ -10,17 +10,14 @@ import (
 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
 // etc.
 type Volume interface {
-       // Get a block. IFF the returned error is nil, the caller must
-       // put the returned slice back into the buffer pool when it's
-       // finished with it. (Otherwise, the buffer pool will be
-       // depleted and eventually -- when all available buffers are
-       // used and not returned -- operations will reach deadlock.)
+       // Get a block: copy the block data into buf, and return the
+       // number of bytes copied.
        //
        // loc is guaranteed to consist of 32 or more lowercase hex
        // digits.
        //
-       // Get should not verify the integrity of the returned data:
-       // it should just return whatever was found in its backing
+       // Get should not verify the integrity of the data: it should
+       // just return whatever was found in its backing
        // store. (Integrity checking is the caller's responsibility.)
        //
        // If an error is encountered that prevents it from
@@ -36,10 +33,12 @@ type Volume interface {
        // access log if the block is not found on any other volumes
        // either).
        //
-       // If the data in the backing store is bigger than BlockSize,
-       // Get is permitted to return an error without reading any of
-       // the data.
-       Get(loc string) ([]byte, error)
+       // If the data in the backing store is bigger than len(buf),
+       // then Get is permitted to return an error without reading
+       // any of the data.
+       //
+       // len(buf) will not exceed BlockSize.
+       Get(loc string, buf []byte) (int, error)
 
        // Compare the given data with the stored data (i.e., what Get
        // would return). If equal, return nil. If not, return
index 95166c252f004bef5a1ba1f583f4e13f54117f47..105795c146e2d932f3066ee2f26948dd639d9436 100644 (file)
@@ -89,14 +89,13 @@ func testGet(t TB, factory TestableVolumeFactory) {
 
        v.PutRaw(TestHash, TestBlock)
 
-       buf, err := v.Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
 
-       bufs.Put(buf)
-
-       if bytes.Compare(buf, TestBlock) != 0 {
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
                t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
        }
 }
@@ -107,7 +106,8 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
-       if _, err := v.Get(TestHash2); err == nil {
+       buf := make([]byte, BlockSize)
+       if _, err := v.Get(TestHash2, buf); err == nil {
                t.Errorf("Expected error while getting non-existing block %v", TestHash2)
        }
 }
@@ -208,24 +208,22 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH
        v.PutRaw(testHash, testDataA)
 
        putErr := v.Put(testHash, testDataB)
-       buf, getErr := v.Get(testHash)
+       buf := make([]byte, BlockSize)
+       n, getErr := v.Get(testHash, buf)
        if putErr == nil {
                // Put must not return a nil error unless it has
                // overwritten the existing data.
-               if bytes.Compare(buf, testDataB) != 0 {
-                       t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
+               if bytes.Compare(buf[:n], testDataB) != 0 {
+                       t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
                }
        } else {
                // It is permissible for Put to fail, but it must
                // leave us with either the original data, the new
                // data, or nothing at all.
-               if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
-                       t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
+               if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
+                       t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
                }
        }
-       if getErr == nil {
-               bufs.Put(buf)
-       }
 }
 
 // Put and get multiple blocks
@@ -253,34 +251,32 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
                t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
        }
 
-       data, err := v.Get(TestHash)
+       data := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, data)
        if err != nil {
                t.Error(err)
        } else {
-               if bytes.Compare(data, TestBlock) != 0 {
-                       t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+               if bytes.Compare(data[:n], TestBlock) != 0 {
+                       t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
                }
-               bufs.Put(data)
        }
 
-       data, err = v.Get(TestHash2)
+       n, err = v.Get(TestHash2, data)
        if err != nil {
                t.Error(err)
        } else {
-               if bytes.Compare(data, TestBlock2) != 0 {
-                       t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+               if bytes.Compare(data[:n], TestBlock2) != 0 {
+                       t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
                }
-               bufs.Put(data)
        }
 
-       data, err = v.Get(TestHash3)
+       n, err = v.Get(TestHash3, data)
        if err != nil {
                t.Error(err)
        } else {
-               if bytes.Compare(data, TestBlock3) != 0 {
-                       t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+               if bytes.Compare(data[:n], TestBlock3) != 0 {
+                       t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
                }
-               bufs.Put(data)
        }
 }
 
@@ -426,14 +422,12 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
-       data, err := v.Get(TestHash)
+       data := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, data)
        if err != nil {
                t.Error(err)
-       } else {
-               if bytes.Compare(data, TestBlock) != 0 {
-                       t.Errorf("Got data %+q, expected %+q", data, TestBlock)
-               }
-               bufs.Put(data)
+       } else if bytes.Compare(data[:n], TestBlock) != 0 {
+               t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
        }
 }
 
@@ -455,7 +449,8 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
-       if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+       data := make([]byte, BlockSize)
+       if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
                t.Errorf("os.IsNotExist(%v) should have been true", err)
        }
 }
@@ -514,9 +509,10 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        }
 
        v.PutRaw(TestHash, TestBlock)
+       buf := make([]byte, BlockSize)
 
        // Get from read-only volume should succeed
-       _, err := v.Get(TestHash)
+       _, err := v.Get(TestHash, buf)
        if err != nil {
                t.Errorf("got err %v, expected nil", err)
        }
@@ -526,7 +522,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        if err == nil {
                t.Errorf("Expected error when putting block in a read-only volume")
        }
-       _, err = v.Get(TestHash2)
+       _, err = v.Get(TestHash2, buf)
        if err == nil {
                t.Errorf("Expected error when getting block whose put in read-only volume failed")
        }
@@ -561,45 +557,45 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
        v.PutRaw(TestHash3, TestBlock3)
 
        sem := make(chan int)
-       go func(sem chan int) {
-               buf, err := v.Get(TestHash)
+       go func() {
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash, buf)
                if err != nil {
                        t.Errorf("err1: %v", err)
                }
-               bufs.Put(buf)
-               if bytes.Compare(buf, TestBlock) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
+               if bytes.Compare(buf[:n], TestBlock) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
                }
                sem <- 1
-       }(sem)
+       }()
 
-       go func(sem chan int) {
-               buf, err := v.Get(TestHash2)
+       go func() {
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash2, buf)
                if err != nil {
                        t.Errorf("err2: %v", err)
                }
-               bufs.Put(buf)
-               if bytes.Compare(buf, TestBlock2) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
+               if bytes.Compare(buf[:n], TestBlock2) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
                }
                sem <- 1
-       }(sem)
+       }()
 
-       go func(sem chan int) {
-               buf, err := v.Get(TestHash3)
+       go func() {
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash3, buf)
                if err != nil {
                        t.Errorf("err3: %v", err)
                }
-               bufs.Put(buf)
-               if bytes.Compare(buf, TestBlock3) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
+               if bytes.Compare(buf[:n], TestBlock3) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
                }
                sem <- 1
-       }(sem)
+       }()
 
        // Wait for all goroutines to finish
-       for done := 0; done < 3; {
-               done += <-sem
+       for done := 0; done < 3; done++ {
+               <-sem
        }
 }
 
@@ -639,36 +635,34 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
        }(sem)
 
        // Wait for all goroutines to finish
-       for done := 0; done < 3; {
-               done += <-sem
+       for done := 0; done < 3; done++ {
+               <-sem
        }
 
        // Double check that we actually wrote the blocks we expected to write.
-       buf, err := v.Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, buf)
        if err != nil {
                t.Errorf("Get #1: %v", err)
        }
-       bufs.Put(buf)
-       if bytes.Compare(buf, TestBlock) != 0 {
-               t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
+               t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
        }
 
-       buf, err = v.Get(TestHash2)
+       n, err = v.Get(TestHash2, buf)
        if err != nil {
                t.Errorf("Get #2: %v", err)
        }
-       bufs.Put(buf)
-       if bytes.Compare(buf, TestBlock2) != 0 {
-               t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
+       if bytes.Compare(buf[:n], TestBlock2) != 0 {
+               t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
        }
 
-       buf, err = v.Get(TestHash3)
+       n, err = v.Get(TestHash3, buf)
        if err != nil {
                t.Errorf("Get #3: %v", err)
        }
-       bufs.Put(buf)
-       if bytes.Compare(buf, TestBlock3) != 0 {
-               t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
+       if bytes.Compare(buf[:n], TestBlock3) != 0 {
+               t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
        }
 }
 
@@ -689,14 +683,13 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
        if err != nil {
                t.Fatal(err)
        }
-       rdata, err := v.Get(hash)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(hash, buf)
        if err != nil {
                t.Error(err)
-       } else {
-               defer bufs.Put(rdata)
        }
-       if bytes.Compare(rdata, wdata) != 0 {
-               t.Error("rdata != wdata")
+       if bytes.Compare(buf[:n], wdata) != 0 {
+               t.Error("buf %+q != wdata %+q", buf[:n], wdata)
        }
 }
 
@@ -717,14 +710,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        v.PutRaw(TestHash, TestBlock)
        v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-       buf, err := v.Get(TestHash)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
-       if bytes.Compare(buf, TestBlock) != 0 {
-               t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
+               t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
        }
-       bufs.Put(buf)
 
        // Trash
        err = v.Trash(TestHash)
@@ -737,7 +730,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
                        t.Error(err)
                }
        } else {
-               _, err = v.Get(TestHash)
+               _, err = v.Get(TestHash, buf)
                if err == nil || !os.IsNotExist(err) {
                        t.Errorf("os.IsNotExist(%v) should have been true", err)
                }
@@ -750,14 +743,13 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
 
        // Get the block - after trash and untrash sequence
-       buf, err = v.Get(TestHash)
+       n, err = v.Get(TestHash, buf)
        if err != nil {
                t.Fatal(err)
        }
-       if bytes.Compare(buf, TestBlock) != 0 {
-               t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+       if bytes.Compare(buf[:n], TestBlock) != 0 {
+               t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
        }
-       bufs.Put(buf)
 }
 
 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
@@ -768,14 +760,14 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
        }(trashLifetime)
 
        checkGet := func() error {
-               buf, err := v.Get(TestHash)
+               buf := make([]byte, BlockSize)
+               n, err := v.Get(TestHash, buf)
                if err != nil {
                        return err
                }
-               if bytes.Compare(buf, TestBlock) != 0 {
-                       t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
+               if bytes.Compare(buf[:n], TestBlock) != 0 {
+                       t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
                }
-               bufs.Put(buf)
                return nil
        }
 
index e8a5a338f51cb25d47f419d02f6ca76a211d535c..5671b8d4a9fd7405f8ca7fd35a18fdf10289a059 100644 (file)
@@ -113,17 +113,16 @@ func (v *MockVolume) Compare(loc string, buf []byte) error {
        }
 }
 
-func (v *MockVolume) Get(loc string) ([]byte, error) {
+func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
        v.gotCall("Get")
        <-v.Gate
        if v.Bad {
-               return nil, errors.New("Bad volume")
+               return 0, errors.New("Bad volume")
        } else if block, ok := v.Store[loc]; ok {
-               buf := bufs.Get(len(block))
-               copy(buf, block)
-               return buf, nil
+               copy(buf[:len(block)], block)
+               return len(block), nil
        }
-       return nil, os.ErrNotExist
+       return 0, os.ErrNotExist
 }
 
 func (v *MockVolume) Put(loc string, block []byte) error {
index 996068cf3d2438f71364b0b2c9ddafcdbd712c54..edec048dfe5836701f821beefdc3d7b131777982 100644 (file)
@@ -181,26 +181,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        return stat, err
 }
 
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               return nil, v.translateError(err)
+               return 0, v.translateError(err)
+       }
+       if stat.Size() > int64(len(buf)) {
+               return 0, TooLongError
        }
-       buf := bufs.Get(int(stat.Size()))
+       var read int
+       size := int(stat.Size())
        err = v.getFunc(path, func(rdr io.Reader) error {
-               _, err = io.ReadFull(rdr, buf)
+               read, err = io.ReadFull(rdr, buf[:size])
                return err
        })
-       if err != nil {
-               bufs.Put(buf)
-               return nil, err
-       }
-       return buf, nil
+       return read, err
 }
 
 // Compare returns nil if Get(loc) would return the same content as
index 0775e89ed275d14f7e2be510084a52e39af84472..c95538bc4da380f7af5561984d7a069324cea970 100644 (file)
@@ -106,12 +106,13 @@ func TestGetNotFound(t *testing.T) {
        defer v.Teardown()
        v.Put(TestHash, TestBlock)
 
-       buf, err := v.Get(TestHash2)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(TestHash2, buf)
        switch {
        case os.IsNotExist(err):
                break
        case err == nil:
-               t.Errorf("Read should have failed, returned %s", string(buf))
+               t.Errorf("Read should have failed, returned %+q", buf[:n])
        default:
                t.Errorf("Read expected ErrNotExist, got: %s", err)
        }
@@ -151,7 +152,8 @@ func TestUnixVolumeReadonly(t *testing.T) {
 
        v.PutRaw(TestHash, TestBlock)
 
-       _, err := v.Get(TestHash)
+       buf := make([]byte, BlockSize)
+       _, err := v.Get(TestHash, buf)
        if err != nil {
                t.Errorf("got err %v, expected nil", err)
        }