--- /dev/null
+crunchstat
--- /dev/null
+package main
+
+import (
+ "log"
+ "sync"
+ "time"
+)
+
+type bufferPool struct {
+ // limiter has a "true" placeholder for each in-use buffer.
+ limiter chan bool
+ // Pool has unused buffers.
+ sync.Pool
+}
+
+func newBufferPool(count int, bufSize int) *bufferPool {
+ p := bufferPool{}
+ p.New = func() interface{} {
+ return make([]byte, bufSize)
+ }
+ p.limiter = make(chan bool, count)
+ return &p
+}
+
+func (p *bufferPool) Get(size int) []byte {
+ select {
+ case p.limiter <- true:
+ default:
+ t0 := time.Now()
+ log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
+ p.limiter <- true
+ log.Printf("waited %v for a buffer", time.Since(t0))
+ }
+ buf := p.Pool.Get().([]byte)
+ if cap(buf) < size {
+ log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
+ }
+ return buf[:size]
+}
+
+func (p *bufferPool) Put(buf []byte) {
+ p.Pool.Put(buf)
+ <-p.limiter
+}
--- /dev/null
+package main
+
+import (
+ . "gopkg.in/check.v1"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func TestBufferPool(t *testing.T) {
+ TestingT(t)
+}
+var _ = Suite(&BufferPoolSuite{})
+type BufferPoolSuite struct {}
+
+// Initialize a default-sized buffer pool for the benefit of test
+// suites that don't run main().
+func init() {
+ bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
+ bufs := newBufferPool(2, 10)
+ b1 := bufs.Get(1)
+ bufs.Get(2)
+ bufs.Put(b1)
+ b3 := bufs.Get(3)
+ c.Check(len(b3), Equals, 3)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolUnderLimit(c *C) {
+ bufs := newBufferPool(3, 10)
+ b1 := bufs.Get(10)
+ bufs.Get(10)
+ testBufferPoolRace(c, bufs, b1, "Get")
+}
+
+func (s *BufferPoolSuite) TestBufferPoolAtLimit(c *C) {
+ bufs := newBufferPool(2, 10)
+ b1 := bufs.Get(10)
+ bufs.Get(10)
+ testBufferPoolRace(c, bufs, b1, "Put")
+}
+
+func testBufferPoolRace(c *C, bufs *bufferPool, unused []byte, expectWin string) {
+ race := make(chan string)
+ go func() {
+ bufs.Get(10)
+ time.Sleep(time.Millisecond)
+ race <- "Get"
+ }()
+ go func() {
+ time.Sleep(10*time.Millisecond)
+ bufs.Put(unused)
+ race <- "Put"
+ }()
+ c.Check(<-race, Equals, expectWin)
+ c.Check(<-race, Not(Equals), expectWin)
+ close(race)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolReuse(c *C) {
+ bufs := newBufferPool(2, 10)
+ bufs.Get(10)
+ last := bufs.Get(10)
+ // The buffer pool is allowed to throw away unused buffers
+ // (e.g., during sync.Pool's garbage collection hook, in the
+ // the current implementation). However, if unused buffers are
+ // getting thrown away and reallocated more than {arbitrary
+ // frequency threshold} during a busy loop, it's not acting
+ // much like a buffer pool.
+ allocs := 1000
+ reuses := 0
+ for i := 0; i < allocs; i++ {
+ bufs.Put(last)
+ next := bufs.Get(10)
+ copy(last, []byte("last"))
+ copy(next, []byte("next"))
+ if last[0] == 'n' {
+ reuses++
+ }
+ last = next
+ }
+ c.Check(reuses > allocs * 95/100, Equals, true)
+}
}
block, err := GetBlock(hash, false)
-
- // Garbage collect after each GET. Fixes #2865.
- // TODO(twp): review Keep memory usage and see if there's
- // a better way to do this than blindly garbage collecting
- // after every block.
- defer runtime.GC()
-
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
return
}
+ defer bufs.Put(block)
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
-
- _, err = resp.Write(block)
-
- return
+ resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+ resp.Header().Set("Content-Type", "application/octet-stream")
+ resp.Write(block)
}
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
return
}
- buf := make([]byte, req.ContentLength)
- nread, err := io.ReadFull(req.Body, buf)
+ buf := bufs.Get(int(req.ContentLength))
+ _, err := io.ReadFull(req.Body, buf)
if err != nil {
http.Error(resp, err.Error(), 500)
- return
- } else if int64(nread) < req.ContentLength {
- http.Error(resp, "request truncated", 500)
+ bufs.Put(buf)
return
}
err = PutBlock(buf, hash)
+ bufs.Put(buf)
+
if err != nil {
ke := err.(*KeepError)
http.Error(resp, ke.Error(), ke.HTTPCode)
// Success; add a size hint, sign the locator if possible, and
// return it to the client.
- return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
+ return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
api_token := GetApiToken(req)
if PermissionSecret != nil && api_token != "" {
expiry := time.Now().Add(blob_signature_ttl)
// actually deleting anything.
var never_delete = false
+var maxBuffers = 128
+var bufs *bufferPool
+
// ==========
// Error types.
//
"pid",
"",
"Path to write pid file")
+ flag.IntVar(
+ &maxBuffers,
+ "max-buffers",
+ maxBuffers,
+ fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
flag.Parse()
+ if maxBuffers < 0 {
+ log.Fatal("-max-buffers must be greater than zero.")
+ }
+ bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
if len(volumes) == 0 {
if volumes.Discover() == 0 {
log.Fatal("No volumes found.")
)
type Volume interface {
+ // Get a block. IFF the returned error is nil, the caller must
+ // put the returned slice back into the buffer pool when it's
+ // finished with it.
Get(loc string) ([]byte, error)
Put(loc string, block []byte) error
Touch(loc string) error
if v.Bad {
return nil, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
- return block, nil
+ buf := bufs.Get(len(block))
+ copy(buf, block)
+ return buf, nil
}
return nil, os.ErrNotExist
}
// slice and whatever non-nil error was returned by Stat or ReadFile.
func (v *UnixVolume) Get(loc string) ([]byte, error) {
path := v.blockPath(loc)
- if _, err := os.Stat(path); err != nil {
+ stat, err := os.Stat(path)
+ if err != nil {
+ return nil, err
+ }
+ if stat.Size() < 0 {
+ return nil, os.ErrInvalid
+ } else if stat.Size() == 0 {
+ return bufs.Get(0), nil
+ } else if stat.Size() > BLOCKSIZE {
+ return nil, TooLongError
+ }
+ f, err := os.Open(path)
+ if err != nil {
return nil, err
}
+ defer f.Close()
+ buf := bufs.Get(int(stat.Size()))
if v.serialize {
v.mutex.Lock()
defer v.mutex.Unlock()
}
- buf, err := ioutil.ReadFile(path)
- return buf, err
+ _, err = io.ReadFull(f, buf)
+ if err != nil {
+ bufs.Put(buf)
+ return nil, err
+ }
+ return buf, nil
}
// Put stores a block of data identified by the locator string