21720: removed redundant redirect-to test
[arvados.git] / services / keepstore / bufferpool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "sync"
10         "sync/atomic"
11         "time"
12
13         "github.com/prometheus/client_golang/prometheus"
14         "github.com/sirupsen/logrus"
15 )
16
17 var bufferPoolBlockSize = BlockSize // modified by tests
18
19 type bufferPool struct {
20         log logrus.FieldLogger
21         // limiter has a "true" placeholder for each in-use buffer.
22         limiter chan bool
23         // allocated is the number of bytes currently allocated to buffers.
24         allocated uint64
25         // Pool has unused buffers.
26         sync.Pool
27 }
28
29 func newBufferPool(log logrus.FieldLogger, count int, reg *prometheus.Registry) *bufferPool {
30         p := bufferPool{log: log}
31         p.Pool.New = func() interface{} {
32                 atomic.AddUint64(&p.allocated, uint64(bufferPoolBlockSize))
33                 return make([]byte, bufferPoolBlockSize)
34         }
35         p.limiter = make(chan bool, count)
36         if reg != nil {
37                 reg.MustRegister(prometheus.NewGaugeFunc(
38                         prometheus.GaugeOpts{
39                                 Namespace: "arvados",
40                                 Subsystem: "keepstore",
41                                 Name:      "bufferpool_allocated_bytes",
42                                 Help:      "Number of bytes allocated to buffers",
43                         },
44                         func() float64 { return float64(p.Alloc()) },
45                 ))
46                 reg.MustRegister(prometheus.NewGaugeFunc(
47                         prometheus.GaugeOpts{
48                                 Namespace: "arvados",
49                                 Subsystem: "keepstore",
50                                 Name:      "bufferpool_max_buffers",
51                                 Help:      "Maximum number of buffers allowed",
52                         },
53                         func() float64 { return float64(p.Cap()) },
54                 ))
55                 reg.MustRegister(prometheus.NewGaugeFunc(
56                         prometheus.GaugeOpts{
57                                 Namespace: "arvados",
58                                 Subsystem: "keepstore",
59                                 Name:      "bufferpool_inuse_buffers",
60                                 Help:      "Number of buffers in use",
61                         },
62                         func() float64 { return float64(p.Len()) },
63                 ))
64         }
65         return &p
66 }
67
68 // GetContext gets a buffer from the pool -- but gives up and returns
69 // ctx.Err() if ctx ends before a buffer is available.
70 func (p *bufferPool) GetContext(ctx context.Context) ([]byte, error) {
71         bufReady := make(chan []byte)
72         go func() {
73                 bufReady <- p.Get()
74         }()
75         select {
76         case buf := <-bufReady:
77                 return buf, nil
78         case <-ctx.Done():
79                 go func() {
80                         // Even if closeNotifier happened first, we
81                         // need to keep waiting for our buf so we can
82                         // return it to the pool.
83                         p.Put(<-bufReady)
84                 }()
85                 return nil, ctx.Err()
86         }
87 }
88
89 func (p *bufferPool) Get() []byte {
90         select {
91         case p.limiter <- true:
92         default:
93                 t0 := time.Now()
94                 p.log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
95                 p.limiter <- true
96                 p.log.Printf("waited %v for a buffer", time.Since(t0))
97         }
98         buf := p.Pool.Get().([]byte)
99         if len(buf) < bufferPoolBlockSize {
100                 p.log.Fatalf("bufferPoolBlockSize=%d but cap(buf)=%d", bufferPoolBlockSize, len(buf))
101         }
102         return buf
103 }
104
105 func (p *bufferPool) Put(buf []byte) {
106         p.Pool.Put(buf[:cap(buf)])
107         <-p.limiter
108 }
109
110 // Alloc returns the number of bytes allocated to buffers.
111 func (p *bufferPool) Alloc() uint64 {
112         return atomic.LoadUint64(&p.allocated)
113 }
114
115 // Cap returns the maximum number of buffers allowed.
116 func (p *bufferPool) Cap() int {
117         return cap(p.limiter)
118 }
119
120 // Len returns the number of buffers in use right now.
121 func (p *bufferPool) Len() int {
122         return len(p.limiter)
123 }