14199: Abandon processing sooner if client disconnects.
[arvados.git] / services / keepstore / bufferpool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "sync"
9         "sync/atomic"
10         "time"
11 )
12
13 type bufferPool struct {
14         // limiter has a "true" placeholder for each in-use buffer.
15         limiter chan bool
16         // allocated is the number of bytes currently allocated to buffers.
17         allocated uint64
18         // Pool has unused buffers.
19         sync.Pool
20 }
21
22 func newBufferPool(count int, bufSize int) *bufferPool {
23         p := bufferPool{}
24         p.New = func() interface{} {
25                 atomic.AddUint64(&p.allocated, uint64(bufSize))
26                 return make([]byte, bufSize)
27         }
28         p.limiter = make(chan bool, count)
29         return &p
30 }
31
32 func (p *bufferPool) Get(size int) []byte {
33         select {
34         case p.limiter <- true:
35         default:
36                 t0 := time.Now()
37                 log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
38                 p.limiter <- true
39                 log.Printf("waited %v for a buffer", time.Since(t0))
40         }
41         buf := p.Pool.Get().([]byte)
42         if cap(buf) < size {
43                 log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
44         }
45         return buf[:size]
46 }
47
48 func (p *bufferPool) Put(buf []byte) {
49         p.Pool.Put(buf)
50         <-p.limiter
51 }
52
53 // Alloc returns the number of bytes allocated to buffers.
54 func (p *bufferPool) Alloc() uint64 {
55         return atomic.LoadUint64(&p.allocated)
56 }
57
58 // Cap returns the maximum number of buffers allowed.
59 func (p *bufferPool) Cap() int {
60         return cap(p.limiter)
61 }
62
63 // Len returns the number of buffers in use right now.
64 func (p *bufferPool) Len() int {
65         return len(p.limiter)
66 }