16834: Drain instance if crunch-run exits but arv-mount doesn't.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval        = time.Minute
68         defaultProbeInterval       = time.Second * 10
69         defaultMaxProbesPerSecond  = 10
70         defaultTimeoutIdle         = time.Minute
71         defaultTimeoutBooting      = time.Minute * 10
72         defaultTimeoutProbe        = time.Minute * 10
73         defaultTimeoutShutdown     = time.Second * 10
74         defaultTimeoutTERM         = time.Minute * 2
75         defaultTimeoutSignal       = time.Second * 5
76         defaultTimeoutStaleRunLock = time.Second * 5
77
78         // Time after a quota error to try again anyway, even if no
79         // instances have been shutdown.
80         quotaErrorTTL = time.Minute
81
82         // Time between "X failed because rate limiting" messages
83         logRateLimitErrorInterval = time.Second * 10
84 )
85
86 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87         if conf > 0 {
88                 return time.Duration(conf)
89         } else {
90                 return def
91         }
92 }
93
94 // NewPool creates a Pool of workers backed by instanceSet.
95 //
96 // New instances are configured and set up according to the given
97 // cluster configuration.
98 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
99         wp := &Pool{
100                 logger:                         logger,
101                 arvClient:                      arvClient,
102                 instanceSetID:                  instanceSetID,
103                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
104                 newExecutor:                    newExecutor,
105                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
106                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
107                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
108                 instanceTypes:                  cluster.InstanceTypes,
109                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
110                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
111                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
112                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
113                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
114                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
115                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
116                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
117                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
118                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
119                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
120                 installPublicKey:               installPublicKey,
121                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
122                 stop:                           make(chan bool),
123         }
124         wp.registerMetrics(reg)
125         go func() {
126                 wp.setupOnce.Do(wp.setup)
127                 go wp.runMetrics()
128                 go wp.runProbes()
129                 go wp.runSync()
130         }()
131         return wp
132 }
133
134 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
135 // zero Pool should not be used. Call NewPool to create a new Pool.
136 type Pool struct {
137         // configuration
138         logger                         logrus.FieldLogger
139         arvClient                      *arvados.Client
140         instanceSetID                  cloud.InstanceSetID
141         instanceSet                    *throttledInstanceSet
142         newExecutor                    func(cloud.Instance) Executor
143         bootProbeCommand               string
144         runnerSource                   string
145         imageID                        cloud.ImageID
146         instanceTypes                  map[string]arvados.InstanceType
147         syncInterval                   time.Duration
148         probeInterval                  time.Duration
149         maxProbesPerSecond             int
150         maxConcurrentInstanceCreateOps int
151         timeoutIdle                    time.Duration
152         timeoutBooting                 time.Duration
153         timeoutProbe                   time.Duration
154         timeoutShutdown                time.Duration
155         timeoutTERM                    time.Duration
156         timeoutSignal                  time.Duration
157         timeoutStaleRunLock            time.Duration
158         installPublicKey               ssh.PublicKey
159         tagKeyPrefix                   string
160
161         // private state
162         subscribers  map[<-chan struct{}]chan<- struct{}
163         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
164         workers      map[cloud.InstanceID]*worker
165         loaded       bool                 // loaded list of instances from InstanceSet at least once
166         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
167         atQuotaUntil time.Time
168         atQuotaErr   cloud.QuotaError
169         stop         chan bool
170         mtx          sync.RWMutex
171         setupOnce    sync.Once
172         runnerData   []byte
173         runnerMD5    [md5.Size]byte
174         runnerCmd    string
175
176         mContainersRunning       prometheus.Gauge
177         mInstances               *prometheus.GaugeVec
178         mInstancesPrice          *prometheus.GaugeVec
179         mVCPUs                   *prometheus.GaugeVec
180         mMemory                  *prometheus.GaugeVec
181         mBootOutcomes            *prometheus.CounterVec
182         mDisappearances          *prometheus.CounterVec
183         mTimeToSSH               prometheus.Summary
184         mTimeToReadyForContainer prometheus.Summary
185 }
186
187 type createCall struct {
188         time         time.Time
189         instanceType arvados.InstanceType
190 }
191
192 func (wp *Pool) CheckHealth() error {
193         wp.setupOnce.Do(wp.setup)
194         if err := wp.loadRunnerData(); err != nil {
195                 return fmt.Errorf("error loading runner binary: %s", err)
196         }
197         return nil
198 }
199
200 // Subscribe returns a buffered channel that becomes ready after any
201 // change to the pool's state that could have scheduling implications:
202 // a worker's state changes, a new worker appears, the cloud
203 // provider's API rate limiting period ends, etc.
204 //
205 // Additional events that occur while the channel is already ready
206 // will be dropped, so it is OK if the caller services the channel
207 // slowly.
208 //
209 // Example:
210 //
211 //      ch := wp.Subscribe()
212 //      defer wp.Unsubscribe(ch)
213 //      for range ch {
214 //              tryScheduling(wp)
215 //              if done {
216 //                      break
217 //              }
218 //      }
219 func (wp *Pool) Subscribe() <-chan struct{} {
220         wp.setupOnce.Do(wp.setup)
221         wp.mtx.Lock()
222         defer wp.mtx.Unlock()
223         ch := make(chan struct{}, 1)
224         wp.subscribers[ch] = ch
225         return ch
226 }
227
228 // Unsubscribe stops sending updates to the given channel.
229 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
230         wp.setupOnce.Do(wp.setup)
231         wp.mtx.Lock()
232         defer wp.mtx.Unlock()
233         delete(wp.subscribers, ch)
234 }
235
236 // Unallocated returns the number of unallocated (creating + booting +
237 // idle + unknown) workers for each instance type.  Workers in
238 // hold/drain mode are not included.
239 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
240         wp.setupOnce.Do(wp.setup)
241         wp.mtx.RLock()
242         defer wp.mtx.RUnlock()
243         unalloc := map[arvados.InstanceType]int{}
244         creating := map[arvados.InstanceType]int{}
245         oldestCreate := map[arvados.InstanceType]time.Time{}
246         for _, cc := range wp.creating {
247                 it := cc.instanceType
248                 creating[it]++
249                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
250                         oldestCreate[it] = cc.time
251                 }
252         }
253         for _, wkr := range wp.workers {
254                 // Skip workers that are not expected to become
255                 // available soon. Note len(wkr.running)>0 is not
256                 // redundant here: it can be true even in
257                 // StateUnknown.
258                 if wkr.state == StateShutdown ||
259                         wkr.state == StateRunning ||
260                         wkr.idleBehavior != IdleBehaviorRun ||
261                         len(wkr.running) > 0 {
262                         continue
263                 }
264                 it := wkr.instType
265                 unalloc[it]++
266                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
267                         // If up to N new workers appear in
268                         // Instances() while we are waiting for N
269                         // Create() calls to complete, we assume we're
270                         // just seeing a race between Instances() and
271                         // Create() responses.
272                         //
273                         // The other common reason why nodes have
274                         // state==Unknown is that they appeared at
275                         // startup, before any Create calls. They
276                         // don't match the above timing condition, so
277                         // we never mistakenly attribute them to
278                         // pending Create calls.
279                         creating[it]--
280                 }
281         }
282         for it, c := range creating {
283                 unalloc[it] += c
284         }
285         return unalloc
286 }
287
288 // Create a new instance with the given type, and add it to the worker
289 // pool. The worker is added immediately; instance creation runs in
290 // the background.
291 //
292 // Create returns false if a pre-existing error state prevents it from
293 // even attempting to create a new instance. Those errors are logged
294 // by the Pool, so the caller does not need to log anything in such
295 // cases.
296 func (wp *Pool) Create(it arvados.InstanceType) bool {
297         logger := wp.logger.WithField("InstanceType", it.Name)
298         wp.setupOnce.Do(wp.setup)
299         if wp.loadRunnerData() != nil {
300                 // Boot probe is certain to fail.
301                 return false
302         }
303         wp.mtx.Lock()
304         defer wp.mtx.Unlock()
305         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
306                 return false
307         }
308         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
309         // requests in flight. It was added to work around a limitation in Azure's
310         // managed disks, which support no more than 20 concurrent node creation
311         // requests from a single disk image (cf.
312         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
313         // The code assumes that node creation, from Azure's perspective, means the
314         // period until the instance appears in the "get all instances" list.
315         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
316                 logger.Info("reached MaxConcurrentInstanceCreateOps")
317                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
318                 return false
319         }
320         now := time.Now()
321         secret := randomHex(instanceSecretLength)
322         wp.creating[secret] = createCall{time: now, instanceType: it}
323         go func() {
324                 defer wp.notify()
325                 tags := cloud.InstanceTags{
326                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
327                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
328                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
329                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
330                 }
331                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
332                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
333                 wp.mtx.Lock()
334                 defer wp.mtx.Unlock()
335                 // delete() is deferred so the updateWorker() call
336                 // below knows to use StateBooting when adding a new
337                 // worker.
338                 defer delete(wp.creating, secret)
339                 if err != nil {
340                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
341                                 wp.atQuotaErr = err
342                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
343                                 time.AfterFunc(quotaErrorTTL, wp.notify)
344                         }
345                         logger.WithError(err).Error("create failed")
346                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
347                         return
348                 }
349                 wp.updateWorker(inst, it)
350         }()
351         return true
352 }
353
354 // AtQuota returns true if Create is not expected to work at the
355 // moment.
356 func (wp *Pool) AtQuota() bool {
357         wp.mtx.Lock()
358         defer wp.mtx.Unlock()
359         return time.Now().Before(wp.atQuotaUntil)
360 }
361
362 // SetIdleBehavior determines how the indicated instance will behave
363 // when it has no containers running.
364 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
365         wp.mtx.Lock()
366         defer wp.mtx.Unlock()
367         wkr, ok := wp.workers[id]
368         if !ok {
369                 return errors.New("requested instance does not exist")
370         }
371         wkr.setIdleBehavior(idleBehavior)
372         return nil
373 }
374
375 // Successful connection to the SSH daemon, update the mTimeToSSH metric
376 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
377         wp.mtx.Lock()
378         defer wp.mtx.Unlock()
379         wkr := wp.workers[inst.ID()]
380         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
381                 // the node is not in booting state (can happen if a-d-c is restarted) OR
382                 // this is not the first SSH connection
383                 return
384         }
385
386         wkr.firstSSHConnection = time.Now()
387         if wp.mTimeToSSH != nil {
388                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
389         }
390 }
391
392 // Add or update worker attached to the given instance.
393 //
394 // The second return value is true if a new worker is created.
395 //
396 // A newly added instance has state=StateBooting if its tags match an
397 // entry in wp.creating, otherwise StateUnknown.
398 //
399 // Caller must have lock.
400 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
401         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
402         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
403         id := inst.ID()
404         if wkr := wp.workers[id]; wkr != nil {
405                 wkr.executor.SetTarget(inst)
406                 wkr.instance = inst
407                 wkr.updated = time.Now()
408                 wkr.saveTags()
409                 return wkr, false
410         }
411
412         state := StateUnknown
413         if _, ok := wp.creating[secret]; ok {
414                 state = StateBooting
415         }
416
417         // If an instance has a valid IdleBehavior tag when it first
418         // appears, initialize the new worker accordingly (this is how
419         // we restore IdleBehavior that was set by a prior dispatch
420         // process); otherwise, default to "run". After this,
421         // wkr.idleBehavior is the source of truth, and will only be
422         // changed via SetIdleBehavior().
423         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
424         if !validIdleBehavior[idleBehavior] {
425                 idleBehavior = IdleBehaviorRun
426         }
427
428         logger := wp.logger.WithFields(logrus.Fields{
429                 "InstanceType": it.Name,
430                 "Instance":     inst.ID(),
431                 "Address":      inst.Address(),
432         })
433         logger.WithFields(logrus.Fields{
434                 "State":        state,
435                 "IdleBehavior": idleBehavior,
436         }).Infof("instance appeared in cloud")
437         now := time.Now()
438         wkr := &worker{
439                 mtx:          &wp.mtx,
440                 wp:           wp,
441                 logger:       logger,
442                 executor:     wp.newExecutor(inst),
443                 state:        state,
444                 idleBehavior: idleBehavior,
445                 instance:     inst,
446                 instType:     it,
447                 appeared:     now,
448                 probed:       now,
449                 busy:         now,
450                 updated:      now,
451                 running:      make(map[string]*remoteRunner),
452                 starting:     make(map[string]*remoteRunner),
453                 probing:      make(chan struct{}, 1),
454         }
455         wp.workers[id] = wkr
456         return wkr, true
457 }
458
459 // Shutdown shuts down a worker with the given type, or returns false
460 // if all workers with the given type are busy.
461 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
462         wp.setupOnce.Do(wp.setup)
463         wp.mtx.Lock()
464         defer wp.mtx.Unlock()
465         logger := wp.logger.WithField("InstanceType", it.Name)
466         logger.Info("shutdown requested")
467         for _, tryState := range []State{StateBooting, StateIdle} {
468                 // TODO: shutdown the worker with the longest idle
469                 // time (Idle) or the earliest create time (Booting)
470                 for _, wkr := range wp.workers {
471                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
472                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
473                                 wkr.reportBootOutcome(BootOutcomeAborted)
474                                 wkr.shutdown()
475                                 return true
476                         }
477                 }
478         }
479         return false
480 }
481
482 // CountWorkers returns the current number of workers in each state.
483 //
484 // CountWorkers blocks, if necessary, until the initial instance list
485 // has been loaded from the cloud provider.
486 func (wp *Pool) CountWorkers() map[State]int {
487         wp.setupOnce.Do(wp.setup)
488         wp.waitUntilLoaded()
489         wp.mtx.Lock()
490         defer wp.mtx.Unlock()
491         r := map[State]int{}
492         for _, w := range wp.workers {
493                 r[w.state]++
494         }
495         return r
496 }
497
498 // Running returns the container UUIDs being prepared/run on workers.
499 //
500 // In the returned map, the time value indicates when the Pool
501 // observed that the container process had exited. A container that
502 // has not yet exited has a zero time value. The caller should use
503 // ForgetContainer() to garbage-collect the entries for exited
504 // containers.
505 func (wp *Pool) Running() map[string]time.Time {
506         wp.setupOnce.Do(wp.setup)
507         wp.mtx.Lock()
508         defer wp.mtx.Unlock()
509         r := map[string]time.Time{}
510         for _, wkr := range wp.workers {
511                 for uuid := range wkr.running {
512                         r[uuid] = time.Time{}
513                 }
514                 for uuid := range wkr.starting {
515                         r[uuid] = time.Time{}
516                 }
517         }
518         for uuid, exited := range wp.exited {
519                 r[uuid] = exited
520         }
521         return r
522 }
523
524 // StartContainer starts a container on an idle worker immediately if
525 // possible, otherwise returns false.
526 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
527         wp.setupOnce.Do(wp.setup)
528         wp.mtx.Lock()
529         defer wp.mtx.Unlock()
530         var wkr *worker
531         for _, w := range wp.workers {
532                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
533                         if wkr == nil || w.busy.After(wkr.busy) {
534                                 wkr = w
535                         }
536                 }
537         }
538         if wkr == nil {
539                 return false
540         }
541         wkr.startContainer(ctr)
542         return true
543 }
544
545 // KillContainer kills the crunch-run process for the given container
546 // UUID, if it's running on any worker.
547 //
548 // KillContainer returns immediately; the act of killing the container
549 // takes some time, and runs in the background.
550 //
551 // KillContainer returns false if the container has already ended.
552 func (wp *Pool) KillContainer(uuid string, reason string) bool {
553         wp.mtx.Lock()
554         defer wp.mtx.Unlock()
555         logger := wp.logger.WithFields(logrus.Fields{
556                 "ContainerUUID": uuid,
557                 "Reason":        reason,
558         })
559         for _, wkr := range wp.workers {
560                 rr := wkr.running[uuid]
561                 if rr == nil {
562                         rr = wkr.starting[uuid]
563                 }
564                 if rr != nil {
565                         rr.Kill(reason)
566                         return true
567                 }
568         }
569         logger.Debug("cannot kill: already disappeared")
570         return false
571 }
572
573 // ForgetContainer clears the placeholder for the given exited
574 // container, so it isn't returned by subsequent calls to Running().
575 //
576 // ForgetContainer has no effect if the container has not yet exited.
577 //
578 // The "container exited at time T" placeholder (which necessitates
579 // ForgetContainer) exists to make it easier for the caller
580 // (scheduler) to distinguish a container that exited without
581 // finalizing its state from a container that exited too recently for
582 // its final state to have appeared in the scheduler's queue cache.
583 func (wp *Pool) ForgetContainer(uuid string) {
584         wp.mtx.Lock()
585         defer wp.mtx.Unlock()
586         if _, ok := wp.exited[uuid]; ok {
587                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
588                 delete(wp.exited, uuid)
589         }
590 }
591
592 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
593         if reg == nil {
594                 reg = prometheus.NewRegistry()
595         }
596         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
597                 Namespace: "arvados",
598                 Subsystem: "dispatchcloud",
599                 Name:      "containers_running",
600                 Help:      "Number of containers reported running by cloud VMs.",
601         })
602         reg.MustRegister(wp.mContainersRunning)
603         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
604                 Namespace: "arvados",
605                 Subsystem: "dispatchcloud",
606                 Name:      "instances_total",
607                 Help:      "Number of cloud VMs.",
608         }, []string{"category", "instance_type"})
609         reg.MustRegister(wp.mInstances)
610         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
611                 Namespace: "arvados",
612                 Subsystem: "dispatchcloud",
613                 Name:      "instances_price",
614                 Help:      "Price of cloud VMs.",
615         }, []string{"category"})
616         reg.MustRegister(wp.mInstancesPrice)
617         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
618                 Namespace: "arvados",
619                 Subsystem: "dispatchcloud",
620                 Name:      "vcpus_total",
621                 Help:      "Total VCPUs on all cloud VMs.",
622         }, []string{"category"})
623         reg.MustRegister(wp.mVCPUs)
624         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
625                 Namespace: "arvados",
626                 Subsystem: "dispatchcloud",
627                 Name:      "memory_bytes_total",
628                 Help:      "Total memory on all cloud VMs.",
629         }, []string{"category"})
630         reg.MustRegister(wp.mMemory)
631         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
632                 Namespace: "arvados",
633                 Subsystem: "dispatchcloud",
634                 Name:      "boot_outcomes",
635                 Help:      "Boot outcomes by type.",
636         }, []string{"outcome"})
637         for k := range validBootOutcomes {
638                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
639         }
640         reg.MustRegister(wp.mBootOutcomes)
641         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
642                 Namespace: "arvados",
643                 Subsystem: "dispatchcloud",
644                 Name:      "instances_disappeared",
645                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
646         }, []string{"state"})
647         for _, v := range stateString {
648                 wp.mDisappearances.WithLabelValues(v).Add(0)
649         }
650         reg.MustRegister(wp.mDisappearances)
651         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
652                 Namespace:  "arvados",
653                 Subsystem:  "dispatchcloud",
654                 Name:       "instances_time_to_ssh_seconds",
655                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
656                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
657         })
658         reg.MustRegister(wp.mTimeToSSH)
659         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
660                 Namespace:  "arvados",
661                 Subsystem:  "dispatchcloud",
662                 Name:       "instances_time_to_ready_for_container_seconds",
663                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
664                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
665         })
666         reg.MustRegister(wp.mTimeToReadyForContainer)
667 }
668
669 func (wp *Pool) runMetrics() {
670         ch := wp.Subscribe()
671         defer wp.Unsubscribe(ch)
672         wp.updateMetrics()
673         for range ch {
674                 wp.updateMetrics()
675         }
676 }
677
678 func (wp *Pool) updateMetrics() {
679         wp.mtx.RLock()
680         defer wp.mtx.RUnlock()
681
682         type entKey struct {
683                 cat      string
684                 instType string
685         }
686         instances := map[entKey]int64{}
687         price := map[string]float64{}
688         cpu := map[string]int64{}
689         mem := map[string]int64{}
690         var running int64
691         for _, wkr := range wp.workers {
692                 var cat string
693                 switch {
694                 case len(wkr.running)+len(wkr.starting) > 0:
695                         cat = "inuse"
696                 case wkr.idleBehavior == IdleBehaviorHold:
697                         cat = "hold"
698                 case wkr.state == StateBooting:
699                         cat = "booting"
700                 case wkr.state == StateUnknown:
701                         cat = "unknown"
702                 default:
703                         cat = "idle"
704                 }
705                 instances[entKey{cat, wkr.instType.Name}]++
706                 price[cat] += wkr.instType.Price
707                 cpu[cat] += int64(wkr.instType.VCPUs)
708                 mem[cat] += int64(wkr.instType.RAM)
709                 running += int64(len(wkr.running) + len(wkr.starting))
710         }
711         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
712                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
713                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
714                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
715                 // make sure to reset gauges for non-existing category/nodetype combinations
716                 for _, it := range wp.instanceTypes {
717                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
718                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
719                         }
720                 }
721         }
722         for k, v := range instances {
723                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
724         }
725         wp.mContainersRunning.Set(float64(running))
726 }
727
728 func (wp *Pool) runProbes() {
729         maxPPS := wp.maxProbesPerSecond
730         if maxPPS < 1 {
731                 maxPPS = defaultMaxProbesPerSecond
732         }
733         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
734         defer limitticker.Stop()
735
736         probeticker := time.NewTicker(wp.probeInterval)
737         defer probeticker.Stop()
738
739         workers := []cloud.InstanceID{}
740         for range probeticker.C {
741                 workers = workers[:0]
742                 wp.mtx.Lock()
743                 for id, wkr := range wp.workers {
744                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
745                                 continue
746                         }
747                         workers = append(workers, id)
748                 }
749                 wp.mtx.Unlock()
750
751                 for _, id := range workers {
752                         wp.mtx.Lock()
753                         wkr, ok := wp.workers[id]
754                         wp.mtx.Unlock()
755                         if !ok {
756                                 // Deleted while we were probing
757                                 // others
758                                 continue
759                         }
760                         go wkr.ProbeAndUpdate()
761                         select {
762                         case <-wp.stop:
763                                 return
764                         case <-limitticker.C:
765                         }
766                 }
767         }
768 }
769
770 func (wp *Pool) runSync() {
771         // sync once immediately, then wait syncInterval, sync again,
772         // etc.
773         timer := time.NewTimer(1)
774         for {
775                 select {
776                 case <-timer.C:
777                         err := wp.getInstancesAndSync()
778                         if err != nil {
779                                 wp.logger.WithError(err).Warn("sync failed")
780                         }
781                         timer.Reset(wp.syncInterval)
782                 case <-wp.stop:
783                         wp.logger.Debug("worker.Pool stopped")
784                         return
785                 }
786         }
787 }
788
789 // Stop synchronizing with the InstanceSet.
790 func (wp *Pool) Stop() {
791         wp.setupOnce.Do(wp.setup)
792         close(wp.stop)
793 }
794
795 // Instances returns an InstanceView for each worker in the pool,
796 // summarizing its current state and recent activity.
797 func (wp *Pool) Instances() []InstanceView {
798         var r []InstanceView
799         wp.setupOnce.Do(wp.setup)
800         wp.mtx.Lock()
801         for _, w := range wp.workers {
802                 r = append(r, InstanceView{
803                         Instance:             w.instance.ID(),
804                         Address:              w.instance.Address(),
805                         Price:                w.instType.Price,
806                         ArvadosInstanceType:  w.instType.Name,
807                         ProviderInstanceType: w.instType.ProviderType,
808                         LastContainerUUID:    w.lastUUID,
809                         LastBusy:             w.busy,
810                         WorkerState:          w.state.String(),
811                         IdleBehavior:         w.idleBehavior,
812                 })
813         }
814         wp.mtx.Unlock()
815         sort.Slice(r, func(i, j int) bool {
816                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
817         })
818         return r
819 }
820
821 // KillInstance destroys a cloud VM instance. It returns an error if
822 // the given instance does not exist.
823 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
824         wkr, ok := wp.workers[id]
825         if !ok {
826                 return errors.New("instance not found")
827         }
828         wkr.logger.WithField("Reason", reason).Info("shutting down")
829         wkr.reportBootOutcome(BootOutcomeAborted)
830         wkr.shutdown()
831         return nil
832 }
833
834 func (wp *Pool) setup() {
835         wp.creating = map[string]createCall{}
836         wp.exited = map[string]time.Time{}
837         wp.workers = map[cloud.InstanceID]*worker{}
838         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
839         wp.loadRunnerData()
840 }
841
842 // Load the runner program to be deployed on worker nodes into
843 // wp.runnerData, if necessary. Errors are logged.
844 //
845 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
846 //
847 // Caller must not have lock.
848 func (wp *Pool) loadRunnerData() error {
849         wp.mtx.Lock()
850         defer wp.mtx.Unlock()
851         if wp.runnerData != nil {
852                 return nil
853         } else if wp.runnerSource == "" {
854                 wp.runnerCmd = "crunch-run"
855                 wp.runnerData = []byte{}
856                 return nil
857         }
858         logger := wp.logger.WithField("source", wp.runnerSource)
859         logger.Debug("loading runner")
860         buf, err := ioutil.ReadFile(wp.runnerSource)
861         if err != nil {
862                 logger.WithError(err).Error("failed to load runner program")
863                 return err
864         }
865         wp.runnerData = buf
866         wp.runnerMD5 = md5.Sum(buf)
867         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
868         return nil
869 }
870
871 func (wp *Pool) notify() {
872         wp.mtx.RLock()
873         defer wp.mtx.RUnlock()
874         for _, send := range wp.subscribers {
875                 select {
876                 case send <- struct{}{}:
877                 default:
878                 }
879         }
880 }
881
882 func (wp *Pool) getInstancesAndSync() error {
883         wp.setupOnce.Do(wp.setup)
884         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
885                 return err
886         }
887         wp.logger.Debug("getting instance list")
888         threshold := time.Now()
889         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
890         if err != nil {
891                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
892                 return err
893         }
894         wp.sync(threshold, instances)
895         wp.logger.Debug("sync done")
896         return nil
897 }
898
899 // Add/remove/update workers based on instances, which was obtained
900 // from the instanceSet. However, don't clobber any other updates that
901 // already happened after threshold.
902 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
903         wp.mtx.Lock()
904         defer wp.mtx.Unlock()
905         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
906         notify := false
907
908         for _, inst := range instances {
909                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
910                 it, ok := wp.instanceTypes[itTag]
911                 if !ok {
912                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
913                         continue
914                 }
915                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
916                         notify = true
917                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
918                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
919                         wkr.shutdown()
920                 }
921         }
922
923         for id, wkr := range wp.workers {
924                 if wkr.updated.After(threshold) {
925                         continue
926                 }
927                 logger := wp.logger.WithFields(logrus.Fields{
928                         "Instance":    wkr.instance.ID(),
929                         "WorkerState": wkr.state,
930                 })
931                 logger.Info("instance disappeared in cloud")
932                 wkr.reportBootOutcome(BootOutcomeDisappeared)
933                 if wp.mDisappearances != nil {
934                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
935                 }
936                 delete(wp.workers, id)
937                 go wkr.Close()
938                 notify = true
939         }
940
941         if !wp.loaded {
942                 notify = true
943                 wp.loaded = true
944                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
945         }
946
947         if notify {
948                 go wp.notify()
949         }
950 }
951
952 func (wp *Pool) waitUntilLoaded() {
953         ch := wp.Subscribe()
954         wp.mtx.RLock()
955         defer wp.mtx.RUnlock()
956         for !wp.loaded {
957                 wp.mtx.RUnlock()
958                 <-ch
959                 wp.mtx.RLock()
960         }
961 }
962
963 // Return a random string of n hexadecimal digits (n*4 random bits). n
964 // must be even.
965 func randomHex(n int) string {
966         buf := make([]byte, n/2)
967         _, err := rand.Read(buf)
968         if err != nil {
969                 panic(err)
970         }
971         return fmt.Sprintf("%x", buf)
972 }