Merge branch '21121-cluster-activity' refs #21121
[arvados.git] / sdk / go / dispatch / throttle.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package dispatch
6
7 import (
8         "sync"
9         "time"
10 )
11
12 type throttleEnt struct {
13         last time.Time // last attempt that was allowed
14 }
15
16 type throttle struct {
17         hold      time.Duration
18         seen      map[string]*throttleEnt
19         updated   sync.Cond
20         setupOnce sync.Once
21         mtx       sync.Mutex
22 }
23
24 // Check checks whether there have been too many recent attempts with
25 // the given uuid, and returns true if it's OK to attempt [again] now.
26 func (t *throttle) Check(uuid string) bool {
27         if t.hold == 0 {
28                 return true
29         }
30         t.setupOnce.Do(t.setup)
31         t.mtx.Lock()
32         defer t.updated.Broadcast()
33         defer t.mtx.Unlock()
34         ent, ok := t.seen[uuid]
35         if !ok {
36                 t.seen[uuid] = &throttleEnt{last: time.Now()}
37                 return true
38         }
39         if time.Since(ent.last) < t.hold {
40                 return false
41         }
42         ent.last = time.Now()
43         return true
44 }
45
46 func (t *throttle) setup() {
47         t.seen = make(map[string]*throttleEnt)
48         t.updated.L = &t.mtx
49         go func() {
50                 for range time.NewTicker(t.hold).C {
51                         t.mtx.Lock()
52                         for uuid, ent := range t.seen {
53                                 if time.Since(ent.last) >= t.hold {
54                                         delete(t.seen, uuid)
55                                 }
56                         }
57                         // don't bother cleaning again until the next update
58                         t.updated.Wait()
59                         t.mtx.Unlock()
60                 }
61         }()
62 }