Merge branch '21211-pysdk-annotations'
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 3abcba6c7365766cf0c7d38315d47dc1292a03e1..13c369d0c65113015cb4297375a0d11d815d9ad9 100644 (file)
@@ -82,6 +82,9 @@ const (
        // instances have been shutdown.
        quotaErrorTTL = time.Minute
 
+       // Time after a capacity error to try again
+       capacityErrorTTL = time.Minute
+
        // Time between "X failed because rate limiting" messages
        logRateLimitErrorInterval = time.Second * 10
 )
@@ -106,6 +109,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                newExecutor:                    newExecutor,
                cluster:                        cluster,
                bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
+               instanceInitCommand:            cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
                runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
                instanceTypes:                  cluster.InstanceTypes,
@@ -149,6 +153,7 @@ type Pool struct {
        newExecutor                    func(cloud.Instance) Executor
        cluster                        *arvados.Cluster
        bootProbeCommand               string
+       instanceInitCommand            cloud.InitCommand
        runnerSource                   string
        imageID                        cloud.ImageID
        instanceTypes                  map[string]arvados.InstanceType
@@ -171,19 +176,21 @@ type Pool struct {
        runnerArgs                     []string // extra args passed to crunch-run
 
        // private state
-       subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
-       workers      map[cloud.InstanceID]*worker
-       loaded       bool                 // loaded list of instances from InstanceSet at least once
-       exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
-       atQuotaUntil time.Time
-       atQuotaErr   cloud.QuotaError
-       stop         chan bool
-       mtx          sync.RWMutex
-       setupOnce    sync.Once
-       runnerData   []byte
-       runnerMD5    [md5.Size]byte
-       runnerCmd    string
+       subscribers                map[<-chan struct{}]chan<- struct{}
+       creating                   map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
+       workers                    map[cloud.InstanceID]*worker
+       loaded                     bool                 // loaded list of instances from InstanceSet at least once
+       exited                     map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
+       atQuotaUntilFewerInstances int
+       atQuotaUntil               time.Time
+       atQuotaErr                 cloud.QuotaError
+       atCapacityUntil            map[string]time.Time
+       stop                       chan bool
+       mtx                        sync.RWMutex
+       setupOnce                  sync.Once
+       runnerData                 []byte
+       runnerMD5                  [md5.Size]byte
+       runnerCmd                  string
 
        mContainersRunning        prometheus.Gauge
        mInstances                *prometheus.GaugeVec
@@ -197,6 +204,8 @@ type Pool struct {
        mTimeFromShutdownToGone   prometheus.Summary
        mTimeFromQueueToCrunchRun prometheus.Summary
        mRunProbeDuration         *prometheus.SummaryVec
+       mProbeAgeMax              prometheus.Gauge
+       mProbeAgeMedian           prometheus.Gauge
 }
 
 type createCall struct {
@@ -315,13 +324,11 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                // Boot probe is certain to fail.
                return false
        }
-       wp.mtx.Lock()
-       defer wp.mtx.Unlock()
-       if time.Now().Before(wp.atQuotaUntil) ||
-               wp.instanceSet.throttleCreate.Error() != nil ||
-               (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
+       if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
                return false
        }
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
        // The maxConcurrentInstanceCreateOps knob throttles the number of node create
        // requests in flight. It was added to work around a limitation in Azure's
        // managed disks, which support no more than 20 concurrent node creation
@@ -345,7 +352,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
                        wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
                }
-               initCmd := TagVerifier{nil, secret, nil}.InitCommand()
+               initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
@@ -356,8 +363,37 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                if err != nil {
                        if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
                                wp.atQuotaErr = err
-                               wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
-                               time.AfterFunc(quotaErrorTTL, wp.notify)
+                               n := len(wp.workers) + len(wp.creating) - 1
+                               if n < 1 {
+                                       // Quota error with no
+                                       // instances running --
+                                       // nothing to do but wait
+                                       wp.atQuotaUntilFewerInstances = 0
+                                       wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                                       time.AfterFunc(quotaErrorTTL, wp.notify)
+                                       logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
+                               } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
+                                       // Quota error with N
+                                       // instances running -- report
+                                       // AtQuota until some
+                                       // instances shut down
+                                       wp.atQuotaUntilFewerInstances = n
+                                       wp.atQuotaUntil = time.Time{}
+                                       logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
+                               }
+                       }
+                       if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
+                               capKey := it.ProviderType
+                               if !err.IsInstanceTypeSpecific() {
+                                       // set capacity flag for all
+                                       // instance types
+                                       capKey = ""
+                               }
+                               if wp.atCapacityUntil == nil {
+                                       wp.atCapacityUntil = map[string]time.Time{}
+                               }
+                               wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
+                               time.AfterFunc(capacityErrorTTL, wp.notify)
                        }
                        logger.WithError(err).Error("create failed")
                        wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
