14325: Document Running() return value.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index fc3301d8636593cfd79aee008061c9cfafab577a..1665a1e43def02e14c7cf915c7f5410131f6c4c8 100644 (file)
@@ -5,6 +5,7 @@
 package worker
 
 import (
+       "errors"
        "io"
        "sort"
        "strings"
@@ -19,24 +20,25 @@ import (
 
 const (
        tagKeyInstanceType = "InstanceType"
-       tagKeyHold         = "Hold"
+       tagKeyIdleBehavior = "IdleBehavior"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
-       Instance             string
-       Price                float64
-       ArvadosInstanceType  string
-       ProviderInstanceType string
-       LastContainerUUID    string
-       LastBusy             time.Time
-       WorkerState          string
+       Instance             cloud.InstanceID `json:"instance"`
+       Price                float64          `json:"price"`
+       ArvadosInstanceType  string           `json:"arvados_instance_type"`
+       ProviderInstanceType string           `json:"provider_instance_type"`
+       LastContainerUUID    string           `json:"last_container_uuid"`
+       LastBusy             time.Time        `json:"last_busy"`
+       WorkerState          string           `json:"worker_state"`
+       IdleBehavior         IdleBehavior     `json:"idle_behavior"`
 }
 
 // An Executor executes shell commands on a remote host.
 type Executor interface {
        // Run cmd on the current target.
-       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+       Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
 
        // Use the given target for subsequent operations. The new
        // target is the same host as the previous target, but it
@@ -61,6 +63,13 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+
+       // Time after a quota error to try again anyway, even if no
+       // instances have been shutdown.
+       quotaErrorTTL = time.Minute
+
+       // Time between "X failed because rate limiting" messages
+       logRateLimitErrorInterval = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -75,10 +84,11 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
-               instanceSet:        instanceSet,
+               arvClient:          arvClient,
+               instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
                bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
                imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
@@ -107,7 +117,8 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
 type Pool struct {
        // configuration
        logger             logrus.FieldLogger
-       instanceSet        cloud.InstanceSet
+       arvClient          *arvados.Client
+       instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
        imageID            cloud.ImageID
@@ -132,7 +143,11 @@ type Pool struct {
        mtx          sync.RWMutex
        setupOnce    sync.Once
 
+       throttleCreate    throttle
+       throttleInstances throttle
+
        mInstances         prometheus.Gauge
+       mInstancesPrice    prometheus.Gauge
        mContainersRunning prometheus.Gauge
        mVCPUs             prometheus.Gauge
        mVCPUsInuse        prometheus.Gauge
@@ -140,15 +155,21 @@ type Pool struct {
        mMemoryInuse       prometheus.Gauge
 }
 
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //     ch := wp.Subscribe()
 //     defer wp.Unsubscribe(ch)
 //     for range ch {
-//             // ...try scheduling some work...
+//             tryScheduling(wp)
 //             if done {
 //                     break
 //             }
@@ -171,7 +192,8 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
 }
 
 // Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
+// idle + unknown) workers for each instance type.  Workers in
+// hold/drain mode are not included.
 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.RLock()
@@ -182,7 +204,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
                creating[it] = len(times)
        }
        for _, wkr := range wp.workers {
-               if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+               if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
                        continue
                }
                it := wkr.instType
@@ -212,15 +234,23 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if time.Now().Before(wp.atQuotaUntil) {
-               return wp.atQuotaErr
+       if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+               return false
+       }
+       tags := cloud.InstanceTags{
+               tagKeyInstanceType: it.Name,
+               tagKeyIdleBehavior: string(IdleBehaviorRun),
        }
-       tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
        now := time.Now()
        wp.creating[it] = append(wp.creating[it], now)
        go func() {
@@ -236,17 +266,19 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
                                break
                        }
                }
-               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
-                       wp.atQuotaErr = err
-                       wp.atQuotaUntil = time.Now().Add(time.Minute)
-               }
                if err != nil {
+                       if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                               wp.atQuotaErr = err
+                               wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                               time.AfterFunc(quotaErrorTTL, wp.notify)
+                       }
                        logger.WithError(err).Error("create failed")
+                       wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
                wp.updateWorker(inst, it, StateBooting)
        }()
-       return nil
+       return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
@@ -257,6 +289,21 @@ func (wp *Pool) AtQuota() bool {
        return time.Now().Before(wp.atQuotaUntil)
 }
 
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("requested instance does not exist")
+       }
+       wkr.idleBehavior = idleBehavior
+       wkr.saveTags()
+       wkr.shutdownIfIdle()
+       return nil
+}
+
 // Add or update worker attached to the given instance. Use
 // initialState if a new worker is created.
 //
