20457: Add dispatchcloud_probe_age_seconds_max and _median metrics.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time between "X failed because rate limiting" messages
86         logRateLimitErrorInterval = time.Second * 10
87 )
88
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90         if conf > 0 {
91                 return time.Duration(conf)
92         }
93         return def
94 }
95
96 // NewPool creates a Pool of workers backed by instanceSet.
97 //
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
101         wp := &Pool{
102                 logger:                         logger,
103                 arvClient:                      arvClient,
104                 instanceSetID:                  instanceSetID,
105                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
106                 newExecutor:                    newExecutor,
107                 cluster:                        cluster,
108                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
109                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
110                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
111                 instanceTypes:                  cluster.InstanceTypes,
112                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
113                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
114                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
115                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
116                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
117                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
118                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
119                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
120                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
121                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
122                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
123                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
124                 systemRootToken:                cluster.SystemRootToken,
125                 installPublicKey:               installPublicKey,
126                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
127                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
128                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
129                 stop:                           make(chan bool),
130         }
131         wp.registerMetrics(reg)
132         go func() {
133                 wp.setupOnce.Do(wp.setup)
134                 go wp.runMetrics()
135                 go wp.runProbes()
136                 go wp.runSync()
137         }()
138         return wp
139 }
140
141 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
142 // zero Pool should not be used. Call NewPool to create a new Pool.
143 type Pool struct {
144         // configuration
145         logger                         logrus.FieldLogger
146         arvClient                      *arvados.Client
147         instanceSetID                  cloud.InstanceSetID
148         instanceSet                    *throttledInstanceSet
149         newExecutor                    func(cloud.Instance) Executor
150         cluster                        *arvados.Cluster
151         bootProbeCommand               string
152         runnerSource                   string
153         imageID                        cloud.ImageID
154         instanceTypes                  map[string]arvados.InstanceType
155         syncInterval                   time.Duration
156         probeInterval                  time.Duration
157         maxProbesPerSecond             int
158         maxConcurrentInstanceCreateOps int
159         maxInstances                   int
160         timeoutIdle                    time.Duration
161         timeoutBooting                 time.Duration
162         timeoutProbe                   time.Duration
163         timeoutShutdown                time.Duration
164         timeoutTERM                    time.Duration
165         timeoutSignal                  time.Duration
166         timeoutStaleRunLock            time.Duration
167         systemRootToken                string
168         installPublicKey               ssh.PublicKey
169         tagKeyPrefix                   string
170         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
171         runnerArgs                     []string // extra args passed to crunch-run
172
173         // private state
174         subscribers  map[<-chan struct{}]chan<- struct{}
175         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
176         workers      map[cloud.InstanceID]*worker
177         loaded       bool                 // loaded list of instances from InstanceSet at least once
178         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
179         atQuotaUntil time.Time
180         atQuotaErr   cloud.QuotaError
181         stop         chan bool
182         mtx          sync.RWMutex
183         setupOnce    sync.Once
184         runnerData   []byte
185         runnerMD5    [md5.Size]byte
186         runnerCmd    string
187
188         mContainersRunning        prometheus.Gauge
189         mInstances                *prometheus.GaugeVec
190         mInstancesPrice           *prometheus.GaugeVec
191         mVCPUs                    *prometheus.GaugeVec
192         mMemory                   *prometheus.GaugeVec
193         mBootOutcomes             *prometheus.CounterVec
194         mDisappearances           *prometheus.CounterVec
195         mTimeToSSH                prometheus.Summary
196         mTimeToReadyForContainer  prometheus.Summary
197         mTimeFromShutdownToGone   prometheus.Summary
198         mTimeFromQueueToCrunchRun prometheus.Summary
199         mRunProbeDuration         *prometheus.SummaryVec
200         mProbeAgeMax              prometheus.Gauge
201         mProbeAgeMedian           prometheus.Gauge
202 }
203
204 type createCall struct {
205         time         time.Time
206         instanceType arvados.InstanceType
207 }
208
209 func (wp *Pool) CheckHealth() error {
210         wp.setupOnce.Do(wp.setup)
211         if err := wp.loadRunnerData(); err != nil {
212                 return fmt.Errorf("error loading runner binary: %s", err)
213         }
214         return nil
215 }
216
217 // Subscribe returns a buffered channel that becomes ready after any
218 // change to the pool's state that could have scheduling implications:
219 // a worker's state changes, a new worker appears, the cloud
220 // provider's API rate limiting period ends, etc.
221 //
222 // Additional events that occur while the channel is already ready
223 // will be dropped, so it is OK if the caller services the channel
224 // slowly.
225 //
226 // Example:
227 //
228 //      ch := wp.Subscribe()
229 //      defer wp.Unsubscribe(ch)
230 //      for range ch {
231 //              tryScheduling(wp)
232 //              if done {
233 //                      break
234 //              }
235 //      }
236 func (wp *Pool) Subscribe() <-chan struct{} {
237         wp.setupOnce.Do(wp.setup)
238         wp.mtx.Lock()
239         defer wp.mtx.Unlock()
240         ch := make(chan struct{}, 1)
241         wp.subscribers[ch] = ch
242         return ch
243 }
244
245 // Unsubscribe stops sending updates to the given channel.
246 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
247         wp.setupOnce.Do(wp.setup)
248         wp.mtx.Lock()
249         defer wp.mtx.Unlock()
250         delete(wp.subscribers, ch)
251 }
252
253 // Unallocated returns the number of unallocated (creating + booting +
254 // idle + unknown) workers for each instance type.  Workers in
255 // hold/drain mode are not included.
256 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
257         wp.setupOnce.Do(wp.setup)
258         wp.mtx.RLock()
259         defer wp.mtx.RUnlock()
260         unalloc := map[arvados.InstanceType]int{}
261         creating := map[arvados.InstanceType]int{}
262         oldestCreate := map[arvados.InstanceType]time.Time{}
263         for _, cc := range wp.creating {
264                 it := cc.instanceType
265                 creating[it]++
266                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
267                         oldestCreate[it] = cc.time
268                 }
269         }
270         for _, wkr := range wp.workers {
271                 // Skip workers that are not expected to become
272                 // available soon. Note len(wkr.running)>0 is not
273                 // redundant here: it can be true even in
274                 // StateUnknown.
275                 if wkr.state == StateShutdown ||
276                         wkr.state == StateRunning ||
277                         wkr.idleBehavior != IdleBehaviorRun ||
278                         len(wkr.running) > 0 {
279                         continue
280                 }
281                 it := wkr.instType
282                 unalloc[it]++
283                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
284                         // If up to N new workers appear in
285                         // Instances() while we are waiting for N
286                         // Create() calls to complete, we assume we're
287                         // just seeing a race between Instances() and
288                         // Create() responses.
289                         //
290                         // The other common reason why nodes have
291                         // state==Unknown is that they appeared at
292                         // startup, before any Create calls. They
293                         // don't match the above timing condition, so
294                         // we never mistakenly attribute them to
295                         // pending Create calls.
296                         creating[it]--
297                 }
298         }
299         for it, c := range creating {
300                 unalloc[it] += c
301         }
302         return unalloc
303 }
304
305 // Create a new instance with the given type, and add it to the worker
306 // pool. The worker is added immediately; instance creation runs in
307 // the background.
308 //
309 // Create returns false if a pre-existing error or a configuration
310 // setting prevents it from even attempting to create a new
311 // instance. Those errors are logged by the Pool, so the caller does
312 // not need to log anything in such cases.
313 func (wp *Pool) Create(it arvados.InstanceType) bool {
314         logger := wp.logger.WithField("InstanceType", it.Name)
315         wp.setupOnce.Do(wp.setup)
316         if wp.loadRunnerData() != nil {
317                 // Boot probe is certain to fail.
318                 return false
319         }
320         wp.mtx.Lock()
321         defer wp.mtx.Unlock()
322         if time.Now().Before(wp.atQuotaUntil) ||
323                 wp.instanceSet.throttleCreate.Error() != nil ||
324                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
325                 return false
326         }
327         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
328         // requests in flight. It was added to work around a limitation in Azure's
329         // managed disks, which support no more than 20 concurrent node creation
330         // requests from a single disk image (cf.
331         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
332         // The code assumes that node creation, from Azure's perspective, means the
333         // period until the instance appears in the "get all instances" list.
334         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
335                 logger.Info("reached MaxConcurrentInstanceCreateOps")
336                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
337                 return false
338         }
339         now := time.Now()
340         secret := randomHex(instanceSecretLength)
341         wp.creating[secret] = createCall{time: now, instanceType: it}
342         go func() {
343                 defer wp.notify()
344                 tags := cloud.InstanceTags{
345                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
346                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
347                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
348                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
349                 }
350                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
351                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
352                 wp.mtx.Lock()
353                 defer wp.mtx.Unlock()
354                 // delete() is deferred so the updateWorker() call
355                 // below knows to use StateBooting when adding a new
356                 // worker.
357                 defer delete(wp.creating, secret)
358                 if err != nil {
359                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
360                                 wp.atQuotaErr = err
361                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
362                                 time.AfterFunc(quotaErrorTTL, wp.notify)
363                         }
364                         logger.WithError(err).Error("create failed")
365                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
366                         return
367                 }
368                 wp.updateWorker(inst, it)
369         }()
370         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
371                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
372         }
373         return true
374 }
375
376 // AtQuota returns true if Create is not expected to work at the
377 // moment (e.g., cloud provider has reported quota errors, or we are
378 // already at our own configured quota).
379 func (wp *Pool) AtQuota() bool {
380         wp.mtx.Lock()
381         defer wp.mtx.Unlock()
382         return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
383 }
384
385 // SetIdleBehavior determines how the indicated instance will behave
386 // when it has no containers running.
387 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
388         wp.mtx.Lock()
389         defer wp.mtx.Unlock()
390         wkr, ok := wp.workers[id]
391         if !ok {
392                 return errors.New("requested instance does not exist")
393         }
394         wkr.setIdleBehavior(idleBehavior)
395         return nil
396 }
397
398 // Successful connection to the SSH daemon, update the mTimeToSSH metric
399 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
400         wp.mtx.Lock()
401         defer wp.mtx.Unlock()
402         wkr, ok := wp.workers[inst.ID()]
403         if !ok {
404                 // race: inst was removed from the pool
405                 return
406         }
407         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
408                 // the node is not in booting state (can happen if
409                 // a-d-c is restarted) OR this is not the first SSH
410                 // connection
411                 return
412         }
413
414         wkr.firstSSHConnection = time.Now()
415         if wp.mTimeToSSH != nil {
416                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
417         }
418 }
419
420 // Add or update worker attached to the given instance.
421 //
422 // The second return value is true if a new worker is created.
423 //
424 // A newly added instance has state=StateBooting if its tags match an
425 // entry in wp.creating, otherwise StateUnknown.
426 //
427 // Caller must have lock.
428 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
429         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
430         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
431         id := inst.ID()
432         if wkr := wp.workers[id]; wkr != nil {
433                 wkr.executor.SetTarget(inst)
434                 wkr.instance = inst
435                 wkr.updated = time.Now()
436                 wkr.saveTags()
437                 return wkr, false
438         }
439
440         state := StateUnknown
441         if _, ok := wp.creating[secret]; ok {
442                 state = StateBooting
443         }
444
445         // If an instance has a valid IdleBehavior tag when it first
446         // appears, initialize the new worker accordingly (this is how
447         // we restore IdleBehavior that was set by a prior dispatch
448         // process); otherwise, default to "run". After this,
449         // wkr.idleBehavior is the source of truth, and will only be
450         // changed via SetIdleBehavior().
451         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
452         if !validIdleBehavior[idleBehavior] {
453                 idleBehavior = IdleBehaviorRun
454         }
455
456         logger := wp.logger.WithFields(logrus.Fields{
457                 "InstanceType": it.Name,
458                 "Instance":     inst.ID(),
459                 "Address":      inst.Address(),
460         })
461         logger.WithFields(logrus.Fields{
462                 "State":        state,
463                 "IdleBehavior": idleBehavior,
464         }).Infof("instance appeared in cloud")
465         now := time.Now()
466         wkr := &worker{
467                 mtx:          &wp.mtx,
468                 wp:           wp,
469                 logger:       logger,
470                 executor:     wp.newExecutor(inst),
471                 state:        state,
472                 idleBehavior: idleBehavior,
473                 instance:     inst,
474                 instType:     it,
475                 appeared:     now,
476                 probed:       now,
477                 busy:         now,
478                 updated:      now,
479                 running:      make(map[string]*remoteRunner),
480                 starting:     make(map[string]*remoteRunner),
481                 probing:      make(chan struct{}, 1),
482         }
483         wp.workers[id] = wkr
484         return wkr, true
485 }
486
487 // Shutdown shuts down a worker with the given type, or returns false
488 // if all workers with the given type are busy.
489 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
490         wp.setupOnce.Do(wp.setup)
491         wp.mtx.Lock()
492         defer wp.mtx.Unlock()
493         logger := wp.logger.WithField("InstanceType", it.Name)
494         logger.Info("shutdown requested")
495         for _, tryState := range []State{StateBooting, StateIdle} {
496                 // TODO: shutdown the worker with the longest idle
497                 // time (Idle) or the earliest create time (Booting)
498                 for _, wkr := range wp.workers {
499                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
500                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
501                                 wkr.reportBootOutcome(BootOutcomeAborted)
502                                 wkr.shutdown()
503                                 return true
504                         }
505                 }
506         }
507         return false
508 }
509
510 // CountWorkers returns the current number of workers in each state.
511 //
512 // CountWorkers blocks, if necessary, until the initial instance list
513 // has been loaded from the cloud provider.
514 func (wp *Pool) CountWorkers() map[State]int {
515         wp.setupOnce.Do(wp.setup)
516         wp.waitUntilLoaded()
517         wp.mtx.Lock()
518         defer wp.mtx.Unlock()
519         r := map[State]int{}
520         for _, w := range wp.workers {
521                 r[w.state]++
522         }
523         return r
524 }
525
526 // Running returns the container UUIDs being prepared/run on workers.
527 //
528 // In the returned map, the time value indicates when the Pool
529 // observed that the container process had exited. A container that
530 // has not yet exited has a zero time value. The caller should use
531 // ForgetContainer() to garbage-collect the entries for exited
532 // containers.
533 func (wp *Pool) Running() map[string]time.Time {
534         wp.setupOnce.Do(wp.setup)
535         wp.mtx.Lock()
536         defer wp.mtx.Unlock()
537         r := map[string]time.Time{}
538         for _, wkr := range wp.workers {
539                 for uuid := range wkr.running {
540                         r[uuid] = time.Time{}
541                 }
542                 for uuid := range wkr.starting {
543                         r[uuid] = time.Time{}
544                 }
545         }
546         for uuid, exited := range wp.exited {
547                 r[uuid] = exited
548         }
549         return r
550 }
551
552 // StartContainer starts a container on an idle worker immediately if
553 // possible, otherwise returns false.
554 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
555         wp.setupOnce.Do(wp.setup)
556         wp.mtx.Lock()
557         defer wp.mtx.Unlock()
558         var wkr *worker
559         for _, w := range wp.workers {
560                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
561                         if wkr == nil || w.busy.After(wkr.busy) {
562                                 wkr = w
563                         }
564                 }
565         }
566         if wkr == nil {
567                 return false
568         }
569         wkr.startContainer(ctr)
570         return true
571 }
572
573 // KillContainer kills the crunch-run process for the given container
574 // UUID, if it's running on any worker.
575 //
576 // KillContainer returns immediately; the act of killing the container
577 // takes some time, and runs in the background.
578 //
579 // KillContainer returns false if the container has already ended.
580 func (wp *Pool) KillContainer(uuid string, reason string) bool {
581         wp.mtx.Lock()
582         defer wp.mtx.Unlock()
583         logger := wp.logger.WithFields(logrus.Fields{
584                 "ContainerUUID": uuid,
585                 "Reason":        reason,
586         })
587         for _, wkr := range wp.workers {
588                 rr := wkr.running[uuid]
589                 if rr == nil {
590                         rr = wkr.starting[uuid]
591                 }
592                 if rr != nil {
593                         rr.Kill(reason)
594                         return true
595                 }
596         }
597         logger.Debug("cannot kill: already disappeared")
598         return false
599 }
600
601 // ForgetContainer clears the placeholder for the given exited
602 // container, so it isn't returned by subsequent calls to Running().
603 //
604 // ForgetContainer has no effect if the container has not yet exited.
605 //
606 // The "container exited at time T" placeholder (which necessitates
607 // ForgetContainer) exists to make it easier for the caller
608 // (scheduler) to distinguish a container that exited without
609 // finalizing its state from a container that exited too recently for
610 // its final state to have appeared in the scheduler's queue cache.
611 func (wp *Pool) ForgetContainer(uuid string) {
612         wp.mtx.Lock()
613         defer wp.mtx.Unlock()
614         if _, ok := wp.exited[uuid]; ok {
615                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
616                 delete(wp.exited, uuid)
617         }
618 }
619
620 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
621         if reg == nil {
622                 reg = prometheus.NewRegistry()
623         }
624         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
625                 Namespace: "arvados",
626                 Subsystem: "dispatchcloud",
627                 Name:      "containers_running",
628                 Help:      "Number of containers reported running by cloud VMs.",
629         })
630         reg.MustRegister(wp.mContainersRunning)
631         wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
632                 Namespace: "arvados",
633                 Subsystem: "dispatchcloud",
634                 Name:      "probe_age_seconds_max",
635                 Help:      "Maximum number of seconds since an instance's most recent successful probe.",
636         })
637         reg.MustRegister(wp.mProbeAgeMax)
638         wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
639                 Namespace: "arvados",
640                 Subsystem: "dispatchcloud",
641                 Name:      "probe_age_seconds_median",
642                 Help:      "Median number of seconds since an instance's most recent successful probe.",
643         })
644         reg.MustRegister(wp.mProbeAgeMedian)
645         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
646                 Namespace: "arvados",
647                 Subsystem: "dispatchcloud",
648                 Name:      "instances_total",
649                 Help:      "Number of cloud VMs.",
650         }, []string{"category", "instance_type"})
651         reg.MustRegister(wp.mInstances)
652         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
653                 Namespace: "arvados",
654                 Subsystem: "dispatchcloud",
655                 Name:      "instances_price",
656                 Help:      "Price of cloud VMs.",
657         }, []string{"category"})
658         reg.MustRegister(wp.mInstancesPrice)
659         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
660                 Namespace: "arvados",
661                 Subsystem: "dispatchcloud",
662                 Name:      "vcpus_total",
663                 Help:      "Total VCPUs on all cloud VMs.",
664         }, []string{"category"})
665         reg.MustRegister(wp.mVCPUs)
666         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
667                 Namespace: "arvados",
668                 Subsystem: "dispatchcloud",
669                 Name:      "memory_bytes_total",
670                 Help:      "Total memory on all cloud VMs.",
671         }, []string{"category"})
672         reg.MustRegister(wp.mMemory)
673         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
674                 Namespace: "arvados",
675                 Subsystem: "dispatchcloud",
676                 Name:      "boot_outcomes",
677                 Help:      "Boot outcomes by type.",
678         }, []string{"outcome"})
679         for k := range validBootOutcomes {
680                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
681         }
682         reg.MustRegister(wp.mBootOutcomes)
683         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
684                 Namespace: "arvados",
685                 Subsystem: "dispatchcloud",
686                 Name:      "instances_disappeared",
687                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
688         }, []string{"state"})
689         for _, v := range stateString {
690                 wp.mDisappearances.WithLabelValues(v).Add(0)
691         }
692         reg.MustRegister(wp.mDisappearances)
693         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
694                 Namespace:  "arvados",
695                 Subsystem:  "dispatchcloud",
696                 Name:       "instances_time_to_ssh_seconds",
697                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
698                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
699         })
700         reg.MustRegister(wp.mTimeToSSH)
701         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
702                 Namespace:  "arvados",
703                 Subsystem:  "dispatchcloud",
704                 Name:       "instances_time_to_ready_for_container_seconds",
705                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
706                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
707         })
708         reg.MustRegister(wp.mTimeToReadyForContainer)
709         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
710                 Namespace:  "arvados",
711                 Subsystem:  "dispatchcloud",
712                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
713                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
714                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
715         })
716         reg.MustRegister(wp.mTimeFromShutdownToGone)
717         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
718                 Namespace:  "arvados",
719                 Subsystem:  "dispatchcloud",
720                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
721                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
722                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
723         })
724         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
725         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
726                 Namespace:  "arvados",
727                 Subsystem:  "dispatchcloud",
728                 Name:       "instances_run_probe_duration_seconds",
729                 Help:       "Number of seconds per runProbe call.",
730                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
731         }, []string{"outcome"})
732         reg.MustRegister(wp.mRunProbeDuration)
733 }
734
735 func (wp *Pool) runMetrics() {
736         ch := wp.Subscribe()
737         defer wp.Unsubscribe(ch)
738         wp.updateMetrics()
739         for range ch {
740                 wp.updateMetrics()
741         }
742 }
743
744 func (wp *Pool) updateMetrics() {
745         wp.mtx.RLock()
746         defer wp.mtx.RUnlock()
747
748         type entKey struct {
749                 cat      string
750                 instType string
751         }
752         instances := map[entKey]int64{}
753         price := map[string]float64{}
754         cpu := map[string]int64{}
755         mem := map[string]int64{}
756         var running int64
757         now := time.Now()
758         var probed []time.Time
759         for _, wkr := range wp.workers {
760                 var cat string
761                 switch {
762                 case len(wkr.running)+len(wkr.starting) > 0:
763                         cat = "inuse"
764                 case wkr.idleBehavior == IdleBehaviorHold:
765                         cat = "hold"
766                 case wkr.state == StateBooting:
767                         cat = "booting"
768                 case wkr.state == StateUnknown:
769                         cat = "unknown"
770                 default:
771                         cat = "idle"
772                 }
773                 instances[entKey{cat, wkr.instType.Name}]++
774                 price[cat] += wkr.instType.Price
775                 cpu[cat] += int64(wkr.instType.VCPUs)
776                 mem[cat] += int64(wkr.instType.RAM)
777                 running += int64(len(wkr.running) + len(wkr.starting))
778                 probed = append(probed, wkr.probed)
779         }
780         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
781                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
782                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
783                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
784                 // make sure to reset gauges for non-existing category/nodetype combinations
785                 for _, it := range wp.instanceTypes {
786                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
787                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
788                         }
789                 }
790         }
791         for k, v := range instances {
792                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
793         }
794         wp.mContainersRunning.Set(float64(running))
795
796         if len(probed) == 0 {
797                 wp.mProbeAgeMax.Set(0)
798                 wp.mProbeAgeMedian.Set(0)
799         } else {
800                 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
801                 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
802                 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
803         }
804 }
805
806 func (wp *Pool) runProbes() {
807         maxPPS := wp.maxProbesPerSecond
808         if maxPPS < 1 {
809                 maxPPS = defaultMaxProbesPerSecond
810         }
811         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
812         defer limitticker.Stop()
813
814         probeticker := time.NewTicker(wp.probeInterval)
815         defer probeticker.Stop()
816
817         workers := []cloud.InstanceID{}
818         for range probeticker.C {
819                 // Add some jitter. Without this, if probeInterval is
820                 // a multiple of syncInterval and sync is
821                 // instantaneous (as with the loopback driver), the
822                 // first few probes race with sync operations and
823                 // don't update the workers.
824                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
825
826                 workers = workers[:0]
827                 wp.mtx.Lock()
828                 for id, wkr := range wp.workers {
829                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
830                                 continue
831                         }
832                         workers = append(workers, id)
833                 }
834                 wp.mtx.Unlock()
835
836                 for _, id := range workers {
837                         wp.mtx.Lock()
838                         wkr, ok := wp.workers[id]
839                         wp.mtx.Unlock()
840                         if !ok {
841                                 // Deleted while we were probing
842                                 // others
843                                 continue
844                         }
845                         go wkr.ProbeAndUpdate()
846                         select {
847                         case <-wp.stop:
848                                 return
849                         case <-limitticker.C:
850                         }
851                 }
852         }
853 }
854
855 func (wp *Pool) runSync() {
856         // sync once immediately, then wait syncInterval, sync again,
857         // etc.
858         timer := time.NewTimer(1)
859         for {
860                 select {
861                 case <-timer.C:
862                         err := wp.getInstancesAndSync()
863                         if err != nil {
864                                 wp.logger.WithError(err).Warn("sync failed")
865                         }
866                         timer.Reset(wp.syncInterval)
867                 case <-wp.stop:
868                         wp.logger.Debug("worker.Pool stopped")
869                         return
870                 }
871         }
872 }
873
874 // Stop synchronizing with the InstanceSet.
875 func (wp *Pool) Stop() {
876         wp.setupOnce.Do(wp.setup)
877         close(wp.stop)
878 }
879
880 // Instances returns an InstanceView for each worker in the pool,
881 // summarizing its current state and recent activity.
882 func (wp *Pool) Instances() []InstanceView {
883         var r []InstanceView
884         wp.setupOnce.Do(wp.setup)
885         wp.mtx.Lock()
886         for _, w := range wp.workers {
887                 r = append(r, InstanceView{
888                         Instance:             w.instance.ID(),
889                         Address:              w.instance.Address(),
890                         Price:                w.instType.Price,
891                         ArvadosInstanceType:  w.instType.Name,
892                         ProviderInstanceType: w.instType.ProviderType,
893                         LastContainerUUID:    w.lastUUID,
894                         LastBusy:             w.busy,
895                         WorkerState:          w.state.String(),
896                         IdleBehavior:         w.idleBehavior,
897                 })
898         }
899         wp.mtx.Unlock()
900         sort.Slice(r, func(i, j int) bool {
901                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
902         })
903         return r
904 }
905
906 // KillInstance destroys a cloud VM instance. It returns an error if
907 // the given instance does not exist.
908 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
909         wkr, ok := wp.workers[id]
910         if !ok {
911                 return errors.New("instance not found")
912         }
913         wkr.logger.WithField("Reason", reason).Info("shutting down")
914         wkr.reportBootOutcome(BootOutcomeAborted)
915         wkr.shutdown()
916         return nil
917 }
918
919 func (wp *Pool) setup() {
920         wp.creating = map[string]createCall{}
921         wp.exited = map[string]time.Time{}
922         wp.workers = map[cloud.InstanceID]*worker{}
923         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
924         wp.loadRunnerData()
925 }
926
927 // Load the runner program to be deployed on worker nodes into
928 // wp.runnerData, if necessary. Errors are logged.
929 //
930 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
931 //
932 // Caller must not have lock.
933 func (wp *Pool) loadRunnerData() error {
934         wp.mtx.Lock()
935         defer wp.mtx.Unlock()
936         if wp.runnerData != nil {
937                 return nil
938         } else if wp.runnerSource == "" {
939                 wp.runnerCmd = wp.runnerCmdDefault
940                 wp.runnerData = []byte{}
941                 return nil
942         }
943         logger := wp.logger.WithField("source", wp.runnerSource)
944         logger.Debug("loading runner")
945         buf, err := ioutil.ReadFile(wp.runnerSource)
946         if err != nil {
947                 logger.WithError(err).Error("failed to load runner program")
948                 return err
949         }
950         wp.runnerData = buf
951         wp.runnerMD5 = md5.Sum(buf)
952         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
953         return nil
954 }
955
956 func (wp *Pool) notify() {
957         wp.mtx.RLock()
958         defer wp.mtx.RUnlock()
959         for _, send := range wp.subscribers {
960                 select {
961                 case send <- struct{}{}:
962                 default:
963                 }
964         }
965 }
966
967 func (wp *Pool) getInstancesAndSync() error {
968         wp.setupOnce.Do(wp.setup)
969         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
970                 return err
971         }
972         wp.logger.Debug("getting instance list")
973         threshold := time.Now()
974         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
975         if err != nil {
976                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
977                 return err
978         }
979         wp.sync(threshold, instances)
980         wp.logger.Debug("sync done")
981         return nil
982 }
983
984 // Add/remove/update workers based on instances, which was obtained
985 // from the instanceSet. However, don't clobber any other updates that
986 // already happened after threshold.
987 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
988         wp.mtx.Lock()
989         defer wp.mtx.Unlock()
990         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
991         notify := false
992
993         for _, inst := range instances {
994                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
995                 it, ok := wp.instanceTypes[itTag]
996                 if !ok {
997                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
998                         continue
999                 }
1000                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1001                         notify = true
1002                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1003                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1004                         wkr.shutdown()
1005                 }
1006         }
1007
1008         for id, wkr := range wp.workers {
1009                 if wkr.updated.After(threshold) {
1010                         continue
1011                 }
1012                 logger := wp.logger.WithFields(logrus.Fields{
1013                         "Instance":    wkr.instance.ID(),
1014                         "WorkerState": wkr.state,
1015                 })
1016                 logger.Info("instance disappeared in cloud")
1017                 wkr.reportBootOutcome(BootOutcomeDisappeared)
1018                 if wp.mDisappearances != nil {
1019                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1020                 }
1021                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1022                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1023                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1024                 }
1025                 delete(wp.workers, id)
1026                 go wkr.Close()
1027                 notify = true
1028         }
1029
1030         if !wp.loaded {
1031                 notify = true
1032                 wp.loaded = true
1033                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1034         }
1035
1036         if notify {
1037                 go wp.notify()
1038         }
1039 }
1040
1041 func (wp *Pool) waitUntilLoaded() {
1042         ch := wp.Subscribe()
1043         wp.mtx.RLock()
1044         defer wp.mtx.RUnlock()
1045         for !wp.loaded {
1046                 wp.mtx.RUnlock()
1047                 <-ch
1048                 wp.mtx.RLock()
1049         }
1050 }
1051
1052 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1053         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1054         fmt.Fprint(h, uuid)
1055         return fmt.Sprintf("%x", h.Sum(nil))
1056 }
1057
1058 // Return a random string of n hexadecimal digits (n*4 random bits). n
1059 // must be even.
1060 func randomHex(n int) string {
1061         buf := make([]byte, n/2)
1062         _, err := rand.Read(buf)
1063         if err != nil {
1064                 panic(err)
1065         }
1066         return fmt.Sprintf("%x", buf)
1067 }