Merge branch '16838-probe-metrics' into master
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval        = time.Minute
68         defaultProbeInterval       = time.Second * 10
69         defaultMaxProbesPerSecond  = 10
70         defaultTimeoutIdle         = time.Minute
71         defaultTimeoutBooting      = time.Minute * 10
72         defaultTimeoutProbe        = time.Minute * 10
73         defaultTimeoutShutdown     = time.Second * 10
74         defaultTimeoutTERM         = time.Minute * 2
75         defaultTimeoutSignal       = time.Second * 5
76         defaultTimeoutStaleRunLock = time.Second * 5
77
78         // Time after a quota error to try again anyway, even if no
79         // instances have been shutdown.
80         quotaErrorTTL = time.Minute
81
82         // Time between "X failed because rate limiting" messages
83         logRateLimitErrorInterval = time.Second * 10
84 )
85
86 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87         if conf > 0 {
88                 return time.Duration(conf)
89         } else {
90                 return def
91         }
92 }
93
94 // NewPool creates a Pool of workers backed by instanceSet.
95 //
96 // New instances are configured and set up according to the given
97 // cluster configuration.
98 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
99         wp := &Pool{
100                 logger:                         logger,
101                 arvClient:                      arvClient,
102                 instanceSetID:                  instanceSetID,
103                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
104                 newExecutor:                    newExecutor,
105                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
106                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
107                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
108                 instanceTypes:                  cluster.InstanceTypes,
109                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
110                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
111                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
112                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
113                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
114                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
115                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
116                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
117                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
118                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
119                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
120                 installPublicKey:               installPublicKey,
121                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
122                 stop:                           make(chan bool),
123         }
124         wp.registerMetrics(reg)
125         go func() {
126                 wp.setupOnce.Do(wp.setup)
127                 go wp.runMetrics()
128                 go wp.runProbes()
129                 go wp.runSync()
130         }()
131         return wp
132 }
133
134 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
135 // zero Pool should not be used. Call NewPool to create a new Pool.
136 type Pool struct {
137         // configuration
138         logger                         logrus.FieldLogger
139         arvClient                      *arvados.Client
140         instanceSetID                  cloud.InstanceSetID
141         instanceSet                    *throttledInstanceSet
142         newExecutor                    func(cloud.Instance) Executor
143         bootProbeCommand               string
144         runnerSource                   string
145         imageID                        cloud.ImageID
146         instanceTypes                  map[string]arvados.InstanceType
147         syncInterval                   time.Duration
148         probeInterval                  time.Duration
149         maxProbesPerSecond             int
150         maxConcurrentInstanceCreateOps int
151         timeoutIdle                    time.Duration
152         timeoutBooting                 time.Duration
153         timeoutProbe                   time.Duration
154         timeoutShutdown                time.Duration
155         timeoutTERM                    time.Duration
156         timeoutSignal                  time.Duration
157         timeoutStaleRunLock            time.Duration
158         installPublicKey               ssh.PublicKey
159         tagKeyPrefix                   string
160
161         // private state
162         subscribers  map[<-chan struct{}]chan<- struct{}
163         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
164         workers      map[cloud.InstanceID]*worker
165         loaded       bool                 // loaded list of instances from InstanceSet at least once
166         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
167         atQuotaUntil time.Time
168         atQuotaErr   cloud.QuotaError
169         stop         chan bool
170         mtx          sync.RWMutex
171         setupOnce    sync.Once
172         runnerData   []byte
173         runnerMD5    [md5.Size]byte
174         runnerCmd    string
175
176         mContainersRunning        prometheus.Gauge
177         mInstances                *prometheus.GaugeVec
178         mInstancesPrice           *prometheus.GaugeVec
179         mVCPUs                    *prometheus.GaugeVec
180         mMemory                   *prometheus.GaugeVec
181         mBootOutcomes             *prometheus.CounterVec
182         mDisappearances           *prometheus.CounterVec
183         mTimeToSSH                prometheus.Summary
184         mTimeToReadyForContainer  prometheus.Summary
185         mTimeFromShutdownToGone   prometheus.Summary
186         mTimeFromQueueToCrunchRun prometheus.Summary
187         mRunProbeDuration         *prometheus.SummaryVec
188 }
189
190 type createCall struct {
191         time         time.Time
192         instanceType arvados.InstanceType
193 }
194
195 func (wp *Pool) CheckHealth() error {
196         wp.setupOnce.Do(wp.setup)
197         if err := wp.loadRunnerData(); err != nil {
198                 return fmt.Errorf("error loading runner binary: %s", err)
199         }
200         return nil
201 }
202
203 // Subscribe returns a buffered channel that becomes ready after any
204 // change to the pool's state that could have scheduling implications:
205 // a worker's state changes, a new worker appears, the cloud
206 // provider's API rate limiting period ends, etc.
207 //
208 // Additional events that occur while the channel is already ready
209 // will be dropped, so it is OK if the caller services the channel
210 // slowly.
211 //
212 // Example:
213 //
214 //      ch := wp.Subscribe()
215 //      defer wp.Unsubscribe(ch)
216 //      for range ch {
217 //              tryScheduling(wp)
218 //              if done {
219 //                      break
220 //              }
221 //      }
222 func (wp *Pool) Subscribe() <-chan struct{} {
223         wp.setupOnce.Do(wp.setup)
224         wp.mtx.Lock()
225         defer wp.mtx.Unlock()
226         ch := make(chan struct{}, 1)
227         wp.subscribers[ch] = ch
228         return ch
229 }
230
231 // Unsubscribe stops sending updates to the given channel.
232 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
233         wp.setupOnce.Do(wp.setup)
234         wp.mtx.Lock()
235         defer wp.mtx.Unlock()
236         delete(wp.subscribers, ch)
237 }
238
239 // Unallocated returns the number of unallocated (creating + booting +
240 // idle + unknown) workers for each instance type.  Workers in
241 // hold/drain mode are not included.
242 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
243         wp.setupOnce.Do(wp.setup)
244         wp.mtx.RLock()
245         defer wp.mtx.RUnlock()
246         unalloc := map[arvados.InstanceType]int{}
247         creating := map[arvados.InstanceType]int{}
248         oldestCreate := map[arvados.InstanceType]time.Time{}
249         for _, cc := range wp.creating {
250                 it := cc.instanceType
251                 creating[it]++
252                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
253                         oldestCreate[it] = cc.time
254                 }
255         }
256         for _, wkr := range wp.workers {
257                 // Skip workers that are not expected to become
258                 // available soon. Note len(wkr.running)>0 is not
259                 // redundant here: it can be true even in
260                 // StateUnknown.
261                 if wkr.state == StateShutdown ||
262                         wkr.state == StateRunning ||
263                         wkr.idleBehavior != IdleBehaviorRun ||
264                         len(wkr.running) > 0 {
265                         continue
266                 }
267                 it := wkr.instType
268                 unalloc[it]++
269                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
270                         // If up to N new workers appear in
271                         // Instances() while we are waiting for N
272                         // Create() calls to complete, we assume we're
273                         // just seeing a race between Instances() and
274                         // Create() responses.
275                         //
276                         // The other common reason why nodes have
277                         // state==Unknown is that they appeared at
278                         // startup, before any Create calls. They
279                         // don't match the above timing condition, so
280                         // we never mistakenly attribute them to
281                         // pending Create calls.
282                         creating[it]--
283                 }
284         }
285         for it, c := range creating {
286                 unalloc[it] += c
287         }
288         return unalloc
289 }
290
291 // Create a new instance with the given type, and add it to the worker
292 // pool. The worker is added immediately; instance creation runs in
293 // the background.
294 //
295 // Create returns false if a pre-existing error state prevents it from
296 // even attempting to create a new instance. Those errors are logged
297 // by the Pool, so the caller does not need to log anything in such
298 // cases.
299 func (wp *Pool) Create(it arvados.InstanceType) bool {
300         logger := wp.logger.WithField("InstanceType", it.Name)
301         wp.setupOnce.Do(wp.setup)
302         if wp.loadRunnerData() != nil {
303                 // Boot probe is certain to fail.
304                 return false
305         }
306         wp.mtx.Lock()
307         defer wp.mtx.Unlock()
308         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
309                 return false
310         }
311         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
312         // requests in flight. It was added to work around a limitation in Azure's
313         // managed disks, which support no more than 20 concurrent node creation
314         // requests from a single disk image (cf.
315         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
316         // The code assumes that node creation, from Azure's perspective, means the
317         // period until the instance appears in the "get all instances" list.
318         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
319                 logger.Info("reached MaxConcurrentInstanceCreateOps")
320                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
321                 return false
322         }
323         now := time.Now()
324         secret := randomHex(instanceSecretLength)
325         wp.creating[secret] = createCall{time: now, instanceType: it}
326         go func() {
327                 defer wp.notify()
328                 tags := cloud.InstanceTags{
329                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
330                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
331                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
332                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
333                 }
334                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
335                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
336                 wp.mtx.Lock()
337                 defer wp.mtx.Unlock()
338                 // delete() is deferred so the updateWorker() call
339                 // below knows to use StateBooting when adding a new
340                 // worker.
341                 defer delete(wp.creating, secret)
342                 if err != nil {
343                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
344                                 wp.atQuotaErr = err
345                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
346                                 time.AfterFunc(quotaErrorTTL, wp.notify)
347                         }
348                         logger.WithError(err).Error("create failed")
349                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
350                         return
351                 }
352                 wp.updateWorker(inst, it)
353         }()
354         return true
355 }
356
357 // AtQuota returns true if Create is not expected to work at the
358 // moment.
359 func (wp *Pool) AtQuota() bool {
360         wp.mtx.Lock()
361         defer wp.mtx.Unlock()
362         return time.Now().Before(wp.atQuotaUntil)
363 }
364
365 // SetIdleBehavior determines how the indicated instance will behave
366 // when it has no containers running.
367 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
368         wp.mtx.Lock()
369         defer wp.mtx.Unlock()
370         wkr, ok := wp.workers[id]
371         if !ok {
372                 return errors.New("requested instance does not exist")
373         }
374         wkr.setIdleBehavior(idleBehavior)
375         return nil
376 }
377
378 // Successful connection to the SSH daemon, update the mTimeToSSH metric
379 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
380         wp.mtx.Lock()
381         defer wp.mtx.Unlock()
382         wkr := wp.workers[inst.ID()]
383         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
384                 // the node is not in booting state (can happen if a-d-c is restarted) OR
385                 // this is not the first SSH connection
386                 return
387         }
388
389         wkr.firstSSHConnection = time.Now()
390         if wp.mTimeToSSH != nil {
391                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
392         }
393 }
394
395 // Add or update worker attached to the given instance.
396 //
397 // The second return value is true if a new worker is created.
398 //
399 // A newly added instance has state=StateBooting if its tags match an
400 // entry in wp.creating, otherwise StateUnknown.
401 //
402 // Caller must have lock.
403 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
404         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
405         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
406         id := inst.ID()
407         if wkr := wp.workers[id]; wkr != nil {
408                 wkr.executor.SetTarget(inst)
409                 wkr.instance = inst
410                 wkr.updated = time.Now()
411                 wkr.saveTags()
412                 return wkr, false
413         }
414
415         state := StateUnknown
416         if _, ok := wp.creating[secret]; ok {
417                 state = StateBooting
418         }
419
420         // If an instance has a valid IdleBehavior tag when it first
421         // appears, initialize the new worker accordingly (this is how
422         // we restore IdleBehavior that was set by a prior dispatch
423         // process); otherwise, default to "run". After this,
424         // wkr.idleBehavior is the source of truth, and will only be
425         // changed via SetIdleBehavior().
426         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
427         if !validIdleBehavior[idleBehavior] {
428                 idleBehavior = IdleBehaviorRun
429         }
430
431         logger := wp.logger.WithFields(logrus.Fields{
432                 "InstanceType": it.Name,
433                 "Instance":     inst.ID(),
434                 "Address":      inst.Address(),
435         })
436         logger.WithFields(logrus.Fields{
437                 "State":        state,
438                 "IdleBehavior": idleBehavior,
439         }).Infof("instance appeared in cloud")
440         now := time.Now()
441         wkr := &worker{
442                 mtx:          &wp.mtx,
443                 wp:           wp,
444                 logger:       logger,
445                 executor:     wp.newExecutor(inst),
446                 state:        state,
447                 idleBehavior: idleBehavior,
448                 instance:     inst,
449                 instType:     it,
450                 appeared:     now,
451                 probed:       now,
452                 busy:         now,
453                 updated:      now,
454                 running:      make(map[string]*remoteRunner),
455                 starting:     make(map[string]*remoteRunner),
456                 probing:      make(chan struct{}, 1),
457         }
458         wp.workers[id] = wkr
459         return wkr, true
460 }
461
462 // Shutdown shuts down a worker with the given type, or returns false
463 // if all workers with the given type are busy.
464 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
465         wp.setupOnce.Do(wp.setup)
466         wp.mtx.Lock()
467         defer wp.mtx.Unlock()
468         logger := wp.logger.WithField("InstanceType", it.Name)
469         logger.Info("shutdown requested")
470         for _, tryState := range []State{StateBooting, StateIdle} {
471                 // TODO: shutdown the worker with the longest idle
472                 // time (Idle) or the earliest create time (Booting)
473                 for _, wkr := range wp.workers {
474                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
475                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
476                                 wkr.reportBootOutcome(BootOutcomeAborted)
477                                 wkr.shutdown()
478                                 return true
479                         }
480                 }
481         }
482         return false
483 }
484
485 // CountWorkers returns the current number of workers in each state.
486 //
487 // CountWorkers blocks, if necessary, until the initial instance list
488 // has been loaded from the cloud provider.
489 func (wp *Pool) CountWorkers() map[State]int {
490         wp.setupOnce.Do(wp.setup)
491         wp.waitUntilLoaded()
492         wp.mtx.Lock()
493         defer wp.mtx.Unlock()
494         r := map[State]int{}
495         for _, w := range wp.workers {
496                 r[w.state]++
497         }
498         return r
499 }
500
501 // Running returns the container UUIDs being prepared/run on workers.
502 //
503 // In the returned map, the time value indicates when the Pool
504 // observed that the container process had exited. A container that
505 // has not yet exited has a zero time value. The caller should use
506 // ForgetContainer() to garbage-collect the entries for exited
507 // containers.
508 func (wp *Pool) Running() map[string]time.Time {
509         wp.setupOnce.Do(wp.setup)
510         wp.mtx.Lock()
511         defer wp.mtx.Unlock()
512         r := map[string]time.Time{}
513         for _, wkr := range wp.workers {
514                 for uuid := range wkr.running {
515                         r[uuid] = time.Time{}
516                 }
517                 for uuid := range wkr.starting {
518                         r[uuid] = time.Time{}
519                 }
520         }
521         for uuid, exited := range wp.exited {
522                 r[uuid] = exited
523         }
524         return r
525 }
526
527 // StartContainer starts a container on an idle worker immediately if
528 // possible, otherwise returns false.
529 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
530         wp.setupOnce.Do(wp.setup)
531         wp.mtx.Lock()
532         defer wp.mtx.Unlock()
533         var wkr *worker
534         for _, w := range wp.workers {
535                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
536                         if wkr == nil || w.busy.After(wkr.busy) {
537                                 wkr = w
538                         }
539                 }
540         }
541         if wkr == nil {
542                 return false
543         }
544         wkr.startContainer(ctr)
545         return true
546 }
547
548 // KillContainer kills the crunch-run process for the given container
549 // UUID, if it's running on any worker.
550 //
551 // KillContainer returns immediately; the act of killing the container
552 // takes some time, and runs in the background.
553 //
554 // KillContainer returns false if the container has already ended.
555 func (wp *Pool) KillContainer(uuid string, reason string) bool {
556         wp.mtx.Lock()
557         defer wp.mtx.Unlock()
558         logger := wp.logger.WithFields(logrus.Fields{
559                 "ContainerUUID": uuid,
560                 "Reason":        reason,
561         })
562         for _, wkr := range wp.workers {
563                 rr := wkr.running[uuid]
564                 if rr == nil {
565                         rr = wkr.starting[uuid]
566                 }
567                 if rr != nil {
568                         rr.Kill(reason)
569                         return true
570                 }
571         }
572         logger.Debug("cannot kill: already disappeared")
573         return false
574 }
575
576 // ForgetContainer clears the placeholder for the given exited
577 // container, so it isn't returned by subsequent calls to Running().
578 //
579 // ForgetContainer has no effect if the container has not yet exited.
580 //
581 // The "container exited at time T" placeholder (which necessitates
582 // ForgetContainer) exists to make it easier for the caller
583 // (scheduler) to distinguish a container that exited without
584 // finalizing its state from a container that exited too recently for
585 // its final state to have appeared in the scheduler's queue cache.
586 func (wp *Pool) ForgetContainer(uuid string) {
587         wp.mtx.Lock()
588         defer wp.mtx.Unlock()
589         if _, ok := wp.exited[uuid]; ok {
590                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
591                 delete(wp.exited, uuid)
592         }
593 }
594
595 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
596         if reg == nil {
597                 reg = prometheus.NewRegistry()
598         }
599         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
600                 Namespace: "arvados",
601                 Subsystem: "dispatchcloud",
602                 Name:      "containers_running",
603                 Help:      "Number of containers reported running by cloud VMs.",
604         })
605         reg.MustRegister(wp.mContainersRunning)
606         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
607                 Namespace: "arvados",
608                 Subsystem: "dispatchcloud",
609                 Name:      "instances_total",
610                 Help:      "Number of cloud VMs.",
611         }, []string{"category", "instance_type"})
612         reg.MustRegister(wp.mInstances)
613         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
614                 Namespace: "arvados",
615                 Subsystem: "dispatchcloud",
616                 Name:      "instances_price",
617                 Help:      "Price of cloud VMs.",
618         }, []string{"category"})
619         reg.MustRegister(wp.mInstancesPrice)
620         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
621                 Namespace: "arvados",
622                 Subsystem: "dispatchcloud",
623                 Name:      "vcpus_total",
624                 Help:      "Total VCPUs on all cloud VMs.",
625         }, []string{"category"})
626         reg.MustRegister(wp.mVCPUs)
627         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
628                 Namespace: "arvados",
629                 Subsystem: "dispatchcloud",
630                 Name:      "memory_bytes_total",
631                 Help:      "Total memory on all cloud VMs.",
632         }, []string{"category"})
633         reg.MustRegister(wp.mMemory)
634         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
635                 Namespace: "arvados",
636                 Subsystem: "dispatchcloud",
637                 Name:      "boot_outcomes",
638                 Help:      "Boot outcomes by type.",
639         }, []string{"outcome"})
640         for k := range validBootOutcomes {
641                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
642         }
643         reg.MustRegister(wp.mBootOutcomes)
644         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
645                 Namespace: "arvados",
646                 Subsystem: "dispatchcloud",
647                 Name:      "instances_disappeared",
648                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
649         }, []string{"state"})
650         for _, v := range stateString {
651                 wp.mDisappearances.WithLabelValues(v).Add(0)
652         }
653         reg.MustRegister(wp.mDisappearances)
654         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
655                 Namespace:  "arvados",
656                 Subsystem:  "dispatchcloud",
657                 Name:       "instances_time_to_ssh_seconds",
658                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
659                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
660         })
661         reg.MustRegister(wp.mTimeToSSH)
662         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
663                 Namespace:  "arvados",
664                 Subsystem:  "dispatchcloud",
665                 Name:       "instances_time_to_ready_for_container_seconds",
666                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
667                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
668         })
669         reg.MustRegister(wp.mTimeToReadyForContainer)
670         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
671                 Namespace:  "arvados",
672                 Subsystem:  "dispatchcloud",
673                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
674                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
675                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
676         })
677         reg.MustRegister(wp.mTimeFromShutdownToGone)
678         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
679                 Namespace:  "arvados",
680                 Subsystem:  "dispatchcloud",
681                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
682                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
683                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
684         })
685         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
686         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
687                 Namespace:  "arvados",
688                 Subsystem:  "dispatchcloud",
689                 Name:       "instances_run_probe_duration_seconds",
690                 Help:       "Number of seconds per runProbe call.",
691                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
692         }, []string{"outcome"})
693         reg.MustRegister(wp.mRunProbeDuration)
694 }
695
696 func (wp *Pool) runMetrics() {
697         ch := wp.Subscribe()
698         defer wp.Unsubscribe(ch)
699         wp.updateMetrics()
700         for range ch {
701                 wp.updateMetrics()
702         }
703 }
704
705 func (wp *Pool) updateMetrics() {
706         wp.mtx.RLock()
707         defer wp.mtx.RUnlock()
708
709         type entKey struct {
710                 cat      string
711                 instType string
712         }
713         instances := map[entKey]int64{}
714         price := map[string]float64{}
715         cpu := map[string]int64{}
716         mem := map[string]int64{}
717         var running int64
718         for _, wkr := range wp.workers {
719                 var cat string
720                 switch {
721                 case len(wkr.running)+len(wkr.starting) > 0:
722                         cat = "inuse"
723                 case wkr.idleBehavior == IdleBehaviorHold:
724                         cat = "hold"
725                 case wkr.state == StateBooting:
726                         cat = "booting"
727                 case wkr.state == StateUnknown:
728                         cat = "unknown"
729                 default:
730                         cat = "idle"
731                 }
732                 instances[entKey{cat, wkr.instType.Name}]++
733                 price[cat] += wkr.instType.Price
734                 cpu[cat] += int64(wkr.instType.VCPUs)
735                 mem[cat] += int64(wkr.instType.RAM)
736                 running += int64(len(wkr.running) + len(wkr.starting))
737         }
738         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
739                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
740                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
741                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
742                 // make sure to reset gauges for non-existing category/nodetype combinations
743                 for _, it := range wp.instanceTypes {
744                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
745                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
746                         }
747                 }
748         }
749         for k, v := range instances {
750                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
751         }
752         wp.mContainersRunning.Set(float64(running))
753 }
754
755 func (wp *Pool) runProbes() {
756         maxPPS := wp.maxProbesPerSecond
757         if maxPPS < 1 {
758                 maxPPS = defaultMaxProbesPerSecond
759         }
760         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
761         defer limitticker.Stop()
762
763         probeticker := time.NewTicker(wp.probeInterval)
764         defer probeticker.Stop()
765
766         workers := []cloud.InstanceID{}
767         for range probeticker.C {
768                 workers = workers[:0]
769                 wp.mtx.Lock()
770                 for id, wkr := range wp.workers {
771                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
772                                 continue
773                         }
774                         workers = append(workers, id)
775                 }
776                 wp.mtx.Unlock()
777
778                 for _, id := range workers {
779                         wp.mtx.Lock()
780                         wkr, ok := wp.workers[id]
781                         wp.mtx.Unlock()
782                         if !ok {
783                                 // Deleted while we were probing
784                                 // others
785                                 continue
786                         }
787                         go wkr.ProbeAndUpdate()
788                         select {
789                         case <-wp.stop:
790                                 return
791                         case <-limitticker.C:
792                         }
793                 }
794         }
795 }
796
797 func (wp *Pool) runSync() {
798         // sync once immediately, then wait syncInterval, sync again,
799         // etc.
800         timer := time.NewTimer(1)
801         for {
802                 select {
803                 case <-timer.C:
804                         err := wp.getInstancesAndSync()
805                         if err != nil {
806                                 wp.logger.WithError(err).Warn("sync failed")
807                         }
808                         timer.Reset(wp.syncInterval)
809                 case <-wp.stop:
810                         wp.logger.Debug("worker.Pool stopped")
811                         return
812                 }
813         }
814 }
815
816 // Stop synchronizing with the InstanceSet.
817 func (wp *Pool) Stop() {
818         wp.setupOnce.Do(wp.setup)
819         close(wp.stop)
820 }
821
822 // Instances returns an InstanceView for each worker in the pool,
823 // summarizing its current state and recent activity.
824 func (wp *Pool) Instances() []InstanceView {
825         var r []InstanceView
826         wp.setupOnce.Do(wp.setup)
827         wp.mtx.Lock()
828         for _, w := range wp.workers {
829                 r = append(r, InstanceView{
830                         Instance:             w.instance.ID(),
831                         Address:              w.instance.Address(),
832                         Price:                w.instType.Price,
833                         ArvadosInstanceType:  w.instType.Name,
834                         ProviderInstanceType: w.instType.ProviderType,
835                         LastContainerUUID:    w.lastUUID,
836                         LastBusy:             w.busy,
837                         WorkerState:          w.state.String(),
838                         IdleBehavior:         w.idleBehavior,
839                 })
840         }
841         wp.mtx.Unlock()
842         sort.Slice(r, func(i, j int) bool {
843                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
844         })
845         return r
846 }
847
848 // KillInstance destroys a cloud VM instance. It returns an error if
849 // the given instance does not exist.
850 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
851         wkr, ok := wp.workers[id]
852         if !ok {
853                 return errors.New("instance not found")
854         }
855         wkr.logger.WithField("Reason", reason).Info("shutting down")
856         wkr.reportBootOutcome(BootOutcomeAborted)
857         wkr.shutdown()
858         return nil
859 }
860
861 func (wp *Pool) setup() {
862         wp.creating = map[string]createCall{}
863         wp.exited = map[string]time.Time{}
864         wp.workers = map[cloud.InstanceID]*worker{}
865         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
866         wp.loadRunnerData()
867 }
868
869 // Load the runner program to be deployed on worker nodes into
870 // wp.runnerData, if necessary. Errors are logged.
871 //
872 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
873 //
874 // Caller must not have lock.
875 func (wp *Pool) loadRunnerData() error {
876         wp.mtx.Lock()
877         defer wp.mtx.Unlock()
878         if wp.runnerData != nil {
879                 return nil
880         } else if wp.runnerSource == "" {
881                 wp.runnerCmd = "crunch-run"
882                 wp.runnerData = []byte{}
883                 return nil
884         }
885         logger := wp.logger.WithField("source", wp.runnerSource)
886         logger.Debug("loading runner")
887         buf, err := ioutil.ReadFile(wp.runnerSource)
888         if err != nil {
889                 logger.WithError(err).Error("failed to load runner program")
890                 return err
891         }
892         wp.runnerData = buf
893         wp.runnerMD5 = md5.Sum(buf)
894         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
895         return nil
896 }
897
898 func (wp *Pool) notify() {
899         wp.mtx.RLock()
900         defer wp.mtx.RUnlock()
901         for _, send := range wp.subscribers {
902                 select {
903                 case send <- struct{}{}:
904                 default:
905                 }
906         }
907 }
908
909 func (wp *Pool) getInstancesAndSync() error {
910         wp.setupOnce.Do(wp.setup)
911         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
912                 return err
913         }
914         wp.logger.Debug("getting instance list")
915         threshold := time.Now()
916         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
917         if err != nil {
918                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
919                 return err
920         }
921         wp.sync(threshold, instances)
922         wp.logger.Debug("sync done")
923         return nil
924 }
925
926 // Add/remove/update workers based on instances, which was obtained
927 // from the instanceSet. However, don't clobber any other updates that
928 // already happened after threshold.
929 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
930         wp.mtx.Lock()
931         defer wp.mtx.Unlock()
932         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
933         notify := false
934
935         for _, inst := range instances {
936                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
937                 it, ok := wp.instanceTypes[itTag]
938                 if !ok {
939                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
940                         continue
941                 }
942                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
943                         notify = true
944                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
945                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
946                         wkr.shutdown()
947                 }
948         }
949
950         for id, wkr := range wp.workers {
951                 if wkr.updated.After(threshold) {
952                         continue
953                 }
954                 logger := wp.logger.WithFields(logrus.Fields{
955                         "Instance":    wkr.instance.ID(),
956                         "WorkerState": wkr.state,
957                 })
958                 logger.Info("instance disappeared in cloud")
959                 wkr.reportBootOutcome(BootOutcomeDisappeared)
960                 if wp.mDisappearances != nil {
961                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
962                 }
963                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
964                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
965                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
966                 }
967                 delete(wp.workers, id)
968                 go wkr.Close()
969                 notify = true
970         }
971
972         if !wp.loaded {
973                 notify = true
974                 wp.loaded = true
975                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
976         }
977
978         if notify {
979                 go wp.notify()
980         }
981 }
982
983 func (wp *Pool) waitUntilLoaded() {
984         ch := wp.Subscribe()
985         wp.mtx.RLock()
986         defer wp.mtx.RUnlock()
987         for !wp.loaded {
988                 wp.mtx.RUnlock()
989                 <-ch
990                 wp.mtx.RLock()
991         }
992 }
993
994 // Return a random string of n hexadecimal digits (n*4 random bits). n
995 // must be even.
996 func randomHex(n int) string {
997         buf := make([]byte, n/2)
998         _, err := rand.Read(buf)
999         if err != nil {
1000                 panic(err)
1001         }
1002         return fmt.Sprintf("%x", buf)
1003 }