5748: Use a buffer pool instead of calling runtime.GC() during each GET.
authorTom Clegg <tom@curoverse.com>
Thu, 7 May 2015 14:40:44 +0000 (10:40 -0400)
committerTom Clegg <tom@curoverse.com>
Fri, 8 May 2015 18:07:46 +0000 (14:07 -0400)
services/crunchstat/.gitignore [new file with mode: 0644]
services/keepproxy/.gitignore [new file with mode: 0644]
services/keepstore/.gitignore [new file with mode: 0644]
services/keepstore/bufferpool.go [new file with mode: 0644]
services/keepstore/bufferpool_test.go [new file with mode: 0644]
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go

diff --git a/services/crunchstat/.gitignore b/services/crunchstat/.gitignore
new file mode 100644 (file)
index 0000000..c26270a
--- /dev/null
@@ -0,0 +1 @@
+crunchstat
diff --git a/services/keepproxy/.gitignore b/services/keepproxy/.gitignore
new file mode 100644 (file)
index 0000000..a4c8ad9
--- /dev/null
@@ -0,0 +1 @@
+keepproxy
diff --git a/services/keepstore/.gitignore b/services/keepstore/.gitignore
new file mode 100644 (file)
index 0000000..c195c4a
--- /dev/null
@@ -0,0 +1 @@
+keepstore
diff --git a/services/keepstore/bufferpool.go b/services/keepstore/bufferpool.go
new file mode 100644 (file)
index 0000000..373bfc7
--- /dev/null
@@ -0,0 +1,44 @@
+package main
+
+import (
+       "log"
+       "sync"
+       "time"
+)
+
+type bufferPool struct {
+       // limiter has a "true" placeholder for each in-use buffer.
+       limiter chan bool
+       // Pool has unused buffers.
+       sync.Pool
+}
+
+func newBufferPool(count int, bufSize int) *bufferPool {
+       p := bufferPool{}
+       p.New = func() interface{} {
+               return make([]byte, bufSize)
+       }
+       p.limiter = make(chan bool, count)
+       return &p
+}
+
+func (p *bufferPool) Get(size int) []byte {
+       select {
+       case p.limiter <- true:
+       default:
+               t0 := time.Now()
+               log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
+               p.limiter <- true
+               log.Printf("waited %v for a buffer", time.Since(t0))
+       }
+       buf := p.Pool.Get().([]byte)
+       if cap(buf) < size {
+               log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
+       }
+       return buf[:size]
+}
+
+func (p *bufferPool) Put(buf []byte) {
+       p.Pool.Put(buf)
+       <-p.limiter
+}
diff --git a/services/keepstore/bufferpool_test.go b/services/keepstore/bufferpool_test.go
new file mode 100644 (file)
index 0000000..b2f63b1
--- /dev/null
@@ -0,0 +1,85 @@
+package main
+
+import (
+       . "gopkg.in/check.v1"
+       "testing"
+       "time"
+)
+
+// Gocheck boilerplate
+func TestBufferPool(t *testing.T) {
+       TestingT(t)
+}
+var _ = Suite(&BufferPoolSuite{})
+type BufferPoolSuite struct {}
+
+// Initialize a default-sized buffer pool for the benefit of test
+// suites that don't run main().
+func init() {
+       bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
+       bufs := newBufferPool(2, 10)
+       b1 := bufs.Get(1)
+       bufs.Get(2)
+       bufs.Put(b1)
+       b3 := bufs.Get(3)
+       c.Check(len(b3), Equals, 3)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolUnderLimit(c *C) {
+       bufs := newBufferPool(3, 10)
+       b1 := bufs.Get(10)
+       bufs.Get(10)
+       testBufferPoolRace(c, bufs, b1, "Get")
+}
+
+func (s *BufferPoolSuite) TestBufferPoolAtLimit(c *C) {
+       bufs := newBufferPool(2, 10)
+       b1 := bufs.Get(10)
+       bufs.Get(10)
+       testBufferPoolRace(c, bufs, b1, "Put")
+}
+
+func testBufferPoolRace(c *C, bufs *bufferPool, unused []byte, expectWin string) {
+       race := make(chan string)
+       go func() {
+               bufs.Get(10)
+               time.Sleep(time.Millisecond)
+               race <- "Get"
+       }()
+       go func() {
+               time.Sleep(10*time.Millisecond)
+               bufs.Put(unused)
+               race <- "Put"
+       }()
+       c.Check(<-race, Equals, expectWin)
+       c.Check(<-race, Not(Equals), expectWin)
+       close(race)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolReuse(c *C) {
+       bufs := newBufferPool(2, 10)
+       bufs.Get(10)
+       last := bufs.Get(10)
+       // The buffer pool is allowed to throw away unused buffers
+       // (e.g., during sync.Pool's garbage collection hook, in the
+       // the current implementation). However, if unused buffers are
+       // getting thrown away and reallocated more than {arbitrary
+       // frequency threshold} during a busy loop, it's not acting
+       // much like a buffer pool.
+       allocs := 1000
+       reuses := 0
+       for i := 0; i < allocs; i++ {
+               bufs.Put(last)
+               next := bufs.Get(10)
+               copy(last, []byte("last"))
+               copy(next, []byte("next"))
+               if last[0] == 'n' {
+                       reuses++
+               }
+               last = next
+       }
+       c.Check(reuses > allocs * 95/100, Equals, true)
+}
index 75b56eb827da7880d24fd43befb98e551914b7d5..e0e3c6174ada43a2e60c09a01b319831d6ff0d70 100644 (file)
@@ -112,25 +112,17 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        }
 
        block, err := GetBlock(hash, false)
-
-       // Garbage collect after each GET. Fixes #2865.
-       // TODO(twp): review Keep memory usage and see if there's
-       // a better way to do this than blindly garbage collecting
-       // after every block.
-       defer runtime.GC()
-
        if err != nil {
                // This type assertion is safe because the only errors
                // GetBlock can return are DiskHashError or NotFoundError.
                http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
                return
        }
+       defer bufs.Put(block)
 
-       resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
-
-       _, err = resp.Write(block)
-
-       return
+       resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+       resp.Header().Set("Content-Type", "application/octet-stream")
+       resp.Write(block)
 }
 
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
@@ -159,17 +151,17 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       buf := make([]byte, req.ContentLength)
-       nread, err := io.ReadFull(req.Body, buf)
+       buf := bufs.Get(int(req.ContentLength))
+       _, err := io.ReadFull(req.Body, buf)
        if err != nil {
                http.Error(resp, err.Error(), 500)
-               return
-       } else if int64(nread) < req.ContentLength {
-               http.Error(resp, "request truncated", 500)
+               bufs.Put(buf)
                return
        }
 
        err = PutBlock(buf, hash)
