20984: Handle "instance type not available" condition better.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time after a capacity error to try again
86         capacityErrorTTL = time.Minute
87
88         // Time between "X failed because rate limiting" messages
89         logRateLimitErrorInterval = time.Second * 10
90 )
91
92 func duration(conf arvados.Duration, def time.Duration) time.Duration {
93         if conf > 0 {
94                 return time.Duration(conf)
95         }
96         return def
97 }
98
99 // NewPool creates a Pool of workers backed by instanceSet.
100 //
101 // New instances are configured and set up according to the given
102 // cluster configuration.
103 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
104         wp := &Pool{
105                 logger:                         logger,
106                 arvClient:                      arvClient,
107                 instanceSetID:                  instanceSetID,
108                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
109                 newExecutor:                    newExecutor,
110                 cluster:                        cluster,
111                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
112                 instanceInitCommand:            cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
113                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
114                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
115                 instanceTypes:                  cluster.InstanceTypes,
116                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
117                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
118                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
119                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
120                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
121                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
122                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
123                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
124                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
125                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
126                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
127                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
128                 systemRootToken:                cluster.SystemRootToken,
129                 installPublicKey:               installPublicKey,
130                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
131                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
132                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
133                 stop:                           make(chan bool),
134         }
135         wp.registerMetrics(reg)
136         go func() {
137                 wp.setupOnce.Do(wp.setup)
138                 go wp.runMetrics()
139                 go wp.runProbes()
140                 go wp.runSync()
141         }()
142         return wp
143 }
144
145 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
146 // zero Pool should not be used. Call NewPool to create a new Pool.
147 type Pool struct {
148         // configuration
149         logger                         logrus.FieldLogger
150         arvClient                      *arvados.Client
151         instanceSetID                  cloud.InstanceSetID
152         instanceSet                    *throttledInstanceSet
153         newExecutor                    func(cloud.Instance) Executor
154         cluster                        *arvados.Cluster
155         bootProbeCommand               string
156         instanceInitCommand            cloud.InitCommand
157         runnerSource                   string
158         imageID                        cloud.ImageID
159         instanceTypes                  map[string]arvados.InstanceType
160         syncInterval                   time.Duration
161         probeInterval                  time.Duration
162         maxProbesPerSecond             int
163         maxConcurrentInstanceCreateOps int
164         maxInstances                   int
165         timeoutIdle                    time.Duration
166         timeoutBooting                 time.Duration
167         timeoutProbe                   time.Duration
168         timeoutShutdown                time.Duration
169         timeoutTERM                    time.Duration
170         timeoutSignal                  time.Duration
171         timeoutStaleRunLock            time.Duration
172         systemRootToken                string
173         installPublicKey               ssh.PublicKey
174         tagKeyPrefix                   string
175         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
176         runnerArgs                     []string // extra args passed to crunch-run
177
178         // private state
179         subscribers                map[<-chan struct{}]chan<- struct{}
180         creating                   map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
181         workers                    map[cloud.InstanceID]*worker
182         loaded                     bool                 // loaded list of instances from InstanceSet at least once
183         exited                     map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
184         atQuotaUntilFewerInstances int
185         atQuotaUntil               time.Time
186         atQuotaErr                 cloud.QuotaError
187         atCapacityUntil            map[string]time.Time
188         stop                       chan bool
189         mtx                        sync.RWMutex
190         setupOnce                  sync.Once
191         runnerData                 []byte
192         runnerMD5                  [md5.Size]byte
193         runnerCmd                  string
194
195         mContainersRunning        prometheus.Gauge
196         mInstances                *prometheus.GaugeVec
197         mInstancesPrice           *prometheus.GaugeVec
198         mVCPUs                    *prometheus.GaugeVec
199         mMemory                   *prometheus.GaugeVec
200         mBootOutcomes             *prometheus.CounterVec
201         mDisappearances           *prometheus.CounterVec
202         mTimeToSSH                prometheus.Summary
203         mTimeToReadyForContainer  prometheus.Summary
204         mTimeFromShutdownToGone   prometheus.Summary
205         mTimeFromQueueToCrunchRun prometheus.Summary
206         mRunProbeDuration         *prometheus.SummaryVec
207         mProbeAgeMax              prometheus.Gauge
208         mProbeAgeMedian           prometheus.Gauge
209 }
210
211 type createCall struct {
212         time         time.Time
213         instanceType arvados.InstanceType
214 }
215
216 func (wp *Pool) CheckHealth() error {
217         wp.setupOnce.Do(wp.setup)
218         if err := wp.loadRunnerData(); err != nil {
219                 return fmt.Errorf("error loading runner binary: %s", err)
220         }
221         return nil
222 }
223
224 // Subscribe returns a buffered channel that becomes ready after any
225 // change to the pool's state that could have scheduling implications:
226 // a worker's state changes, a new worker appears, the cloud
227 // provider's API rate limiting period ends, etc.
228 //
229 // Additional events that occur while the channel is already ready
230 // will be dropped, so it is OK if the caller services the channel
231 // slowly.
232 //
233 // Example:
234 //
235 //      ch := wp.Subscribe()
236 //      defer wp.Unsubscribe(ch)
237 //      for range ch {
238 //              tryScheduling(wp)
239 //              if done {
240 //                      break
241 //              }
242 //      }
243 func (wp *Pool) Subscribe() <-chan struct{} {
244         wp.setupOnce.Do(wp.setup)
245         wp.mtx.Lock()
246         defer wp.mtx.Unlock()
247         ch := make(chan struct{}, 1)
248         wp.subscribers[ch] = ch
249         return ch
250 }
251
252 // Unsubscribe stops sending updates to the given channel.
253 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
254         wp.setupOnce.Do(wp.setup)
255         wp.mtx.Lock()
256         defer wp.mtx.Unlock()
257         delete(wp.subscribers, ch)
258 }
259
260 // Unallocated returns the number of unallocated (creating + booting +
261 // idle + unknown) workers for each instance type.  Workers in
262 // hold/drain mode are not included.
263 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
264         wp.setupOnce.Do(wp.setup)
265         wp.mtx.RLock()
266         defer wp.mtx.RUnlock()
267         unalloc := map[arvados.InstanceType]int{}
268         creating := map[arvados.InstanceType]int{}
269         oldestCreate := map[arvados.InstanceType]time.Time{}
270         for _, cc := range wp.creating {
271                 it := cc.instanceType
272                 creating[it]++
273                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
274                         oldestCreate[it] = cc.time
275                 }
276         }
277         for _, wkr := range wp.workers {
278                 // Skip workers that are not expected to become
279                 // available soon. Note len(wkr.running)>0 is not
280                 // redundant here: it can be true even in
281                 // StateUnknown.
282                 if wkr.state == StateShutdown ||
283                         wkr.state == StateRunning ||
284                         wkr.idleBehavior != IdleBehaviorRun ||
285                         len(wkr.running) > 0 {
286                         continue
287                 }
288                 it := wkr.instType
289                 unalloc[it]++
290                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
291                         // If up to N new workers appear in
292                         // Instances() while we are waiting for N
293                         // Create() calls to complete, we assume we're
294                         // just seeing a race between Instances() and
295                         // Create() responses.
296                         //
297                         // The other common reason why nodes have
298                         // state==Unknown is that they appeared at
299                         // startup, before any Create calls. They
300                         // don't match the above timing condition, so
301                         // we never mistakenly attribute them to
302                         // pending Create calls.
303                         creating[it]--
304                 }
305         }
306         for it, c := range creating {
307                 unalloc[it] += c
308         }
309         return unalloc
310 }
311
312 // Create a new instance with the given type, and add it to the worker
313 // pool. The worker is added immediately; instance creation runs in
314 // the background.
315 //
316 // Create returns false if a pre-existing error or a configuration
317 // setting prevents it from even attempting to create a new
318 // instance. Those errors are logged by the Pool, so the caller does
319 // not need to log anything in such cases.
320 func (wp *Pool) Create(it arvados.InstanceType) bool {
321         logger := wp.logger.WithField("InstanceType", it.Name)
322         wp.setupOnce.Do(wp.setup)
323         if wp.loadRunnerData() != nil {
324                 // Boot probe is certain to fail.
325                 return false
326         }
327         if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
328                 return false
329         }
330         wp.mtx.Lock()
331         defer wp.mtx.Unlock()
332         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
333         // requests in flight. It was added to work around a limitation in Azure's
334         // managed disks, which support no more than 20 concurrent node creation
335         // requests from a single disk image (cf.
336         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
337         // The code assumes that node creation, from Azure's perspective, means the
338         // period until the instance appears in the "get all instances" list.
339         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
340                 logger.Info("reached MaxConcurrentInstanceCreateOps")
341                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
342                 return false
343         }
344         now := time.Now()
345         secret := randomHex(instanceSecretLength)
346         wp.creating[secret] = createCall{time: now, instanceType: it}
347         go func() {
348                 defer wp.notify()
349                 tags := cloud.InstanceTags{
350                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
351                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
352                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
353                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
354                 }
355                 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
356                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
357                 wp.mtx.Lock()
358                 defer wp.mtx.Unlock()
359                 // delete() is deferred so the updateWorker() call
360                 // below knows to use StateBooting when adding a new
361                 // worker.
362                 defer delete(wp.creating, secret)
363                 if err != nil {
364                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
365                                 wp.atQuotaErr = err
366                                 n := len(wp.workers) + len(wp.creating) - 1
367                                 if n < 1 {
368                                         // Quota error with no
369                                         // instances running --
370                                         // nothing to do but wait
371                                         wp.atQuotaUntilFewerInstances = 0
372                                         wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
373                                         time.AfterFunc(quotaErrorTTL, wp.notify)
374                                         logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
375                                 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
376                                         // Quota error with N
377                                         // instances running -- report
378                                         // AtQuota until some
379                                         // instances shut down
380                                         wp.atQuotaUntilFewerInstances = n
381                                         wp.atQuotaUntil = time.Time{}
382                                         logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
383                                 }
384                         }
385                         if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
386                                 capKey := it.ProviderType
387                                 if !err.IsInstanceTypeSpecific() {
388                                         // set capacity flag for all
389                                         // instance types
390                                         capKey = ""
391                                 }
392                                 if wp.atCapacityUntil == nil {
393                                         wp.atCapacityUntil = map[string]time.Time{}
394                                 }
395                                 wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
396                         }
397                         logger.WithError(err).Error("create failed")
398                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
399                         return
400                 }
401                 wp.updateWorker(inst, it)
402         }()
403         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
404                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
405         }
406         return true
407 }
408
409 // AtCapacity returns true if Create() is currently expected to fail
410 // for the given instance type.
411 func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
412         wp.mtx.Lock()
413         defer wp.mtx.Unlock()
414         if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
415                 // at capacity for this instance type
416                 return true
417         }
418         if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
419                 // at capacity for all instance types
420                 return true
421         }
422         return false
423 }
424
425 // AtQuota returns true if Create is not expected to work at the
426 // moment (e.g., cloud provider has reported quota errors, or we are
427 // already at our own configured quota).
428 func (wp *Pool) AtQuota() bool {
429         wp.mtx.Lock()
430         defer wp.mtx.Unlock()
431         return wp.atQuotaUntilFewerInstances > 0 ||
432                 time.Now().Before(wp.atQuotaUntil) ||
433                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
434 }
435
436 // SetIdleBehavior determines how the indicated instance will behave
437 // when it has no containers running.
438 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
439         wp.mtx.Lock()
440         defer wp.mtx.Unlock()
441         wkr, ok := wp.workers[id]
442         if !ok {
443                 return errors.New("requested instance does not exist")
444         }
445         wkr.setIdleBehavior(idleBehavior)
446         return nil
447 }
448
449 // Successful connection to the SSH daemon, update the mTimeToSSH metric
450 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
451         wp.mtx.Lock()
452         defer wp.mtx.Unlock()
453         wkr, ok := wp.workers[inst.ID()]
454         if !ok {
455                 // race: inst was removed from the pool
456                 return
457         }
458         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
459                 // the node is not in booting state (can happen if
460                 // a-d-c is restarted) OR this is not the first SSH
461                 // connection
462                 return
463         }
464
465         wkr.firstSSHConnection = time.Now()
466         if wp.mTimeToSSH != nil {
467                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
468         }
469 }
470
471 // Add or update worker attached to the given instance.
472 //
473 // The second return value is true if a new worker is created.
474 //
475 // A newly added instance has state=StateBooting if its tags match an
476 // entry in wp.creating, otherwise StateUnknown.
477 //
478 // Caller must have lock.
479 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
480         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
481         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
482         id := inst.ID()
483         if wkr := wp.workers[id]; wkr != nil {
484                 wkr.executor.SetTarget(inst)
485                 wkr.instance = inst
486                 wkr.updated = time.Now()
487                 wkr.saveTags()
488                 return wkr, false
489         }
490
491         state := StateUnknown
492         if _, ok := wp.creating[secret]; ok {
493                 state = StateBooting
494         }
495
496         // If an instance has a valid IdleBehavior tag when it first
497         // appears, initialize the new worker accordingly (this is how
498         // we restore IdleBehavior that was set by a prior dispatch
499         // process); otherwise, default to "run". After this,
500         // wkr.idleBehavior is the source of truth, and will only be
501         // changed via SetIdleBehavior().
502         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
503         if !validIdleBehavior[idleBehavior] {
504                 idleBehavior = IdleBehaviorRun
505         }
506
507         logger := wp.logger.WithFields(logrus.Fields{
508                 "InstanceType": it.Name,
509                 "Instance":     inst.ID(),
510                 "Address":      inst.Address(),
511         })
512         logger.WithFields(logrus.Fields{
513                 "State":        state,
514                 "IdleBehavior": idleBehavior,
515         }).Infof("instance appeared in cloud")
516         now := time.Now()
517         wkr := &worker{
518                 mtx:          &wp.mtx,
519                 wp:           wp,
520                 logger:       logger,
521                 executor:     wp.newExecutor(inst),
522                 state:        state,
523                 idleBehavior: idleBehavior,
524                 instance:     inst,
525                 instType:     it,
526                 appeared:     now,
527                 probed:       now,
528                 busy:         now,
529                 updated:      now,
530                 running:      make(map[string]*remoteRunner),
531                 starting:     make(map[string]*remoteRunner),
532                 probing:      make(chan struct{}, 1),
533         }
534         wp.workers[id] = wkr
535         return wkr, true
536 }
537
538 // Shutdown shuts down a worker with the given type, or returns false
539 // if all workers with the given type are busy.
540 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
541         wp.setupOnce.Do(wp.setup)
542         wp.mtx.Lock()
543         defer wp.mtx.Unlock()
544         logger := wp.logger.WithField("InstanceType", it.Name)
545         logger.Info("shutdown requested")
546         for _, tryState := range []State{StateBooting, StateIdle} {
547                 // TODO: shutdown the worker with the longest idle
548                 // time (Idle) or the earliest create time (Booting)
549                 for _, wkr := range wp.workers {
550                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
551                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
552                                 wkr.reportBootOutcome(BootOutcomeAborted)
553                                 wkr.shutdown()
554                                 return true
555                         }
556                 }
557         }
558         return false
559 }
560
561 // CountWorkers returns the current number of workers in each state.
562 //
563 // CountWorkers blocks, if necessary, until the initial instance list
564 // has been loaded from the cloud provider.
565 func (wp *Pool) CountWorkers() map[State]int {
566         wp.setupOnce.Do(wp.setup)
567         wp.waitUntilLoaded()
568         wp.mtx.Lock()
569         defer wp.mtx.Unlock()
570         r := map[State]int{}
571         for _, w := range wp.workers {
572                 r[w.state]++
573         }
574         return r
575 }
576
577 // Running returns the container UUIDs being prepared/run on workers.
578 //
579 // In the returned map, the time value indicates when the Pool
580 // observed that the container process had exited. A container that
581 // has not yet exited has a zero time value. The caller should use
582 // ForgetContainer() to garbage-collect the entries for exited
583 // containers.
584 func (wp *Pool) Running() map[string]time.Time {
585         wp.setupOnce.Do(wp.setup)
586         wp.mtx.Lock()
587         defer wp.mtx.Unlock()
588         r := map[string]time.Time{}
589         for _, wkr := range wp.workers {
590                 for uuid := range wkr.running {
591                         r[uuid] = time.Time{}
592                 }
593                 for uuid := range wkr.starting {
594                         r[uuid] = time.Time{}
595                 }
596         }
597         for uuid, exited := range wp.exited {
598                 r[uuid] = exited
599         }
600         return r
601 }
602
603 // StartContainer starts a container on an idle worker immediately if
604 // possible, otherwise returns false.
605 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
606         wp.setupOnce.Do(wp.setup)
607         wp.mtx.Lock()
608         defer wp.mtx.Unlock()
609         var wkr *worker
610         for _, w := range wp.workers {
611                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
612                         if wkr == nil || w.busy.After(wkr.busy) {
613                                 wkr = w
614                         }
615                 }
616         }
617         if wkr == nil {
618                 return false
619         }
620         wkr.startContainer(ctr)
621         return true
622 }
623
624 // KillContainer kills the crunch-run process for the given container
625 // UUID, if it's running on any worker.
626 //
627 // KillContainer returns immediately; the act of killing the container
628 // takes some time, and runs in the background.
629 //
630 // KillContainer returns false if the container has already ended.
631 func (wp *Pool) KillContainer(uuid string, reason string) bool {
632         wp.mtx.Lock()
633         defer wp.mtx.Unlock()
634         logger := wp.logger.WithFields(logrus.Fields{
635                 "ContainerUUID": uuid,
636                 "Reason":        reason,
637         })
638         for _, wkr := range wp.workers {
639                 rr := wkr.running[uuid]
640                 if rr == nil {
641                         rr = wkr.starting[uuid]
642                 }
643                 if rr != nil {
644                         rr.Kill(reason)
645                         return true
646                 }
647         }
648         logger.Debug("cannot kill: already disappeared")
649         return false
650 }
651
652 // ForgetContainer clears the placeholder for the given exited
653 // container, so it isn't returned by subsequent calls to Running().
654 //
655 // ForgetContainer has no effect if the container has not yet exited.
656 //
657 // The "container exited at time T" placeholder (which necessitates
658 // ForgetContainer) exists to make it easier for the caller
659 // (scheduler) to distinguish a container that exited without
660 // finalizing its state from a container that exited too recently for
661 // its final state to have appeared in the scheduler's queue cache.
662 func (wp *Pool) ForgetContainer(uuid string) {
663         wp.mtx.Lock()
664         defer wp.mtx.Unlock()
665         if _, ok := wp.exited[uuid]; ok {
666                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
667                 delete(wp.exited, uuid)
668         }
669 }
670
671 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
672         if reg == nil {
673                 reg = prometheus.NewRegistry()
674         }
675         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
676                 Namespace: "arvados",
677                 Subsystem: "dispatchcloud",
678                 Name:      "containers_running",
679                 Help:      "Number of containers reported running by cloud VMs.",
680         })
681         reg.MustRegister(wp.mContainersRunning)
682         wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
683                 Namespace: "arvados",
684                 Subsystem: "dispatchcloud",
685                 Name:      "probe_age_seconds_max",
686                 Help:      "Maximum number of seconds since an instance's most recent successful probe.",
687         })
688         reg.MustRegister(wp.mProbeAgeMax)
689         wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
690                 Namespace: "arvados",
691                 Subsystem: "dispatchcloud",
692                 Name:      "probe_age_seconds_median",
693                 Help:      "Median number of seconds since an instance's most recent successful probe.",
694         })
695         reg.MustRegister(wp.mProbeAgeMedian)
696         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
697                 Namespace: "arvados",
698                 Subsystem: "dispatchcloud",
699                 Name:      "instances_total",
700                 Help:      "Number of cloud VMs.",
701         }, []string{"category", "instance_type"})
702         reg.MustRegister(wp.mInstances)
703         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
704                 Namespace: "arvados",
705                 Subsystem: "dispatchcloud",
706                 Name:      "instances_price",
707                 Help:      "Price of cloud VMs.",
708         }, []string{"category"})
709         reg.MustRegister(wp.mInstancesPrice)
710         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
711                 Namespace: "arvados",
712                 Subsystem: "dispatchcloud",
713                 Name:      "vcpus_total",
714                 Help:      "Total VCPUs on all cloud VMs.",
715         }, []string{"category"})
716         reg.MustRegister(wp.mVCPUs)
717         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
718                 Namespace: "arvados",
719                 Subsystem: "dispatchcloud",
720                 Name:      "memory_bytes_total",
721                 Help:      "Total memory on all cloud VMs.",
722         }, []string{"category"})
723         reg.MustRegister(wp.mMemory)
724         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
725                 Namespace: "arvados",
726                 Subsystem: "dispatchcloud",
727                 Name:      "boot_outcomes",
728                 Help:      "Boot outcomes by type.",
729         }, []string{"outcome"})
730         for k := range validBootOutcomes {
731                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
732         }
733         reg.MustRegister(wp.mBootOutcomes)
734         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
735                 Namespace: "arvados",
736                 Subsystem: "dispatchcloud",
737                 Name:      "instances_disappeared",
738                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
739         }, []string{"state"})
740         for _, v := range stateString {
741                 wp.mDisappearances.WithLabelValues(v).Add(0)
742         }
743         reg.MustRegister(wp.mDisappearances)
744         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
745                 Namespace:  "arvados",
746                 Subsystem:  "dispatchcloud",
747                 Name:       "instances_time_to_ssh_seconds",
748                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
749                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
750         })
751         reg.MustRegister(wp.mTimeToSSH)
752         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
753                 Namespace:  "arvados",
754                 Subsystem:  "dispatchcloud",
755                 Name:       "instances_time_to_ready_for_container_seconds",
756                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
757                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
758         })
759         reg.MustRegister(wp.mTimeToReadyForContainer)
760         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
761                 Namespace:  "arvados",
762                 Subsystem:  "dispatchcloud",
763                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
764                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
765                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
766         })
767         reg.MustRegister(wp.mTimeFromShutdownToGone)
768         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
769                 Namespace:  "arvados",
770                 Subsystem:  "dispatchcloud",
771                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
772                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
773                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
774         })
775         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
776         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
777                 Namespace:  "arvados",
778                 Subsystem:  "dispatchcloud",
779                 Name:       "instances_run_probe_duration_seconds",
780                 Help:       "Number of seconds per runProbe call.",
781                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
782         }, []string{"outcome"})
783         reg.MustRegister(wp.mRunProbeDuration)
784 }
785
786 func (wp *Pool) runMetrics() {
787         ch := wp.Subscribe()
788         defer wp.Unsubscribe(ch)
789         wp.updateMetrics()
790         for range ch {
791                 wp.updateMetrics()
792         }
793 }
794
795 func (wp *Pool) updateMetrics() {
796         wp.mtx.RLock()
797         defer wp.mtx.RUnlock()
798
799         type entKey struct {
800                 cat      string
801                 instType string
802         }
803         instances := map[entKey]int64{}
804         price := map[string]float64{}
805         cpu := map[string]int64{}
806         mem := map[string]int64{}
807         var running int64
808         now := time.Now()
809         var probed []time.Time
810         for _, wkr := range wp.workers {
811                 var cat string
812                 switch {
813                 case len(wkr.running)+len(wkr.starting) > 0:
814                         cat = "inuse"
815                 case wkr.idleBehavior == IdleBehaviorHold:
816                         cat = "hold"
817                 case wkr.state == StateBooting:
818                         cat = "booting"
819                 case wkr.state == StateUnknown:
820                         cat = "unknown"
821                 default:
822                         cat = "idle"
823                 }
824                 instances[entKey{cat, wkr.instType.Name}]++
825                 price[cat] += wkr.instType.Price
826                 cpu[cat] += int64(wkr.instType.VCPUs)
827                 mem[cat] += int64(wkr.instType.RAM)
828                 running += int64(len(wkr.running) + len(wkr.starting))
829                 probed = append(probed, wkr.probed)
830         }
831         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
832                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
833                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
834                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
835                 // make sure to reset gauges for non-existing category/nodetype combinations
836                 for _, it := range wp.instanceTypes {
837                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
838                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
839                         }
840                 }
841         }
842         for k, v := range instances {
843                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
844         }
845         wp.mContainersRunning.Set(float64(running))
846
847         if len(probed) == 0 {
848                 wp.mProbeAgeMax.Set(0)
849                 wp.mProbeAgeMedian.Set(0)
850         } else {
851                 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
852                 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
853                 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
854         }
855 }
856
857 func (wp *Pool) runProbes() {
858         maxPPS := wp.maxProbesPerSecond
859         if maxPPS < 1 {
860                 maxPPS = defaultMaxProbesPerSecond
861         }
862         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
863         defer limitticker.Stop()
864
865         probeticker := time.NewTicker(wp.probeInterval)
866         defer probeticker.Stop()
867
868         workers := []cloud.InstanceID{}
869         for range probeticker.C {
870                 // Add some jitter. Without this, if probeInterval is
871                 // a multiple of syncInterval and sync is
872                 // instantaneous (as with the loopback driver), the
873                 // first few probes race with sync operations and
874                 // don't update the workers.
875                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
876
877                 workers = workers[:0]
878                 wp.mtx.Lock()
879                 for id, wkr := range wp.workers {
880                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
881                                 continue
882                         }
883                         workers = append(workers, id)
884                 }
885                 wp.mtx.Unlock()
886
887                 for _, id := range workers {
888                         wp.mtx.Lock()
889                         wkr, ok := wp.workers[id]
890                         wp.mtx.Unlock()
891                         if !ok {
892                                 // Deleted while we were probing
893                                 // others
894                                 continue
895                         }
896                         go wkr.ProbeAndUpdate()
897                         select {
898                         case <-wp.stop:
899                                 return
900                         case <-limitticker.C:
901                         }
902                 }
903         }
904 }
905
906 func (wp *Pool) runSync() {
907         // sync once immediately, then wait syncInterval, sync again,
908         // etc.
909         timer := time.NewTimer(1)
910         for {
911                 select {
912                 case <-timer.C:
913                         err := wp.getInstancesAndSync()
914                         if err != nil {
915                                 wp.logger.WithError(err).Warn("sync failed")
916                         }
917                         timer.Reset(wp.syncInterval)
918                 case <-wp.stop:
919                         wp.logger.Debug("worker.Pool stopped")
920                         return
921                 }
922         }
923 }
924
925 // Stop synchronizing with the InstanceSet.
926 func (wp *Pool) Stop() {
927         wp.setupOnce.Do(wp.setup)
928         close(wp.stop)
929 }
930
931 // Instances returns an InstanceView for each worker in the pool,
932 // summarizing its current state and recent activity.
933 func (wp *Pool) Instances() []InstanceView {
934         var r []InstanceView
935         wp.setupOnce.Do(wp.setup)
936         wp.mtx.Lock()
937         for _, w := range wp.workers {
938                 r = append(r, InstanceView{
939                         Instance:             w.instance.ID(),
940                         Address:              w.instance.Address(),
941                         Price:                w.instType.Price,
942                         ArvadosInstanceType:  w.instType.Name,
943                         ProviderInstanceType: w.instType.ProviderType,
944                         LastContainerUUID:    w.lastUUID,
945                         LastBusy:             w.busy,
946                         WorkerState:          w.state.String(),
947                         IdleBehavior:         w.idleBehavior,
948                 })
949         }
950         wp.mtx.Unlock()
951         sort.Slice(r, func(i, j int) bool {
952                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
953         })
954         return r
955 }
956
957 // KillInstance destroys a cloud VM instance. It returns an error if
958 // the given instance does not exist.
959 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
960         wp.setupOnce.Do(wp.setup)
961         wp.mtx.Lock()
962         defer wp.mtx.Unlock()
963         wkr, ok := wp.workers[id]
964         if !ok {
965                 return errors.New("instance not found")
966         }
967         wkr.logger.WithField("Reason", reason).Info("shutting down")
968         wkr.reportBootOutcome(BootOutcomeAborted)
969         wkr.shutdown()
970         return nil
971 }
972
973 func (wp *Pool) setup() {
974         wp.creating = map[string]createCall{}
975         wp.exited = map[string]time.Time{}
976         wp.workers = map[cloud.InstanceID]*worker{}
977         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
978         wp.loadRunnerData()
979 }
980
981 // Load the runner program to be deployed on worker nodes into
982 // wp.runnerData, if necessary. Errors are logged.
983 //
984 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
985 //
986 // Caller must not have lock.
987 func (wp *Pool) loadRunnerData() error {
988         wp.mtx.Lock()
989         defer wp.mtx.Unlock()
990         if wp.runnerData != nil {
991                 return nil
992         } else if wp.runnerSource == "" {
993                 wp.runnerCmd = wp.runnerCmdDefault
994                 wp.runnerData = []byte{}
995                 return nil
996         }
997         logger := wp.logger.WithField("source", wp.runnerSource)
998         logger.Debug("loading runner")
999         buf, err := ioutil.ReadFile(wp.runnerSource)
1000         if err != nil {
1001                 logger.WithError(err).Error("failed to load runner program")
1002                 return err
1003         }
1004         wp.runnerData = buf
1005         wp.runnerMD5 = md5.Sum(buf)
1006         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
1007         return nil
1008 }
1009
1010 func (wp *Pool) notify() {
1011         wp.mtx.RLock()
1012         defer wp.mtx.RUnlock()
1013         for _, send := range wp.subscribers {
1014                 select {
1015                 case send <- struct{}{}:
1016                 default:
1017                 }
1018         }
1019 }
1020
1021 func (wp *Pool) getInstancesAndSync() error {
1022         wp.setupOnce.Do(wp.setup)
1023         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
1024                 return err
1025         }
1026         wp.logger.Debug("getting instance list")
1027         threshold := time.Now()
1028         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1029         if err != nil {
1030                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1031                 return err
1032         }
1033         wp.sync(threshold, instances)
1034         wp.logger.Debug("sync done")
1035         return nil
1036 }
1037
1038 // Add/remove/update workers based on instances, which was obtained
1039 // from the instanceSet. However, don't clobber any other updates that
1040 // already happened after threshold.
1041 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1042         wp.mtx.Lock()
1043         defer wp.mtx.Unlock()
1044         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1045         notify := false
1046
1047         for _, inst := range instances {
1048                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1049                 it, ok := wp.instanceTypes[itTag]
1050                 if !ok {
1051                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1052                         continue
1053                 }
1054                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1055                         notify = true
1056                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1057                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1058                         wkr.shutdown()
1059                 }
1060         }
1061
1062         for id, wkr := range wp.workers {
1063                 if wkr.updated.After(threshold) {
1064                         continue
1065                 }
1066                 logger := wp.logger.WithFields(logrus.Fields{
1067                         "Instance":    wkr.instance.ID(),
1068                         "WorkerState": wkr.state,
1069                 })
1070                 logger.Info("instance disappeared in cloud")
1071                 wkr.reportBootOutcome(BootOutcomeDisappeared)
1072                 if wp.mDisappearances != nil {
1073                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1074                 }
1075                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1076                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1077                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1078                 }
1079                 delete(wp.workers, id)
1080                 go wkr.Close()
1081                 notify = true
1082         }
1083
1084         if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1085                 // After syncing, there are fewer instances (including
1086                 // pending creates) than there were last time we saw a
1087                 // quota error.  This might mean it's now possible to
1088                 // create new instances.  Reset our "at quota" state.
1089                 wp.atQuotaUntilFewerInstances = 0
1090         }
1091
1092         if !wp.loaded {
1093                 notify = true
1094                 wp.loaded = true
1095                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1096         }
1097
1098         if notify {
1099                 go wp.notify()
1100         }
1101 }
1102
1103 func (wp *Pool) waitUntilLoaded() {
1104         ch := wp.Subscribe()
1105         wp.mtx.RLock()
1106         defer wp.mtx.RUnlock()
1107         for !wp.loaded {
1108                 wp.mtx.RUnlock()
1109                 <-ch
1110                 wp.mtx.RLock()
1111         }
1112 }
1113
1114 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1115         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1116         fmt.Fprint(h, uuid)
1117         return fmt.Sprintf("%x", h.Sum(nil))
1118 }
1119
1120 // Return a random string of n hexadecimal digits (n*4 random bits). n
1121 // must be even.
1122 func randomHex(n int) string {
1123         buf := make([]byte, n/2)
1124         _, err := rand.Read(buf)
1125         if err != nil {
1126                 panic(err)
1127         }
1128         return fmt.Sprintf("%x", buf)
1129 }