16811: Add a test that system users/groups can't be deleted.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:                         logger,
100                 arvClient:                      arvClient,
101                 instanceSetID:                  instanceSetID,
102                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:                    newExecutor,
104                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:                  cluster.InstanceTypes,
108                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118                 installPublicKey:               installPublicKey,
119                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
120                 stop:                           make(chan bool),
121         }
122         wp.registerMetrics(reg)
123         go func() {
124                 wp.setupOnce.Do(wp.setup)
125                 go wp.runMetrics()
126                 go wp.runProbes()
127                 go wp.runSync()
128         }()
129         return wp
130 }
131
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
134 type Pool struct {
135         // configuration
136         logger                         logrus.FieldLogger
137         arvClient                      *arvados.Client
138         instanceSetID                  cloud.InstanceSetID
139         instanceSet                    *throttledInstanceSet
140         newExecutor                    func(cloud.Instance) Executor
141         bootProbeCommand               string
142         runnerSource                   string
143         imageID                        cloud.ImageID
144         instanceTypes                  map[string]arvados.InstanceType
145         syncInterval                   time.Duration
146         probeInterval                  time.Duration
147         maxProbesPerSecond             int
148         maxConcurrentInstanceCreateOps int
149         timeoutIdle                    time.Duration
150         timeoutBooting                 time.Duration
151         timeoutProbe                   time.Duration
152         timeoutShutdown                time.Duration
153         timeoutTERM                    time.Duration
154         timeoutSignal                  time.Duration
155         installPublicKey               ssh.PublicKey
156         tagKeyPrefix                   string
157
158         // private state
159         subscribers  map[<-chan struct{}]chan<- struct{}
160         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161         workers      map[cloud.InstanceID]*worker
162         loaded       bool                 // loaded list of instances from InstanceSet at least once
163         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164         atQuotaUntil time.Time
165         atQuotaErr   cloud.QuotaError
166         stop         chan bool
167         mtx          sync.RWMutex
168         setupOnce    sync.Once
169         runnerData   []byte
170         runnerMD5    [md5.Size]byte
171         runnerCmd    string
172
173         mContainersRunning       prometheus.Gauge
174         mInstances               *prometheus.GaugeVec
175         mInstancesPrice          *prometheus.GaugeVec
176         mVCPUs                   *prometheus.GaugeVec
177         mMemory                  *prometheus.GaugeVec
178         mBootOutcomes            *prometheus.CounterVec
179         mDisappearances          *prometheus.CounterVec
180         mTimeToSSH               prometheus.Summary
181         mTimeToReadyForContainer prometheus.Summary
182 }
183
184 type createCall struct {
185         time         time.Time
186         instanceType arvados.InstanceType
187 }
188
189 func (wp *Pool) CheckHealth() error {
190         wp.setupOnce.Do(wp.setup)
191         if err := wp.loadRunnerData(); err != nil {
192                 return fmt.Errorf("error loading runner binary: %s", err)
193         }
194         return nil
195 }
196
197 // Subscribe returns a buffered channel that becomes ready after any
198 // change to the pool's state that could have scheduling implications:
199 // a worker's state changes, a new worker appears, the cloud
200 // provider's API rate limiting period ends, etc.
201 //
202 // Additional events that occur while the channel is already ready
203 // will be dropped, so it is OK if the caller services the channel
204 // slowly.
205 //
206 // Example:
207 //
208 //      ch := wp.Subscribe()
209 //      defer wp.Unsubscribe(ch)
210 //      for range ch {
211 //              tryScheduling(wp)
212 //              if done {
213 //                      break
214 //              }
215 //      }
216 func (wp *Pool) Subscribe() <-chan struct{} {
217         wp.setupOnce.Do(wp.setup)
218         wp.mtx.Lock()
219         defer wp.mtx.Unlock()
220         ch := make(chan struct{}, 1)
221         wp.subscribers[ch] = ch
222         return ch
223 }
224
225 // Unsubscribe stops sending updates to the given channel.
226 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
227         wp.setupOnce.Do(wp.setup)
228         wp.mtx.Lock()
229         defer wp.mtx.Unlock()
230         delete(wp.subscribers, ch)
231 }
232
233 // Unallocated returns the number of unallocated (creating + booting +
234 // idle + unknown) workers for each instance type.  Workers in
235 // hold/drain mode are not included.
236 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
237         wp.setupOnce.Do(wp.setup)
238         wp.mtx.RLock()
239         defer wp.mtx.RUnlock()
240         unalloc := map[arvados.InstanceType]int{}
241         creating := map[arvados.InstanceType]int{}
242         oldestCreate := map[arvados.InstanceType]time.Time{}
243         for _, cc := range wp.creating {
244                 it := cc.instanceType
245                 creating[it]++
246                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
247                         oldestCreate[it] = cc.time
248                 }
249         }
250         for _, wkr := range wp.workers {
251                 // Skip workers that are not expected to become
252                 // available soon. Note len(wkr.running)>0 is not
253                 // redundant here: it can be true even in
254                 // StateUnknown.
255                 if wkr.state == StateShutdown ||
256                         wkr.state == StateRunning ||
257                         wkr.idleBehavior != IdleBehaviorRun ||
258                         len(wkr.running) > 0 {
259                         continue
260                 }
261                 it := wkr.instType
262                 unalloc[it]++
263                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
264                         // If up to N new workers appear in
265                         // Instances() while we are waiting for N
266                         // Create() calls to complete, we assume we're
267                         // just seeing a race between Instances() and
268                         // Create() responses.
269                         //
270                         // The other common reason why nodes have
271                         // state==Unknown is that they appeared at
272                         // startup, before any Create calls. They
273                         // don't match the above timing condition, so
274                         // we never mistakenly attribute them to
275                         // pending Create calls.
276                         creating[it]--
277                 }
278         }
279         for it, c := range creating {
280                 unalloc[it] += c
281         }
282         return unalloc
283 }
284
285 // Create a new instance with the given type, and add it to the worker
286 // pool. The worker is added immediately; instance creation runs in
287 // the background.
288 //
289 // Create returns false if a pre-existing error state prevents it from
290 // even attempting to create a new instance. Those errors are logged
291 // by the Pool, so the caller does not need to log anything in such
292 // cases.
293 func (wp *Pool) Create(it arvados.InstanceType) bool {
294         logger := wp.logger.WithField("InstanceType", it.Name)
295         wp.setupOnce.Do(wp.setup)
296         if wp.loadRunnerData() != nil {
297                 // Boot probe is certain to fail.
298                 return false
299         }
300         wp.mtx.Lock()
301         defer wp.mtx.Unlock()
302         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
303                 return false
304         }
305         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
306         // requests in flight. It was added to work around a limitation in Azure's
307         // managed disks, which support no more than 20 concurrent node creation
308         // requests from a single disk image (cf.
309         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
310         // The code assumes that node creation, from Azure's perspective, means the
311         // period until the instance appears in the "get all instances" list.
312         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
313                 logger.Info("reached MaxConcurrentInstanceCreateOps")
314                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
315                 return false
316         }
317         now := time.Now()
318         secret := randomHex(instanceSecretLength)
319         wp.creating[secret] = createCall{time: now, instanceType: it}
320         go func() {
321                 defer wp.notify()
322                 tags := cloud.InstanceTags{
323                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
324                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
325                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
326                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
327                 }
328                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
329                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
330                 wp.mtx.Lock()
331                 defer wp.mtx.Unlock()
332                 // delete() is deferred so the updateWorker() call
333                 // below knows to use StateBooting when adding a new
334                 // worker.
335                 defer delete(wp.creating, secret)
336                 if err != nil {
337                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
338                                 wp.atQuotaErr = err
339                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
340                                 time.AfterFunc(quotaErrorTTL, wp.notify)
341                         }
342                         logger.WithError(err).Error("create failed")
343                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
344                         return
345                 }
346                 wp.updateWorker(inst, it)
347         }()
348         return true
349 }
350
351 // AtQuota returns true if Create is not expected to work at the
352 // moment.
353 func (wp *Pool) AtQuota() bool {
354         wp.mtx.Lock()
355         defer wp.mtx.Unlock()
356         return time.Now().Before(wp.atQuotaUntil)
357 }
358
359 // SetIdleBehavior determines how the indicated instance will behave
360 // when it has no containers running.
361 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
362         wp.mtx.Lock()
363         defer wp.mtx.Unlock()
364         wkr, ok := wp.workers[id]
365         if !ok {
366                 return errors.New("requested instance does not exist")
367         }
368         wkr.setIdleBehavior(idleBehavior)
369         return nil
370 }
371
372 // Successful connection to the SSH daemon, update the mTimeToSSH metric
373 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
374         wp.mtx.Lock()
375         defer wp.mtx.Unlock()
376         wkr := wp.workers[inst.ID()]
377         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
378                 // the node is not in booting state (can happen if a-d-c is restarted) OR
379                 // this is not the first SSH connection
380                 return
381         }
382
383         wkr.firstSSHConnection = time.Now()
384         if wp.mTimeToSSH != nil {
385                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
386         }
387 }
388
389 // Add or update worker attached to the given instance.
390 //
391 // The second return value is true if a new worker is created.
392 //
393 // A newly added instance has state=StateBooting if its tags match an
394 // entry in wp.creating, otherwise StateUnknown.
395 //
396 // Caller must have lock.
397 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
398         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
399         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
400         id := inst.ID()
401         if wkr := wp.workers[id]; wkr != nil {
402                 wkr.executor.SetTarget(inst)
403                 wkr.instance = inst
404                 wkr.updated = time.Now()
405                 wkr.saveTags()
406                 return wkr, false
407         }
408
409         state := StateUnknown
410         if _, ok := wp.creating[secret]; ok {
411                 state = StateBooting
412         }
413
414         // If an instance has a valid IdleBehavior tag when it first
415         // appears, initialize the new worker accordingly (this is how
416         // we restore IdleBehavior that was set by a prior dispatch
417         // process); otherwise, default to "run". After this,
418         // wkr.idleBehavior is the source of truth, and will only be
419         // changed via SetIdleBehavior().
420         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
421         if !validIdleBehavior[idleBehavior] {
422                 idleBehavior = IdleBehaviorRun
423         }
424
425         logger := wp.logger.WithFields(logrus.Fields{
426                 "InstanceType": it.Name,
427                 "Instance":     inst.ID(),
428                 "Address":      inst.Address(),
429         })
430         logger.WithFields(logrus.Fields{
431                 "State":        state,
432                 "IdleBehavior": idleBehavior,
433         }).Infof("instance appeared in cloud")
434         now := time.Now()
435         wkr := &worker{
436                 mtx:          &wp.mtx,
437                 wp:           wp,
438                 logger:       logger,
439                 executor:     wp.newExecutor(inst),
440                 state:        state,
441                 idleBehavior: idleBehavior,
442                 instance:     inst,
443                 instType:     it,
444                 appeared:     now,
445                 probed:       now,
446                 busy:         now,
447                 updated:      now,
448                 running:      make(map[string]*remoteRunner),
449                 starting:     make(map[string]*remoteRunner),
450                 probing:      make(chan struct{}, 1),
451         }
452         wp.workers[id] = wkr
453         return wkr, true
454 }
455
456 // Shutdown shuts down a worker with the given type, or returns false
457 // if all workers with the given type are busy.
458 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
459         wp.setupOnce.Do(wp.setup)
460         wp.mtx.Lock()
461         defer wp.mtx.Unlock()
462         logger := wp.logger.WithField("InstanceType", it.Name)
463         logger.Info("shutdown requested")
464         for _, tryState := range []State{StateBooting, StateIdle} {
465                 // TODO: shutdown the worker with the longest idle
466                 // time (Idle) or the earliest create time (Booting)
467                 for _, wkr := range wp.workers {
468                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
469                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
470                                 wkr.reportBootOutcome(BootOutcomeAborted)
471                                 wkr.shutdown()
472                                 return true
473                         }
474                 }
475         }
476         return false
477 }
478
479 // CountWorkers returns the current number of workers in each state.
480 //
481 // CountWorkers blocks, if necessary, until the initial instance list
482 // has been loaded from the cloud provider.
483 func (wp *Pool) CountWorkers() map[State]int {
484         wp.setupOnce.Do(wp.setup)
485         wp.waitUntilLoaded()
486         wp.mtx.Lock()
487         defer wp.mtx.Unlock()
488         r := map[State]int{}
489         for _, w := range wp.workers {
490                 r[w.state]++
491         }
492         return r
493 }
494
495 // Running returns the container UUIDs being prepared/run on workers.
496 //
497 // In the returned map, the time value indicates when the Pool
498 // observed that the container process had exited. A container that
499 // has not yet exited has a zero time value. The caller should use
500 // ForgetContainer() to garbage-collect the entries for exited
501 // containers.
502 func (wp *Pool) Running() map[string]time.Time {
503         wp.setupOnce.Do(wp.setup)
504         wp.mtx.Lock()
505         defer wp.mtx.Unlock()
506         r := map[string]time.Time{}
507         for _, wkr := range wp.workers {
508                 for uuid := range wkr.running {
509                         r[uuid] = time.Time{}
510                 }
511                 for uuid := range wkr.starting {
512                         r[uuid] = time.Time{}
513                 }
514         }
515         for uuid, exited := range wp.exited {
516                 r[uuid] = exited
517         }
518         return r
519 }
520
521 // StartContainer starts a container on an idle worker immediately if
522 // possible, otherwise returns false.
523 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
524         wp.setupOnce.Do(wp.setup)
525         wp.mtx.Lock()
526         defer wp.mtx.Unlock()
527         var wkr *worker
528         for _, w := range wp.workers {
529                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
530                         if wkr == nil || w.busy.After(wkr.busy) {
531                                 wkr = w
532                         }
533                 }
534         }
535         if wkr == nil {
536                 return false
537         }
538         wkr.startContainer(ctr)
539         return true
540 }
541
542 // KillContainer kills the crunch-run process for the given container
543 // UUID, if it's running on any worker.
544 //
545 // KillContainer returns immediately; the act of killing the container
546 // takes some time, and runs in the background.
547 //
548 // KillContainer returns false if the container has already ended.
549 func (wp *Pool) KillContainer(uuid string, reason string) bool {
550         wp.mtx.Lock()
551         defer wp.mtx.Unlock()
552         logger := wp.logger.WithFields(logrus.Fields{
553                 "ContainerUUID": uuid,
554                 "Reason":        reason,
555         })
556         for _, wkr := range wp.workers {
557                 rr := wkr.running[uuid]
558                 if rr == nil {
559                         rr = wkr.starting[uuid]
560                 }
561                 if rr != nil {
562                         rr.Kill(reason)
563                         return true
564                 }
565         }
566         logger.Debug("cannot kill: already disappeared")
567         return false
568 }
569
570 // ForgetContainer clears the placeholder for the given exited
571 // container, so it isn't returned by subsequent calls to Running().
572 //
573 // ForgetContainer has no effect if the container has not yet exited.
574 //
575 // The "container exited at time T" placeholder (which necessitates
576 // ForgetContainer) exists to make it easier for the caller
577 // (scheduler) to distinguish a container that exited without
578 // finalizing its state from a container that exited too recently for
579 // its final state to have appeared in the scheduler's queue cache.
580 func (wp *Pool) ForgetContainer(uuid string) {
581         wp.mtx.Lock()
582         defer wp.mtx.Unlock()
583         if _, ok := wp.exited[uuid]; ok {
584                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
585                 delete(wp.exited, uuid)
586         }
587 }
588
589 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
590         if reg == nil {
591                 reg = prometheus.NewRegistry()
592         }
593         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
594                 Namespace: "arvados",
595                 Subsystem: "dispatchcloud",
596                 Name:      "containers_running",
597                 Help:      "Number of containers reported running by cloud VMs.",
598         })
599         reg.MustRegister(wp.mContainersRunning)
600         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
601                 Namespace: "arvados",
602                 Subsystem: "dispatchcloud",
603                 Name:      "instances_total",
604                 Help:      "Number of cloud VMs.",
605         }, []string{"category", "instance_type"})
606         reg.MustRegister(wp.mInstances)
607         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
608                 Namespace: "arvados",
609                 Subsystem: "dispatchcloud",
610                 Name:      "instances_price",
611                 Help:      "Price of cloud VMs.",
612         }, []string{"category"})
613         reg.MustRegister(wp.mInstancesPrice)
614         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
615                 Namespace: "arvados",
616                 Subsystem: "dispatchcloud",
617                 Name:      "vcpus_total",
618                 Help:      "Total VCPUs on all cloud VMs.",
619         }, []string{"category"})
620         reg.MustRegister(wp.mVCPUs)
621         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
622                 Namespace: "arvados",
623                 Subsystem: "dispatchcloud",
624                 Name:      "memory_bytes_total",
625                 Help:      "Total memory on all cloud VMs.",
626         }, []string{"category"})
627         reg.MustRegister(wp.mMemory)
628         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
629                 Namespace: "arvados",
630                 Subsystem: "dispatchcloud",
631                 Name:      "boot_outcomes",
632                 Help:      "Boot outcomes by type.",
633         }, []string{"outcome"})
634         for k := range validBootOutcomes {
635                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
636         }
637         reg.MustRegister(wp.mBootOutcomes)
638         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
639                 Namespace: "arvados",
640                 Subsystem: "dispatchcloud",
641                 Name:      "instances_disappeared",
642                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
643         }, []string{"state"})
644         for _, v := range stateString {
645                 wp.mDisappearances.WithLabelValues(v).Add(0)
646         }
647         reg.MustRegister(wp.mDisappearances)
648         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
649                 Namespace:  "arvados",
650                 Subsystem:  "dispatchcloud",
651                 Name:       "instances_time_to_ssh_seconds",
652                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
653                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
654         })
655         reg.MustRegister(wp.mTimeToSSH)
656         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
657                 Namespace:  "arvados",
658                 Subsystem:  "dispatchcloud",
659                 Name:       "instances_time_to_ready_for_container_seconds",
660                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
661                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
662         })
663         reg.MustRegister(wp.mTimeToReadyForContainer)
664 }
665
666 func (wp *Pool) runMetrics() {
667         ch := wp.Subscribe()
668         defer wp.Unsubscribe(ch)
669         wp.updateMetrics()
670         for range ch {
671                 wp.updateMetrics()
672         }
673 }
674
675 func (wp *Pool) updateMetrics() {
676         wp.mtx.RLock()
677         defer wp.mtx.RUnlock()
678
679         type entKey struct {
680                 cat      string
681                 instType string
682         }
683         instances := map[entKey]int64{}
684         price := map[string]float64{}
685         cpu := map[string]int64{}
686         mem := map[string]int64{}
687         var running int64
688         for _, wkr := range wp.workers {
689                 var cat string
690                 switch {
691                 case len(wkr.running)+len(wkr.starting) > 0:
692                         cat = "inuse"
693                 case wkr.idleBehavior == IdleBehaviorHold:
694                         cat = "hold"
695                 case wkr.state == StateBooting:
696                         cat = "booting"
697                 case wkr.state == StateUnknown:
698                         cat = "unknown"
699                 default:
700                         cat = "idle"
701                 }
702                 instances[entKey{cat, wkr.instType.Name}]++
703                 price[cat] += wkr.instType.Price
704                 cpu[cat] += int64(wkr.instType.VCPUs)
705                 mem[cat] += int64(wkr.instType.RAM)
706                 running += int64(len(wkr.running) + len(wkr.starting))
707         }
708         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
709                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
710                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
711                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
712                 // make sure to reset gauges for non-existing category/nodetype combinations
713                 for _, it := range wp.instanceTypes {
714                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
715                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
716                         }
717                 }
718         }
719         for k, v := range instances {
720                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
721         }
722         wp.mContainersRunning.Set(float64(running))
723 }
724
725 func (wp *Pool) runProbes() {
726         maxPPS := wp.maxProbesPerSecond
727         if maxPPS < 1 {
728                 maxPPS = defaultMaxProbesPerSecond
729         }
730         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
731         defer limitticker.Stop()
732
733         probeticker := time.NewTicker(wp.probeInterval)
734         defer probeticker.Stop()
735
736         workers := []cloud.InstanceID{}
737         for range probeticker.C {
738                 workers = workers[:0]
739                 wp.mtx.Lock()
740                 for id, wkr := range wp.workers {
741                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
742                                 continue
743                         }
744                         workers = append(workers, id)
745                 }
746                 wp.mtx.Unlock()
747
748                 for _, id := range workers {
749                         wp.mtx.Lock()
750                         wkr, ok := wp.workers[id]
751                         wp.mtx.Unlock()
752                         if !ok {
753                                 // Deleted while we were probing
754                                 // others
755                                 continue
756                         }
757                         go wkr.ProbeAndUpdate()
758                         select {
759                         case <-wp.stop:
760                                 return
761                         case <-limitticker.C:
762                         }
763                 }
764         }
765 }
766
767 func (wp *Pool) runSync() {
768         // sync once immediately, then wait syncInterval, sync again,
769         // etc.
770         timer := time.NewTimer(1)
771         for {
772                 select {
773                 case <-timer.C:
774                         err := wp.getInstancesAndSync()
775                         if err != nil {
776                                 wp.logger.WithError(err).Warn("sync failed")
777                         }
778                         timer.Reset(wp.syncInterval)
779                 case <-wp.stop:
780                         wp.logger.Debug("worker.Pool stopped")
781                         return
782                 }
783         }
784 }
785
786 // Stop synchronizing with the InstanceSet.
787 func (wp *Pool) Stop() {
788         wp.setupOnce.Do(wp.setup)
789         close(wp.stop)
790 }
791
792 // Instances returns an InstanceView for each worker in the pool,
793 // summarizing its current state and recent activity.
794 func (wp *Pool) Instances() []InstanceView {
795         var r []InstanceView
796         wp.setupOnce.Do(wp.setup)
797         wp.mtx.Lock()
798         for _, w := range wp.workers {
799                 r = append(r, InstanceView{
800                         Instance:             w.instance.ID(),
801                         Address:              w.instance.Address(),
802                         Price:                w.instType.Price,
803                         ArvadosInstanceType:  w.instType.Name,
804                         ProviderInstanceType: w.instType.ProviderType,
805                         LastContainerUUID:    w.lastUUID,
806                         LastBusy:             w.busy,
807                         WorkerState:          w.state.String(),
808                         IdleBehavior:         w.idleBehavior,
809                 })
810         }
811         wp.mtx.Unlock()
812         sort.Slice(r, func(i, j int) bool {
813                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
814         })
815         return r
816 }
817
818 // KillInstance destroys a cloud VM instance. It returns an error if
819 // the given instance does not exist.
820 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
821         wkr, ok := wp.workers[id]
822         if !ok {
823                 return errors.New("instance not found")
824         }
825         wkr.logger.WithField("Reason", reason).Info("shutting down")
826         wkr.reportBootOutcome(BootOutcomeAborted)
827         wkr.shutdown()
828         return nil
829 }
830
831 func (wp *Pool) setup() {
832         wp.creating = map[string]createCall{}
833         wp.exited = map[string]time.Time{}
834         wp.workers = map[cloud.InstanceID]*worker{}
835         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
836         wp.loadRunnerData()
837 }
838
839 // Load the runner program to be deployed on worker nodes into
840 // wp.runnerData, if necessary. Errors are logged.
841 //
842 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
843 //
844 // Caller must not have lock.
845 func (wp *Pool) loadRunnerData() error {
846         wp.mtx.Lock()
847         defer wp.mtx.Unlock()
848         if wp.runnerData != nil {
849                 return nil
850         } else if wp.runnerSource == "" {
851                 wp.runnerCmd = "crunch-run"
852                 wp.runnerData = []byte{}
853                 return nil
854         }
855         logger := wp.logger.WithField("source", wp.runnerSource)
856         logger.Debug("loading runner")
857         buf, err := ioutil.ReadFile(wp.runnerSource)
858         if err != nil {
859                 logger.WithError(err).Error("failed to load runner program")
860                 return err
861         }
862         wp.runnerData = buf
863         wp.runnerMD5 = md5.Sum(buf)
864         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
865         return nil
866 }
867
868 func (wp *Pool) notify() {
869         wp.mtx.RLock()
870         defer wp.mtx.RUnlock()
871         for _, send := range wp.subscribers {
872                 select {
873                 case send <- struct{}{}:
874                 default:
875                 }
876         }
877 }
878
879 func (wp *Pool) getInstancesAndSync() error {
880         wp.setupOnce.Do(wp.setup)
881         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
882                 return err
883         }
884         wp.logger.Debug("getting instance list")
885         threshold := time.Now()
886         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
887         if err != nil {
888                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
889                 return err
890         }
891         wp.sync(threshold, instances)
892         wp.logger.Debug("sync done")
893         return nil
894 }
895
896 // Add/remove/update workers based on instances, which was obtained
897 // from the instanceSet. However, don't clobber any other updates that
898 // already happened after threshold.
899 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
900         wp.mtx.Lock()
901         defer wp.mtx.Unlock()
902         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
903         notify := false
904
905         for _, inst := range instances {
906                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
907                 it, ok := wp.instanceTypes[itTag]
908                 if !ok {
909                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
910                         continue
911                 }
912                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
913                         notify = true
914                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
915                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
916                         wkr.shutdown()
917                 }
918         }
919
920         for id, wkr := range wp.workers {
921                 if wkr.updated.After(threshold) {
922                         continue
923                 }
924                 logger := wp.logger.WithFields(logrus.Fields{
925                         "Instance":    wkr.instance.ID(),
926                         "WorkerState": wkr.state,
927                 })
928                 logger.Info("instance disappeared in cloud")
929                 wkr.reportBootOutcome(BootOutcomeDisappeared)
930                 if wp.mDisappearances != nil {
931                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
932                 }
933                 delete(wp.workers, id)
934                 go wkr.Close()
935                 notify = true
936         }
937
938         if !wp.loaded {
939                 notify = true
940                 wp.loaded = true
941                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
942         }
943
944         if notify {
945                 go wp.notify()
946         }
947 }
948
949 func (wp *Pool) waitUntilLoaded() {
950         ch := wp.Subscribe()
951         wp.mtx.RLock()
952         defer wp.mtx.RUnlock()
953         for !wp.loaded {
954                 wp.mtx.RUnlock()
955                 <-ch
956                 wp.mtx.RLock()
957         }
958 }
959
960 // Return a random string of n hexadecimal digits (n*4 random bits). n
961 // must be even.
962 func randomHex(n int) string {
963         buf := make([]byte, n/2)
964         _, err := rand.Read(buf)
965         if err != nil {
966                 panic(err)
967         }
968         return fmt.Sprintf("%x", buf)
969 }