20520: Add test for custom init command.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time between "X failed because rate limiting" messages
86         logRateLimitErrorInterval = time.Second * 10
87 )
88
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90         if conf > 0 {
91                 return time.Duration(conf)
92         }
93         return def
94 }
95
96 // NewPool creates a Pool of workers backed by instanceSet.
97 //
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
101         wp := &Pool{
102                 logger:                         logger,
103                 arvClient:                      arvClient,
104                 instanceSetID:                  instanceSetID,
105                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
106                 newExecutor:                    newExecutor,
107                 cluster:                        cluster,
108                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
109                 instanceInitCommand:            cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
110                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
111                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
112                 instanceTypes:                  cluster.InstanceTypes,
113                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
114                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
115                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
116                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
117                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
118                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
119                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
120                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
121                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
122                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
123                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
124                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
125                 systemRootToken:                cluster.SystemRootToken,
126                 installPublicKey:               installPublicKey,
127                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
128                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
129                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
130                 stop:                           make(chan bool),
131         }
132         wp.registerMetrics(reg)
133         go func() {
134                 wp.setupOnce.Do(wp.setup)
135                 go wp.runMetrics()
136                 go wp.runProbes()
137                 go wp.runSync()
138         }()
139         return wp
140 }
141
142 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
143 // zero Pool should not be used. Call NewPool to create a new Pool.
144 type Pool struct {
145         // configuration
146         logger                         logrus.FieldLogger
147         arvClient                      *arvados.Client
148         instanceSetID                  cloud.InstanceSetID
149         instanceSet                    *throttledInstanceSet
150         newExecutor                    func(cloud.Instance) Executor
151         cluster                        *arvados.Cluster
152         bootProbeCommand               string
153         instanceInitCommand            cloud.InitCommand
154         runnerSource                   string
155         imageID                        cloud.ImageID
156         instanceTypes                  map[string]arvados.InstanceType
157         syncInterval                   time.Duration
158         probeInterval                  time.Duration
159         maxProbesPerSecond             int
160         maxConcurrentInstanceCreateOps int
161         maxInstances                   int
162         timeoutIdle                    time.Duration
163         timeoutBooting                 time.Duration
164         timeoutProbe                   time.Duration
165         timeoutShutdown                time.Duration
166         timeoutTERM                    time.Duration
167         timeoutSignal                  time.Duration
168         timeoutStaleRunLock            time.Duration
169         systemRootToken                string
170         installPublicKey               ssh.PublicKey
171         tagKeyPrefix                   string
172         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
173         runnerArgs                     []string // extra args passed to crunch-run
174
175         // private state
176         subscribers  map[<-chan struct{}]chan<- struct{}
177         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
178         workers      map[cloud.InstanceID]*worker
179         loaded       bool                 // loaded list of instances from InstanceSet at least once
180         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
181         atQuotaUntil time.Time
182         atQuotaErr   cloud.QuotaError
183         stop         chan bool
184         mtx          sync.RWMutex
185         setupOnce    sync.Once
186         runnerData   []byte
187         runnerMD5    [md5.Size]byte
188         runnerCmd    string
189
190         mContainersRunning        prometheus.Gauge
191         mInstances                *prometheus.GaugeVec
192         mInstancesPrice           *prometheus.GaugeVec
193         mVCPUs                    *prometheus.GaugeVec
194         mMemory                   *prometheus.GaugeVec
195         mBootOutcomes             *prometheus.CounterVec
196         mDisappearances           *prometheus.CounterVec
197         mTimeToSSH                prometheus.Summary
198         mTimeToReadyForContainer  prometheus.Summary
199         mTimeFromShutdownToGone   prometheus.Summary
200         mTimeFromQueueToCrunchRun prometheus.Summary
201         mRunProbeDuration         *prometheus.SummaryVec
202         mProbeAgeMax              prometheus.Gauge
203         mProbeAgeMedian           prometheus.Gauge
204 }
205
206 type createCall struct {
207         time         time.Time
208         instanceType arvados.InstanceType
209 }
210
211 func (wp *Pool) CheckHealth() error {
212         wp.setupOnce.Do(wp.setup)
213         if err := wp.loadRunnerData(); err != nil {
214                 return fmt.Errorf("error loading runner binary: %s", err)
215         }
216         return nil
217 }
218
219 // Subscribe returns a buffered channel that becomes ready after any
220 // change to the pool's state that could have scheduling implications:
221 // a worker's state changes, a new worker appears, the cloud
222 // provider's API rate limiting period ends, etc.
223 //
224 // Additional events that occur while the channel is already ready
225 // will be dropped, so it is OK if the caller services the channel
226 // slowly.
227 //
228 // Example:
229 //
230 //      ch := wp.Subscribe()
231 //      defer wp.Unsubscribe(ch)
232 //      for range ch {
233 //              tryScheduling(wp)
234 //              if done {
235 //                      break
236 //              }
237 //      }
238 func (wp *Pool) Subscribe() <-chan struct{} {
239         wp.setupOnce.Do(wp.setup)
240         wp.mtx.Lock()
241         defer wp.mtx.Unlock()
242         ch := make(chan struct{}, 1)
243         wp.subscribers[ch] = ch
244         return ch
245 }
246
247 // Unsubscribe stops sending updates to the given channel.
248 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
249         wp.setupOnce.Do(wp.setup)
250         wp.mtx.Lock()
251         defer wp.mtx.Unlock()
252         delete(wp.subscribers, ch)
253 }
254
255 // Unallocated returns the number of unallocated (creating + booting +
256 // idle + unknown) workers for each instance type.  Workers in
257 // hold/drain mode are not included.
258 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
259         wp.setupOnce.Do(wp.setup)
260         wp.mtx.RLock()
261         defer wp.mtx.RUnlock()
262         unalloc := map[arvados.InstanceType]int{}
263         creating := map[arvados.InstanceType]int{}
264         oldestCreate := map[arvados.InstanceType]time.Time{}
265         for _, cc := range wp.creating {
266                 it := cc.instanceType
267                 creating[it]++
268                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
269                         oldestCreate[it] = cc.time
270                 }
271         }
272         for _, wkr := range wp.workers {
273                 // Skip workers that are not expected to become
274                 // available soon. Note len(wkr.running)>0 is not
275                 // redundant here: it can be true even in
276                 // StateUnknown.
277                 if wkr.state == StateShutdown ||
278                         wkr.state == StateRunning ||
279                         wkr.idleBehavior != IdleBehaviorRun ||
280                         len(wkr.running) > 0 {
281                         continue
282                 }
283                 it := wkr.instType
284                 unalloc[it]++
285                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
286                         // If up to N new workers appear in
287                         // Instances() while we are waiting for N
288                         // Create() calls to complete, we assume we're
289                         // just seeing a race between Instances() and
290                         // Create() responses.
291                         //
292                         // The other common reason why nodes have
293                         // state==Unknown is that they appeared at
294                         // startup, before any Create calls. They
295                         // don't match the above timing condition, so
296                         // we never mistakenly attribute them to
297                         // pending Create calls.
298                         creating[it]--
299                 }
300         }
301         for it, c := range creating {
302                 unalloc[it] += c
303         }
304         return unalloc
305 }
306
307 // Create a new instance with the given type, and add it to the worker
308 // pool. The worker is added immediately; instance creation runs in
309 // the background.
310 //
311 // Create returns false if a pre-existing error or a configuration
312 // setting prevents it from even attempting to create a new
313 // instance. Those errors are logged by the Pool, so the caller does
314 // not need to log anything in such cases.
315 func (wp *Pool) Create(it arvados.InstanceType) bool {
316         logger := wp.logger.WithField("InstanceType", it.Name)
317         wp.setupOnce.Do(wp.setup)
318         if wp.loadRunnerData() != nil {
319                 // Boot probe is certain to fail.
320                 return false
321         }
322         wp.mtx.Lock()
323         defer wp.mtx.Unlock()
324         if time.Now().Before(wp.atQuotaUntil) ||
325                 wp.instanceSet.throttleCreate.Error() != nil ||
326                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
327                 return false
328         }
329         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
330         // requests in flight. It was added to work around a limitation in Azure's
331         // managed disks, which support no more than 20 concurrent node creation
332         // requests from a single disk image (cf.
333         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
334         // The code assumes that node creation, from Azure's perspective, means the
335         // period until the instance appears in the "get all instances" list.
336         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
337                 logger.Info("reached MaxConcurrentInstanceCreateOps")
338                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
339                 return false
340         }
341         now := time.Now()
342         secret := randomHex(instanceSecretLength)
343         wp.creating[secret] = createCall{time: now, instanceType: it}
344         go func() {
345                 defer wp.notify()
346                 tags := cloud.InstanceTags{
347                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
348                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
349                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
350                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
351                 }
352                 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
353                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
354                 wp.mtx.Lock()
355                 defer wp.mtx.Unlock()
356                 // delete() is deferred so the updateWorker() call
357                 // below knows to use StateBooting when adding a new
358                 // worker.
359                 defer delete(wp.creating, secret)
360                 if err != nil {
361                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
362                                 wp.atQuotaErr = err
363                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
364                                 time.AfterFunc(quotaErrorTTL, wp.notify)
365                         }
366                         logger.WithError(err).Error("create failed")
367                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
368                         return
369                 }
370                 wp.updateWorker(inst, it)
371         }()
372         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
373                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
374         }
375         return true
376 }
377
378 // AtQuota returns true if Create is not expected to work at the
379 // moment (e.g., cloud provider has reported quota errors, or we are
380 // already at our own configured quota).
381 func (wp *Pool) AtQuota() bool {
382         wp.mtx.Lock()
383         defer wp.mtx.Unlock()
384         return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
385 }
386
387 // SetIdleBehavior determines how the indicated instance will behave
388 // when it has no containers running.
389 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
390         wp.mtx.Lock()
391         defer wp.mtx.Unlock()
392         wkr, ok := wp.workers[id]
393         if !ok {
394                 return errors.New("requested instance does not exist")
395         }
396         wkr.setIdleBehavior(idleBehavior)
397         return nil
398 }
399
400 // Successful connection to the SSH daemon, update the mTimeToSSH metric
401 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
402         wp.mtx.Lock()
403         defer wp.mtx.Unlock()
404         wkr, ok := wp.workers[inst.ID()]
405         if !ok {
406                 // race: inst was removed from the pool
407                 return
408         }
409         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
410                 // the node is not in booting state (can happen if
411                 // a-d-c is restarted) OR this is not the first SSH
412                 // connection
413                 return
414         }
415
416         wkr.firstSSHConnection = time.Now()
417         if wp.mTimeToSSH != nil {
418                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
419         }
420 }
421
422 // Add or update worker attached to the given instance.
423 //
424 // The second return value is true if a new worker is created.
425 //
426 // A newly added instance has state=StateBooting if its tags match an
427 // entry in wp.creating, otherwise StateUnknown.
428 //
429 // Caller must have lock.
430 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
431         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
432         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
433         id := inst.ID()
434         if wkr := wp.workers[id]; wkr != nil {
435                 wkr.executor.SetTarget(inst)
436                 wkr.instance = inst
437                 wkr.updated = time.Now()
438                 wkr.saveTags()
439                 return wkr, false
440         }
441
442         state := StateUnknown
443         if _, ok := wp.creating[secret]; ok {
444                 state = StateBooting
445         }
446
447         // If an instance has a valid IdleBehavior tag when it first
448         // appears, initialize the new worker accordingly (this is how
449         // we restore IdleBehavior that was set by a prior dispatch
450         // process); otherwise, default to "run". After this,
451         // wkr.idleBehavior is the source of truth, and will only be
452         // changed via SetIdleBehavior().
453         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
454         if !validIdleBehavior[idleBehavior] {
455                 idleBehavior = IdleBehaviorRun
456         }
457
458         logger := wp.logger.WithFields(logrus.Fields{
459                 "InstanceType": it.Name,
460                 "Instance":     inst.ID(),
461                 "Address":      inst.Address(),
462         })
463         logger.WithFields(logrus.Fields{
464                 "State":        state,
465                 "IdleBehavior": idleBehavior,
466         }).Infof("instance appeared in cloud")
467         now := time.Now()
468         wkr := &worker{
469                 mtx:          &wp.mtx,
470                 wp:           wp,
471                 logger:       logger,
472                 executor:     wp.newExecutor(inst),
473                 state:        state,
474                 idleBehavior: idleBehavior,
475                 instance:     inst,
476                 instType:     it,
477                 appeared:     now,
478                 probed:       now,
479                 busy:         now,
480                 updated:      now,
481                 running:      make(map[string]*remoteRunner),
482                 starting:     make(map[string]*remoteRunner),
483                 probing:      make(chan struct{}, 1),
484         }
485         wp.workers[id] = wkr
486         return wkr, true
487 }
488
489 // Shutdown shuts down a worker with the given type, or returns false
490 // if all workers with the given type are busy.
491 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
492         wp.setupOnce.Do(wp.setup)
493         wp.mtx.Lock()
494         defer wp.mtx.Unlock()
495         logger := wp.logger.WithField("InstanceType", it.Name)
496         logger.Info("shutdown requested")
497         for _, tryState := range []State{StateBooting, StateIdle} {
498                 // TODO: shutdown the worker with the longest idle
499                 // time (Idle) or the earliest create time (Booting)
500                 for _, wkr := range wp.workers {
501                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
502                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
503                                 wkr.reportBootOutcome(BootOutcomeAborted)
504                                 wkr.shutdown()
505                                 return true
506                         }
507                 }
508         }
509         return false
510 }
511
512 // CountWorkers returns the current number of workers in each state.
513 //
514 // CountWorkers blocks, if necessary, until the initial instance list
515 // has been loaded from the cloud provider.
516 func (wp *Pool) CountWorkers() map[State]int {
517         wp.setupOnce.Do(wp.setup)
518         wp.waitUntilLoaded()
519         wp.mtx.Lock()
520         defer wp.mtx.Unlock()
521         r := map[State]int{}
522         for _, w := range wp.workers {
523                 r[w.state]++
524         }
525         return r
526 }
527
528 // Running returns the container UUIDs being prepared/run on workers.
529 //
530 // In the returned map, the time value indicates when the Pool
531 // observed that the container process had exited. A container that
532 // has not yet exited has a zero time value. The caller should use
533 // ForgetContainer() to garbage-collect the entries for exited
534 // containers.
535 func (wp *Pool) Running() map[string]time.Time {
536         wp.setupOnce.Do(wp.setup)
537         wp.mtx.Lock()
538         defer wp.mtx.Unlock()
539         r := map[string]time.Time{}
540         for _, wkr := range wp.workers {
541                 for uuid := range wkr.running {
542                         r[uuid] = time.Time{}
543                 }
544                 for uuid := range wkr.starting {
545                         r[uuid] = time.Time{}
546                 }
547         }
548         for uuid, exited := range wp.exited {
549                 r[uuid] = exited
550         }
551         return r
552 }
553
554 // StartContainer starts a container on an idle worker immediately if
555 // possible, otherwise returns false.
556 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
557         wp.setupOnce.Do(wp.setup)
558         wp.mtx.Lock()
559         defer wp.mtx.Unlock()
560         var wkr *worker
561         for _, w := range wp.workers {
562                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
563                         if wkr == nil || w.busy.After(wkr.busy) {
564                                 wkr = w
565                         }
566                 }
567         }
568         if wkr == nil {
569                 return false
570         }
571         wkr.startContainer(ctr)
572         return true
573 }
574
575 // KillContainer kills the crunch-run process for the given container
576 // UUID, if it's running on any worker.
577 //
578 // KillContainer returns immediately; the act of killing the container
579 // takes some time, and runs in the background.
580 //
581 // KillContainer returns false if the container has already ended.
582 func (wp *Pool) KillContainer(uuid string, reason string) bool {
583         wp.mtx.Lock()
584         defer wp.mtx.Unlock()
585         logger := wp.logger.WithFields(logrus.Fields{
586                 "ContainerUUID": uuid,
587                 "Reason":        reason,
588         })
589         for _, wkr := range wp.workers {
590                 rr := wkr.running[uuid]
591                 if rr == nil {
592                         rr = wkr.starting[uuid]
593                 }
594                 if rr != nil {
595                         rr.Kill(reason)
596                         return true
597                 }
598         }
599         logger.Debug("cannot kill: already disappeared")
600         return false
601 }
602
603 // ForgetContainer clears the placeholder for the given exited
604 // container, so it isn't returned by subsequent calls to Running().
605 //
606 // ForgetContainer has no effect if the container has not yet exited.
607 //
608 // The "container exited at time T" placeholder (which necessitates
609 // ForgetContainer) exists to make it easier for the caller
610 // (scheduler) to distinguish a container that exited without
611 // finalizing its state from a container that exited too recently for
612 // its final state to have appeared in the scheduler's queue cache.
613 func (wp *Pool) ForgetContainer(uuid string) {
614         wp.mtx.Lock()
615         defer wp.mtx.Unlock()
616         if _, ok := wp.exited[uuid]; ok {
617                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
618                 delete(wp.exited, uuid)
619         }
620 }
621
622 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
623         if reg == nil {
624                 reg = prometheus.NewRegistry()
625         }
626         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
627                 Namespace: "arvados",
628                 Subsystem: "dispatchcloud",
629                 Name:      "containers_running",
630                 Help:      "Number of containers reported running by cloud VMs.",
631         })
632         reg.MustRegister(wp.mContainersRunning)
633         wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
634                 Namespace: "arvados",
635                 Subsystem: "dispatchcloud",
636                 Name:      "probe_age_seconds_max",
637                 Help:      "Maximum number of seconds since an instance's most recent successful probe.",
638         })
639         reg.MustRegister(wp.mProbeAgeMax)
640         wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
641                 Namespace: "arvados",
642                 Subsystem: "dispatchcloud",
643                 Name:      "probe_age_seconds_median",
644                 Help:      "Median number of seconds since an instance's most recent successful probe.",
645         })
646         reg.MustRegister(wp.mProbeAgeMedian)
647         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
648                 Namespace: "arvados",
649                 Subsystem: "dispatchcloud",
650                 Name:      "instances_total",
651                 Help:      "Number of cloud VMs.",
652         }, []string{"category", "instance_type"})
653         reg.MustRegister(wp.mInstances)
654         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
655                 Namespace: "arvados",
656                 Subsystem: "dispatchcloud",
657                 Name:      "instances_price",
658                 Help:      "Price of cloud VMs.",
659         }, []string{"category"})
660         reg.MustRegister(wp.mInstancesPrice)
661         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
662                 Namespace: "arvados",
663                 Subsystem: "dispatchcloud",
664                 Name:      "vcpus_total",
665                 Help:      "Total VCPUs on all cloud VMs.",
666         }, []string{"category"})
667         reg.MustRegister(wp.mVCPUs)
668         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
669                 Namespace: "arvados",
670                 Subsystem: "dispatchcloud",
671                 Name:      "memory_bytes_total",
672                 Help:      "Total memory on all cloud VMs.",
673         }, []string{"category"})
674         reg.MustRegister(wp.mMemory)
675         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
676                 Namespace: "arvados",
677                 Subsystem: "dispatchcloud",
678                 Name:      "boot_outcomes",
679                 Help:      "Boot outcomes by type.",
680         }, []string{"outcome"})
681         for k := range validBootOutcomes {
682                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
683         }
684         reg.MustRegister(wp.mBootOutcomes)
685         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
686                 Namespace: "arvados",
687                 Subsystem: "dispatchcloud",
688                 Name:      "instances_disappeared",
689                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
690         }, []string{"state"})
691         for _, v := range stateString {
692                 wp.mDisappearances.WithLabelValues(v).Add(0)
693         }
694         reg.MustRegister(wp.mDisappearances)
695         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
696                 Namespace:  "arvados",
697                 Subsystem:  "dispatchcloud",
698                 Name:       "instances_time_to_ssh_seconds",
699                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
700                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
701         })
702         reg.MustRegister(wp.mTimeToSSH)
703         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
704                 Namespace:  "arvados",
705                 Subsystem:  "dispatchcloud",
706                 Name:       "instances_time_to_ready_for_container_seconds",
707                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
708                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
709         })
710         reg.MustRegister(wp.mTimeToReadyForContainer)
711         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
712                 Namespace:  "arvados",
713                 Subsystem:  "dispatchcloud",
714                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
715                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
716                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
717         })
718         reg.MustRegister(wp.mTimeFromShutdownToGone)
719         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
720                 Namespace:  "arvados",
721                 Subsystem:  "dispatchcloud",
722                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
723                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
724                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
725         })
726         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
727         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
728                 Namespace:  "arvados",
729                 Subsystem:  "dispatchcloud",
730                 Name:       "instances_run_probe_duration_seconds",
731                 Help:       "Number of seconds per runProbe call.",
732                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
733         }, []string{"outcome"})
734         reg.MustRegister(wp.mRunProbeDuration)
735 }
736
737 func (wp *Pool) runMetrics() {
738         ch := wp.Subscribe()
739         defer wp.Unsubscribe(ch)
740         wp.updateMetrics()
741         for range ch {
742                 wp.updateMetrics()
743         }
744 }
745
746 func (wp *Pool) updateMetrics() {
747         wp.mtx.RLock()
748         defer wp.mtx.RUnlock()
749
750         type entKey struct {
751                 cat      string
752                 instType string
753         }
754         instances := map[entKey]int64{}
755         price := map[string]float64{}
756         cpu := map[string]int64{}
757         mem := map[string]int64{}
758         var running int64
759         now := time.Now()
760         var probed []time.Time
761         for _, wkr := range wp.workers {
762                 var cat string
763                 switch {
764                 case len(wkr.running)+len(wkr.starting) > 0:
765                         cat = "inuse"
766                 case wkr.idleBehavior == IdleBehaviorHold:
767                         cat = "hold"
768                 case wkr.state == StateBooting:
769                         cat = "booting"
770                 case wkr.state == StateUnknown:
771                         cat = "unknown"
772                 default:
773                         cat = "idle"
774                 }
775                 instances[entKey{cat, wkr.instType.Name}]++
776                 price[cat] += wkr.instType.Price
777                 cpu[cat] += int64(wkr.instType.VCPUs)
778                 mem[cat] += int64(wkr.instType.RAM)
779                 running += int64(len(wkr.running) + len(wkr.starting))
780                 probed = append(probed, wkr.probed)
781         }
782         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
783                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
784                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
785                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
786                 // make sure to reset gauges for non-existing category/nodetype combinations
787                 for _, it := range wp.instanceTypes {
788                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
789                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
790                         }
791                 }
792         }
793         for k, v := range instances {
794                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
795         }
796         wp.mContainersRunning.Set(float64(running))
797
798         if len(probed) == 0 {
799                 wp.mProbeAgeMax.Set(0)
800                 wp.mProbeAgeMedian.Set(0)
801         } else {
802                 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
803                 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
804                 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
805         }
806 }
807
808 func (wp *Pool) runProbes() {
809         maxPPS := wp.maxProbesPerSecond
810         if maxPPS < 1 {
811                 maxPPS = defaultMaxProbesPerSecond
812         }
813         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
814         defer limitticker.Stop()
815
816         probeticker := time.NewTicker(wp.probeInterval)
817         defer probeticker.Stop()
818
819         workers := []cloud.InstanceID{}
820         for range probeticker.C {
821                 // Add some jitter. Without this, if probeInterval is
822                 // a multiple of syncInterval and sync is
823                 // instantaneous (as with the loopback driver), the
824                 // first few probes race with sync operations and
825                 // don't update the workers.
826                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
827
828                 workers = workers[:0]
829                 wp.mtx.Lock()
830                 for id, wkr := range wp.workers {
831                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
832                                 continue
833                         }
834                         workers = append(workers, id)
835                 }
836                 wp.mtx.Unlock()
837
838                 for _, id := range workers {
839                         wp.mtx.Lock()
840                         wkr, ok := wp.workers[id]
841                         wp.mtx.Unlock()
842                         if !ok {
843                                 // Deleted while we were probing
844                                 // others
845                                 continue
846                         }
847                         go wkr.ProbeAndUpdate()
848                         select {
849                         case <-wp.stop:
850                                 return
851                         case <-limitticker.C:
852                         }
853                 }
854         }
855 }
856
857 func (wp *Pool) runSync() {
858         // sync once immediately, then wait syncInterval, sync again,
859         // etc.
860         timer := time.NewTimer(1)
861         for {
862                 select {
863                 case <-timer.C:
864                         err := wp.getInstancesAndSync()
865                         if err != nil {
866                                 wp.logger.WithError(err).Warn("sync failed")
867                         }
868                         timer.Reset(wp.syncInterval)
869                 case <-wp.stop:
870                         wp.logger.Debug("worker.Pool stopped")
871                         return
872                 }
873         }
874 }
875
876 // Stop synchronizing with the InstanceSet.
877 func (wp *Pool) Stop() {
878         wp.setupOnce.Do(wp.setup)
879         close(wp.stop)
880 }
881
882 // Instances returns an InstanceView for each worker in the pool,
883 // summarizing its current state and recent activity.
884 func (wp *Pool) Instances() []InstanceView {
885         var r []InstanceView
886         wp.setupOnce.Do(wp.setup)
887         wp.mtx.Lock()
888         for _, w := range wp.workers {
889                 r = append(r, InstanceView{
890                         Instance:             w.instance.ID(),
891                         Address:              w.instance.Address(),
892                         Price:                w.instType.Price,
893                         ArvadosInstanceType:  w.instType.Name,
894                         ProviderInstanceType: w.instType.ProviderType,
895                         LastContainerUUID:    w.lastUUID,
896                         LastBusy:             w.busy,
897                         WorkerState:          w.state.String(),
898                         IdleBehavior:         w.idleBehavior,
899                 })
900         }
901         wp.mtx.Unlock()
902         sort.Slice(r, func(i, j int) bool {
903                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
904         })
905         return r
906 }
907
908 // KillInstance destroys a cloud VM instance. It returns an error if
909 // the given instance does not exist.
910 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
911         wkr, ok := wp.workers[id]
912         if !ok {
913                 return errors.New("instance not found")
914         }
915         wkr.logger.WithField("Reason", reason).Info("shutting down")
916         wkr.reportBootOutcome(BootOutcomeAborted)
917         wkr.shutdown()
918         return nil
919 }
920
921 func (wp *Pool) setup() {
922         wp.creating = map[string]createCall{}
923         wp.exited = map[string]time.Time{}
924         wp.workers = map[cloud.InstanceID]*worker{}
925         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
926         wp.loadRunnerData()
927 }
928
929 // Load the runner program to be deployed on worker nodes into
930 // wp.runnerData, if necessary. Errors are logged.
931 //
932 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
933 //
934 // Caller must not have lock.
935 func (wp *Pool) loadRunnerData() error {
936         wp.mtx.Lock()
937         defer wp.mtx.Unlock()
938         if wp.runnerData != nil {
939                 return nil
940         } else if wp.runnerSource == "" {
941                 wp.runnerCmd = wp.runnerCmdDefault
942                 wp.runnerData = []byte{}
943                 return nil
944         }
945         logger := wp.logger.WithField("source", wp.runnerSource)
946         logger.Debug("loading runner")
947         buf, err := ioutil.ReadFile(wp.runnerSource)
948         if err != nil {
949                 logger.WithError(err).Error("failed to load runner program")
950                 return err
951         }
952         wp.runnerData = buf
953         wp.runnerMD5 = md5.Sum(buf)
954         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
955         return nil
956 }
957
958 func (wp *Pool) notify() {
959         wp.mtx.RLock()
960         defer wp.mtx.RUnlock()
961         for _, send := range wp.subscribers {
962                 select {
963                 case send <- struct{}{}:
964                 default:
965                 }
966         }
967 }
968
969 func (wp *Pool) getInstancesAndSync() error {
970         wp.setupOnce.Do(wp.setup)
971         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
972                 return err
973         }
974         wp.logger.Debug("getting instance list")
975         threshold := time.Now()
976         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
977         if err != nil {
978                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
979                 return err
980         }
981         wp.sync(threshold, instances)
982         wp.logger.Debug("sync done")
983         return nil
984 }
985
986 // Add/remove/update workers based on instances, which was obtained
987 // from the instanceSet. However, don't clobber any other updates that
988 // already happened after threshold.
989 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
990         wp.mtx.Lock()
991         defer wp.mtx.Unlock()
992         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
993         notify := false
994
995         for _, inst := range instances {
996                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
997                 it, ok := wp.instanceTypes[itTag]
998                 if !ok {
999                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1000                         continue
1001                 }
1002                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1003                         notify = true
1004                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1005                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1006                         wkr.shutdown()
1007                 }
1008         }
1009
1010         for id, wkr := range wp.workers {
1011                 if wkr.updated.After(threshold) {
1012                         continue
1013                 }
1014                 logger := wp.logger.WithFields(logrus.Fields{
1015                         "Instance":    wkr.instance.ID(),
1016                         "WorkerState": wkr.state,
1017                 })
1018                 logger.Info("instance disappeared in cloud")
1019                 wkr.reportBootOutcome(BootOutcomeDisappeared)
1020                 if wp.mDisappearances != nil {
1021                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1022                 }
1023                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1024                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1025                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1026                 }
1027                 delete(wp.workers, id)
1028                 go wkr.Close()
1029                 notify = true
1030         }
1031
1032         if !wp.loaded {
1033                 notify = true
1034                 wp.loaded = true
1035                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1036         }
1037
1038         if notify {
1039                 go wp.notify()
1040         }
1041 }
1042
1043 func (wp *Pool) waitUntilLoaded() {
1044         ch := wp.Subscribe()
1045         wp.mtx.RLock()
1046         defer wp.mtx.RUnlock()
1047         for !wp.loaded {
1048                 wp.mtx.RUnlock()
1049                 <-ch
1050                 wp.mtx.RLock()
1051         }
1052 }
1053
1054 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1055         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1056         fmt.Fprint(h, uuid)
1057         return fmt.Sprintf("%x", h.Sum(nil))
1058 }
1059
1060 // Return a random string of n hexadecimal digits (n*4 random bits). n
1061 // must be even.
1062 func randomHex(n int) string {
1063         buf := make([]byte, n/2)
1064         _, err := rand.Read(buf)
1065         if err != nil {
1066                 panic(err)
1067         }
1068         return fmt.Sprintf("%x", buf)
1069 }