2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / bufferpool.go
index 373bfc75a1ca251b0bf851bfe45dfe78fc635ae7..811715b191c7384cfe904744221cbffcd3ac1b46 100644 (file)
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
 
 import (
-       "log"
+       "context"
        "sync"
+       "sync/atomic"
        "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
+var bufferPoolBlockSize = BlockSize // modified by tests
+
 type bufferPool struct {
+       log logrus.FieldLogger
        // limiter has a "true" placeholder for each in-use buffer.
        limiter chan bool
+       // allocated is the number of bytes currently allocated to buffers.
+       allocated uint64
        // Pool has unused buffers.
        sync.Pool
 }
 
-func newBufferPool(count int, bufSize int) *bufferPool {
-       p := bufferPool{}
-       p.New = func() interface{} {
-               return make([]byte, bufSize)
+func newBufferPool(log logrus.FieldLogger, count int, reg *prometheus.Registry) *bufferPool {
+       p := bufferPool{log: log}
+       p.Pool.New = func() interface{} {
+               atomic.AddUint64(&p.allocated, uint64(bufferPoolBlockSize))
+               return make([]byte, bufferPoolBlockSize)
        }
        p.limiter = make(chan bool, count)
+       if reg != nil {
+               reg.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Subsystem: "keepstore",
+                               Name:      "bufferpool_allocated_bytes",
+                               Help:      "Number of bytes allocated to buffers",
+                       },
+                       func() float64 { return float64(p.Alloc()) },
+               ))
+               reg.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Subsystem: "keepstore",
+                               Name:      "bufferpool_max_buffers",
+                               Help:      "Maximum number of buffers allowed",
+                       },
+                       func() float64 { return float64(p.Cap()) },
+               ))
+               reg.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Subsystem: "keepstore",
+                               Name:      "bufferpool_inuse_buffers",
+                               Help:      "Number of buffers in use",
+                       },
+                       func() float64 { return float64(p.Len()) },
+               ))
+       }
        return &p
 }
 
-func (p *bufferPool) Get(size int) []byte {
+// GetContext gets a buffer from the pool -- but gives up and returns
+// ctx.Err() if ctx ends before a buffer is available.
+func (p *bufferPool) GetContext(ctx context.Context) ([]byte, error) {
+       bufReady := make(chan []byte)
+       go func() {
+               bufReady <- p.Get()
+       }()
+       select {
+       case buf := <-bufReady:
+               return buf, nil
+       case <-ctx.Done():
+               go func() {
+                       // Even if closeNotifier happened first, we
+                       // need to keep waiting for our buf so we can
+                       // return it to the pool.
+                       p.Put(<-bufReady)
+               }()
+               return nil, ctx.Err()
+       }
+}
+
+func (p *bufferPool) Get() []byte {
        select {
        case p.limiter <- true:
        default:
                t0 := time.Now()
-               log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
+               p.log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
                p.limiter <- true
-               log.Printf("waited %v for a buffer", time.Since(t0))
+               p.log.Printf("waited %v for a buffer", time.Since(t0))
        }
        buf := p.Pool.Get().([]byte)
-       if cap(buf) < size {
-               log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
+       if len(buf) < bufferPoolBlockSize {
+               p.log.Fatalf("bufferPoolBlockSize=%d but cap(buf)=%d", bufferPoolBlockSize, len(buf))
        }
-       return buf[:size]
+       return buf
 }
 
 func (p *bufferPool) Put(buf []byte) {
-       p.Pool.Put(buf)
+       p.Pool.Put(buf[:cap(buf)])
        <-p.limiter
 }
+
+// Alloc returns the number of bytes allocated to buffers.
+func (p *bufferPool) Alloc() uint64 {
+       return atomic.LoadUint64(&p.allocated)
+}
+
+// Cap returns the maximum number of buffers allowed.
+func (p *bufferPool) Cap() int {
+       return cap(p.limiter)
+}
+
+// Len returns the number of buffers in use right now.
+func (p *bufferPool) Len() int {
+       return len(p.limiter)
+}