Merge branch '16790-next-marker-omitempty'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:                         logger,
100                 arvClient:                      arvClient,
101                 instanceSetID:                  instanceSetID,
102                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:                    newExecutor,
104                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:                  cluster.InstanceTypes,
108                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118                 installPublicKey:               installPublicKey,
119                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
120                 stop:                           make(chan bool),
121         }
122         wp.registerMetrics(reg)
123         go func() {
124                 wp.setupOnce.Do(wp.setup)
125                 go wp.runMetrics()
126                 go wp.runProbes()
127                 go wp.runSync()
128         }()
129         return wp
130 }
131
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
134 type Pool struct {
135         // configuration
136         logger                         logrus.FieldLogger
137         arvClient                      *arvados.Client
138         instanceSetID                  cloud.InstanceSetID
139         instanceSet                    *throttledInstanceSet
140         newExecutor                    func(cloud.Instance) Executor
141         bootProbeCommand               string
142         runnerSource                   string
143         imageID                        cloud.ImageID
144         instanceTypes                  map[string]arvados.InstanceType
145         syncInterval                   time.Duration
146         probeInterval                  time.Duration
147         maxProbesPerSecond             int
148         maxConcurrentInstanceCreateOps int
149         timeoutIdle                    time.Duration
150         timeoutBooting                 time.Duration
151         timeoutProbe                   time.Duration
152         timeoutShutdown                time.Duration
153         timeoutTERM                    time.Duration
154         timeoutSignal                  time.Duration
155         installPublicKey               ssh.PublicKey
156         tagKeyPrefix                   string
157
158         // private state
159         subscribers  map[<-chan struct{}]chan<- struct{}
160         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161         workers      map[cloud.InstanceID]*worker
162         loaded       bool                 // loaded list of instances from InstanceSet at least once
163         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164         atQuotaUntil time.Time
165         atQuotaErr   cloud.QuotaError
166         stop         chan bool
167         mtx          sync.RWMutex
168         setupOnce    sync.Once
169         runnerData   []byte
170         runnerMD5    [md5.Size]byte
171         runnerCmd    string
172
173         mContainersRunning prometheus.Gauge
174         mInstances         *prometheus.GaugeVec
175         mInstancesPrice    *prometheus.GaugeVec
176         mVCPUs             *prometheus.GaugeVec
177         mMemory            *prometheus.GaugeVec
178         mBootOutcomes      *prometheus.CounterVec
179         mDisappearances    *prometheus.CounterVec
180 }
181
182 type createCall struct {
183         time         time.Time
184         instanceType arvados.InstanceType
185 }
186
187 func (wp *Pool) CheckHealth() error {
188         wp.setupOnce.Do(wp.setup)
189         if err := wp.loadRunnerData(); err != nil {
190                 return fmt.Errorf("error loading runner binary: %s", err)
191         }
192         return nil
193 }
194
195 // Subscribe returns a buffered channel that becomes ready after any
196 // change to the pool's state that could have scheduling implications:
197 // a worker's state changes, a new worker appears, the cloud
198 // provider's API rate limiting period ends, etc.
199 //
200 // Additional events that occur while the channel is already ready
201 // will be dropped, so it is OK if the caller services the channel
202 // slowly.
203 //
204 // Example:
205 //
206 //      ch := wp.Subscribe()
207 //      defer wp.Unsubscribe(ch)
208 //      for range ch {
209 //              tryScheduling(wp)
210 //              if done {
211 //                      break
212 //              }
213 //      }
214 func (wp *Pool) Subscribe() <-chan struct{} {
215         wp.setupOnce.Do(wp.setup)
216         wp.mtx.Lock()
217         defer wp.mtx.Unlock()
218         ch := make(chan struct{}, 1)
219         wp.subscribers[ch] = ch
220         return ch
221 }
222
223 // Unsubscribe stops sending updates to the given channel.
224 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
225         wp.setupOnce.Do(wp.setup)
226         wp.mtx.Lock()
227         defer wp.mtx.Unlock()
228         delete(wp.subscribers, ch)
229 }
230
231 // Unallocated returns the number of unallocated (creating + booting +
232 // idle + unknown) workers for each instance type.  Workers in
233 // hold/drain mode are not included.
234 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
235         wp.setupOnce.Do(wp.setup)
236         wp.mtx.RLock()
237         defer wp.mtx.RUnlock()
238         unalloc := map[arvados.InstanceType]int{}
239         creating := map[arvados.InstanceType]int{}
240         oldestCreate := map[arvados.InstanceType]time.Time{}
241         for _, cc := range wp.creating {
242                 it := cc.instanceType
243                 creating[it]++
244                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
245                         oldestCreate[it] = cc.time
246                 }
247         }
248         for _, wkr := range wp.workers {
249                 // Skip workers that are not expected to become
250                 // available soon. Note len(wkr.running)>0 is not
251                 // redundant here: it can be true even in
252                 // StateUnknown.
253                 if wkr.state == StateShutdown ||
254                         wkr.state == StateRunning ||
255                         wkr.idleBehavior != IdleBehaviorRun ||
256                         len(wkr.running) > 0 {
257                         continue
258                 }
259                 it := wkr.instType
260                 unalloc[it]++
261                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
262                         // If up to N new workers appear in
263                         // Instances() while we are waiting for N
264                         // Create() calls to complete, we assume we're
265                         // just seeing a race between Instances() and
266                         // Create() responses.
267                         //
268                         // The other common reason why nodes have
269                         // state==Unknown is that they appeared at
270                         // startup, before any Create calls. They
271                         // don't match the above timing condition, so
272                         // we never mistakenly attribute them to
273                         // pending Create calls.
274                         creating[it]--
275                 }
276         }
277         for it, c := range creating {
278                 unalloc[it] += c
279         }
280         return unalloc
281 }
282
283 // Create a new instance with the given type, and add it to the worker
284 // pool. The worker is added immediately; instance creation runs in
285 // the background.
286 //
287 // Create returns false if a pre-existing error state prevents it from
288 // even attempting to create a new instance. Those errors are logged
289 // by the Pool, so the caller does not need to log anything in such
290 // cases.
291 func (wp *Pool) Create(it arvados.InstanceType) bool {
292         logger := wp.logger.WithField("InstanceType", it.Name)
293         wp.setupOnce.Do(wp.setup)
294         if wp.loadRunnerData() != nil {
295                 // Boot probe is certain to fail.
296                 return false
297         }
298         wp.mtx.Lock()
299         defer wp.mtx.Unlock()
300         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
301                 return false
302         }
303         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
304         // requests in flight. It was added to work around a limitation in Azure's
305         // managed disks, which support no more than 20 concurrent node creation
306         // requests from a single disk image (cf.
307         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
308         // The code assumes that node creation, from Azure's perspective, means the
309         // period until the instance appears in the "get all instances" list.
310         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
311                 logger.Info("reached MaxConcurrentInstanceCreateOps")
312                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
313                 return false
314         }
315         now := time.Now()
316         secret := randomHex(instanceSecretLength)
317         wp.creating[secret] = createCall{time: now, instanceType: it}
318         go func() {
319                 defer wp.notify()
320                 tags := cloud.InstanceTags{
321                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
322                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
323                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
324                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
325                 }
326                 initCmd := TagVerifier{nil, secret}.InitCommand()
327                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
328                 wp.mtx.Lock()
329                 defer wp.mtx.Unlock()
330                 // delete() is deferred so the updateWorker() call
331                 // below knows to use StateBooting when adding a new
332                 // worker.
333                 defer delete(wp.creating, secret)
334                 if err != nil {
335                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
336                                 wp.atQuotaErr = err
337                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
338                                 time.AfterFunc(quotaErrorTTL, wp.notify)
339                         }
340                         logger.WithError(err).Error("create failed")
341                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
342                         return
343                 }
344                 wp.updateWorker(inst, it)
345         }()
346         return true
347 }
348
349 // AtQuota returns true if Create is not expected to work at the
350 // moment.
351 func (wp *Pool) AtQuota() bool {
352         wp.mtx.Lock()
353         defer wp.mtx.Unlock()
354         return time.Now().Before(wp.atQuotaUntil)
355 }
356
357 // SetIdleBehavior determines how the indicated instance will behave
358 // when it has no containers running.
359 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
360         wp.mtx.Lock()
361         defer wp.mtx.Unlock()
362         wkr, ok := wp.workers[id]
363         if !ok {
364                 return errors.New("requested instance does not exist")
365         }
366         wkr.setIdleBehavior(idleBehavior)
367         return nil
368 }
369
370 // Add or update worker attached to the given instance.
371 //
372 // The second return value is true if a new worker is created.
373 //
374 // A newly added instance has state=StateBooting if its tags match an
375 // entry in wp.creating, otherwise StateUnknown.
376 //
377 // Caller must have lock.
378 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
379         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
380         inst = TagVerifier{inst, secret}
381         id := inst.ID()
382         if wkr := wp.workers[id]; wkr != nil {
383                 wkr.executor.SetTarget(inst)
384                 wkr.instance = inst
385                 wkr.updated = time.Now()
386                 wkr.saveTags()
387                 return wkr, false
388         }
389
390         state := StateUnknown
391         if _, ok := wp.creating[secret]; ok {
392                 state = StateBooting
393         }
394
395         // If an instance has a valid IdleBehavior tag when it first
396         // appears, initialize the new worker accordingly (this is how
397         // we restore IdleBehavior that was set by a prior dispatch
398         // process); otherwise, default to "run". After this,
399         // wkr.idleBehavior is the source of truth, and will only be
400         // changed via SetIdleBehavior().
401         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
402         if !validIdleBehavior[idleBehavior] {
403                 idleBehavior = IdleBehaviorRun
404         }
405
406         logger := wp.logger.WithFields(logrus.Fields{
407                 "InstanceType": it.Name,
408                 "Instance":     inst.ID(),
409                 "Address":      inst.Address(),
410         })
411         logger.WithFields(logrus.Fields{
412                 "State":        state,
413                 "IdleBehavior": idleBehavior,
414         }).Infof("instance appeared in cloud")
415         now := time.Now()
416         wkr := &worker{
417                 mtx:          &wp.mtx,
418                 wp:           wp,
419                 logger:       logger,
420                 executor:     wp.newExecutor(inst),
421                 state:        state,
422                 idleBehavior: idleBehavior,
423                 instance:     inst,
424                 instType:     it,
425                 appeared:     now,
426                 probed:       now,
427                 busy:         now,
428                 updated:      now,
429                 running:      make(map[string]*remoteRunner),
430                 starting:     make(map[string]*remoteRunner),
431                 probing:      make(chan struct{}, 1),
432         }
433         wp.workers[id] = wkr
434         return wkr, true
435 }
436
437 // Shutdown shuts down a worker with the given type, or returns false
438 // if all workers with the given type are busy.
439 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
440         wp.setupOnce.Do(wp.setup)
441         wp.mtx.Lock()
442         defer wp.mtx.Unlock()
443         logger := wp.logger.WithField("InstanceType", it.Name)
444         logger.Info("shutdown requested")
445         for _, tryState := range []State{StateBooting, StateIdle} {
446                 // TODO: shutdown the worker with the longest idle
447                 // time (Idle) or the earliest create time (Booting)
448                 for _, wkr := range wp.workers {
449                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
450                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
451                                 wkr.reportBootOutcome(BootOutcomeAborted)
452                                 wkr.shutdown()
453                                 return true
454                         }
455                 }
456         }
457         return false
458 }
459
460 // CountWorkers returns the current number of workers in each state.
461 //
462 // CountWorkers blocks, if necessary, until the initial instance list
463 // has been loaded from the cloud provider.
464 func (wp *Pool) CountWorkers() map[State]int {
465         wp.setupOnce.Do(wp.setup)
466         wp.waitUntilLoaded()
467         wp.mtx.Lock()
468         defer wp.mtx.Unlock()
469         r := map[State]int{}
470         for _, w := range wp.workers {
471                 r[w.state]++
472         }
473         return r
474 }
475
476 // Running returns the container UUIDs being prepared/run on workers.
477 //
478 // In the returned map, the time value indicates when the Pool
479 // observed that the container process had exited. A container that
480 // has not yet exited has a zero time value. The caller should use
481 // ForgetContainer() to garbage-collect the entries for exited
482 // containers.
483 func (wp *Pool) Running() map[string]time.Time {
484         wp.setupOnce.Do(wp.setup)
485         wp.mtx.Lock()
486         defer wp.mtx.Unlock()
487         r := map[string]time.Time{}
488         for _, wkr := range wp.workers {
489                 for uuid := range wkr.running {
490                         r[uuid] = time.Time{}
491                 }
492                 for uuid := range wkr.starting {
493                         r[uuid] = time.Time{}
494                 }
495         }
496         for uuid, exited := range wp.exited {
497                 r[uuid] = exited
498         }
499         return r
500 }
501
502 // StartContainer starts a container on an idle worker immediately if
503 // possible, otherwise returns false.
504 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
505         wp.setupOnce.Do(wp.setup)
506         wp.mtx.Lock()
507         defer wp.mtx.Unlock()
508         var wkr *worker
509         for _, w := range wp.workers {
510                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
511                         if wkr == nil || w.busy.After(wkr.busy) {
512                                 wkr = w
513                         }
514                 }
515         }
516         if wkr == nil {
517                 return false
518         }
519         wkr.startContainer(ctr)
520         return true
521 }
522
523 // KillContainer kills the crunch-run process for the given container
524 // UUID, if it's running on any worker.
525 //
526 // KillContainer returns immediately; the act of killing the container
527 // takes some time, and runs in the background.
528 //
529 // KillContainer returns false if the container has already ended.
530 func (wp *Pool) KillContainer(uuid string, reason string) bool {
531         wp.mtx.Lock()
532         defer wp.mtx.Unlock()
533         logger := wp.logger.WithFields(logrus.Fields{
534                 "ContainerUUID": uuid,
535                 "Reason":        reason,
536         })
537         for _, wkr := range wp.workers {
538                 rr := wkr.running[uuid]
539                 if rr == nil {
540                         rr = wkr.starting[uuid]
541                 }
542                 if rr != nil {
543                         rr.Kill(reason)
544                         return true
545                 }
546         }
547         logger.Debug("cannot kill: already disappeared")
548         return false
549 }
550
551 // ForgetContainer clears the placeholder for the given exited
552 // container, so it isn't returned by subsequent calls to Running().
553 //
554 // ForgetContainer has no effect if the container has not yet exited.
555 //
556 // The "container exited at time T" placeholder (which necessitates
557 // ForgetContainer) exists to make it easier for the caller
558 // (scheduler) to distinguish a container that exited without
559 // finalizing its state from a container that exited too recently for
560 // its final state to have appeared in the scheduler's queue cache.
561 func (wp *Pool) ForgetContainer(uuid string) {
562         wp.mtx.Lock()
563         defer wp.mtx.Unlock()
564         if _, ok := wp.exited[uuid]; ok {
565                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
566                 delete(wp.exited, uuid)
567         }
568 }
569
570 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
571         if reg == nil {
572                 reg = prometheus.NewRegistry()
573         }
574         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
575                 Namespace: "arvados",
576                 Subsystem: "dispatchcloud",
577                 Name:      "containers_running",
578                 Help:      "Number of containers reported running by cloud VMs.",
579         })
580         reg.MustRegister(wp.mContainersRunning)
581         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
582                 Namespace: "arvados",
583                 Subsystem: "dispatchcloud",
584                 Name:      "instances_total",
585                 Help:      "Number of cloud VMs.",
586         }, []string{"category", "instance_type"})
587         reg.MustRegister(wp.mInstances)
588         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
589                 Namespace: "arvados",
590                 Subsystem: "dispatchcloud",
591                 Name:      "instances_price",
592                 Help:      "Price of cloud VMs.",
593         }, []string{"category"})
594         reg.MustRegister(wp.mInstancesPrice)
595         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
596                 Namespace: "arvados",
597                 Subsystem: "dispatchcloud",
598                 Name:      "vcpus_total",
599                 Help:      "Total VCPUs on all cloud VMs.",
600         }, []string{"category"})
601         reg.MustRegister(wp.mVCPUs)
602         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
603                 Namespace: "arvados",
604                 Subsystem: "dispatchcloud",
605                 Name:      "memory_bytes_total",
606                 Help:      "Total memory on all cloud VMs.",
607         }, []string{"category"})
608         reg.MustRegister(wp.mMemory)
609         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
610                 Namespace: "arvados",
611                 Subsystem: "dispatchcloud",
612                 Name:      "boot_outcomes",
613                 Help:      "Boot outcomes by type.",
614         }, []string{"outcome"})
615         for k := range validBootOutcomes {
616                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
617         }
618         reg.MustRegister(wp.mBootOutcomes)
619         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
620                 Namespace: "arvados",
621                 Subsystem: "dispatchcloud",
622                 Name:      "instances_disappeared",
623                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
624         }, []string{"state"})
625         for _, v := range stateString {
626                 wp.mDisappearances.WithLabelValues(v).Add(0)
627         }
628         reg.MustRegister(wp.mDisappearances)
629 }
630
631 func (wp *Pool) runMetrics() {
632         ch := wp.Subscribe()
633         defer wp.Unsubscribe(ch)
634         wp.updateMetrics()
635         for range ch {
636                 wp.updateMetrics()
637         }
638 }
639
640 func (wp *Pool) updateMetrics() {
641         wp.mtx.RLock()
642         defer wp.mtx.RUnlock()
643
644         type entKey struct {
645                 cat      string
646                 instType string
647         }
648         instances := map[entKey]int64{}
649         price := map[string]float64{}
650         cpu := map[string]int64{}
651         mem := map[string]int64{}
652         var running int64
653         for _, wkr := range wp.workers {
654                 var cat string
655                 switch {
656                 case len(wkr.running)+len(wkr.starting) > 0:
657                         cat = "inuse"
658                 case wkr.idleBehavior == IdleBehaviorHold:
659                         cat = "hold"
660                 case wkr.state == StateBooting:
661                         cat = "booting"
662                 case wkr.state == StateUnknown:
663                         cat = "unknown"
664                 default:
665                         cat = "idle"
666                 }
667                 instances[entKey{cat, wkr.instType.Name}]++
668                 price[cat] += wkr.instType.Price
669                 cpu[cat] += int64(wkr.instType.VCPUs)
670                 mem[cat] += int64(wkr.instType.RAM)
671                 running += int64(len(wkr.running) + len(wkr.starting))
672         }
673         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
674                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
675                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
676                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
677                 // make sure to reset gauges for non-existing category/nodetype combinations
678                 for _, it := range wp.instanceTypes {
679                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
680                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
681                         }
682                 }
683         }
684         for k, v := range instances {
685                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
686         }
687         wp.mContainersRunning.Set(float64(running))
688 }
689
690 func (wp *Pool) runProbes() {
691         maxPPS := wp.maxProbesPerSecond
692         if maxPPS < 1 {
693                 maxPPS = defaultMaxProbesPerSecond
694         }
695         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
696         defer limitticker.Stop()
697
698         probeticker := time.NewTicker(wp.probeInterval)
699         defer probeticker.Stop()
700
701         workers := []cloud.InstanceID{}
702         for range probeticker.C {
703                 workers = workers[:0]
704                 wp.mtx.Lock()
705                 for id, wkr := range wp.workers {
706                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
707                                 continue
708                         }
709                         workers = append(workers, id)
710                 }
711                 wp.mtx.Unlock()
712
713                 for _, id := range workers {
714                         wp.mtx.Lock()
715                         wkr, ok := wp.workers[id]
716                         wp.mtx.Unlock()
717                         if !ok {
718                                 // Deleted while we were probing
719                                 // others
720                                 continue
721                         }
722                         go wkr.ProbeAndUpdate()
723                         select {
724                         case <-wp.stop:
725                                 return
726                         case <-limitticker.C:
727                         }
728                 }
729         }
730 }
731
732 func (wp *Pool) runSync() {
733         // sync once immediately, then wait syncInterval, sync again,
734         // etc.
735         timer := time.NewTimer(1)
736         for {
737                 select {
738                 case <-timer.C:
739                         err := wp.getInstancesAndSync()
740                         if err != nil {
741                                 wp.logger.WithError(err).Warn("sync failed")
742                         }
743                         timer.Reset(wp.syncInterval)
744                 case <-wp.stop:
745                         wp.logger.Debug("worker.Pool stopped")
746                         return
747                 }
748         }
749 }
750
751 // Stop synchronizing with the InstanceSet.
752 func (wp *Pool) Stop() {
753         wp.setupOnce.Do(wp.setup)
754         close(wp.stop)
755 }
756
757 // Instances returns an InstanceView for each worker in the pool,
758 // summarizing its current state and recent activity.
759 func (wp *Pool) Instances() []InstanceView {
760         var r []InstanceView
761         wp.setupOnce.Do(wp.setup)
762         wp.mtx.Lock()
763         for _, w := range wp.workers {
764                 r = append(r, InstanceView{
765                         Instance:             w.instance.ID(),
766                         Address:              w.instance.Address(),
767                         Price:                w.instType.Price,
768                         ArvadosInstanceType:  w.instType.Name,
769                         ProviderInstanceType: w.instType.ProviderType,
770                         LastContainerUUID:    w.lastUUID,
771                         LastBusy:             w.busy,
772                         WorkerState:          w.state.String(),
773                         IdleBehavior:         w.idleBehavior,
774                 })
775         }
776         wp.mtx.Unlock()
777         sort.Slice(r, func(i, j int) bool {
778                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
779         })
780         return r
781 }
782
783 // KillInstance destroys a cloud VM instance. It returns an error if
784 // the given instance does not exist.
785 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
786         wkr, ok := wp.workers[id]
787         if !ok {
788                 return errors.New("instance not found")
789         }
790         wkr.logger.WithField("Reason", reason).Info("shutting down")
791         wkr.reportBootOutcome(BootOutcomeAborted)
792         wkr.shutdown()
793         return nil
794 }
795
796 func (wp *Pool) setup() {
797         wp.creating = map[string]createCall{}
798         wp.exited = map[string]time.Time{}
799         wp.workers = map[cloud.InstanceID]*worker{}
800         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
801         wp.loadRunnerData()
802 }
803
804 // Load the runner program to be deployed on worker nodes into
805 // wp.runnerData, if necessary. Errors are logged.
806 //
807 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
808 //
809 // Caller must not have lock.
810 func (wp *Pool) loadRunnerData() error {
811         wp.mtx.Lock()
812         defer wp.mtx.Unlock()
813         if wp.runnerData != nil {
814                 return nil
815         } else if wp.runnerSource == "" {
816                 wp.runnerCmd = "crunch-run"
817                 wp.runnerData = []byte{}
818                 return nil
819         }
820         logger := wp.logger.WithField("source", wp.runnerSource)
821         logger.Debug("loading runner")
822         buf, err := ioutil.ReadFile(wp.runnerSource)
823         if err != nil {
824                 logger.WithError(err).Error("failed to load runner program")
825                 return err
826         }
827         wp.runnerData = buf
828         wp.runnerMD5 = md5.Sum(buf)
829         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
830         return nil
831 }
832
833 func (wp *Pool) notify() {
834         wp.mtx.RLock()
835         defer wp.mtx.RUnlock()
836         for _, send := range wp.subscribers {
837                 select {
838                 case send <- struct{}{}:
839                 default:
840                 }
841         }
842 }
843
844 func (wp *Pool) getInstancesAndSync() error {
845         wp.setupOnce.Do(wp.setup)
846         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
847                 return err
848         }
849         wp.logger.Debug("getting instance list")
850         threshold := time.Now()
851         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
852         if err != nil {
853                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
854                 return err
855         }
856         wp.sync(threshold, instances)
857         wp.logger.Debug("sync done")
858         return nil
859 }
860
861 // Add/remove/update workers based on instances, which was obtained
862 // from the instanceSet. However, don't clobber any other updates that
863 // already happened after threshold.
864 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
865         wp.mtx.Lock()
866         defer wp.mtx.Unlock()
867         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
868         notify := false
869
870         for _, inst := range instances {
871                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
872                 it, ok := wp.instanceTypes[itTag]
873                 if !ok {
874                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
875                         continue
876                 }
877                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
878                         notify = true
879                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
880                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
881                         wkr.shutdown()
882                 }
883         }
884
885         for id, wkr := range wp.workers {
886                 if wkr.updated.After(threshold) {
887                         continue
888                 }
889                 logger := wp.logger.WithFields(logrus.Fields{
890                         "Instance":    wkr.instance.ID(),
891                         "WorkerState": wkr.state,
892                 })
893                 logger.Info("instance disappeared in cloud")
894                 wkr.reportBootOutcome(BootOutcomeDisappeared)
895                 if wp.mDisappearances != nil {
896                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
897                 }
898                 delete(wp.workers, id)
899                 go wkr.Close()
900                 notify = true
901         }
902
903         if !wp.loaded {
904                 notify = true
905                 wp.loaded = true
906                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
907         }
908
909         if notify {
910                 go wp.notify()
911         }
912 }
913
914 func (wp *Pool) waitUntilLoaded() {
915         ch := wp.Subscribe()
916         wp.mtx.RLock()
917         defer wp.mtx.RUnlock()
918         for !wp.loaded {
919                 wp.mtx.RUnlock()
920                 <-ch
921                 wp.mtx.RLock()
922         }
923 }
924
925 // Return a random string of n hexadecimal digits (n*4 random bits). n
926 // must be even.
927 func randomHex(n int) string {
928         buf := make([]byte, n/2)
929         _, err := rand.Read(buf)
930         if err != nil {
931                 panic(err)
932         }
933         return fmt.Sprintf("%x", buf)
934 }