@@ -371,13 +407,31 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        return true
 }
 
+// AtCapacity returns true if Create() is currently expected to fail
+// for the given instance type.
+func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
+               // at capacity for this instance type
+               return true
+       }
+       if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
+               // at capacity for all instance types
+               return true
+       }
+       return false
+}
+
 // AtQuota returns true if Create is not expected to work at the
 // moment (e.g., cloud provider has reported quota errors, or we are
 // already at our own configured quota).
 func (wp *Pool) AtQuota() bool {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
+       return wp.atQuotaUntilFewerInstances > 0 ||
+               time.Now().Before(wp.atQuotaUntil) ||
+               (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
 }
 
 // SetIdleBehavior determines how the indicated instance will behave
@@ -397,10 +451,15 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       wkr := wp.workers[inst.ID()]
+       wkr, ok := wp.workers[inst.ID()]
+       if !ok {
+               // race: inst was removed from the pool
+               return
+       }
        if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
-               // the node is not in booting state (can happen if a-d-c is restarted) OR
-               // this is not the first SSH connection
+               // the node is not in booting state (can happen if
+               // a-d-c is restarted) OR this is not the first SSH
+               // connection
                return
        }
 
@@ -621,6 +680,20 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
+       wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "probe_age_seconds_max",
+               Help:      "Maximum number of seconds since an instance's most recent successful probe.",
+       })
+       reg.MustRegister(wp.mProbeAgeMax)
+       wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "probe_age_seconds_median",
+               Help:      "Median number of seconds since an instance's most recent successful probe.",
+       })
+       reg.MustRegister(wp.mProbeAgeMedian)
        wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -733,6 +806,8 @@ func (wp *Pool) updateMetrics() {
        cpu := map[string]int64{}
        mem := map[string]int64{}
        var running int64
+       now := time.Now()
+       var probed []time.Time
        for _, wkr := range wp.workers {
                var cat string
                switch {
@@ -752,6 +827,7 @@ func (wp *Pool) updateMetrics() {
                cpu[cat] += int64(wkr.instType.VCPUs)
                mem[cat] += int64(wkr.instType.RAM)
                running += int64(len(wkr.running) + len(wkr.starting))
+               probed = append(probed, wkr.probed)
        }
        for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
                wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
@@ -768,6 +844,15 @@ func (wp *Pool) updateMetrics() {
                wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
        }
        wp.mContainersRunning.Set(float64(running))
+
+       if len(probed) == 0 {
+               wp.mProbeAgeMax.Set(0)
+               wp.mProbeAgeMedian.Set(0)
+       } else {
+               sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
+               wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
+               wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
+       }
 }
 
 func (wp *Pool) runProbes() {
@@ -873,6 +958,9 @@ func (wp *Pool) Instances() []InstanceView {
 // KillInstance destroys a cloud VM instance. It returns an error if
 // the given instance does not exist.
 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
        wkr, ok := wp.workers[id]
        if !ok {
                return errors.New("instance not found")
@@ -994,6 +1082,14 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                notify = true
        }
 
+       if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
+               // After syncing, there are fewer instances (including
+               // pending creates) than there were last time we saw a
+               // quota error.  This might mean it's now possible to
+               // create new instances.  Reset our "at quota" state.
+               wp.atQuotaUntilFewerInstances = 0
+       }
+
        if !wp.loaded {
                notify = true
                wp.loaded = true