//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
+ "context"
"sync"
"sync/atomic"
"time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
+var bufferPoolBlockSize = BlockSize // modified by tests
+
type bufferPool struct {
+ log logrus.FieldLogger
// limiter has a "true" placeholder for each in-use buffer.
limiter chan bool
// allocated is the number of bytes currently allocated to buffers.
sync.Pool
}
-func newBufferPool(count int, bufSize int) *bufferPool {
- p := bufferPool{}
- p.New = func() interface{} {
- atomic.AddUint64(&p.allocated, uint64(bufSize))
- return make([]byte, bufSize)
+func newBufferPool(log logrus.FieldLogger, count int, reg *prometheus.Registry) *bufferPool {
+ p := bufferPool{log: log}
+ p.Pool.New = func() interface{} {
+ atomic.AddUint64(&p.allocated, uint64(bufferPoolBlockSize))
+ return make([]byte, bufferPoolBlockSize)
}
p.limiter = make(chan bool, count)
+ if reg != nil {
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_allocated_bytes",
+ Help: "Number of bytes allocated to buffers",
+ },
+ func() float64 { return float64(p.Alloc()) },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_max_buffers",
+ Help: "Maximum number of buffers allowed",
+ },
+ func() float64 { return float64(p.Cap()) },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_inuse_buffers",
+ Help: "Number of buffers in use",
+ },
+ func() float64 { return float64(p.Len()) },
+ ))
+ }
return &p
}
-func (p *bufferPool) Get(size int) []byte {
+// GetContext gets a buffer from the pool -- but gives up and returns
+// ctx.Err() if ctx ends before a buffer is available.
+func (p *bufferPool) GetContext(ctx context.Context) ([]byte, error) {
+ bufReady := make(chan []byte)
+ go func() {
+ bufReady <- p.Get()
+ }()
+ select {
+ case buf := <-bufReady:
+ return buf, nil
+ case <-ctx.Done():
+ go func() {
+ // Even if closeNotifier happened first, we
+ // need to keep waiting for our buf so we can
+ // return it to the pool.
+ p.Put(<-bufReady)
+ }()
+ return nil, ctx.Err()
+ }
+}
+
+func (p *bufferPool) Get() []byte {
select {
case p.limiter <- true:
default:
t0 := time.Now()
- log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
+ p.log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
p.limiter <- true
- log.Printf("waited %v for a buffer", time.Since(t0))
+ p.log.Printf("waited %v for a buffer", time.Since(t0))
}
buf := p.Pool.Get().([]byte)
- if cap(buf) < size {
- log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
+ if len(buf) < bufferPoolBlockSize {
+ p.log.Fatalf("bufferPoolBlockSize=%d but cap(buf)=%d", bufferPoolBlockSize, len(buf))
}
- return buf[:size]
+ return buf
}
func (p *bufferPool) Put(buf []byte) {
- p.Pool.Put(buf)
+ p.Pool.Put(buf[:cap(buf)])
<-p.limiter
}