X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/54377a7bacc182ace0bb8b55a812e0a9fee5ced8..2c6557f613fcf6cdcebb08c321a5d061aeb780c6:/services/keepstore/bufferpool.go diff --git a/services/keepstore/bufferpool.go b/services/keepstore/bufferpool.go index b4cc5d38e1..811715b191 100644 --- a/services/keepstore/bufferpool.go +++ b/services/keepstore/bufferpool.go @@ -5,13 +5,17 @@ package keepstore import ( + "context" "sync" "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) +var bufferPoolBlockSize = BlockSize // modified by tests + type bufferPool struct { log logrus.FieldLogger // limiter has a "true" placeholder for each in-use buffer. @@ -22,17 +26,67 @@ type bufferPool struct { sync.Pool } -func newBufferPool(log logrus.FieldLogger, count int, bufSize int) *bufferPool { +func newBufferPool(log logrus.FieldLogger, count int, reg *prometheus.Registry) *bufferPool { p := bufferPool{log: log} p.Pool.New = func() interface{} { - atomic.AddUint64(&p.allocated, uint64(bufSize)) - return make([]byte, bufSize) + atomic.AddUint64(&p.allocated, uint64(bufferPoolBlockSize)) + return make([]byte, bufferPoolBlockSize) } p.limiter = make(chan bool, count) + if reg != nil { + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepstore", + Name: "bufferpool_allocated_bytes", + Help: "Number of bytes allocated to buffers", + }, + func() float64 { return float64(p.Alloc()) }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepstore", + Name: "bufferpool_max_buffers", + Help: "Maximum number of buffers allowed", + }, + func() float64 { return float64(p.Cap()) }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "keepstore", + Name: "bufferpool_inuse_buffers", + Help: "Number of buffers in use", + }, + func() float64 { return float64(p.Len()) }, + )) + } return &p } -func (p *bufferPool) Get(size int) []byte { +// GetContext gets a buffer from the pool -- but gives up and returns +// ctx.Err() if ctx ends before a buffer is available. +func (p *bufferPool) GetContext(ctx context.Context) ([]byte, error) { + bufReady := make(chan []byte) + go func() { + bufReady <- p.Get() + }() + select { + case buf := <-bufReady: + return buf, nil + case <-ctx.Done(): + go func() { + // Even if closeNotifier happened first, we + // need to keep waiting for our buf so we can + // return it to the pool. + p.Put(<-bufReady) + }() + return nil, ctx.Err() + } +} + +func (p *bufferPool) Get() []byte { select { case p.limiter <- true: default: @@ -42,14 +96,14 @@ func (p *bufferPool) Get(size int) []byte { p.log.Printf("waited %v for a buffer", time.Since(t0)) } buf := p.Pool.Get().([]byte) - if cap(buf) < size { - p.log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf)) + if len(buf) < bufferPoolBlockSize { + p.log.Fatalf("bufferPoolBlockSize=%d but cap(buf)=%d", bufferPoolBlockSize, len(buf)) } - return buf[:size] + return buf } func (p *bufferPool) Put(buf []byte) { - p.Pool.Put(buf) + p.Pool.Put(buf[:cap(buf)]) <-p.limiter }