Merge branch 'master' into 16811-public-favs
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval        = time.Minute
68         defaultProbeInterval       = time.Second * 10
69         defaultMaxProbesPerSecond  = 10
70         defaultTimeoutIdle         = time.Minute
71         defaultTimeoutBooting      = time.Minute * 10
72         defaultTimeoutProbe        = time.Minute * 10
73         defaultTimeoutShutdown     = time.Second * 10
74         defaultTimeoutTERM         = time.Minute * 2
75         defaultTimeoutSignal       = time.Second * 5
76         defaultTimeoutStaleRunLock = time.Second * 5
77
78         // Time after a quota error to try again anyway, even if no
79         // instances have been shutdown.
80         quotaErrorTTL = time.Minute
81
82         // Time between "X failed because rate limiting" messages
83         logRateLimitErrorInterval = time.Second * 10
84 )
85
86 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87         if conf > 0 {
88                 return time.Duration(conf)
89         }
90         return def
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:                         logger,
100                 arvClient:                      arvClient,
101                 instanceSetID:                  instanceSetID,
102                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:                    newExecutor,
104                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:                  cluster.InstanceTypes,
108                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
119                 installPublicKey:               installPublicKey,
120                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
121                 stop:                           make(chan bool),
122         }
123         wp.registerMetrics(reg)
124         go func() {
125                 wp.setupOnce.Do(wp.setup)
126                 go wp.runMetrics()
127                 go wp.runProbes()
128                 go wp.runSync()
129         }()
130         return wp
131 }
132
133 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
134 // zero Pool should not be used. Call NewPool to create a new Pool.
135 type Pool struct {
136         // configuration
137         logger                         logrus.FieldLogger
138         arvClient                      *arvados.Client
139         instanceSetID                  cloud.InstanceSetID
140         instanceSet                    *throttledInstanceSet
141         newExecutor                    func(cloud.Instance) Executor
142         bootProbeCommand               string
143         runnerSource                   string
144         imageID                        cloud.ImageID
145         instanceTypes                  map[string]arvados.InstanceType
146         syncInterval                   time.Duration
147         probeInterval                  time.Duration
148         maxProbesPerSecond             int
149         maxConcurrentInstanceCreateOps int
150         timeoutIdle                    time.Duration
151         timeoutBooting                 time.Duration
152         timeoutProbe                   time.Duration
153         timeoutShutdown                time.Duration
154         timeoutTERM                    time.Duration
155         timeoutSignal                  time.Duration
156         timeoutStaleRunLock            time.Duration
157         installPublicKey               ssh.PublicKey
158         tagKeyPrefix                   string
159
160         // private state
161         subscribers  map[<-chan struct{}]chan<- struct{}
162         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
163         workers      map[cloud.InstanceID]*worker
164         loaded       bool                 // loaded list of instances from InstanceSet at least once
165         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
166         atQuotaUntil time.Time
167         atQuotaErr   cloud.QuotaError
168         stop         chan bool
169         mtx          sync.RWMutex
170         setupOnce    sync.Once
171         runnerData   []byte
172         runnerMD5    [md5.Size]byte
173         runnerCmd    string
174
175         mContainersRunning        prometheus.Gauge
176         mInstances                *prometheus.GaugeVec
177         mInstancesPrice           *prometheus.GaugeVec
178         mVCPUs                    *prometheus.GaugeVec
179         mMemory                   *prometheus.GaugeVec
180         mBootOutcomes             *prometheus.CounterVec
181         mDisappearances           *prometheus.CounterVec
182         mTimeToSSH                prometheus.Summary
183         mTimeToReadyForContainer  prometheus.Summary
184         mTimeFromShutdownToGone   prometheus.Summary
185         mTimeFromQueueToCrunchRun prometheus.Summary
186         mRunProbeDuration         *prometheus.SummaryVec
187 }
188
189 type createCall struct {
190         time         time.Time
191         instanceType arvados.InstanceType
192 }
193
194 func (wp *Pool) CheckHealth() error {
195         wp.setupOnce.Do(wp.setup)
196         if err := wp.loadRunnerData(); err != nil {
197                 return fmt.Errorf("error loading runner binary: %s", err)
198         }
199         return nil
200 }
201
202 // Subscribe returns a buffered channel that becomes ready after any
203 // change to the pool's state that could have scheduling implications:
204 // a worker's state changes, a new worker appears, the cloud
205 // provider's API rate limiting period ends, etc.
206 //
207 // Additional events that occur while the channel is already ready
208 // will be dropped, so it is OK if the caller services the channel
209 // slowly.
210 //
211 // Example:
212 //
213 //      ch := wp.Subscribe()
214 //      defer wp.Unsubscribe(ch)
215 //      for range ch {
216 //              tryScheduling(wp)
217 //              if done {
218 //                      break
219 //              }
220 //      }
221 func (wp *Pool) Subscribe() <-chan struct{} {
222         wp.setupOnce.Do(wp.setup)
223         wp.mtx.Lock()
224         defer wp.mtx.Unlock()
225         ch := make(chan struct{}, 1)
226         wp.subscribers[ch] = ch
227         return ch
228 }
229
230 // Unsubscribe stops sending updates to the given channel.
231 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
232         wp.setupOnce.Do(wp.setup)
233         wp.mtx.Lock()
234         defer wp.mtx.Unlock()
235         delete(wp.subscribers, ch)
236 }
237
238 // Unallocated returns the number of unallocated (creating + booting +
239 // idle + unknown) workers for each instance type.  Workers in
240 // hold/drain mode are not included.
241 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
242         wp.setupOnce.Do(wp.setup)
243         wp.mtx.RLock()
244         defer wp.mtx.RUnlock()
245         unalloc := map[arvados.InstanceType]int{}
246         creating := map[arvados.InstanceType]int{}
247         oldestCreate := map[arvados.InstanceType]time.Time{}
248         for _, cc := range wp.creating {
249                 it := cc.instanceType
250                 creating[it]++
251                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
252                         oldestCreate[it] = cc.time
253                 }
254         }
255         for _, wkr := range wp.workers {
256                 // Skip workers that are not expected to become
257                 // available soon. Note len(wkr.running)>0 is not
258                 // redundant here: it can be true even in
259                 // StateUnknown.
260                 if wkr.state == StateShutdown ||
261                         wkr.state == StateRunning ||
262                         wkr.idleBehavior != IdleBehaviorRun ||
263                         len(wkr.running) > 0 {
264                         continue
265                 }
266                 it := wkr.instType
267                 unalloc[it]++
268                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
269                         // If up to N new workers appear in
270                         // Instances() while we are waiting for N
271                         // Create() calls to complete, we assume we're
272                         // just seeing a race between Instances() and
273                         // Create() responses.
274                         //
275                         // The other common reason why nodes have
276                         // state==Unknown is that they appeared at
277                         // startup, before any Create calls. They
278                         // don't match the above timing condition, so
279                         // we never mistakenly attribute them to
280                         // pending Create calls.
281                         creating[it]--
282                 }
283         }
284         for it, c := range creating {
285                 unalloc[it] += c
286         }
287         return unalloc
288 }
289
290 // Create a new instance with the given type, and add it to the worker
291 // pool. The worker is added immediately; instance creation runs in
292 // the background.
293 //
294 // Create returns false if a pre-existing error state prevents it from
295 // even attempting to create a new instance. Those errors are logged
296 // by the Pool, so the caller does not need to log anything in such
297 // cases.
298 func (wp *Pool) Create(it arvados.InstanceType) bool {
299         logger := wp.logger.WithField("InstanceType", it.Name)
300         wp.setupOnce.Do(wp.setup)
301         if wp.loadRunnerData() != nil {
302                 // Boot probe is certain to fail.
303                 return false
304         }
305         wp.mtx.Lock()
306         defer wp.mtx.Unlock()
307         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
308                 return false
309         }
310         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
311         // requests in flight. It was added to work around a limitation in Azure's
312         // managed disks, which support no more than 20 concurrent node creation
313         // requests from a single disk image (cf.
314         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
315         // The code assumes that node creation, from Azure's perspective, means the
316         // period until the instance appears in the "get all instances" list.
317         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
318                 logger.Info("reached MaxConcurrentInstanceCreateOps")
319                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
320                 return false
321         }
322         now := time.Now()
323         secret := randomHex(instanceSecretLength)
324         wp.creating[secret] = createCall{time: now, instanceType: it}
325         go func() {
326                 defer wp.notify()
327                 tags := cloud.InstanceTags{
328                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
329                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
330                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
331                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
332                 }
333                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
334                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
335                 wp.mtx.Lock()
336                 defer wp.mtx.Unlock()
337                 // delete() is deferred so the updateWorker() call
338                 // below knows to use StateBooting when adding a new
339                 // worker.
340                 defer delete(wp.creating, secret)
341                 if err != nil {
342                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
343                                 wp.atQuotaErr = err
344                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
345                                 time.AfterFunc(quotaErrorTTL, wp.notify)
346                         }
347                         logger.WithError(err).Error("create failed")
348                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
349                         return
350                 }
351                 wp.updateWorker(inst, it)
352         }()
353         return true
354 }
355
356 // AtQuota returns true if Create is not expected to work at the
357 // moment.
358 func (wp *Pool) AtQuota() bool {
359         wp.mtx.Lock()
360         defer wp.mtx.Unlock()
361         return time.Now().Before(wp.atQuotaUntil)
362 }
363
364 // SetIdleBehavior determines how the indicated instance will behave
365 // when it has no containers running.
366 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
367         wp.mtx.Lock()
368         defer wp.mtx.Unlock()
369         wkr, ok := wp.workers[id]
370         if !ok {
371                 return errors.New("requested instance does not exist")
372         }
373         wkr.setIdleBehavior(idleBehavior)
374         return nil
375 }
376
377 // Successful connection to the SSH daemon, update the mTimeToSSH metric
378 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
379         wp.mtx.Lock()
380         defer wp.mtx.Unlock()
381         wkr := wp.workers[inst.ID()]
382         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
383                 // the node is not in booting state (can happen if a-d-c is restarted) OR
384                 // this is not the first SSH connection
385                 return
386         }
387
388         wkr.firstSSHConnection = time.Now()
389         if wp.mTimeToSSH != nil {
390                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
391         }
392 }
393
394 // Add or update worker attached to the given instance.
395 //
396 // The second return value is true if a new worker is created.
397 //
398 // A newly added instance has state=StateBooting if its tags match an
399 // entry in wp.creating, otherwise StateUnknown.
400 //
401 // Caller must have lock.
402 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
403         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
404         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
405         id := inst.ID()
406         if wkr := wp.workers[id]; wkr != nil {
407                 wkr.executor.SetTarget(inst)
408                 wkr.instance = inst
409                 wkr.updated = time.Now()
410                 wkr.saveTags()
411                 return wkr, false
412         }
413
414         state := StateUnknown
415         if _, ok := wp.creating[secret]; ok {
416                 state = StateBooting
417         }
418
419         // If an instance has a valid IdleBehavior tag when it first
420         // appears, initialize the new worker accordingly (this is how
421         // we restore IdleBehavior that was set by a prior dispatch
422         // process); otherwise, default to "run". After this,
423         // wkr.idleBehavior is the source of truth, and will only be
424         // changed via SetIdleBehavior().
425         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
426         if !validIdleBehavior[idleBehavior] {
427                 idleBehavior = IdleBehaviorRun
428         }
429
430         logger := wp.logger.WithFields(logrus.Fields{
431                 "InstanceType": it.Name,
432                 "Instance":     inst.ID(),
433                 "Address":      inst.Address(),
434         })
435         logger.WithFields(logrus.Fields{
436                 "State":        state,
437                 "IdleBehavior": idleBehavior,
438         }).Infof("instance appeared in cloud")
439         now := time.Now()
440         wkr := &worker{
441                 mtx:          &wp.mtx,
442                 wp:           wp,
443                 logger:       logger,
444                 executor:     wp.newExecutor(inst),
445                 state:        state,
446                 idleBehavior: idleBehavior,
447                 instance:     inst,
448                 instType:     it,
449                 appeared:     now,
450                 probed:       now,
451                 busy:         now,
452                 updated:      now,
453                 running:      make(map[string]*remoteRunner),
454                 starting:     make(map[string]*remoteRunner),
455                 probing:      make(chan struct{}, 1),
456         }
457         wp.workers[id] = wkr
458         return wkr, true
459 }
460
461 // Shutdown shuts down a worker with the given type, or returns false
462 // if all workers with the given type are busy.
463 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
464         wp.setupOnce.Do(wp.setup)
465         wp.mtx.Lock()
466         defer wp.mtx.Unlock()
467         logger := wp.logger.WithField("InstanceType", it.Name)
468         logger.Info("shutdown requested")
469         for _, tryState := range []State{StateBooting, StateIdle} {
470                 // TODO: shutdown the worker with the longest idle
471                 // time (Idle) or the earliest create time (Booting)
472                 for _, wkr := range wp.workers {
473                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
474                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
475                                 wkr.reportBootOutcome(BootOutcomeAborted)
476                                 wkr.shutdown()
477                                 return true
478                         }
479                 }
480         }
481         return false
482 }
483
484 // CountWorkers returns the current number of workers in each state.
485 //
486 // CountWorkers blocks, if necessary, until the initial instance list
487 // has been loaded from the cloud provider.
488 func (wp *Pool) CountWorkers() map[State]int {
489         wp.setupOnce.Do(wp.setup)
490         wp.waitUntilLoaded()
491         wp.mtx.Lock()
492         defer wp.mtx.Unlock()
493         r := map[State]int{}
494         for _, w := range wp.workers {
495                 r[w.state]++
496         }
497         return r
498 }
499
500 // Running returns the container UUIDs being prepared/run on workers.
501 //
502 // In the returned map, the time value indicates when the Pool
503 // observed that the container process had exited. A container that
504 // has not yet exited has a zero time value. The caller should use
505 // ForgetContainer() to garbage-collect the entries for exited
506 // containers.
507 func (wp *Pool) Running() map[string]time.Time {
508         wp.setupOnce.Do(wp.setup)
509         wp.mtx.Lock()
510         defer wp.mtx.Unlock()
511         r := map[string]time.Time{}
512         for _, wkr := range wp.workers {
513                 for uuid := range wkr.running {
514                         r[uuid] = time.Time{}
515                 }
516                 for uuid := range wkr.starting {
517                         r[uuid] = time.Time{}
518                 }
519         }
520         for uuid, exited := range wp.exited {
521                 r[uuid] = exited
522         }
523         return r
524 }
525
526 // StartContainer starts a container on an idle worker immediately if
527 // possible, otherwise returns false.
528 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
529         wp.setupOnce.Do(wp.setup)
530         wp.mtx.Lock()
531         defer wp.mtx.Unlock()
532         var wkr *worker
533         for _, w := range wp.workers {
534                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
535                         if wkr == nil || w.busy.After(wkr.busy) {
536                                 wkr = w
537                         }
538                 }
539         }
540         if wkr == nil {
541                 return false
542         }
543         wkr.startContainer(ctr)
544         return true
545 }
546
547 // KillContainer kills the crunch-run process for the given container
548 // UUID, if it's running on any worker.
549 //
550 // KillContainer returns immediately; the act of killing the container
551 // takes some time, and runs in the background.
552 //
553 // KillContainer returns false if the container has already ended.
554 func (wp *Pool) KillContainer(uuid string, reason string) bool {
555         wp.mtx.Lock()
556         defer wp.mtx.Unlock()
557         logger := wp.logger.WithFields(logrus.Fields{
558                 "ContainerUUID": uuid,
559                 "Reason":        reason,
560         })
561         for _, wkr := range wp.workers {
562                 rr := wkr.running[uuid]
563                 if rr == nil {
564                         rr = wkr.starting[uuid]
565                 }
566                 if rr != nil {
567                         rr.Kill(reason)
568                         return true
569                 }
570         }
571         logger.Debug("cannot kill: already disappeared")
572         return false
573 }
574
575 // ForgetContainer clears the placeholder for the given exited
576 // container, so it isn't returned by subsequent calls to Running().
577 //
578 // ForgetContainer has no effect if the container has not yet exited.
579 //
580 // The "container exited at time T" placeholder (which necessitates
581 // ForgetContainer) exists to make it easier for the caller
582 // (scheduler) to distinguish a container that exited without
583 // finalizing its state from a container that exited too recently for
584 // its final state to have appeared in the scheduler's queue cache.
585 func (wp *Pool) ForgetContainer(uuid string) {
586         wp.mtx.Lock()
587         defer wp.mtx.Unlock()
588         if _, ok := wp.exited[uuid]; ok {
589                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
590                 delete(wp.exited, uuid)
591         }
592 }
593
594 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
595         if reg == nil {
596                 reg = prometheus.NewRegistry()
597         }
598         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
599                 Namespace: "arvados",
600                 Subsystem: "dispatchcloud",
601                 Name:      "containers_running",
602                 Help:      "Number of containers reported running by cloud VMs.",
603         })
604         reg.MustRegister(wp.mContainersRunning)
605         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
606                 Namespace: "arvados",
607                 Subsystem: "dispatchcloud",
608                 Name:      "instances_total",
609                 Help:      "Number of cloud VMs.",
610         }, []string{"category", "instance_type"})
611         reg.MustRegister(wp.mInstances)
612         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
613                 Namespace: "arvados",
614                 Subsystem: "dispatchcloud",
615                 Name:      "instances_price",
616                 Help:      "Price of cloud VMs.",
617         }, []string{"category"})
618         reg.MustRegister(wp.mInstancesPrice)
619         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
620                 Namespace: "arvados",
621                 Subsystem: "dispatchcloud",
622                 Name:      "vcpus_total",
623                 Help:      "Total VCPUs on all cloud VMs.",
624         }, []string{"category"})
625         reg.MustRegister(wp.mVCPUs)
626         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
627                 Namespace: "arvados",
628                 Subsystem: "dispatchcloud",
629                 Name:      "memory_bytes_total",
630                 Help:      "Total memory on all cloud VMs.",
631         }, []string{"category"})
632         reg.MustRegister(wp.mMemory)
633         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
634                 Namespace: "arvados",
635                 Subsystem: "dispatchcloud",
636                 Name:      "boot_outcomes",
637                 Help:      "Boot outcomes by type.",
638         }, []string{"outcome"})
639         for k := range validBootOutcomes {
640                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
641         }
642         reg.MustRegister(wp.mBootOutcomes)
643         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
644                 Namespace: "arvados",
645                 Subsystem: "dispatchcloud",
646                 Name:      "instances_disappeared",
647                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
648         }, []string{"state"})
649         for _, v := range stateString {
650                 wp.mDisappearances.WithLabelValues(v).Add(0)
651         }
652         reg.MustRegister(wp.mDisappearances)
653         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
654                 Namespace:  "arvados",
655                 Subsystem:  "dispatchcloud",
656                 Name:       "instances_time_to_ssh_seconds",
657                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
658                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
659         })
660         reg.MustRegister(wp.mTimeToSSH)
661         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
662                 Namespace:  "arvados",
663                 Subsystem:  "dispatchcloud",
664                 Name:       "instances_time_to_ready_for_container_seconds",
665                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
666                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
667         })
668         reg.MustRegister(wp.mTimeToReadyForContainer)
669         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
670                 Namespace:  "arvados",
671                 Subsystem:  "dispatchcloud",
672                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
673                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
674                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
675         })
676         reg.MustRegister(wp.mTimeFromShutdownToGone)
677         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
678                 Namespace:  "arvados",
679                 Subsystem:  "dispatchcloud",
680                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
681                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
682                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
683         })
684         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
685         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
686                 Namespace:  "arvados",
687                 Subsystem:  "dispatchcloud",
688                 Name:       "instances_run_probe_duration_seconds",
689                 Help:       "Number of seconds per runProbe call.",
690                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
691         }, []string{"outcome"})
692         reg.MustRegister(wp.mRunProbeDuration)
693 }
694
695 func (wp *Pool) runMetrics() {
696         ch := wp.Subscribe()
697         defer wp.Unsubscribe(ch)
698         wp.updateMetrics()
699         for range ch {
700                 wp.updateMetrics()
701         }
702 }
703
704 func (wp *Pool) updateMetrics() {
705         wp.mtx.RLock()
706         defer wp.mtx.RUnlock()
707
708         type entKey struct {
709                 cat      string
710                 instType string
711         }
712         instances := map[entKey]int64{}
713         price := map[string]float64{}
714         cpu := map[string]int64{}
715         mem := map[string]int64{}
716         var running int64
717         for _, wkr := range wp.workers {
718                 var cat string
719                 switch {
720                 case len(wkr.running)+len(wkr.starting) > 0:
721                         cat = "inuse"
722                 case wkr.idleBehavior == IdleBehaviorHold:
723                         cat = "hold"
724                 case wkr.state == StateBooting:
725                         cat = "booting"
726                 case wkr.state == StateUnknown:
727                         cat = "unknown"
728                 default:
729                         cat = "idle"
730                 }
731                 instances[entKey{cat, wkr.instType.Name}]++
732                 price[cat] += wkr.instType.Price
733                 cpu[cat] += int64(wkr.instType.VCPUs)
734                 mem[cat] += int64(wkr.instType.RAM)
735                 running += int64(len(wkr.running) + len(wkr.starting))
736         }
737         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
738                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
739                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
740                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
741                 // make sure to reset gauges for non-existing category/nodetype combinations
742                 for _, it := range wp.instanceTypes {
743                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
744                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
745                         }
746                 }
747         }
748         for k, v := range instances {
749                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
750         }
751         wp.mContainersRunning.Set(float64(running))
752 }
753
754 func (wp *Pool) runProbes() {
755         maxPPS := wp.maxProbesPerSecond
756         if maxPPS < 1 {
757                 maxPPS = defaultMaxProbesPerSecond
758         }
759         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
760         defer limitticker.Stop()
761
762         probeticker := time.NewTicker(wp.probeInterval)
763         defer probeticker.Stop()
764
765         workers := []cloud.InstanceID{}
766         for range probeticker.C {
767                 workers = workers[:0]
768                 wp.mtx.Lock()
769                 for id, wkr := range wp.workers {
770                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
771                                 continue
772                         }
773                         workers = append(workers, id)
774                 }
775                 wp.mtx.Unlock()
776
777                 for _, id := range workers {
778                         wp.mtx.Lock()
779                         wkr, ok := wp.workers[id]
780                         wp.mtx.Unlock()
781                         if !ok {
782                                 // Deleted while we were probing
783                                 // others
784                                 continue
785                         }
786                         go wkr.ProbeAndUpdate()
787                         select {
788                         case <-wp.stop:
789                                 return
790                         case <-limitticker.C:
791                         }
792                 }
793         }
794 }
795
796 func (wp *Pool) runSync() {
797         // sync once immediately, then wait syncInterval, sync again,
798         // etc.
799         timer := time.NewTimer(1)
800         for {
801                 select {
802                 case <-timer.C:
803                         err := wp.getInstancesAndSync()
804                         if err != nil {
805                                 wp.logger.WithError(err).Warn("sync failed")
806                         }
807                         timer.Reset(wp.syncInterval)
808                 case <-wp.stop:
809                         wp.logger.Debug("worker.Pool stopped")
810                         return
811                 }
812         }
813 }
814
815 // Stop synchronizing with the InstanceSet.
816 func (wp *Pool) Stop() {
817         wp.setupOnce.Do(wp.setup)
818         close(wp.stop)
819 }
820
821 // Instances returns an InstanceView for each worker in the pool,
822 // summarizing its current state and recent activity.
823 func (wp *Pool) Instances() []InstanceView {
824         var r []InstanceView
825         wp.setupOnce.Do(wp.setup)
826         wp.mtx.Lock()
827         for _, w := range wp.workers {
828                 r = append(r, InstanceView{
829                         Instance:             w.instance.ID(),
830                         Address:              w.instance.Address(),
831                         Price:                w.instType.Price,
832                         ArvadosInstanceType:  w.instType.Name,
833                         ProviderInstanceType: w.instType.ProviderType,
834                         LastContainerUUID:    w.lastUUID,
835                         LastBusy:             w.busy,
836                         WorkerState:          w.state.String(),
837                         IdleBehavior:         w.idleBehavior,
838                 })
839         }
840         wp.mtx.Unlock()
841         sort.Slice(r, func(i, j int) bool {
842                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
843         })
844         return r
845 }
846
847 // KillInstance destroys a cloud VM instance. It returns an error if
848 // the given instance does not exist.
849 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
850         wkr, ok := wp.workers[id]
851         if !ok {
852                 return errors.New("instance not found")
853         }
854         wkr.logger.WithField("Reason", reason).Info("shutting down")
855         wkr.reportBootOutcome(BootOutcomeAborted)
856         wkr.shutdown()
857         return nil
858 }
859
860 func (wp *Pool) setup() {
861         wp.creating = map[string]createCall{}
862         wp.exited = map[string]time.Time{}
863         wp.workers = map[cloud.InstanceID]*worker{}
864         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
865         wp.loadRunnerData()
866 }
867
868 // Load the runner program to be deployed on worker nodes into
869 // wp.runnerData, if necessary. Errors are logged.
870 //
871 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
872 //
873 // Caller must not have lock.
874 func (wp *Pool) loadRunnerData() error {
875         wp.mtx.Lock()
876         defer wp.mtx.Unlock()
877         if wp.runnerData != nil {
878                 return nil
879         } else if wp.runnerSource == "" {
880                 wp.runnerCmd = "crunch-run"
881                 wp.runnerData = []byte{}
882                 return nil
883         }
884         logger := wp.logger.WithField("source", wp.runnerSource)
885         logger.Debug("loading runner")
886         buf, err := ioutil.ReadFile(wp.runnerSource)
887         if err != nil {
888                 logger.WithError(err).Error("failed to load runner program")
889                 return err
890         }
891         wp.runnerData = buf
892         wp.runnerMD5 = md5.Sum(buf)
893         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
894         return nil
895 }
896
897 func (wp *Pool) notify() {
898         wp.mtx.RLock()
899         defer wp.mtx.RUnlock()
900         for _, send := range wp.subscribers {
901                 select {
902                 case send <- struct{}{}:
903                 default:
904                 }
905         }
906 }
907
908 func (wp *Pool) getInstancesAndSync() error {
909         wp.setupOnce.Do(wp.setup)
910         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
911                 return err
912         }
913         wp.logger.Debug("getting instance list")
914         threshold := time.Now()
915         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
916         if err != nil {
917                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
918                 return err
919         }
920         wp.sync(threshold, instances)
921         wp.logger.Debug("sync done")
922         return nil
923 }
924
925 // Add/remove/update workers based on instances, which was obtained
926 // from the instanceSet. However, don't clobber any other updates that
927 // already happened after threshold.
928 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
929         wp.mtx.Lock()
930         defer wp.mtx.Unlock()
931         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
932         notify := false
933
934         for _, inst := range instances {
935                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
936                 it, ok := wp.instanceTypes[itTag]
937                 if !ok {
938                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
939                         continue
940                 }
941                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
942                         notify = true
943                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
944                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
945                         wkr.shutdown()
946                 }
947         }
948
949         for id, wkr := range wp.workers {
950                 if wkr.updated.After(threshold) {
951                         continue
952                 }
953                 logger := wp.logger.WithFields(logrus.Fields{
954                         "Instance":    wkr.instance.ID(),
955                         "WorkerState": wkr.state,
956                 })
957                 logger.Info("instance disappeared in cloud")
958                 wkr.reportBootOutcome(BootOutcomeDisappeared)
959                 if wp.mDisappearances != nil {
960                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
961                 }
962                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
963                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
964                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
965                 }
966                 delete(wp.workers, id)
967                 go wkr.Close()
968                 notify = true
969         }
970
971         if !wp.loaded {
972                 notify = true
973                 wp.loaded = true
974                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
975         }
976
977         if notify {
978                 go wp.notify()
979         }
980 }
981
982 func (wp *Pool) waitUntilLoaded() {
983         ch := wp.Subscribe()
984         wp.mtx.RLock()
985         defer wp.mtx.RUnlock()
986         for !wp.loaded {
987                 wp.mtx.RUnlock()
988                 <-ch
989                 wp.mtx.RLock()
990         }
991 }
992
993 // Return a random string of n hexadecimal digits (n*4 random bits). n
994 // must be even.
995 func randomHex(n int) string {
996         buf := make([]byte, n/2)
997         _, err := rand.Read(buf)
998         if err != nil {
999                 panic(err)
1000         }
1001         return fmt.Sprintf("%x", buf)
1002 }