1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "github.com/prometheus/client_golang/prometheus"
14 "github.com/sirupsen/logrus"
17 var bufferPoolBlockSize = BlockSize // modified by tests
19 type bufferPool struct {
20 log logrus.FieldLogger
21 // limiter has a "true" placeholder for each in-use buffer.
23 // allocated is the number of bytes currently allocated to buffers.
25 // Pool has unused buffers.
29 func newBufferPool(log logrus.FieldLogger, count int, reg *prometheus.Registry) *bufferPool {
30 p := bufferPool{log: log}
31 p.Pool.New = func() interface{} {
32 atomic.AddUint64(&p.allocated, uint64(bufferPoolBlockSize))
33 return make([]byte, bufferPoolBlockSize)
35 p.limiter = make(chan bool, count)
37 reg.MustRegister(prometheus.NewGaugeFunc(
40 Subsystem: "keepstore",
41 Name: "bufferpool_allocated_bytes",
42 Help: "Number of bytes allocated to buffers",
44 func() float64 { return float64(p.Alloc()) },
46 reg.MustRegister(prometheus.NewGaugeFunc(
49 Subsystem: "keepstore",
50 Name: "bufferpool_max_buffers",
51 Help: "Maximum number of buffers allowed",
53 func() float64 { return float64(p.Cap()) },
55 reg.MustRegister(prometheus.NewGaugeFunc(
58 Subsystem: "keepstore",
59 Name: "bufferpool_inuse_buffers",
60 Help: "Number of buffers in use",
62 func() float64 { return float64(p.Len()) },
68 // GetContext gets a buffer from the pool -- but gives up and returns
69 // ctx.Err() if ctx ends before a buffer is available.
70 func (p *bufferPool) GetContext(ctx context.Context) ([]byte, error) {
71 bufReady := make(chan []byte)
76 case buf := <-bufReady:
80 // Even if closeNotifier happened first, we
81 // need to keep waiting for our buf so we can
82 // return it to the pool.
89 func (p *bufferPool) Get() []byte {
91 case p.limiter <- true:
94 p.log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
96 p.log.Printf("waited %v for a buffer", time.Since(t0))
98 buf := p.Pool.Get().([]byte)
99 if len(buf) < bufferPoolBlockSize {
100 p.log.Fatalf("bufferPoolBlockSize=%d but cap(buf)=%d", bufferPoolBlockSize, len(buf))
105 func (p *bufferPool) Put(buf []byte) {
106 p.Pool.Put(buf[:cap(buf)])
110 // Alloc returns the number of bytes allocated to buffers.
111 func (p *bufferPool) Alloc() uint64 {
112 return atomic.LoadUint64(&p.allocated)
115 // Cap returns the maximum number of buffers allowed.
116 func (p *bufferPool) Cap() int {
117 return cap(p.limiter)
120 // Len returns the number of buffers in use right now.
121 func (p *bufferPool) Len() int {
122 return len(p.limiter)