Merge branch '16723-kill-vs-requeue'
[arvados.git] / services / keepstore / bufferpool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11
12         "github.com/sirupsen/logrus"
13 )
14
15 type bufferPool struct {
16         log logrus.FieldLogger
17         // limiter has a "true" placeholder for each in-use buffer.
18         limiter chan bool
19         // allocated is the number of bytes currently allocated to buffers.
20         allocated uint64
21         // Pool has unused buffers.
22         sync.Pool
23 }
24
25 func newBufferPool(log logrus.FieldLogger, count int, bufSize int) *bufferPool {
26         p := bufferPool{log: log}
27         p.Pool.New = func() interface{} {
28                 atomic.AddUint64(&p.allocated, uint64(bufSize))
29                 return make([]byte, bufSize)
30         }
31         p.limiter = make(chan bool, count)
32         return &p
33 }
34
35 func (p *bufferPool) Get(size int) []byte {
36         select {
37         case p.limiter <- true:
38         default:
39                 t0 := time.Now()
40                 p.log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
41                 p.limiter <- true
42                 p.log.Printf("waited %v for a buffer", time.Since(t0))
43         }
44         buf := p.Pool.Get().([]byte)
45         if cap(buf) < size {
46                 p.log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
47         }
48         return buf[:size]
49 }
50
51 func (p *bufferPool) Put(buf []byte) {
52         p.Pool.Put(buf)
53         <-p.limiter
54 }
55
56 // Alloc returns the number of bytes allocated to buffers.
57 func (p *bufferPool) Alloc() uint64 {
58         return atomic.LoadUint64(&p.allocated)
59 }
60
61 // Cap returns the maximum number of buffers allowed.
62 func (p *bufferPool) Cap() int {
63         return cap(p.limiter)
64 }
65
66 // Len returns the number of buffers in use right now.
67 func (p *bufferPool) Len() int {
68         return len(p.limiter)
69 }