refactor as procedural
[arvados.git] / services / keepstore / bufferpool.go
1 package main
2
3 import (
4         "sync"
5         "sync/atomic"
6         "time"
7
8         log "github.com/Sirupsen/logrus"
9 )
10
11 type bufferPool struct {
12         // limiter has a "true" placeholder for each in-use buffer.
13         limiter chan bool
14         // allocated is the number of bytes currently allocated to buffers.
15         allocated uint64
16         // Pool has unused buffers.
17         sync.Pool
18 }
19
20 func newBufferPool(count int, bufSize int) *bufferPool {
21         p := bufferPool{}
22         p.New = func() interface{} {
23                 atomic.AddUint64(&p.allocated, uint64(bufSize))
24                 return make([]byte, bufSize)
25         }
26         p.limiter = make(chan bool, count)
27         return &p
28 }
29
30 func (p *bufferPool) Get(size int) []byte {
31         select {
32         case p.limiter <- true:
33         default:
34                 t0 := time.Now()
35                 log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
36                 p.limiter <- true
37                 log.Printf("waited %v for a buffer", time.Since(t0))
38         }
39         buf := p.Pool.Get().([]byte)
40         if cap(buf) < size {
41                 log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
42         }
43         return buf[:size]
44 }
45
46 func (p *bufferPool) Put(buf []byte) {
47         p.Pool.Put(buf)
48         <-p.limiter
49 }
50
51 // Alloc returns the number of bytes allocated to buffers.
52 func (p *bufferPool) Alloc() uint64 {
53         return atomic.LoadUint64(&p.allocated)
54 }
55
56 // Cap returns the maximum number of buffers allowed.
57 func (p *bufferPool) Cap() int {
58         return cap(p.limiter)
59 }
60
61 // Len returns the number of buffers in use right now.
62 func (p *bufferPool) Len() int {
63         return len(p.limiter)
64 }