@@ -272,32 +319,46 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                if initialState == StateBooting && wkr.state == StateUnknown {
                        wkr.state = StateBooting
                }
+               wkr.saveTags()
                return wkr, false
        }
-       if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
-               initialState = StateHold
+
+       // If an instance has a valid IdleBehavior tag when it first
+       // appears, initialize the new worker accordingly (this is how
+       // we restore IdleBehavior that was set by a prior dispatch
+       // process); otherwise, default to "run". After this,
+       // wkr.idleBehavior is the source of truth, and will only be
+       // changed via SetIdleBehavior().
+       idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+       if !validIdleBehavior[idleBehavior] {
+               idleBehavior = IdleBehaviorRun
        }
+
        logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
                "Instance":     inst,
        })
-       logger.WithField("State", initialState).Infof("instance appeared in cloud")
+       logger.WithFields(logrus.Fields{
+               "State":        initialState,
+               "IdleBehavior": idleBehavior,
+       }).Infof("instance appeared in cloud")
        now := time.Now()
        wkr := &worker{
-               mtx:      &wp.mtx,
-               wp:       wp,
-               logger:   logger,
-               executor: wp.newExecutor(inst),
-               state:    initialState,
-               instance: inst,
-               instType: it,
-               appeared: now,
-               probed:   now,
-               busy:     now,
-               updated:  now,
-               running:  make(map[string]struct{}),
-               starting: make(map[string]struct{}),
-               probing:  make(chan struct{}, 1),
+               mtx:          &wp.mtx,
+               wp:           wp,
+               logger:       logger,
+               executor:     wp.newExecutor(inst),
+               state:        initialState,
+               idleBehavior: idleBehavior,
+               instance:     inst,
+               instType:     it,
+               appeared:     now,
+               probed:       now,
+               busy:         now,
+               updated:      now,
+               running:      make(map[string]struct{}),
+               starting:     make(map[string]struct{}),
+               probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
@@ -320,7 +381,7 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
                // TODO: shutdown the worker with the longest idle
                // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
-                       if wkr.state == tryState && wkr.instType == it {
+                       if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
                                logger.WithField("Instance", wkr.instance).Info("shutting down")
                                wkr.shutdown()
                                return true
@@ -343,6 +404,12 @@ func (wp *Pool) CountWorkers() map[State]int {
 }
 
 // Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
@@ -411,7 +478,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
                "Instance":      wkr.instance,
        })
        logger.Debug("killing process")
-       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+       stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
        if err != nil {
                logger.WithFields(logrus.Fields{
                        "stderr": string(stderr),
@@ -444,6 +511,13 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
        })
        reg.MustRegister(wp.mInstances)
+       wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_price_total",
+               Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
+       })
+       reg.MustRegister(wp.mInstancesPrice)
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -494,8 +568,10 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
+       var price float64
        var alloc, cpu, cpuInuse, mem, memInuse int64
        for _, wkr := range wp.workers {
+               price += wkr.instType.Price
                cpu += int64(wkr.instType.VCPUs)
                mem += int64(wkr.instType.RAM)
                if len(wkr.running)+len(wkr.starting) == 0 {
@@ -506,6 +582,7 @@ func (wp *Pool) updateMetrics() {
                memInuse += int64(wkr.instType.RAM)
        }
        wp.mInstances.Set(float64(len(wp.workers)))
+       wp.mInstancesPrice.Set(price)
        wp.mContainersRunning.Set(float64(alloc))
        wp.mVCPUs.Set(float64(cpu))
        wp.mMemory.Set(float64(mem))
@@ -588,18 +665,19 @@ func (wp *Pool) Instances() []InstanceView {
        wp.mtx.Lock()
        for _, w := range wp.workers {
                r = append(r, InstanceView{
-                       Instance:             w.instance.String(),
+                       Instance:             w.instance.ID(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
                        LastContainerUUID:    w.lastUUID,
                        LastBusy:             w.busy,
                        WorkerState:          w.state.String(),
+                       IdleBehavior:         w.idleBehavior,
                })
        }
        wp.mtx.Unlock()
        sort.Slice(r, func(i, j int) bool {
-               return strings.Compare(r[i].Instance, r[j].Instance) < 0
+               return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
        })
        return r
 }
@@ -624,10 +702,14 @@ func (wp *Pool) notify() {
 
 func (wp *Pool) getInstancesAndSync() error {
        wp.setupOnce.Do(wp.setup)
+       if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+               return err
+       }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
        instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
        if err != nil {
+               wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
        }
        wp.sync(threshold, instances)