More Salt installer doc refactoring.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "sort"
17         "strings"
18         "sync"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/cloud"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "github.com/prometheus/client_golang/prometheus"
24         "github.com/sirupsen/logrus"
25         "golang.org/x/crypto/ssh"
26 )
27
28 const (
29         tagKeyInstanceType   = "InstanceType"
30         tagKeyIdleBehavior   = "IdleBehavior"
31         tagKeyInstanceSecret = "InstanceSecret"
32         tagKeyInstanceSetID  = "InstanceSetID"
33 )
34
35 // An InstanceView shows a worker's current state and recent activity.
36 type InstanceView struct {
37         Instance             cloud.InstanceID `json:"instance"`
38         Address              string           `json:"address"`
39         Price                float64          `json:"price"`
40         ArvadosInstanceType  string           `json:"arvados_instance_type"`
41         ProviderInstanceType string           `json:"provider_instance_type"`
42         LastContainerUUID    string           `json:"last_container_uuid"`
43         LastBusy             time.Time        `json:"last_busy"`
44         WorkerState          string           `json:"worker_state"`
45         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
46 }
47
48 // An Executor executes shell commands on a remote host.
49 type Executor interface {
50         // Run cmd on the current target.
51         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
52
53         // Use the given target for subsequent operations. The new
54         // target is the same host as the previous target, but it
55         // might return a different address and verify a different
56         // host key.
57         //
58         // SetTarget is called frequently, and in most cases the new
59         // target will behave exactly the same as the old one. An
60         // implementation should optimize accordingly.
61         //
62         // SetTarget must not block on concurrent Execute calls.
63         SetTarget(cloud.ExecutorTarget)
64
65         Close()
66 }
67
68 const (
69         defaultSyncInterval        = time.Minute
70         defaultProbeInterval       = time.Second * 10
71         defaultMaxProbesPerSecond  = 10
72         defaultTimeoutIdle         = time.Minute
73         defaultTimeoutBooting      = time.Minute * 10
74         defaultTimeoutProbe        = time.Minute * 10
75         defaultTimeoutShutdown     = time.Second * 10
76         defaultTimeoutTERM         = time.Minute * 2
77         defaultTimeoutSignal       = time.Second * 5
78         defaultTimeoutStaleRunLock = time.Second * 5
79
80         // Time after a quota error to try again anyway, even if no
81         // instances have been shutdown.
82         quotaErrorTTL = time.Minute
83
84         // Time between "X failed because rate limiting" messages
85         logRateLimitErrorInterval = time.Second * 10
86 )
87
88 func duration(conf arvados.Duration, def time.Duration) time.Duration {
89         if conf > 0 {
90                 return time.Duration(conf)
91         }
92         return def
93 }
94
95 // NewPool creates a Pool of workers backed by instanceSet.
96 //
97 // New instances are configured and set up according to the given
98 // cluster configuration.
99 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100         wp := &Pool{
101                 logger:                         logger,
102                 arvClient:                      arvClient,
103                 instanceSetID:                  instanceSetID,
104                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
105                 newExecutor:                    newExecutor,
106                 cluster:                        cluster,
107                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
108                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
109                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
110                 instanceTypes:                  cluster.InstanceTypes,
111                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
112                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
113                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
114                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
115                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
116                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
117                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
118                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
119                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
120                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
121                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
122                 systemRootToken:                cluster.SystemRootToken,
123                 installPublicKey:               installPublicKey,
124                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
125                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
126                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
127                 stop:                           make(chan bool),
128         }
129         wp.registerMetrics(reg)
130         go func() {
131                 wp.setupOnce.Do(wp.setup)
132                 go wp.runMetrics()
133                 go wp.runProbes()
134                 go wp.runSync()
135         }()
136         return wp
137 }
138
139 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
140 // zero Pool should not be used. Call NewPool to create a new Pool.
141 type Pool struct {
142         // configuration
143         logger                         logrus.FieldLogger
144         arvClient                      *arvados.Client
145         instanceSetID                  cloud.InstanceSetID
146         instanceSet                    *throttledInstanceSet
147         newExecutor                    func(cloud.Instance) Executor
148         cluster                        *arvados.Cluster
149         bootProbeCommand               string
150         runnerSource                   string
151         imageID                        cloud.ImageID
152         instanceTypes                  map[string]arvados.InstanceType
153         syncInterval                   time.Duration
154         probeInterval                  time.Duration
155         maxProbesPerSecond             int
156         maxConcurrentInstanceCreateOps int
157         timeoutIdle                    time.Duration
158         timeoutBooting                 time.Duration
159         timeoutProbe                   time.Duration
160         timeoutShutdown                time.Duration
161         timeoutTERM                    time.Duration
162         timeoutSignal                  time.Duration
163         timeoutStaleRunLock            time.Duration
164         systemRootToken                string
165         installPublicKey               ssh.PublicKey
166         tagKeyPrefix                   string
167         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
168         runnerArgs                     []string // extra args passed to crunch-run
169
170         // private state
171         subscribers  map[<-chan struct{}]chan<- struct{}
172         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
173         workers      map[cloud.InstanceID]*worker
174         loaded       bool                 // loaded list of instances from InstanceSet at least once
175         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
176         atQuotaUntil time.Time
177         atQuotaErr   cloud.QuotaError
178         stop         chan bool
179         mtx          sync.RWMutex
180         setupOnce    sync.Once
181         runnerData   []byte
182         runnerMD5    [md5.Size]byte
183         runnerCmd    string
184
185         mContainersRunning        prometheus.Gauge
186         mInstances                *prometheus.GaugeVec
187         mInstancesPrice           *prometheus.GaugeVec
188         mVCPUs                    *prometheus.GaugeVec
189         mMemory                   *prometheus.GaugeVec
190         mBootOutcomes             *prometheus.CounterVec
191         mDisappearances           *prometheus.CounterVec
192         mTimeToSSH                prometheus.Summary
193         mTimeToReadyForContainer  prometheus.Summary
194         mTimeFromShutdownToGone   prometheus.Summary
195         mTimeFromQueueToCrunchRun prometheus.Summary
196         mRunProbeDuration         *prometheus.SummaryVec
197 }
198
199 type createCall struct {
200         time         time.Time
201         instanceType arvados.InstanceType
202 }
203
204 func (wp *Pool) CheckHealth() error {
205         wp.setupOnce.Do(wp.setup)
206         if err := wp.loadRunnerData(); err != nil {
207                 return fmt.Errorf("error loading runner binary: %s", err)
208         }
209         return nil
210 }
211
212 // Subscribe returns a buffered channel that becomes ready after any
213 // change to the pool's state that could have scheduling implications:
214 // a worker's state changes, a new worker appears, the cloud
215 // provider's API rate limiting period ends, etc.
216 //
217 // Additional events that occur while the channel is already ready
218 // will be dropped, so it is OK if the caller services the channel
219 // slowly.
220 //
221 // Example:
222 //
223 //      ch := wp.Subscribe()
224 //      defer wp.Unsubscribe(ch)
225 //      for range ch {
226 //              tryScheduling(wp)
227 //              if done {
228 //                      break
229 //              }
230 //      }
231 func (wp *Pool) Subscribe() <-chan struct{} {
232         wp.setupOnce.Do(wp.setup)
233         wp.mtx.Lock()
234         defer wp.mtx.Unlock()
235         ch := make(chan struct{}, 1)
236         wp.subscribers[ch] = ch
237         return ch
238 }
239
240 // Unsubscribe stops sending updates to the given channel.
241 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
242         wp.setupOnce.Do(wp.setup)
243         wp.mtx.Lock()
244         defer wp.mtx.Unlock()
245         delete(wp.subscribers, ch)
246 }
247
248 // Unallocated returns the number of unallocated (creating + booting +
249 // idle + unknown) workers for each instance type.  Workers in
250 // hold/drain mode are not included.
251 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
252         wp.setupOnce.Do(wp.setup)
253         wp.mtx.RLock()
254         defer wp.mtx.RUnlock()
255         unalloc := map[arvados.InstanceType]int{}
256         creating := map[arvados.InstanceType]int{}
257         oldestCreate := map[arvados.InstanceType]time.Time{}
258         for _, cc := range wp.creating {
259                 it := cc.instanceType
260                 creating[it]++
261                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
262                         oldestCreate[it] = cc.time
263                 }
264         }
265         for _, wkr := range wp.workers {
266                 // Skip workers that are not expected to become
267                 // available soon. Note len(wkr.running)>0 is not
268                 // redundant here: it can be true even in
269                 // StateUnknown.
270                 if wkr.state == StateShutdown ||
271                         wkr.state == StateRunning ||
272                         wkr.idleBehavior != IdleBehaviorRun ||
273                         len(wkr.running) > 0 {
274                         continue
275                 }
276                 it := wkr.instType
277                 unalloc[it]++
278                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
279                         // If up to N new workers appear in
280                         // Instances() while we are waiting for N
281                         // Create() calls to complete, we assume we're
282                         // just seeing a race between Instances() and
283                         // Create() responses.
284                         //
285                         // The other common reason why nodes have
286                         // state==Unknown is that they appeared at
287                         // startup, before any Create calls. They
288                         // don't match the above timing condition, so
289                         // we never mistakenly attribute them to
290                         // pending Create calls.
291                         creating[it]--
292                 }
293         }
294         for it, c := range creating {
295                 unalloc[it] += c
296         }
297         return unalloc
298 }
299
300 // Create a new instance with the given type, and add it to the worker
301 // pool. The worker is added immediately; instance creation runs in
302 // the background.
303 //
304 // Create returns false if a pre-existing error state prevents it from
305 // even attempting to create a new instance. Those errors are logged
306 // by the Pool, so the caller does not need to log anything in such
307 // cases.
308 func (wp *Pool) Create(it arvados.InstanceType) bool {
309         logger := wp.logger.WithField("InstanceType", it.Name)
310         wp.setupOnce.Do(wp.setup)
311         if wp.loadRunnerData() != nil {
312                 // Boot probe is certain to fail.
313                 return false
314         }
315         wp.mtx.Lock()
316         defer wp.mtx.Unlock()
317         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
318                 return false
319         }
320         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
321         // requests in flight. It was added to work around a limitation in Azure's
322         // managed disks, which support no more than 20 concurrent node creation
323         // requests from a single disk image (cf.
324         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
325         // The code assumes that node creation, from Azure's perspective, means the
326         // period until the instance appears in the "get all instances" list.
327         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
328                 logger.Info("reached MaxConcurrentInstanceCreateOps")
329                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
330                 return false
331         }
332         now := time.Now()
333         secret := randomHex(instanceSecretLength)
334         wp.creating[secret] = createCall{time: now, instanceType: it}
335         go func() {
336                 defer wp.notify()
337                 tags := cloud.InstanceTags{
338                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
339                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
340                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
341                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
342                 }
343                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
344                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
345                 wp.mtx.Lock()
346                 defer wp.mtx.Unlock()
347                 // delete() is deferred so the updateWorker() call
348                 // below knows to use StateBooting when adding a new
349                 // worker.
350                 defer delete(wp.creating, secret)
351                 if err != nil {
352                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
353                                 wp.atQuotaErr = err
354                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
355                                 time.AfterFunc(quotaErrorTTL, wp.notify)
356                         }
357                         logger.WithError(err).Error("create failed")
358                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
359                         return
360                 }
361                 wp.updateWorker(inst, it)
362         }()
363         return true
364 }
365
366 // AtQuota returns true if Create is not expected to work at the
367 // moment.
368 func (wp *Pool) AtQuota() bool {
369         wp.mtx.Lock()
370         defer wp.mtx.Unlock()
371         return time.Now().Before(wp.atQuotaUntil)
372 }
373
374 // SetIdleBehavior determines how the indicated instance will behave
375 // when it has no containers running.
376 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
377         wp.mtx.Lock()
378         defer wp.mtx.Unlock()
379         wkr, ok := wp.workers[id]
380         if !ok {
381                 return errors.New("requested instance does not exist")
382         }
383         wkr.setIdleBehavior(idleBehavior)
384         return nil
385 }
386
387 // Successful connection to the SSH daemon, update the mTimeToSSH metric
388 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
389         wp.mtx.Lock()
390         defer wp.mtx.Unlock()
391         wkr := wp.workers[inst.ID()]
392         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
393                 // the node is not in booting state (can happen if a-d-c is restarted) OR
394                 // this is not the first SSH connection
395                 return
396         }
397
398         wkr.firstSSHConnection = time.Now()
399         if wp.mTimeToSSH != nil {
400                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
401         }
402 }
403
404 // Add or update worker attached to the given instance.
405 //
406 // The second return value is true if a new worker is created.
407 //
408 // A newly added instance has state=StateBooting if its tags match an
409 // entry in wp.creating, otherwise StateUnknown.
410 //
411 // Caller must have lock.
412 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
413         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
414         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
415         id := inst.ID()
416         if wkr := wp.workers[id]; wkr != nil {
417                 wkr.executor.SetTarget(inst)
418                 wkr.instance = inst
419                 wkr.updated = time.Now()
420                 wkr.saveTags()
421                 return wkr, false
422         }
423
424         state := StateUnknown
425         if _, ok := wp.creating[secret]; ok {
426                 state = StateBooting
427         }
428
429         // If an instance has a valid IdleBehavior tag when it first
430         // appears, initialize the new worker accordingly (this is how
431         // we restore IdleBehavior that was set by a prior dispatch
432         // process); otherwise, default to "run". After this,
433         // wkr.idleBehavior is the source of truth, and will only be
434         // changed via SetIdleBehavior().
435         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
436         if !validIdleBehavior[idleBehavior] {
437                 idleBehavior = IdleBehaviorRun
438         }
439
440         logger := wp.logger.WithFields(logrus.Fields{
441                 "InstanceType": it.Name,
442                 "Instance":     inst.ID(),
443                 "Address":      inst.Address(),
444         })
445         logger.WithFields(logrus.Fields{
446                 "State":        state,
447                 "IdleBehavior": idleBehavior,
448         }).Infof("instance appeared in cloud")
449         now := time.Now()
450         wkr := &worker{
451                 mtx:          &wp.mtx,
452                 wp:           wp,
453                 logger:       logger,
454                 executor:     wp.newExecutor(inst),
455                 state:        state,
456                 idleBehavior: idleBehavior,
457                 instance:     inst,
458                 instType:     it,
459                 appeared:     now,
460                 probed:       now,
461                 busy:         now,
462                 updated:      now,
463                 running:      make(map[string]*remoteRunner),
464                 starting:     make(map[string]*remoteRunner),
465                 probing:      make(chan struct{}, 1),
466         }
467         wp.workers[id] = wkr
468         return wkr, true
469 }
470
471 // Shutdown shuts down a worker with the given type, or returns false
472 // if all workers with the given type are busy.
473 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
474         wp.setupOnce.Do(wp.setup)
475         wp.mtx.Lock()
476         defer wp.mtx.Unlock()
477         logger := wp.logger.WithField("InstanceType", it.Name)
478         logger.Info("shutdown requested")
479         for _, tryState := range []State{StateBooting, StateIdle} {
480                 // TODO: shutdown the worker with the longest idle
481                 // time (Idle) or the earliest create time (Booting)
482                 for _, wkr := range wp.workers {
483                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
484                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
485                                 wkr.reportBootOutcome(BootOutcomeAborted)
486                                 wkr.shutdown()
487                                 return true
488                         }
489                 }
490         }
491         return false
492 }
493
494 // CountWorkers returns the current number of workers in each state.
495 //
496 // CountWorkers blocks, if necessary, until the initial instance list
497 // has been loaded from the cloud provider.
498 func (wp *Pool) CountWorkers() map[State]int {
499         wp.setupOnce.Do(wp.setup)
500         wp.waitUntilLoaded()
501         wp.mtx.Lock()
502         defer wp.mtx.Unlock()
503         r := map[State]int{}
504         for _, w := range wp.workers {
505                 r[w.state]++
506         }
507         return r
508 }
509
510 // Running returns the container UUIDs being prepared/run on workers.
511 //
512 // In the returned map, the time value indicates when the Pool
513 // observed that the container process had exited. A container that
514 // has not yet exited has a zero time value. The caller should use
515 // ForgetContainer() to garbage-collect the entries for exited
516 // containers.
517 func (wp *Pool) Running() map[string]time.Time {
518         wp.setupOnce.Do(wp.setup)
519         wp.mtx.Lock()
520         defer wp.mtx.Unlock()
521         r := map[string]time.Time{}
522         for _, wkr := range wp.workers {
523                 for uuid := range wkr.running {
524                         r[uuid] = time.Time{}
525                 }
526                 for uuid := range wkr.starting {
527                         r[uuid] = time.Time{}
528                 }
529         }
530         for uuid, exited := range wp.exited {
531                 r[uuid] = exited
532         }
533         return r
534 }
535
536 // StartContainer starts a container on an idle worker immediately if
537 // possible, otherwise returns false.
538 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
539         wp.setupOnce.Do(wp.setup)
540         wp.mtx.Lock()
541         defer wp.mtx.Unlock()
542         var wkr *worker
543         for _, w := range wp.workers {
544                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
545                         if wkr == nil || w.busy.After(wkr.busy) {
546                                 wkr = w
547                         }
548                 }
549         }
550         if wkr == nil {
551                 return false
552         }
553         wkr.startContainer(ctr)
554         return true
555 }
556
557 // KillContainer kills the crunch-run process for the given container
558 // UUID, if it's running on any worker.
559 //
560 // KillContainer returns immediately; the act of killing the container
561 // takes some time, and runs in the background.
562 //
563 // KillContainer returns false if the container has already ended.
564 func (wp *Pool) KillContainer(uuid string, reason string) bool {
565         wp.mtx.Lock()
566         defer wp.mtx.Unlock()
567         logger := wp.logger.WithFields(logrus.Fields{
568                 "ContainerUUID": uuid,
569                 "Reason":        reason,
570         })
571         for _, wkr := range wp.workers {
572                 rr := wkr.running[uuid]
573                 if rr == nil {
574                         rr = wkr.starting[uuid]
575                 }
576                 if rr != nil {
577                         rr.Kill(reason)
578                         return true
579                 }
580         }
581         logger.Debug("cannot kill: already disappeared")
582         return false
583 }
584
585 // ForgetContainer clears the placeholder for the given exited
586 // container, so it isn't returned by subsequent calls to Running().
587 //
588 // ForgetContainer has no effect if the container has not yet exited.
589 //
590 // The "container exited at time T" placeholder (which necessitates
591 // ForgetContainer) exists to make it easier for the caller
592 // (scheduler) to distinguish a container that exited without
593 // finalizing its state from a container that exited too recently for
594 // its final state to have appeared in the scheduler's queue cache.
595 func (wp *Pool) ForgetContainer(uuid string) {
596         wp.mtx.Lock()
597         defer wp.mtx.Unlock()
598         if _, ok := wp.exited[uuid]; ok {
599                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
600                 delete(wp.exited, uuid)
601         }
602 }
603
604 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
605         if reg == nil {
606                 reg = prometheus.NewRegistry()
607         }
608         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
609                 Namespace: "arvados",
610                 Subsystem: "dispatchcloud",
611                 Name:      "containers_running",
612                 Help:      "Number of containers reported running by cloud VMs.",
613         })
614         reg.MustRegister(wp.mContainersRunning)
615         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
616                 Namespace: "arvados",
617                 Subsystem: "dispatchcloud",
618                 Name:      "instances_total",
619                 Help:      "Number of cloud VMs.",
620         }, []string{"category", "instance_type"})
621         reg.MustRegister(wp.mInstances)
622         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
623                 Namespace: "arvados",
624                 Subsystem: "dispatchcloud",
625                 Name:      "instances_price",
626                 Help:      "Price of cloud VMs.",
627         }, []string{"category"})
628         reg.MustRegister(wp.mInstancesPrice)
629         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
630                 Namespace: "arvados",
631                 Subsystem: "dispatchcloud",
632                 Name:      "vcpus_total",
633                 Help:      "Total VCPUs on all cloud VMs.",
634         }, []string{"category"})
635         reg.MustRegister(wp.mVCPUs)
636         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
637                 Namespace: "arvados",
638                 Subsystem: "dispatchcloud",
639                 Name:      "memory_bytes_total",
640                 Help:      "Total memory on all cloud VMs.",
641         }, []string{"category"})
642         reg.MustRegister(wp.mMemory)
643         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
644                 Namespace: "arvados",
645                 Subsystem: "dispatchcloud",
646                 Name:      "boot_outcomes",
647                 Help:      "Boot outcomes by type.",
648         }, []string{"outcome"})
649         for k := range validBootOutcomes {
650                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
651         }
652         reg.MustRegister(wp.mBootOutcomes)
653         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
654                 Namespace: "arvados",
655                 Subsystem: "dispatchcloud",
656                 Name:      "instances_disappeared",
657                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
658         }, []string{"state"})
659         for _, v := range stateString {
660                 wp.mDisappearances.WithLabelValues(v).Add(0)
661         }
662         reg.MustRegister(wp.mDisappearances)
663         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
664                 Namespace:  "arvados",
665                 Subsystem:  "dispatchcloud",
666                 Name:       "instances_time_to_ssh_seconds",
667                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
668                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
669         })
670         reg.MustRegister(wp.mTimeToSSH)
671         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
672                 Namespace:  "arvados",
673                 Subsystem:  "dispatchcloud",
674                 Name:       "instances_time_to_ready_for_container_seconds",
675                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
676                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
677         })
678         reg.MustRegister(wp.mTimeToReadyForContainer)
679         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
680                 Namespace:  "arvados",
681                 Subsystem:  "dispatchcloud",
682                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
683                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
684                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
685         })
686         reg.MustRegister(wp.mTimeFromShutdownToGone)
687         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
688                 Namespace:  "arvados",
689                 Subsystem:  "dispatchcloud",
690                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
691                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
692                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
693         })
694         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
695         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
696                 Namespace:  "arvados",
697                 Subsystem:  "dispatchcloud",
698                 Name:       "instances_run_probe_duration_seconds",
699                 Help:       "Number of seconds per runProbe call.",
700                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
701         }, []string{"outcome"})
702         reg.MustRegister(wp.mRunProbeDuration)
703 }
704
705 func (wp *Pool) runMetrics() {
706         ch := wp.Subscribe()
707         defer wp.Unsubscribe(ch)
708         wp.updateMetrics()
709         for range ch {
710                 wp.updateMetrics()
711         }
712 }
713
714 func (wp *Pool) updateMetrics() {
715         wp.mtx.RLock()
716         defer wp.mtx.RUnlock()
717
718         type entKey struct {
719                 cat      string
720                 instType string
721         }
722         instances := map[entKey]int64{}
723         price := map[string]float64{}
724         cpu := map[string]int64{}
725         mem := map[string]int64{}
726         var running int64
727         for _, wkr := range wp.workers {
728                 var cat string
729                 switch {
730                 case len(wkr.running)+len(wkr.starting) > 0:
731                         cat = "inuse"
732                 case wkr.idleBehavior == IdleBehaviorHold:
733                         cat = "hold"
734                 case wkr.state == StateBooting:
735                         cat = "booting"
736                 case wkr.state == StateUnknown:
737                         cat = "unknown"
738                 default:
739                         cat = "idle"
740                 }
741                 instances[entKey{cat, wkr.instType.Name}]++
742                 price[cat] += wkr.instType.Price
743                 cpu[cat] += int64(wkr.instType.VCPUs)
744                 mem[cat] += int64(wkr.instType.RAM)
745                 running += int64(len(wkr.running) + len(wkr.starting))
746         }
747         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
748                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
749                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
750                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
751                 // make sure to reset gauges for non-existing category/nodetype combinations
752                 for _, it := range wp.instanceTypes {
753                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
754                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
755                         }
756                 }
757         }
758         for k, v := range instances {
759                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
760         }
761         wp.mContainersRunning.Set(float64(running))
762 }
763
764 func (wp *Pool) runProbes() {
765         maxPPS := wp.maxProbesPerSecond
766         if maxPPS < 1 {
767                 maxPPS = defaultMaxProbesPerSecond
768         }
769         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
770         defer limitticker.Stop()
771
772         probeticker := time.NewTicker(wp.probeInterval)
773         defer probeticker.Stop()
774
775         workers := []cloud.InstanceID{}
776         for range probeticker.C {
777                 workers = workers[:0]
778                 wp.mtx.Lock()
779                 for id, wkr := range wp.workers {
780                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
781                                 continue
782                         }
783                         workers = append(workers, id)
784                 }
785                 wp.mtx.Unlock()
786
787                 for _, id := range workers {
788                         wp.mtx.Lock()
789                         wkr, ok := wp.workers[id]
790                         wp.mtx.Unlock()
791                         if !ok {
792                                 // Deleted while we were probing
793                                 // others
794                                 continue
795                         }
796                         go wkr.ProbeAndUpdate()
797                         select {
798                         case <-wp.stop:
799                                 return
800                         case <-limitticker.C:
801                         }
802                 }
803         }
804 }
805
806 func (wp *Pool) runSync() {
807         // sync once immediately, then wait syncInterval, sync again,
808         // etc.
809         timer := time.NewTimer(1)
810         for {
811                 select {
812                 case <-timer.C:
813                         err := wp.getInstancesAndSync()
814                         if err != nil {
815                                 wp.logger.WithError(err).Warn("sync failed")
816                         }
817                         timer.Reset(wp.syncInterval)
818                 case <-wp.stop:
819                         wp.logger.Debug("worker.Pool stopped")
820                         return
821                 }
822         }
823 }
824
825 // Stop synchronizing with the InstanceSet.
826 func (wp *Pool) Stop() {
827         wp.setupOnce.Do(wp.setup)
828         close(wp.stop)
829 }
830
831 // Instances returns an InstanceView for each worker in the pool,
832 // summarizing its current state and recent activity.
833 func (wp *Pool) Instances() []InstanceView {
834         var r []InstanceView
835         wp.setupOnce.Do(wp.setup)
836         wp.mtx.Lock()
837         for _, w := range wp.workers {
838                 r = append(r, InstanceView{
839                         Instance:             w.instance.ID(),
840                         Address:              w.instance.Address(),
841                         Price:                w.instType.Price,
842                         ArvadosInstanceType:  w.instType.Name,
843                         ProviderInstanceType: w.instType.ProviderType,
844                         LastContainerUUID:    w.lastUUID,
845                         LastBusy:             w.busy,
846                         WorkerState:          w.state.String(),
847                         IdleBehavior:         w.idleBehavior,
848                 })
849         }
850         wp.mtx.Unlock()
851         sort.Slice(r, func(i, j int) bool {
852                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
853         })
854         return r
855 }
856
857 // KillInstance destroys a cloud VM instance. It returns an error if
858 // the given instance does not exist.
859 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
860         wkr, ok := wp.workers[id]
861         if !ok {
862                 return errors.New("instance not found")
863         }
864         wkr.logger.WithField("Reason", reason).Info("shutting down")
865         wkr.reportBootOutcome(BootOutcomeAborted)
866         wkr.shutdown()
867         return nil
868 }
869
870 func (wp *Pool) setup() {
871         wp.creating = map[string]createCall{}
872         wp.exited = map[string]time.Time{}
873         wp.workers = map[cloud.InstanceID]*worker{}
874         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
875         wp.loadRunnerData()
876 }
877
878 // Load the runner program to be deployed on worker nodes into
879 // wp.runnerData, if necessary. Errors are logged.
880 //
881 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
882 //
883 // Caller must not have lock.
884 func (wp *Pool) loadRunnerData() error {
885         wp.mtx.Lock()
886         defer wp.mtx.Unlock()
887         if wp.runnerData != nil {
888                 return nil
889         } else if wp.runnerSource == "" {
890                 wp.runnerCmd = wp.runnerCmdDefault
891                 wp.runnerData = []byte{}
892                 return nil
893         }
894         logger := wp.logger.WithField("source", wp.runnerSource)
895         logger.Debug("loading runner")
896         buf, err := ioutil.ReadFile(wp.runnerSource)
897         if err != nil {
898                 logger.WithError(err).Error("failed to load runner program")
899                 return err
900         }
901         wp.runnerData = buf
902         wp.runnerMD5 = md5.Sum(buf)
903         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
904         return nil
905 }
906
907 func (wp *Pool) notify() {
908         wp.mtx.RLock()
909         defer wp.mtx.RUnlock()
910         for _, send := range wp.subscribers {
911                 select {
912                 case send <- struct{}{}:
913                 default:
914                 }
915         }
916 }
917
918 func (wp *Pool) getInstancesAndSync() error {
919         wp.setupOnce.Do(wp.setup)
920         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
921                 return err
922         }
923         wp.logger.Debug("getting instance list")
924         threshold := time.Now()
925         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
926         if err != nil {
927                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
928                 return err
929         }
930         wp.sync(threshold, instances)
931         wp.logger.Debug("sync done")
932         return nil
933 }
934
935 // Add/remove/update workers based on instances, which was obtained
936 // from the instanceSet. However, don't clobber any other updates that
937 // already happened after threshold.
938 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
939         wp.mtx.Lock()
940         defer wp.mtx.Unlock()
941         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
942         notify := false
943
944         for _, inst := range instances {
945                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
946                 it, ok := wp.instanceTypes[itTag]
947                 if !ok {
948                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
949                         continue
950                 }
951                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
952                         notify = true
953                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
954                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
955                         wkr.shutdown()
956                 }
957         }
958
959         for id, wkr := range wp.workers {
960                 if wkr.updated.After(threshold) {
961                         continue
962                 }
963                 logger := wp.logger.WithFields(logrus.Fields{
964                         "Instance":    wkr.instance.ID(),
965                         "WorkerState": wkr.state,
966                 })
967                 logger.Info("instance disappeared in cloud")
968                 wkr.reportBootOutcome(BootOutcomeDisappeared)
969                 if wp.mDisappearances != nil {
970                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
971                 }
972                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
973                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
974                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
975                 }
976                 delete(wp.workers, id)
977                 go wkr.Close()
978                 notify = true
979         }
980
981         if !wp.loaded {
982                 notify = true
983                 wp.loaded = true
984                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
985         }
986
987         if notify {
988                 go wp.notify()
989         }
990 }
991
992 func (wp *Pool) waitUntilLoaded() {
993         ch := wp.Subscribe()
994         wp.mtx.RLock()
995         defer wp.mtx.RUnlock()
996         for !wp.loaded {
997                 wp.mtx.RUnlock()
998                 <-ch
999                 wp.mtx.RLock()
1000         }
1001 }
1002
1003 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1004         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1005         fmt.Fprint(h, uuid)
1006         return fmt.Sprintf("%x", h.Sum(nil))
1007 }
1008
1009 // Return a random string of n hexadecimal digits (n*4 random bits). n
1010 // must be even.
1011 func randomHex(n int) string {
1012         buf := make([]byte, n/2)
1013         _, err := rand.Read(buf)
1014         if err != nil {
1015                 panic(err)
1016         }
1017         return fmt.Sprintf("%x", buf)
1018 }