// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package keepstore import ( "context" "sync" "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) var bufferPoolBlockSize = BlockSize // modified by tests type bufferPool struct { log logrus.FieldLogger // limiter has a "true" placeholder for each in-use buffer. limiter chan bool // allocated is the number of bytes currently allocated to buffers. allocated uint64 // Pool has unused buffers. sync.Pool } func newBufferPool(log logrus.FieldLogger, count int, reg *prometheus.Registry) *bufferPool { p := bufferPool{log: log} p.Pool.New = func() interface{} { atomic.AddUint64(&p.allocated, uint64(bufferPoolBlockSize)) return make([]byte, bufferPoolBlockSize) } p.limiter = make(chan bool, count) if reg != nil { reg.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepstore", Name: "bufferpool_allocated_bytes", Help: "Number of bytes allocated to buffers", }, func() float64 { return float64(p.Alloc()) }, )) reg.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepstore", Name: "bufferpool_max_buffers", Help: "Maximum number of buffers allowed", }, func() float64 { return float64(p.Cap()) }, )) reg.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "keepstore", Name: "bufferpool_inuse_buffers", Help: "Number of buffers in use", }, func() float64 { return float64(p.Len()) }, )) } return &p } // GetContext gets a buffer from the pool -- but gives up and returns // ctx.Err() if ctx ends before a buffer is available. func (p *bufferPool) GetContext(ctx context.Context) ([]byte, error) { bufReady := make(chan []byte) go func() { bufReady <- p.Get() }() select { case buf := <-bufReady: return buf, nil case <-ctx.Done(): go func() { // Even if closeNotifier happened first, we // need to keep waiting for our buf so we can // return it to the pool. p.Put(<-bufReady) }() return nil, ctx.Err() } } func (p *bufferPool) Get() []byte { select { case p.limiter <- true: default: t0 := time.Now() p.log.Printf("reached max buffers (%d), waiting", cap(p.limiter)) p.limiter <- true p.log.Printf("waited %v for a buffer", time.Since(t0)) } buf := p.Pool.Get().([]byte) if len(buf) < bufferPoolBlockSize { p.log.Fatalf("bufferPoolBlockSize=%d but cap(buf)=%d", bufferPoolBlockSize, len(buf)) } return buf } func (p *bufferPool) Put(buf []byte) { p.Pool.Put(buf[:cap(buf)]) <-p.limiter } // Alloc returns the number of bytes allocated to buffers. func (p *bufferPool) Alloc() uint64 { return atomic.LoadUint64(&p.allocated) } // Cap returns the maximum number of buffers allowed. func (p *bufferPool) Cap() int { return cap(p.limiter) } // Len returns the number of buffers in use right now. func (p *bufferPool) Len() int { return len(p.limiter) }