Merge branch '14325-dispatch-cloud'
[arvados.git] / lib / dispatchcloud / worker / throttle.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "fmt"
9         "sync"
10         "time"
11
12         "git.curoverse.com/arvados.git/lib/cloud"
13         "github.com/sirupsen/logrus"
14 )
15
16 type throttle struct {
17         err   error
18         until time.Time
19         mtx   sync.Mutex
20 }
21
22 // CheckRateLimitError checks whether the given error is a
23 // cloud.RateLimitError, and if so, ensures Error() returns a non-nil
24 // error until the rate limiting holdoff period expires.
25 //
26 // If a notify func is given, it will be called after the holdoff
27 // period expires.
28 func (thr *throttle) CheckRateLimitError(err error, logger logrus.FieldLogger, callType string, notify func()) {
29         rle, ok := err.(cloud.RateLimitError)
30         if !ok {
31                 return
32         }
33         until := rle.EarliestRetry()
34         if !until.After(time.Now()) {
35                 return
36         }
37         dur := until.Sub(time.Now())
38         logger.WithFields(logrus.Fields{
39                 "CallType": callType,
40                 "Duration": dur,
41                 "ResumeAt": until,
42         }).Info("suspending remote calls due to rate-limit error")
43         thr.ErrorUntil(fmt.Errorf("remote calls are suspended for %s, until %s", dur, until), until, notify)
44 }
45
46 func (thr *throttle) ErrorUntil(err error, until time.Time, notify func()) {
47         thr.mtx.Lock()
48         defer thr.mtx.Unlock()
49         thr.err, thr.until = err, until
50         if notify != nil {
51                 time.AfterFunc(until.Sub(time.Now()), notify)
52         }
53 }
54
55 func (thr *throttle) Error() error {
56         thr.mtx.Lock()
57         defer thr.mtx.Unlock()
58         if thr.err != nil && time.Now().After(thr.until) {
59                 thr.err = nil
60         }
61         return thr.err
62 }
63
64 type throttledInstanceSet struct {
65         cloud.InstanceSet
66         throttleCreate    throttle
67         throttleInstances throttle
68 }