20259: Add documentation for banner and tooltip features
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time between "X failed because rate limiting" messages
86         logRateLimitErrorInterval = time.Second * 10
87 )
88
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90         if conf > 0 {
91                 return time.Duration(conf)
92         }
93         return def
94 }
95
96 // NewPool creates a Pool of workers backed by instanceSet.
97 //
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
101         wp := &Pool{
102                 logger:                         logger,
103                 arvClient:                      arvClient,
104                 instanceSetID:                  instanceSetID,
105                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
106                 newExecutor:                    newExecutor,
107                 cluster:                        cluster,
108                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
109                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
110                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
111                 instanceTypes:                  cluster.InstanceTypes,
112                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
113                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
114                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
115                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
116                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
117                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
118                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
119                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
120                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
121                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
122                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
123                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
124                 systemRootToken:                cluster.SystemRootToken,
125                 installPublicKey:               installPublicKey,
126                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
127                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
128                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
129                 stop:                           make(chan bool),
130         }
131         wp.registerMetrics(reg)
132         go func() {
133                 wp.setupOnce.Do(wp.setup)
134                 go wp.runMetrics()
135                 go wp.runProbes()
136                 go wp.runSync()
137         }()
138         return wp
139 }
140
141 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
142 // zero Pool should not be used. Call NewPool to create a new Pool.
143 type Pool struct {
144         // configuration
145         logger                         logrus.FieldLogger
146         arvClient                      *arvados.Client
147         instanceSetID                  cloud.InstanceSetID
148         instanceSet                    *throttledInstanceSet
149         newExecutor                    func(cloud.Instance) Executor
150         cluster                        *arvados.Cluster
151         bootProbeCommand               string
152         runnerSource                   string
153         imageID                        cloud.ImageID
154         instanceTypes                  map[string]arvados.InstanceType
155         syncInterval                   time.Duration
156         probeInterval                  time.Duration
157         maxProbesPerSecond             int
158         maxConcurrentInstanceCreateOps int
159         maxInstances                   int
160         timeoutIdle                    time.Duration
161         timeoutBooting                 time.Duration
162         timeoutProbe                   time.Duration
163         timeoutShutdown                time.Duration
164         timeoutTERM                    time.Duration
165         timeoutSignal                  time.Duration
166         timeoutStaleRunLock            time.Duration
167         systemRootToken                string
168         installPublicKey               ssh.PublicKey
169         tagKeyPrefix                   string
170         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
171         runnerArgs                     []string // extra args passed to crunch-run
172
173         // private state
174         subscribers  map[<-chan struct{}]chan<- struct{}
175         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
176         workers      map[cloud.InstanceID]*worker
177         loaded       bool                 // loaded list of instances from InstanceSet at least once
178         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
179         atQuotaUntil time.Time
180         atQuotaErr   cloud.QuotaError
181         stop         chan bool
182         mtx          sync.RWMutex
183         setupOnce    sync.Once
184         runnerData   []byte
185         runnerMD5    [md5.Size]byte
186         runnerCmd    string
187
188         mContainersRunning        prometheus.Gauge
189         mInstances                *prometheus.GaugeVec
190         mInstancesPrice           *prometheus.GaugeVec
191         mVCPUs                    *prometheus.GaugeVec
192         mMemory                   *prometheus.GaugeVec
193         mBootOutcomes             *prometheus.CounterVec
194         mDisappearances           *prometheus.CounterVec
195         mTimeToSSH                prometheus.Summary
196         mTimeToReadyForContainer  prometheus.Summary
197         mTimeFromShutdownToGone   prometheus.Summary
198         mTimeFromQueueToCrunchRun prometheus.Summary
199         mRunProbeDuration         *prometheus.SummaryVec
200 }
201
202 type createCall struct {
203         time         time.Time
204         instanceType arvados.InstanceType
205 }
206
207 func (wp *Pool) CheckHealth() error {
208         wp.setupOnce.Do(wp.setup)
209         if err := wp.loadRunnerData(); err != nil {
210                 return fmt.Errorf("error loading runner binary: %s", err)
211         }
212         return nil
213 }
214
215 // Subscribe returns a buffered channel that becomes ready after any
216 // change to the pool's state that could have scheduling implications:
217 // a worker's state changes, a new worker appears, the cloud
218 // provider's API rate limiting period ends, etc.
219 //
220 // Additional events that occur while the channel is already ready
221 // will be dropped, so it is OK if the caller services the channel
222 // slowly.
223 //
224 // Example:
225 //
226 //      ch := wp.Subscribe()
227 //      defer wp.Unsubscribe(ch)
228 //      for range ch {
229 //              tryScheduling(wp)
230 //              if done {
231 //                      break
232 //              }
233 //      }
234 func (wp *Pool) Subscribe() <-chan struct{} {
235         wp.setupOnce.Do(wp.setup)
236         wp.mtx.Lock()
237         defer wp.mtx.Unlock()
238         ch := make(chan struct{}, 1)
239         wp.subscribers[ch] = ch
240         return ch
241 }
242
243 // Unsubscribe stops sending updates to the given channel.
244 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
245         wp.setupOnce.Do(wp.setup)
246         wp.mtx.Lock()
247         defer wp.mtx.Unlock()
248         delete(wp.subscribers, ch)
249 }
250
251 // Unallocated returns the number of unallocated (creating + booting +
252 // idle + unknown) workers for each instance type.  Workers in
253 // hold/drain mode are not included.
254 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
255         wp.setupOnce.Do(wp.setup)
256         wp.mtx.RLock()
257         defer wp.mtx.RUnlock()
258         unalloc := map[arvados.InstanceType]int{}
259         creating := map[arvados.InstanceType]int{}
260         oldestCreate := map[arvados.InstanceType]time.Time{}
261         for _, cc := range wp.creating {
262                 it := cc.instanceType
263                 creating[it]++
264                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
265                         oldestCreate[it] = cc.time
266                 }
267         }
268         for _, wkr := range wp.workers {
269                 // Skip workers that are not expected to become
270                 // available soon. Note len(wkr.running)>0 is not
271                 // redundant here: it can be true even in
272                 // StateUnknown.
273                 if wkr.state == StateShutdown ||
274                         wkr.state == StateRunning ||
275                         wkr.idleBehavior != IdleBehaviorRun ||
276                         len(wkr.running) > 0 {
277                         continue
278                 }
279                 it := wkr.instType
280                 unalloc[it]++
281                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
282                         // If up to N new workers appear in
283                         // Instances() while we are waiting for N
284                         // Create() calls to complete, we assume we're
285                         // just seeing a race between Instances() and
286                         // Create() responses.
287                         //
288                         // The other common reason why nodes have
289                         // state==Unknown is that they appeared at
290                         // startup, before any Create calls. They
291                         // don't match the above timing condition, so
292                         // we never mistakenly attribute them to
293                         // pending Create calls.
294                         creating[it]--
295                 }
296         }
297         for it, c := range creating {
298                 unalloc[it] += c
299         }
300         return unalloc
301 }
302
303 // Create a new instance with the given type, and add it to the worker
304 // pool. The worker is added immediately; instance creation runs in
305 // the background.
306 //
307 // Create returns false if a pre-existing error or a configuration
308 // setting prevents it from even attempting to create a new
309 // instance. Those errors are logged by the Pool, so the caller does
310 // not need to log anything in such cases.
311 func (wp *Pool) Create(it arvados.InstanceType) bool {
312         logger := wp.logger.WithField("InstanceType", it.Name)
313         wp.setupOnce.Do(wp.setup)
314         if wp.loadRunnerData() != nil {
315                 // Boot probe is certain to fail.
316                 return false
317         }
318         wp.mtx.Lock()
319         defer wp.mtx.Unlock()
320         if time.Now().Before(wp.atQuotaUntil) ||
321                 wp.instanceSet.throttleCreate.Error() != nil ||
322                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
323                 return false
324         }
325         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
326         // requests in flight. It was added to work around a limitation in Azure's
327         // managed disks, which support no more than 20 concurrent node creation
328         // requests from a single disk image (cf.
329         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
330         // The code assumes that node creation, from Azure's perspective, means the
331         // period until the instance appears in the "get all instances" list.
332         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
333                 logger.Info("reached MaxConcurrentInstanceCreateOps")
334                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
335                 return false
336         }
337         now := time.Now()
338         secret := randomHex(instanceSecretLength)
339         wp.creating[secret] = createCall{time: now, instanceType: it}
340         go func() {
341                 defer wp.notify()
342                 tags := cloud.InstanceTags{
343                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
344                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
345                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
346                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
347                 }
348                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
349                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
350                 wp.mtx.Lock()
351                 defer wp.mtx.Unlock()
352                 // delete() is deferred so the updateWorker() call
353                 // below knows to use StateBooting when adding a new
354                 // worker.
355                 defer delete(wp.creating, secret)
356                 if err != nil {
357                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
358                                 wp.atQuotaErr = err
359                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
360                                 time.AfterFunc(quotaErrorTTL, wp.notify)
361                         }
362                         logger.WithError(err).Error("create failed")
363                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
364                         return
365                 }
366                 wp.updateWorker(inst, it)
367         }()
368         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
369                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
370         }
371         return true
372 }
373
374 // AtQuota returns true if Create is not expected to work at the
375 // moment (e.g., cloud provider has reported quota errors, or we are
376 // already at our own configured quota).
377 func (wp *Pool) AtQuota() bool {
378         wp.mtx.Lock()
379         defer wp.mtx.Unlock()
380         return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
381 }
382
383 // SetIdleBehavior determines how the indicated instance will behave
384 // when it has no containers running.
385 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
386         wp.mtx.Lock()
387         defer wp.mtx.Unlock()
388         wkr, ok := wp.workers[id]
389         if !ok {
390                 return errors.New("requested instance does not exist")
391         }
392         wkr.setIdleBehavior(idleBehavior)
393         return nil
394 }
395
396 // Successful connection to the SSH daemon, update the mTimeToSSH metric
397 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
398         wp.mtx.Lock()
399         defer wp.mtx.Unlock()
400         wkr, ok := wp.workers[inst.ID()]
401         if !ok {
402                 // race: inst was removed from the pool
403                 return
404         }
405         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
406                 // the node is not in booting state (can happen if
407                 // a-d-c is restarted) OR this is not the first SSH
408                 // connection
409                 return
410         }
411
412         wkr.firstSSHConnection = time.Now()
413         if wp.mTimeToSSH != nil {
414                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
415         }
416 }
417
418 // Add or update worker attached to the given instance.
419 //
420 // The second return value is true if a new worker is created.
421 //
422 // A newly added instance has state=StateBooting if its tags match an
423 // entry in wp.creating, otherwise StateUnknown.
424 //
425 // Caller must have lock.
426 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
427         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
428         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
429         id := inst.ID()
430         if wkr := wp.workers[id]; wkr != nil {
431                 wkr.executor.SetTarget(inst)
432                 wkr.instance = inst
433                 wkr.updated = time.Now()
434                 wkr.saveTags()
435                 return wkr, false
436         }
437
438         state := StateUnknown
439         if _, ok := wp.creating[secret]; ok {
440                 state = StateBooting
441         }
442
443         // If an instance has a valid IdleBehavior tag when it first
444         // appears, initialize the new worker accordingly (this is how
445         // we restore IdleBehavior that was set by a prior dispatch
446         // process); otherwise, default to "run". After this,
447         // wkr.idleBehavior is the source of truth, and will only be
448         // changed via SetIdleBehavior().
449         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
450         if !validIdleBehavior[idleBehavior] {
451                 idleBehavior = IdleBehaviorRun
452         }
453
454         logger := wp.logger.WithFields(logrus.Fields{
455                 "InstanceType": it.Name,
456                 "Instance":     inst.ID(),
457                 "Address":      inst.Address(),
458         })
459         logger.WithFields(logrus.Fields{
460                 "State":        state,
461                 "IdleBehavior": idleBehavior,
462         }).Infof("instance appeared in cloud")
463         now := time.Now()
464         wkr := &worker{
465                 mtx:          &wp.mtx,
466                 wp:           wp,
467                 logger:       logger,
468                 executor:     wp.newExecutor(inst),
469                 state:        state,
470                 idleBehavior: idleBehavior,
471                 instance:     inst,
472                 instType:     it,
473                 appeared:     now,
474                 probed:       now,
475                 busy:         now,
476                 updated:      now,
477                 running:      make(map[string]*remoteRunner),
478                 starting:     make(map[string]*remoteRunner),
479                 probing:      make(chan struct{}, 1),
480         }
481         wp.workers[id] = wkr
482         return wkr, true
483 }
484
485 // Shutdown shuts down a worker with the given type, or returns false
486 // if all workers with the given type are busy.
487 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
488         wp.setupOnce.Do(wp.setup)
489         wp.mtx.Lock()
490         defer wp.mtx.Unlock()
491         logger := wp.logger.WithField("InstanceType", it.Name)
492         logger.Info("shutdown requested")
493         for _, tryState := range []State{StateBooting, StateIdle} {
494                 // TODO: shutdown the worker with the longest idle
495                 // time (Idle) or the earliest create time (Booting)
496                 for _, wkr := range wp.workers {
497                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
498                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
499                                 wkr.reportBootOutcome(BootOutcomeAborted)
500                                 wkr.shutdown()
501                                 return true
502                         }
503                 }
504         }
505         return false
506 }
507
508 // CountWorkers returns the current number of workers in each state.
509 //
510 // CountWorkers blocks, if necessary, until the initial instance list
511 // has been loaded from the cloud provider.
512 func (wp *Pool) CountWorkers() map[State]int {
513         wp.setupOnce.Do(wp.setup)
514         wp.waitUntilLoaded()
515         wp.mtx.Lock()
516         defer wp.mtx.Unlock()
517         r := map[State]int{}
518         for _, w := range wp.workers {
519                 r[w.state]++
520         }
521         return r
522 }
523
524 // Running returns the container UUIDs being prepared/run on workers.
525 //
526 // In the returned map, the time value indicates when the Pool
527 // observed that the container process had exited. A container that
528 // has not yet exited has a zero time value. The caller should use
529 // ForgetContainer() to garbage-collect the entries for exited
530 // containers.
531 func (wp *Pool) Running() map[string]time.Time {
532         wp.setupOnce.Do(wp.setup)
533         wp.mtx.Lock()
534         defer wp.mtx.Unlock()
535         r := map[string]time.Time{}
536         for _, wkr := range wp.workers {
537                 for uuid := range wkr.running {
538                         r[uuid] = time.Time{}
539                 }
540                 for uuid := range wkr.starting {
541                         r[uuid] = time.Time{}
542                 }
543         }
544         for uuid, exited := range wp.exited {
545                 r[uuid] = exited
546         }
547         return r
548 }
549
550 // StartContainer starts a container on an idle worker immediately if
551 // possible, otherwise returns false.
552 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
553         wp.setupOnce.Do(wp.setup)
554         wp.mtx.Lock()
555         defer wp.mtx.Unlock()
556         var wkr *worker
557         for _, w := range wp.workers {
558                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
559                         if wkr == nil || w.busy.After(wkr.busy) {
560                                 wkr = w
561                         }
562                 }
563         }
564         if wkr == nil {
565                 return false
566         }
567         wkr.startContainer(ctr)
568         return true
569 }
570
571 // KillContainer kills the crunch-run process for the given container
572 // UUID, if it's running on any worker.
573 //
574 // KillContainer returns immediately; the act of killing the container
575 // takes some time, and runs in the background.
576 //
577 // KillContainer returns false if the container has already ended.
578 func (wp *Pool) KillContainer(uuid string, reason string) bool {
579         wp.mtx.Lock()
580         defer wp.mtx.Unlock()
581         logger := wp.logger.WithFields(logrus.Fields{
582                 "ContainerUUID": uuid,
583                 "Reason":        reason,
584         })
585         for _, wkr := range wp.workers {
586                 rr := wkr.running[uuid]
587                 if rr == nil {
588                         rr = wkr.starting[uuid]
589                 }
590                 if rr != nil {
591                         rr.Kill(reason)
592                         return true
593                 }
594         }
595         logger.Debug("cannot kill: already disappeared")
596         return false
597 }
598
599 // ForgetContainer clears the placeholder for the given exited
600 // container, so it isn't returned by subsequent calls to Running().
601 //
602 // ForgetContainer has no effect if the container has not yet exited.
603 //
604 // The "container exited at time T" placeholder (which necessitates
605 // ForgetContainer) exists to make it easier for the caller
606 // (scheduler) to distinguish a container that exited without
607 // finalizing its state from a container that exited too recently for
608 // its final state to have appeared in the scheduler's queue cache.
609 func (wp *Pool) ForgetContainer(uuid string) {
610         wp.mtx.Lock()
611         defer wp.mtx.Unlock()
612         if _, ok := wp.exited[uuid]; ok {
613                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
614                 delete(wp.exited, uuid)
615         }
616 }
617
618 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
619         if reg == nil {
620                 reg = prometheus.NewRegistry()
621         }
622         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
623                 Namespace: "arvados",
624                 Subsystem: "dispatchcloud",
625                 Name:      "containers_running",
626                 Help:      "Number of containers reported running by cloud VMs.",
627         })
628         reg.MustRegister(wp.mContainersRunning)
629         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
630                 Namespace: "arvados",
631                 Subsystem: "dispatchcloud",
632                 Name:      "instances_total",
633                 Help:      "Number of cloud VMs.",
634         }, []string{"category", "instance_type"})
635         reg.MustRegister(wp.mInstances)
636         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
637                 Namespace: "arvados",
638                 Subsystem: "dispatchcloud",
639                 Name:      "instances_price",
640                 Help:      "Price of cloud VMs.",
641         }, []string{"category"})
642         reg.MustRegister(wp.mInstancesPrice)
643         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
644                 Namespace: "arvados",
645                 Subsystem: "dispatchcloud",
646                 Name:      "vcpus_total",
647                 Help:      "Total VCPUs on all cloud VMs.",
648         }, []string{"category"})
649         reg.MustRegister(wp.mVCPUs)
650         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
651                 Namespace: "arvados",
652                 Subsystem: "dispatchcloud",
653                 Name:      "memory_bytes_total",
654                 Help:      "Total memory on all cloud VMs.",
655         }, []string{"category"})
656         reg.MustRegister(wp.mMemory)
657         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
658                 Namespace: "arvados",
659                 Subsystem: "dispatchcloud",
660                 Name:      "boot_outcomes",
661                 Help:      "Boot outcomes by type.",
662         }, []string{"outcome"})
663         for k := range validBootOutcomes {
664                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
665         }
666         reg.MustRegister(wp.mBootOutcomes)
667         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
668                 Namespace: "arvados",
669                 Subsystem: "dispatchcloud",
670                 Name:      "instances_disappeared",
671                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
672         }, []string{"state"})
673         for _, v := range stateString {
674                 wp.mDisappearances.WithLabelValues(v).Add(0)
675         }
676         reg.MustRegister(wp.mDisappearances)
677         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
678                 Namespace:  "arvados",
679                 Subsystem:  "dispatchcloud",
680                 Name:       "instances_time_to_ssh_seconds",
681                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
682                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
683         })
684         reg.MustRegister(wp.mTimeToSSH)
685         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
686                 Namespace:  "arvados",
687                 Subsystem:  "dispatchcloud",
688                 Name:       "instances_time_to_ready_for_container_seconds",
689                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
690                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
691         })
692         reg.MustRegister(wp.mTimeToReadyForContainer)
693         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
694                 Namespace:  "arvados",
695                 Subsystem:  "dispatchcloud",
696                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
697                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
698                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
699         })
700         reg.MustRegister(wp.mTimeFromShutdownToGone)
701         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
702                 Namespace:  "arvados",
703                 Subsystem:  "dispatchcloud",
704                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
705                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
706                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
707         })
708         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
709         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
710                 Namespace:  "arvados",
711                 Subsystem:  "dispatchcloud",
712                 Name:       "instances_run_probe_duration_seconds",
713                 Help:       "Number of seconds per runProbe call.",
714                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
715         }, []string{"outcome"})
716         reg.MustRegister(wp.mRunProbeDuration)
717 }
718
719 func (wp *Pool) runMetrics() {
720         ch := wp.Subscribe()
721         defer wp.Unsubscribe(ch)
722         wp.updateMetrics()
723         for range ch {
724                 wp.updateMetrics()
725         }
726 }
727
728 func (wp *Pool) updateMetrics() {
729         wp.mtx.RLock()
730         defer wp.mtx.RUnlock()
731
732         type entKey struct {
733                 cat      string
734                 instType string
735         }
736         instances := map[entKey]int64{}
737         price := map[string]float64{}
738         cpu := map[string]int64{}
739         mem := map[string]int64{}
740         var running int64
741         for _, wkr := range wp.workers {
742                 var cat string
743                 switch {
744                 case len(wkr.running)+len(wkr.starting) > 0:
745                         cat = "inuse"
746                 case wkr.idleBehavior == IdleBehaviorHold:
747                         cat = "hold"
748                 case wkr.state == StateBooting:
749                         cat = "booting"
750                 case wkr.state == StateUnknown:
751                         cat = "unknown"
752                 default:
753                         cat = "idle"
754                 }
755                 instances[entKey{cat, wkr.instType.Name}]++
756                 price[cat] += wkr.instType.Price
757                 cpu[cat] += int64(wkr.instType.VCPUs)
758                 mem[cat] += int64(wkr.instType.RAM)
759                 running += int64(len(wkr.running) + len(wkr.starting))
760         }
761         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
762                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
763                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
764                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
765                 // make sure to reset gauges for non-existing category/nodetype combinations
766                 for _, it := range wp.instanceTypes {
767                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
768                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
769                         }
770                 }
771         }
772         for k, v := range instances {
773                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
774         }
775         wp.mContainersRunning.Set(float64(running))
776 }
777
778 func (wp *Pool) runProbes() {
779         maxPPS := wp.maxProbesPerSecond
780         if maxPPS < 1 {
781                 maxPPS = defaultMaxProbesPerSecond
782         }
783         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
784         defer limitticker.Stop()
785
786         probeticker := time.NewTicker(wp.probeInterval)
787         defer probeticker.Stop()
788
789         workers := []cloud.InstanceID{}
790         for range probeticker.C {
791                 // Add some jitter. Without this, if probeInterval is
792                 // a multiple of syncInterval and sync is
793                 // instantaneous (as with the loopback driver), the
794                 // first few probes race with sync operations and
795                 // don't update the workers.
796                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
797
798                 workers = workers[:0]
799                 wp.mtx.Lock()
800                 for id, wkr := range wp.workers {
801                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
802                                 continue
803                         }
804                         workers = append(workers, id)
805                 }
806                 wp.mtx.Unlock()
807
808                 for _, id := range workers {
809                         wp.mtx.Lock()
810                         wkr, ok := wp.workers[id]
811                         wp.mtx.Unlock()
812                         if !ok {
813                                 // Deleted while we were probing
814                                 // others
815                                 continue
816                         }
817                         go wkr.ProbeAndUpdate()
818                         select {
819                         case <-wp.stop:
820                                 return
821                         case <-limitticker.C:
822                         }
823                 }
824         }
825 }
826
827 func (wp *Pool) runSync() {
828         // sync once immediately, then wait syncInterval, sync again,
829         // etc.
830         timer := time.NewTimer(1)
831         for {
832                 select {
833                 case <-timer.C:
834                         err := wp.getInstancesAndSync()
835                         if err != nil {
836                                 wp.logger.WithError(err).Warn("sync failed")
837                         }
838                         timer.Reset(wp.syncInterval)
839                 case <-wp.stop:
840                         wp.logger.Debug("worker.Pool stopped")
841                         return
842                 }
843         }
844 }
845
846 // Stop synchronizing with the InstanceSet.
847 func (wp *Pool) Stop() {
848         wp.setupOnce.Do(wp.setup)
849         close(wp.stop)
850 }
851
852 // Instances returns an InstanceView for each worker in the pool,
853 // summarizing its current state and recent activity.
854 func (wp *Pool) Instances() []InstanceView {
855         var r []InstanceView
856         wp.setupOnce.Do(wp.setup)
857         wp.mtx.Lock()
858         for _, w := range wp.workers {
859                 r = append(r, InstanceView{
860                         Instance:             w.instance.ID(),
861                         Address:              w.instance.Address(),
862                         Price:                w.instType.Price,
863                         ArvadosInstanceType:  w.instType.Name,
864                         ProviderInstanceType: w.instType.ProviderType,
865                         LastContainerUUID:    w.lastUUID,
866                         LastBusy:             w.busy,
867                         WorkerState:          w.state.String(),
868                         IdleBehavior:         w.idleBehavior,
869                 })
870         }
871         wp.mtx.Unlock()
872         sort.Slice(r, func(i, j int) bool {
873                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
874         })
875         return r
876 }
877
878 // KillInstance destroys a cloud VM instance. It returns an error if
879 // the given instance does not exist.
880 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
881         wkr, ok := wp.workers[id]
882         if !ok {
883                 return errors.New("instance not found")
884         }
885         wkr.logger.WithField("Reason", reason).Info("shutting down")
886         wkr.reportBootOutcome(BootOutcomeAborted)
887         wkr.shutdown()
888         return nil
889 }
890
891 func (wp *Pool) setup() {
892         wp.creating = map[string]createCall{}
893         wp.exited = map[string]time.Time{}
894         wp.workers = map[cloud.InstanceID]*worker{}
895         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
896         wp.loadRunnerData()
897 }
898
899 // Load the runner program to be deployed on worker nodes into
900 // wp.runnerData, if necessary. Errors are logged.
901 //
902 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
903 //
904 // Caller must not have lock.
905 func (wp *Pool) loadRunnerData() error {
906         wp.mtx.Lock()
907         defer wp.mtx.Unlock()
908         if wp.runnerData != nil {
909                 return nil
910         } else if wp.runnerSource == "" {
911                 wp.runnerCmd = wp.runnerCmdDefault
912                 wp.runnerData = []byte{}
913                 return nil
914         }
915         logger := wp.logger.WithField("source", wp.runnerSource)
916         logger.Debug("loading runner")
917         buf, err := ioutil.ReadFile(wp.runnerSource)
918         if err != nil {
919                 logger.WithError(err).Error("failed to load runner program")
920                 return err
921         }
922         wp.runnerData = buf
923         wp.runnerMD5 = md5.Sum(buf)
924         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
925         return nil
926 }
927
928 func (wp *Pool) notify() {
929         wp.mtx.RLock()
930         defer wp.mtx.RUnlock()
931         for _, send := range wp.subscribers {
932                 select {
933                 case send <- struct{}{}:
934                 default:
935                 }
936         }
937 }
938
939 func (wp *Pool) getInstancesAndSync() error {
940         wp.setupOnce.Do(wp.setup)
941         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
942                 return err
943         }
944         wp.logger.Debug("getting instance list")
945         threshold := time.Now()
946         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
947         if err != nil {
948                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
949                 return err
950         }
951         wp.sync(threshold, instances)
952         wp.logger.Debug("sync done")
953         return nil
954 }
955
956 // Add/remove/update workers based on instances, which was obtained
957 // from the instanceSet. However, don't clobber any other updates that
958 // already happened after threshold.
959 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
960         wp.mtx.Lock()
961         defer wp.mtx.Unlock()
962         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
963         notify := false
964
965         for _, inst := range instances {
966                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
967                 it, ok := wp.instanceTypes[itTag]
968                 if !ok {
969                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
970                         continue
971                 }
972                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
973                         notify = true
974                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
975                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
976                         wkr.shutdown()
977                 }
978         }
979
980         for id, wkr := range wp.workers {
981                 if wkr.updated.After(threshold) {
982                         continue
983                 }
984                 logger := wp.logger.WithFields(logrus.Fields{
985                         "Instance":    wkr.instance.ID(),
986                         "WorkerState": wkr.state,
987                 })
988                 logger.Info("instance disappeared in cloud")
989                 wkr.reportBootOutcome(BootOutcomeDisappeared)
990                 if wp.mDisappearances != nil {
991                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
992                 }
993                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
994                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
995                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
996                 }
997                 delete(wp.workers, id)
998                 go wkr.Close()
999                 notify = true
1000         }
1001
1002         if !wp.loaded {
1003                 notify = true
1004                 wp.loaded = true
1005                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1006         }
1007
1008         if notify {
1009                 go wp.notify()
1010         }
1011 }
1012
1013 func (wp *Pool) waitUntilLoaded() {
1014         ch := wp.Subscribe()
1015         wp.mtx.RLock()
1016         defer wp.mtx.RUnlock()
1017         for !wp.loaded {
1018                 wp.mtx.RUnlock()
1019                 <-ch
1020                 wp.mtx.RLock()
1021         }
1022 }
1023
1024 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1025         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1026         fmt.Fprint(h, uuid)
1027         return fmt.Sprintf("%x", h.Sum(nil))
1028 }
1029
1030 // Return a random string of n hexadecimal digits (n*4 random bits). n
1031 // must be even.
1032 func randomHex(n int) string {
1033         buf := make([]byte, n/2)
1034         _, err := rand.Read(buf)
1035         if err != nil {
1036                 panic(err)
1037         }
1038         return fmt.Sprintf("%x", buf)
1039 }