Merge branch 'master' into 6569-smarter-jobs-image
[arvados.git] / services / keepstore / bufferpool.go
1 package main
2
3 import (
4         "log"
5         "sync"
6         "sync/atomic"
7         "time"
8 )
9
10 type bufferPool struct {
11         // limiter has a "true" placeholder for each in-use buffer.
12         limiter chan bool
13         // allocated is the number of bytes currently allocated to buffers.
14         allocated uint64
15         // Pool has unused buffers.
16         sync.Pool
17 }
18
19 func newBufferPool(count int, bufSize int) *bufferPool {
20         p := bufferPool{}
21         p.New = func() interface{} {
22                 atomic.AddUint64(&p.allocated, uint64(bufSize))
23                 return make([]byte, bufSize)
24         }
25         p.limiter = make(chan bool, count)
26         return &p
27 }
28
29 func (p *bufferPool) Get(size int) []byte {
30         select {
31         case p.limiter <- true:
32         default:
33                 t0 := time.Now()
34                 log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
35                 p.limiter <- true
36                 log.Printf("waited %v for a buffer", time.Since(t0))
37         }
38         buf := p.Pool.Get().([]byte)
39         if cap(buf) < size {
40                 log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
41         }
42         return buf[:size]
43 }
44
45 func (p *bufferPool) Put(buf []byte) {
46         p.Pool.Put(buf)
47         <-p.limiter
48 }
49
50 // Alloc returns the number of bytes allocated to buffers.
51 func (p *bufferPool) Alloc() uint64 {
52         return atomic.LoadUint64(&p.allocated)
53 }
54
55 // Cap returns the maximum number of buffers allowed.
56 func (p *bufferPool) Cap() int {
57         return cap(p.limiter)
58 }
59
60 // Len returns the number of buffers in use right now.
61 func (p *bufferPool) Len() int {
62         return len(p.limiter)
63 }