if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- sched := scheduler.New(disp.Context, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
+ sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
sched.Start()
defer sched.Stop()
import (
"context"
+ "crypto/tls"
"encoding/json"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
+ "net/url"
"os"
"sync"
"time"
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
- ctx context.Context
- cancel context.CancelFunc
- cluster *arvados.Cluster
- stubDriver *test.StubDriver
- disp *dispatcher
+ ctx context.Context
+ cancel context.CancelFunc
+ cluster *arvados.Cluster
+ stubDriver *test.StubDriver
+ disp *dispatcher
+ error503Server *httptest.Server
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
arvClient, err := arvados.NewClientFromConfig(s.cluster)
c.Check(err, check.IsNil)
+ s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) }))
+ arvClient.Client = &http.Client{
+ Transport: &http.Transport{
+ Proxy: s.arvClientProxy(c),
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true}}}
+
s.disp = &dispatcher{
Cluster: s.cluster,
Context: s.ctx,
func (s *DispatcherSuite) TearDownTest(c *check.C) {
s.cancel()
s.disp.Close()
+ s.error503Server.Close()
+}
+
+// Intercept outgoing API requests for "/503" and respond HTTP
+// 503. This lets us force (*arvados.Client)Last503() to return
+// something.
+func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
+ return func(req *http.Request) (*url.URL, error) {
+ if req.URL.Path == "/503" {
+ return url.Parse(s.error503Server.URL)
+ } else {
+ return nil, nil
+ }
+ }
}
// DispatchToStubDriver checks that the dispatcher wires everything
return
}
delete(waiting, ctr.UUID)
+ if len(waiting) == 100 {
+ // trigger scheduler maxConcurrency limit
+ s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
+ }
if len(waiting) == 0 {
close(done)
}
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
- c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} 0.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} [0-9]+\n.*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="disappeared"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="failure"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="success"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="success"} [0-9e+.]*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_count{outcome="fail"} [0-9]*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="fail"} [0-9e+.]*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*last_503_time [1-9][0-9e+.]*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`)
}
func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
"github.com/sirupsen/logrus"
)
+var quietAfter503 = time.Minute
+
func (sch *Scheduler) runQueue() {
unsorted, _ := sch.queue.Entries()
sorted := make([]container.QueueEnt, 0, len(unsorted))
running := sch.pool.Running()
unalloc := sch.pool.Unallocated()
+ if t := sch.client.Last503(); t.After(sch.last503time) {
+ // API has sent an HTTP 503 response since last time
+ // we checked. Use current #containers - 1 as
+ // maxConcurrency, i.e., try to stay just below the
+ // level where we see 503s.
+ sch.last503time = t
+ if newlimit := len(running) - 1; newlimit < 1 {
+ sch.maxConcurrency = 1
+ } else {
+ sch.maxConcurrency = newlimit
+ }
+ } else if sch.maxConcurrency > 0 && time.Since(sch.last503time) > quietAfter503 {
+ // If we haven't seen any 503 errors lately, raise
+ // limit to ~10% beyond the current workload.
+ //
+ // As we use the added 10% to schedule more
+ // containers, len(running) will increase and we'll
+ // push the limit up further. Soon enough,
+ // maxConcurrency will get high enough to schedule the
+ // entire queue, hit pool quota, or get 503s again.
+ max := len(running)*11/10 + 1
+ if sch.maxConcurrency < max {
+ sch.maxConcurrency = max
+ }
+ }
+ if sch.last503time.IsZero() {
+ sch.mLast503Time.Set(0)
+ } else {
+ sch.mLast503Time.Set(float64(sch.last503time.Unix()))
+ }
+ sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
+
sch.logger.WithFields(logrus.Fields{
- "Containers": len(sorted),
- "Processes": len(running),
+ "Containers": len(sorted),
+ "Processes": len(running),
+ "maxConcurrency": sch.maxConcurrency,
}).Debug("runQueue")
dontstart := map[arvados.InstanceType]bool{}
var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
var containerAllocatedWorkerBootingCount int
+ // trying is #containers running + #containers we're trying to
+ // start. We stop trying to start more containers if this
+ // reaches the dynamic maxConcurrency limit.
+ trying := len(running)
+
tryrun:
for i, ctr := range sorted {
ctr, it := ctr.Container, ctr.InstanceType
}
switch ctr.State {
case arvados.ContainerStateQueued:
+ if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
+ logger.Tracef("not locking: already at maxConcurrency %d", sch.maxConcurrency)
+ overquota = sorted[i:]
+ break tryrun
+ }
+ trying++
if unalloc[it] < 1 && sch.pool.AtQuota() {
- logger.Debug("not locking: AtQuota and no unalloc workers")
+ logger.Trace("not locking: AtQuota and no unalloc workers")
overquota = sorted[i:]
break tryrun
}
go sch.lockContainer(logger, ctr.UUID)
unalloc[it]--
case arvados.ContainerStateLocked:
+ if sch.maxConcurrency > 0 && trying >= sch.maxConcurrency {
+ logger.Debugf("not starting: already at maxConcurrency %d", sch.maxConcurrency)
+ overquota = sorted[i:]
+ break tryrun
+ }
+ trying++
if unalloc[it] > 0 {
unalloc[it]--
} else if sch.pool.AtQuota() {
} else if sch.pool.KillContainer(ctr.UUID, "about to start") {
logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
} else if sch.pool.StartContainer(it, ctr) {
+ logger.Trace("StartContainer => true")
// Success.
} else {
+ logger.Trace("StartContainer => false")
containerAllocatedWorkerBootingCount += 1
dontstart[it] = true
}
if len(overquota) > 0 {
// Unlock any containers that are unmappable while
- // we're at quota.
+ // we're at quota (but if they have already been
+ // scheduled and they're loading docker images etc.,
+ // let them run).
for _, ctr := range overquota {
ctr := ctr.Container
- if ctr.State == arvados.ContainerStateLocked {
+ _, toolate := running[ctr.UUID]
+ if ctr.State == arvados.ContainerStateLocked && !toolate {
logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
logger.Debug("unlock because pool capacity is used by higher priority containers")
err := sch.queue.Unlock(ctr.UUID)
func (p *stubPool) AtQuota() bool {
p.Lock()
defer p.Unlock()
- return len(p.unalloc)+len(p.running)+len(p.unknown) >= p.quota
+ n := len(p.running)
+ for _, nn := range p.unalloc {
+ n += nn
+ }
+ for _, nn := range p.unknown {
+ n += nn
+ }
+ return n >= p.quota
}
func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
func (p *stubPool) Unsubscribe(<-chan struct{}) {}
running: map[string]time.Time{},
canCreate: 0,
}
- New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
// call Create().
func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
- for quota := 1; quota < 3; quota++ {
+ for quota := 1; quota <= 3; quota++ {
c.Logf("quota=%d", quota)
- shouldCreate := []arvados.InstanceType{}
- for i := 1; i < quota; i++ {
- shouldCreate = append(shouldCreate, test.InstanceType(3))
- }
queue := test.Queue{
ChooseType: chooseType,
Containers: []arvados.Container{
starts: []string{},
canCreate: 0,
}
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
- sch.runQueue()
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.sync()
sch.runQueue()
sch.sync()
- c.Check(pool.creates, check.DeepEquals, shouldCreate)
- if len(shouldCreate) == 0 {
- c.Check(pool.starts, check.DeepEquals, []string{})
- } else {
+ switch quota {
+ case 1, 2:
+ // Can't create a type3 node for ctr3, so we
+ // shutdown an unallocated node (type2), and
+ // unlock both containers.
+ c.Check(pool.starts, check.HasLen, 0)
+ c.Check(pool.shutdowns, check.Equals, 1)
+ c.Check(pool.creates, check.HasLen, 0)
+ c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
+ {UUID: test.ContainerUUID(3), From: "Locked", To: "Queued"},
+ {UUID: test.ContainerUUID(2), From: "Locked", To: "Queued"},
+ })
+ case 3:
+ // Creating a type3 instance works, so we
+ // start ctr2 on a type2 instance, and leave
+ // ctr3 locked while we wait for the new
+ // instance to come up.
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
+ c.Check(pool.shutdowns, check.Equals, 0)
+ c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(3)})
+ c.Check(queue.StateChanges(), check.HasLen, 0)
+ default:
+ panic("test not written for quota>3")
}
- c.Check(pool.shutdowns, check.Equals, 3-quota)
- c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
- {UUID: "zzzzz-dz642-000000000000003", From: "Locked", To: "Queued"},
- {UUID: "zzzzz-dz642-000000000000002", From: "Locked", To: "Queued"},
- })
}
}
pool := stubPool{
quota: 2,
unalloc: map[arvados.InstanceType]int{
- test.InstanceType(3): 1,
+ test.InstanceType(3): 2,
},
idle: map[arvados.InstanceType]int{
- test.InstanceType(3): 1,
+ test.InstanceType(3): 2,
},
running: map[string]time.Time{},
creates: []arvados.InstanceType{},
starts: []string{},
- canCreate: 1,
+ canCreate: 0,
}
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
for i := 0; i < 30; i++ {
sch.runQueue()
sch.sync()
time.Sleep(time.Millisecond)
}
c.Check(pool.shutdowns, check.Equals, 0)
- c.Check(pool.starts, check.HasLen, 1)
+ c.Check(pool.starts, check.HasLen, 2)
unlocked := map[string]int{}
for _, chg := range queue.StateChanges() {
if chg.To == arvados.ContainerStateQueued {
},
}
queue.Update()
- New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
},
}
queue.Update()
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
c.Check(pool.running, check.HasLen, 1)
sch.sync()
for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
pool := stubPool{
unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
}
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
sch.updateMetrics()
// 'over quota' metric will be 1 because no workers are available and canCreate defaults
// to zero.
pool = stubPool{}
- sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
sch.updateMetrics()
unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
running: map[string]time.Time{},
}
- sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
sch.updateMetrics()
// shuts down idle workers, in case they are consuming quota.
type Scheduler struct {
logger logrus.FieldLogger
+ client *arvados.Client
queue ContainerQueue
pool WorkerPool
reg *prometheus.Registry
stop chan struct{}
stopped chan struct{}
+ last503time time.Time // last time API responded 503
+ maxConcurrency int // dynamic container limit (0 = unlimited), see runQueue()
+
mContainersAllocatedNotStarted prometheus.Gauge
mContainersNotAllocatedOverQuota prometheus.Gauge
mLongestWaitTimeSinceQueue prometheus.Gauge
+ mLast503Time prometheus.Gauge
+ mMaxContainerConcurrency prometheus.Gauge
}
// New returns a new unstarted Scheduler.
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, client *arvados.Client, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
+ client: client,
queue: queue,
pool: pool,
reg: reg,
Help: "Current longest wait time of any container since queuing, and before the start of crunch-run.",
})
reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
+ sch.mLast503Time = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "last_503_time",
+ Help: "Time of most recent 503 error received from API.",
+ })
+ reg.MustRegister(sch.mLast503Time)
+ sch.mMaxContainerConcurrency = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "max_concurrent_containers",
+ Help: "Dynamically assigned limit on number of containers scheduled concurrency, set after receiving 503 errors from API.",
+ })
+ reg.MustRegister(sch.mMaxContainerConcurrency)
}
func (sch *Scheduler) updateMetrics() {
ents, _ := queue.Entries()
c.Check(ents, check.HasLen, 1)
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.sync()
ents, _ = queue.Entries()
ents, _ := queue.Entries()
c.Check(ents, check.HasLen, 1)
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
// Sync shouldn't cancel the container because it might be
// running on the VM with state=="unknown".
func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- wkr := wp.workers[inst.ID()]
+ wkr, ok := wp.workers[inst.ID()]
+ if !ok {
+ // race: inst was removed from the pool
+ return
+ }
if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
- // the node is not in booting state (can happen if a-d-c is restarted) OR
- // this is not the first SSH connection
+ // the node is not in booting state (can happen if
+ // a-d-c is restarted) OR this is not the first SSH
+ // connection
return
}
"os"
"regexp"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/httpserver"
// APIHost and AuthToken were loaded from ARVADOS_* env vars
// (used to customize "no host/token" error messages)
loadedFromEnv bool
+
+ // Track/limit concurrent outgoing API calls. Note this
+ // differs from an outgoing connection limit (a feature
+ // provided by http.Transport) when concurrent calls are
+ // multiplexed on a single http2 connection.
+ requestLimiter requestLimiter
+
+ last503 atomic.Value
}
// InsecureHTTPClient is the default http.Client used by a Client with
var reqIDGen = httpserver.IDGenerator{Prefix: "req-"}
-// Do adds Authorization and X-Request-Id headers and then calls
+// Do adds Authorization and X-Request-Id headers, delays in order to
+// comply with rate-limiting restrictions, and then calls
// (*http.Client)Do().
func (c *Client) Do(req *http.Request) (*http.Response, error) {
- if auth, _ := req.Context().Value(contextKeyAuthorization{}).(string); auth != "" {
+ ctx := req.Context()
+ if auth, _ := ctx.Value(contextKeyAuthorization{}).(string); auth != "" {
req.Header.Add("Authorization", auth)
} else if c.AuthToken != "" {
req.Header.Add("Authorization", "OAuth2 "+c.AuthToken)
if req.Header.Get("X-Request-Id") == "" {
var reqid string
- if ctxreqid, _ := req.Context().Value(contextKeyRequestID{}).(string); ctxreqid != "" {
+ if ctxreqid, _ := ctx.Value(contextKeyRequestID{}).(string); ctxreqid != "" {
reqid = ctxreqid
} else if c.defaultRequestID != "" {
reqid = c.defaultRequestID
}
var cancel context.CancelFunc
if c.Timeout > 0 {
- ctx := req.Context()
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(c.Timeout))
req = req.WithContext(ctx)
+ } else {
+ cancel = context.CancelFunc(func() {})
+ }
+
+ c.requestLimiter.Acquire(ctx)
+ if ctx.Err() != nil {
+ c.requestLimiter.Release()
+ return nil, ctx.Err()
+ }
+
+ // Attach Release() to cancel func, see cancelOnClose below.
+ cancelOrig := cancel
+ cancel = func() {
+ c.requestLimiter.Release()
+ cancelOrig()
}
+
resp, err := c.httpClient().Do(req)
- if err == nil && cancel != nil {
+ if c.requestLimiter.Report(resp, err) {
+ c.last503.Store(time.Now())
+ }
+ if err == nil {
// We need to call cancel() eventually, but we can't
// use "defer cancel()" because the context has to
// stay alive until the caller has finished reading
// the response body.
resp.Body = cancelOnClose{ReadCloser: resp.Body, cancel: cancel}
- } else if cancel != nil {
+ } else {
cancel()
}
return resp, err
}
+// Last503 returns the time of the most recent HTTP 503 (Service
+// Unavailable) response. Zero time indicates never.
+func (c *Client) Last503() time.Time {
+ t, _ := c.last503.Load().(time.Time)
+ return t
+}
+
// cancelOnClose calls a provided CancelFunc when its wrapped
// ReadCloser's Close() method is called.
type cancelOnClose struct {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "context"
+ "errors"
+ "net/http"
+ "net/url"
+ "sync"
+ "time"
+)
+
+var requestLimiterQuietPeriod = time.Second
+
+type requestLimiter struct {
+ current int64
+ limit int64
+ lock sync.Mutex
+ cond *sync.Cond
+ quietUntil time.Time
+}
+
+// Acquire reserves one request slot, waiting if necessary.
+//
+// Acquire returns early if ctx cancels before a slot is available. It
+// is assumed in this case the caller will immediately notice
+// ctx.Err() != nil and call Release().
+func (rl *requestLimiter) Acquire(ctx context.Context) {
+ rl.lock.Lock()
+ if rl.cond == nil {
+ // First use of requestLimiter. Initialize.
+ rl.cond = sync.NewCond(&rl.lock)
+ }
+ // Wait out the quiet period(s) immediately following a 503.
+ for ctx.Err() == nil {
+ delay := rl.quietUntil.Sub(time.Now())
+ if delay < 0 {
+ break
+ }
+ // Wait for the end of the quiet period, which started
+ // when we last received a 503 response.
+ rl.lock.Unlock()
+ timer := time.NewTimer(delay)
+ select {
+ case <-timer.C:
+ case <-ctx.Done():
+ timer.Stop()
+ }
+ rl.lock.Lock()
+ }
+ ready := make(chan struct{})
+ go func() {
+ // close ready when a slot is available _or_ we wake
+ // up and find ctx has been canceled (meaning Acquire
+ // has already returned, or is about to).
+ for rl.limit > 0 && rl.limit <= rl.current && ctx.Err() == nil {
+ rl.cond.Wait()
+ }
+ close(ready)
+ }()
+ select {
+ case <-ready:
+ // Wait() returned, so we have the lock.
+ rl.current++
+ rl.lock.Unlock()
+ case <-ctx.Done():
+ // When Wait() returns the lock to our goroutine
+ // (which might have already happened) we need to
+ // release it (if we don't do this now, the following
+ // Lock() can deadlock).
+ go func() {
+ <-ready
+ rl.lock.Unlock()
+ }()
+ // Note we may have current > limit until the caller
+ // calls Release().
+ rl.lock.Lock()
+ rl.current++
+ rl.lock.Unlock()
+ }
+}
+
+// Release releases a slot that has been reserved with Acquire.
+func (rl *requestLimiter) Release() {
+ rl.lock.Lock()
+ rl.current--
+ rl.lock.Unlock()
+ rl.cond.Signal()
+}
+
+// Report uses the return values from (*http.Client)Do() to adjust the
+// outgoing request limit (increase on success, decrease on 503).
+//
+// Return value is true if the response was a 503.
+func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
+ rl.lock.Lock()
+ defer rl.lock.Unlock()
+ is503 := false
+ if err != nil {
+ uerr := &url.Error{}
+ if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
+ // This is how http.Client reports 503 from proxy server
+ is503 = true
+ } else {
+ return false
+ }
+ } else {
+ is503 = resp.StatusCode == http.StatusServiceUnavailable
+ }
+ if is503 {
+ if rl.limit == 0 {
+ // Concurrency was unlimited until now.
+ // Calculate new limit based on actual
+ // concurrency instead of previous limit.
+ rl.limit = rl.current
+ }
+ if time.Now().After(rl.quietUntil) {
+ // Reduce concurrency limit by half.
+ rl.limit = (rl.limit + 1) / 2
+ // Don't start any new calls (or reduce the
+ // limit even further on additional 503s) for
+ // a second.
+ rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
+ }
+ return true
+ }
+ if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
+ // After each non-server-error response, increase
+ // concurrency limit by at least 10% -- but not beyond
+ // 2x the highest concurrency level we've seen without
+ // a failure.
+ increase := rl.limit / 10
+ if increase < 1 {
+ increase = 1
+ }
+ rl.limit += increase
+ if max := rl.current * 2; max > rl.limit {
+ rl.limit = max
+ }
+ rl.cond.Broadcast()
+ }
+ return false
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "context"
+ "errors"
+ "net/http"
+ "sync"
+ "time"
+
+ . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&limiterSuite{})
+
+type limiterSuite struct{}
+
+func (*limiterSuite) TestUnlimitedBeforeFirstReport(c *C) {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ rl := requestLimiter{}
+
+ var wg sync.WaitGroup
+ wg.Add(1000)
+ for i := 0; i < 1000; i++ {
+ go func() {
+ rl.Acquire(ctx)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ c.Check(rl.current, Equals, int64(1000))
+ wg.Add(1000)
+ for i := 0; i < 1000; i++ {
+ go func() {
+ rl.Release()
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ c.Check(rl.current, Equals, int64(0))
+}
+
+func (*limiterSuite) TestCancelWhileWaitingForAcquire(c *C) {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ rl := requestLimiter{}
+
+ rl.limit = 1
+ rl.Acquire(ctx)
+ ctxShort, cancel := context.WithDeadline(ctx, time.Now().Add(time.Millisecond))
+ defer cancel()
+ rl.Acquire(ctxShort)
+ c.Check(rl.current, Equals, int64(2))
+ c.Check(ctxShort.Err(), NotNil)
+ rl.Release()
+ rl.Release()
+ c.Check(rl.current, Equals, int64(0))
+}
+
+func (*limiterSuite) TestReducedLimitAndQuietPeriod(c *C) {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ rl := requestLimiter{}
+
+ // Use a short quiet period to make tests faster
+ defer func(orig time.Duration) { requestLimiterQuietPeriod = orig }(requestLimiterQuietPeriod)
+ requestLimiterQuietPeriod = time.Second / 10
+
+ for i := 0; i < 5; i++ {
+ rl.Acquire(ctx)
+ }
+ rl.Report(&http.Response{StatusCode: http.StatusServiceUnavailable}, nil)
+ c.Check(rl.limit, Equals, int64(3))
+ for i := 0; i < 5; i++ {
+ rl.Release()
+ }
+
+ // Even with all slots released, we can't Acquire in the quiet
+ // period.
+
+ // (a) If our context expires before the end of the quiet
+ // period, we get back DeadlineExceeded -- without waiting for
+ // the end of the quiet period.
+ acquire := time.Now()
+ ctxShort, cancel := context.WithDeadline(ctx, time.Now().Add(requestLimiterQuietPeriod/10))
+ defer cancel()
+ rl.Acquire(ctxShort)
+ c.Check(ctxShort.Err(), Equals, context.DeadlineExceeded)
+ c.Check(time.Since(acquire) < requestLimiterQuietPeriod/2, Equals, true)
+ c.Check(rl.quietUntil.Sub(time.Now()) > requestLimiterQuietPeriod/2, Equals, true)
+ rl.Release()
+
+ // (b) If our context does not expire first, Acquire waits for
+ // the end of the quiet period.
+ ctxLong, cancel := context.WithDeadline(ctx, time.Now().Add(requestLimiterQuietPeriod*2))
+ defer cancel()
+ acquire = time.Now()
+ rl.Acquire(ctxLong)
+ c.Check(time.Since(acquire) > requestLimiterQuietPeriod/10, Equals, true)
+ c.Check(time.Since(acquire) < requestLimiterQuietPeriod, Equals, true)
+ c.Check(ctxLong.Err(), IsNil)
+ rl.Release()
+
+ // OK to call Report() with nil Response and non-nil error.
+ rl.Report(nil, errors.New("network error"))
+}