+       bufs.Put(buf)
+
        if err != nil {
                ke := err.(*KeepError)
                http.Error(resp, ke.Error(), ke.HTTPCode)
@@ -178,7 +170,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 
        // Success; add a size hint, sign the locator if possible, and
        // return it to the client.
-       return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
+       return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
        api_token := GetApiToken(req)
        if PermissionSecret != nil && api_token != "" {
                expiry := time.Now().Add(blob_signature_ttl)
index 71e577fe54c7c5bb64c459d61ce78a8adf8951ca..c0b8aec7975d978ae0f980b416840d9226a87c04 100644 (file)
@@ -56,6 +56,9 @@ var data_manager_token string
 // actually deleting anything.
 var never_delete = false
 
+var maxBuffers = 128
+var bufs *bufferPool
+
 // ==========
 // Error types.
 //
@@ -276,9 +279,19 @@ func main() {
                "pid",
                "",
                "Path to write pid file")
+       flag.IntVar(
+               &maxBuffers,
+               "max-buffers",
+               maxBuffers,
+               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
 
        flag.Parse()
 
+       if maxBuffers < 0 {
+               log.Fatal("-max-buffers must be greater than zero.")
+       }
+       bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
        if len(volumes) == 0 {
                if volumes.Discover() == 0 {
                        log.Fatal("No volumes found.")
index 1b5294952820b5cb2f0be2d53c37eb84f2cefed5..64fea34bfe1c32ad9b6b6b33a74c82f8b9f0252f 100644 (file)
@@ -11,6 +11,9 @@ import (
 )
 
 type Volume interface {
+       // Get a block. IFF the returned error is nil, the caller must
+       // put the returned slice back into the buffer pool when it's
+       // finished with it.
        Get(loc string) ([]byte, error)
        Put(loc string, block []byte) error
        Touch(loc string) error
index 66e0810c972a86c59603f3ab40498e264081072a..261501992f8080110062cc7be7a1828052f24014 100644 (file)
@@ -65,7 +65,9 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
        if v.Bad {
                return nil, errors.New("Bad volume")
        } else if block, ok := v.Store[loc]; ok {
-               return block, nil
+               buf := bufs.Get(len(block))
+               copy(buf, block)
+               return buf, nil
        }
        return nil, os.ErrNotExist
 }
index bcf57c1647ff54b2e5d2efd1432d54ab15be0559..61a98b5c736a8d3821fd0975df0d5dd5a0977a21 100644 (file)
@@ -63,15 +63,33 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 // slice and whatever non-nil error was returned by Stat or ReadFile.
 func (v *UnixVolume) Get(loc string) ([]byte, error) {
        path := v.blockPath(loc)
-       if _, err := os.Stat(path); err != nil {
+       stat, err := os.Stat(path)
+       if err != nil {
+               return nil, err
+       }
+       if stat.Size() < 0 {
+               return nil, os.ErrInvalid
+       } else if stat.Size() == 0 {
+               return bufs.Get(0), nil
+       } else if stat.Size() > BLOCKSIZE {
+               return nil, TooLongError
+       }
+       f, err := os.Open(path)
+       if err != nil {
                return nil, err
        }
+       defer f.Close()
+       buf := bufs.Get(int(stat.Size()))
        if v.serialize {
                v.mutex.Lock()
                defer v.mutex.Unlock()
        }
-       buf, err := ioutil.ReadFile(path)
-       return buf, err
+       _, err = io.ReadFull(f, buf)
+       if err != nil {
+               bufs.Put(buf)
+               return nil, err
+       }
+       return buf, nil
 }
 
 // Put stores a block of data identified by the locator string