Merge branch '15370-loopback-dispatchcloud'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time between "X failed because rate limiting" messages
86         logRateLimitErrorInterval = time.Second * 10
87 )
88
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90         if conf > 0 {
91                 return time.Duration(conf)
92         }
93         return def
94 }
95
96 // NewPool creates a Pool of workers backed by instanceSet.
97 //
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
101         wp := &Pool{
102                 logger:                         logger,
103                 arvClient:                      arvClient,
104                 instanceSetID:                  instanceSetID,
105                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
106                 newExecutor:                    newExecutor,
107                 cluster:                        cluster,
108                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
109                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
110                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
111                 instanceTypes:                  cluster.InstanceTypes,
112                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
113                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
114                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
115                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
116                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
117                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
118                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
119                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
120                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
121                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
122                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
123                 systemRootToken:                cluster.SystemRootToken,
124                 installPublicKey:               installPublicKey,
125                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
126                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
127                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
128                 stop:                           make(chan bool),
129         }
130         wp.registerMetrics(reg)
131         go func() {
132                 wp.setupOnce.Do(wp.setup)
133                 go wp.runMetrics()
134                 go wp.runProbes()
135                 go wp.runSync()
136         }()
137         return wp
138 }
139
140 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
141 // zero Pool should not be used. Call NewPool to create a new Pool.
142 type Pool struct {
143         // configuration
144         logger                         logrus.FieldLogger
145         arvClient                      *arvados.Client
146         instanceSetID                  cloud.InstanceSetID
147         instanceSet                    *throttledInstanceSet
148         newExecutor                    func(cloud.Instance) Executor
149         cluster                        *arvados.Cluster
150         bootProbeCommand               string
151         runnerSource                   string
152         imageID                        cloud.ImageID
153         instanceTypes                  map[string]arvados.InstanceType
154         syncInterval                   time.Duration
155         probeInterval                  time.Duration
156         maxProbesPerSecond             int
157         maxConcurrentInstanceCreateOps int
158         timeoutIdle                    time.Duration
159         timeoutBooting                 time.Duration
160         timeoutProbe                   time.Duration
161         timeoutShutdown                time.Duration
162         timeoutTERM                    time.Duration
163         timeoutSignal                  time.Duration
164         timeoutStaleRunLock            time.Duration
165         systemRootToken                string
166         installPublicKey               ssh.PublicKey
167         tagKeyPrefix                   string
168         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
169         runnerArgs                     []string // extra args passed to crunch-run
170
171         // private state
172         subscribers  map[<-chan struct{}]chan<- struct{}
173         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
174         workers      map[cloud.InstanceID]*worker
175         loaded       bool                 // loaded list of instances from InstanceSet at least once
176         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
177         atQuotaUntil time.Time
178         atQuotaErr   cloud.QuotaError
179         stop         chan bool
180         mtx          sync.RWMutex
181         setupOnce    sync.Once
182         runnerData   []byte
183         runnerMD5    [md5.Size]byte
184         runnerCmd    string
185
186         mContainersRunning        prometheus.Gauge
187         mInstances                *prometheus.GaugeVec
188         mInstancesPrice           *prometheus.GaugeVec
189         mVCPUs                    *prometheus.GaugeVec
190         mMemory                   *prometheus.GaugeVec
191         mBootOutcomes             *prometheus.CounterVec
192         mDisappearances           *prometheus.CounterVec
193         mTimeToSSH                prometheus.Summary
194         mTimeToReadyForContainer  prometheus.Summary
195         mTimeFromShutdownToGone   prometheus.Summary
196         mTimeFromQueueToCrunchRun prometheus.Summary
197         mRunProbeDuration         *prometheus.SummaryVec
198 }
199
200 type createCall struct {
201         time         time.Time
202         instanceType arvados.InstanceType
203 }
204
205 func (wp *Pool) CheckHealth() error {
206         wp.setupOnce.Do(wp.setup)
207         if err := wp.loadRunnerData(); err != nil {
208                 return fmt.Errorf("error loading runner binary: %s", err)
209         }
210         return nil
211 }
212
213 // Subscribe returns a buffered channel that becomes ready after any
214 // change to the pool's state that could have scheduling implications:
215 // a worker's state changes, a new worker appears, the cloud
216 // provider's API rate limiting period ends, etc.
217 //
218 // Additional events that occur while the channel is already ready
219 // will be dropped, so it is OK if the caller services the channel
220 // slowly.
221 //
222 // Example:
223 //
224 //      ch := wp.Subscribe()
225 //      defer wp.Unsubscribe(ch)
226 //      for range ch {
227 //              tryScheduling(wp)
228 //              if done {
229 //                      break
230 //              }
231 //      }
232 func (wp *Pool) Subscribe() <-chan struct{} {
233         wp.setupOnce.Do(wp.setup)
234         wp.mtx.Lock()
235         defer wp.mtx.Unlock()
236         ch := make(chan struct{}, 1)
237         wp.subscribers[ch] = ch
238         return ch
239 }
240
241 // Unsubscribe stops sending updates to the given channel.
242 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
243         wp.setupOnce.Do(wp.setup)
244         wp.mtx.Lock()
245         defer wp.mtx.Unlock()
246         delete(wp.subscribers, ch)
247 }
248
249 // Unallocated returns the number of unallocated (creating + booting +
250 // idle + unknown) workers for each instance type.  Workers in
251 // hold/drain mode are not included.
252 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
253         wp.setupOnce.Do(wp.setup)
254         wp.mtx.RLock()
255         defer wp.mtx.RUnlock()
256         unalloc := map[arvados.InstanceType]int{}
257         creating := map[arvados.InstanceType]int{}
258         oldestCreate := map[arvados.InstanceType]time.Time{}
259         for _, cc := range wp.creating {
260                 it := cc.instanceType
261                 creating[it]++
262                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
263                         oldestCreate[it] = cc.time
264                 }
265         }
266         for _, wkr := range wp.workers {
267                 // Skip workers that are not expected to become
268                 // available soon. Note len(wkr.running)>0 is not
269                 // redundant here: it can be true even in
270                 // StateUnknown.
271                 if wkr.state == StateShutdown ||
272                         wkr.state == StateRunning ||
273                         wkr.idleBehavior != IdleBehaviorRun ||
274                         len(wkr.running) > 0 {
275                         continue
276                 }
277                 it := wkr.instType
278                 unalloc[it]++
279                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
280                         // If up to N new workers appear in
281                         // Instances() while we are waiting for N
282                         // Create() calls to complete, we assume we're
283                         // just seeing a race between Instances() and
284                         // Create() responses.
285                         //
286                         // The other common reason why nodes have
287                         // state==Unknown is that they appeared at
288                         // startup, before any Create calls. They
289                         // don't match the above timing condition, so
290                         // we never mistakenly attribute them to
291                         // pending Create calls.
292                         creating[it]--
293                 }
294         }
295         for it, c := range creating {
296                 unalloc[it] += c
297         }
298         return unalloc
299 }
300
301 // Create a new instance with the given type, and add it to the worker
302 // pool. The worker is added immediately; instance creation runs in
303 // the background.
304 //
305 // Create returns false if a pre-existing error state prevents it from
306 // even attempting to create a new instance. Those errors are logged
307 // by the Pool, so the caller does not need to log anything in such
308 // cases.
309 func (wp *Pool) Create(it arvados.InstanceType) bool {
310         logger := wp.logger.WithField("InstanceType", it.Name)
311         wp.setupOnce.Do(wp.setup)
312         if wp.loadRunnerData() != nil {
313                 // Boot probe is certain to fail.
314                 return false
315         }
316         wp.mtx.Lock()
317         defer wp.mtx.Unlock()
318         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
319                 return false
320         }
321         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
322         // requests in flight. It was added to work around a limitation in Azure's
323         // managed disks, which support no more than 20 concurrent node creation
324         // requests from a single disk image (cf.
325         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
326         // The code assumes that node creation, from Azure's perspective, means the
327         // period until the instance appears in the "get all instances" list.
328         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
329                 logger.Info("reached MaxConcurrentInstanceCreateOps")
330                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
331                 return false
332         }
333         now := time.Now()
334         secret := randomHex(instanceSecretLength)
335         wp.creating[secret] = createCall{time: now, instanceType: it}
336         go func() {
337                 defer wp.notify()
338                 tags := cloud.InstanceTags{
339                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
340                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
341                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
342                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
343                 }
344                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
345                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
346                 wp.mtx.Lock()
347                 defer wp.mtx.Unlock()
348                 // delete() is deferred so the updateWorker() call
349                 // below knows to use StateBooting when adding a new
350                 // worker.
351                 defer delete(wp.creating, secret)
352                 if err != nil {
353                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
354                                 wp.atQuotaErr = err
355                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
356                                 time.AfterFunc(quotaErrorTTL, wp.notify)
357                         }
358                         logger.WithError(err).Error("create failed")
359                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
360                         return
361                 }
362                 wp.updateWorker(inst, it)
363         }()
364         return true
365 }
366
367 // AtQuota returns true if Create is not expected to work at the
368 // moment.
369 func (wp *Pool) AtQuota() bool {
370         wp.mtx.Lock()
371         defer wp.mtx.Unlock()
372         return time.Now().Before(wp.atQuotaUntil)
373 }
374
375 // SetIdleBehavior determines how the indicated instance will behave
376 // when it has no containers running.
377 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
378         wp.mtx.Lock()
379         defer wp.mtx.Unlock()
380         wkr, ok := wp.workers[id]
381         if !ok {
382                 return errors.New("requested instance does not exist")
383         }
384         wkr.setIdleBehavior(idleBehavior)
385         return nil
386 }
387
388 // Successful connection to the SSH daemon, update the mTimeToSSH metric
389 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
390         wp.mtx.Lock()
391         defer wp.mtx.Unlock()
392         wkr := wp.workers[inst.ID()]
393         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
394                 // the node is not in booting state (can happen if a-d-c is restarted) OR
395                 // this is not the first SSH connection
396                 return
397         }
398
399         wkr.firstSSHConnection = time.Now()
400         if wp.mTimeToSSH != nil {
401                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
402         }
403 }
404
405 // Add or update worker attached to the given instance.
406 //
407 // The second return value is true if a new worker is created.
408 //
409 // A newly added instance has state=StateBooting if its tags match an
410 // entry in wp.creating, otherwise StateUnknown.
411 //
412 // Caller must have lock.
413 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
414         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
415         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
416         id := inst.ID()
417         if wkr := wp.workers[id]; wkr != nil {
418                 wkr.executor.SetTarget(inst)
419                 wkr.instance = inst
420                 wkr.updated = time.Now()
421                 wkr.saveTags()
422                 return wkr, false
423         }
424
425         state := StateUnknown
426         if _, ok := wp.creating[secret]; ok {
427                 state = StateBooting
428         }
429
430         // If an instance has a valid IdleBehavior tag when it first
431         // appears, initialize the new worker accordingly (this is how
432         // we restore IdleBehavior that was set by a prior dispatch
433         // process); otherwise, default to "run". After this,
434         // wkr.idleBehavior is the source of truth, and will only be
435         // changed via SetIdleBehavior().
436         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
437         if !validIdleBehavior[idleBehavior] {
438                 idleBehavior = IdleBehaviorRun
439         }
440
441         logger := wp.logger.WithFields(logrus.Fields{
442                 "InstanceType": it.Name,
443                 "Instance":     inst.ID(),
444                 "Address":      inst.Address(),
445         })
446         logger.WithFields(logrus.Fields{
447                 "State":        state,
448                 "IdleBehavior": idleBehavior,
449         }).Infof("instance appeared in cloud")
450         now := time.Now()
451         wkr := &worker{
452                 mtx:          &wp.mtx,
453                 wp:           wp,
454                 logger:       logger,
455                 executor:     wp.newExecutor(inst),
456                 state:        state,
457                 idleBehavior: idleBehavior,
458                 instance:     inst,
459                 instType:     it,
460                 appeared:     now,
461                 probed:       now,
462                 busy:         now,
463                 updated:      now,
464                 running:      make(map[string]*remoteRunner),
465                 starting:     make(map[string]*remoteRunner),
466                 probing:      make(chan struct{}, 1),
467         }
468         wp.workers[id] = wkr
469         return wkr, true
470 }
471
472 // Shutdown shuts down a worker with the given type, or returns false
473 // if all workers with the given type are busy.
474 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
475         wp.setupOnce.Do(wp.setup)
476         wp.mtx.Lock()
477         defer wp.mtx.Unlock()
478         logger := wp.logger.WithField("InstanceType", it.Name)
479         logger.Info("shutdown requested")
480         for _, tryState := range []State{StateBooting, StateIdle} {
481                 // TODO: shutdown the worker with the longest idle
482                 // time (Idle) or the earliest create time (Booting)
483                 for _, wkr := range wp.workers {
484                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
485                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
486                                 wkr.reportBootOutcome(BootOutcomeAborted)
487                                 wkr.shutdown()
488                                 return true
489                         }
490                 }
491         }
492         return false
493 }
494
495 // CountWorkers returns the current number of workers in each state.
496 //
497 // CountWorkers blocks, if necessary, until the initial instance list
498 // has been loaded from the cloud provider.
499 func (wp *Pool) CountWorkers() map[State]int {
500         wp.setupOnce.Do(wp.setup)
501         wp.waitUntilLoaded()
502         wp.mtx.Lock()
503         defer wp.mtx.Unlock()
504         r := map[State]int{}
505         for _, w := range wp.workers {
506                 r[w.state]++
507         }
508         return r
509 }
510
511 // Running returns the container UUIDs being prepared/run on workers.
512 //
513 // In the returned map, the time value indicates when the Pool
514 // observed that the container process had exited. A container that
515 // has not yet exited has a zero time value. The caller should use
516 // ForgetContainer() to garbage-collect the entries for exited
517 // containers.
518 func (wp *Pool) Running() map[string]time.Time {
519         wp.setupOnce.Do(wp.setup)
520         wp.mtx.Lock()
521         defer wp.mtx.Unlock()
522         r := map[string]time.Time{}
523         for _, wkr := range wp.workers {
524                 for uuid := range wkr.running {
525                         r[uuid] = time.Time{}
526                 }
527                 for uuid := range wkr.starting {
528                         r[uuid] = time.Time{}
529                 }
530         }
531         for uuid, exited := range wp.exited {
532                 r[uuid] = exited
533         }
534         return r
535 }
536
537 // StartContainer starts a container on an idle worker immediately if
538 // possible, otherwise returns false.
539 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
540         wp.setupOnce.Do(wp.setup)
541         wp.mtx.Lock()
542         defer wp.mtx.Unlock()
543         var wkr *worker
544         for _, w := range wp.workers {
545                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
546                         if wkr == nil || w.busy.After(wkr.busy) {
547                                 wkr = w
548                         }
549                 }
550         }
551         if wkr == nil {
552                 return false
553         }
554         wkr.startContainer(ctr)
555         return true
556 }
557
558 // KillContainer kills the crunch-run process for the given container
559 // UUID, if it's running on any worker.
560 //
561 // KillContainer returns immediately; the act of killing the container
562 // takes some time, and runs in the background.
563 //
564 // KillContainer returns false if the container has already ended.
565 func (wp *Pool) KillContainer(uuid string, reason string) bool {
566         wp.mtx.Lock()
567         defer wp.mtx.Unlock()
568         logger := wp.logger.WithFields(logrus.Fields{
569                 "ContainerUUID": uuid,
570                 "Reason":        reason,
571         })
572         for _, wkr := range wp.workers {
573                 rr := wkr.running[uuid]
574                 if rr == nil {
575                         rr = wkr.starting[uuid]
576                 }
577                 if rr != nil {
578                         rr.Kill(reason)
579                         return true
580                 }
581         }
582         logger.Debug("cannot kill: already disappeared")
583         return false
584 }
585
586 // ForgetContainer clears the placeholder for the given exited
587 // container, so it isn't returned by subsequent calls to Running().
588 //
589 // ForgetContainer has no effect if the container has not yet exited.
590 //
591 // The "container exited at time T" placeholder (which necessitates
592 // ForgetContainer) exists to make it easier for the caller
593 // (scheduler) to distinguish a container that exited without
594 // finalizing its state from a container that exited too recently for
595 // its final state to have appeared in the scheduler's queue cache.
596 func (wp *Pool) ForgetContainer(uuid string) {
597         wp.mtx.Lock()
598         defer wp.mtx.Unlock()
599         if _, ok := wp.exited[uuid]; ok {
600                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
601                 delete(wp.exited, uuid)
602         }
603 }
604
605 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
606         if reg == nil {
607                 reg = prometheus.NewRegistry()
608         }
609         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
610                 Namespace: "arvados",
611                 Subsystem: "dispatchcloud",
612                 Name:      "containers_running",
613                 Help:      "Number of containers reported running by cloud VMs.",
614         })
615         reg.MustRegister(wp.mContainersRunning)
616         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
617                 Namespace: "arvados",
618                 Subsystem: "dispatchcloud",
619                 Name:      "instances_total",
620                 Help:      "Number of cloud VMs.",
621         }, []string{"category", "instance_type"})
622         reg.MustRegister(wp.mInstances)
623         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
624                 Namespace: "arvados",
625                 Subsystem: "dispatchcloud",
626                 Name:      "instances_price",
627                 Help:      "Price of cloud VMs.",
628         }, []string{"category"})
629         reg.MustRegister(wp.mInstancesPrice)
630         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
631                 Namespace: "arvados",
632                 Subsystem: "dispatchcloud",
633                 Name:      "vcpus_total",
634                 Help:      "Total VCPUs on all cloud VMs.",
635         }, []string{"category"})
636         reg.MustRegister(wp.mVCPUs)
637         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
638                 Namespace: "arvados",
639                 Subsystem: "dispatchcloud",
640                 Name:      "memory_bytes_total",
641                 Help:      "Total memory on all cloud VMs.",
642         }, []string{"category"})
643         reg.MustRegister(wp.mMemory)
644         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
645                 Namespace: "arvados",
646                 Subsystem: "dispatchcloud",
647                 Name:      "boot_outcomes",
648                 Help:      "Boot outcomes by type.",
649         }, []string{"outcome"})
650         for k := range validBootOutcomes {
651                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
652         }
653         reg.MustRegister(wp.mBootOutcomes)
654         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
655                 Namespace: "arvados",
656                 Subsystem: "dispatchcloud",
657                 Name:      "instances_disappeared",
658                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
659         }, []string{"state"})
660         for _, v := range stateString {
661                 wp.mDisappearances.WithLabelValues(v).Add(0)
662         }
663         reg.MustRegister(wp.mDisappearances)
664         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
665                 Namespace:  "arvados",
666                 Subsystem:  "dispatchcloud",
667                 Name:       "instances_time_to_ssh_seconds",
668                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
669                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
670         })
671         reg.MustRegister(wp.mTimeToSSH)
672         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
673                 Namespace:  "arvados",
674                 Subsystem:  "dispatchcloud",
675                 Name:       "instances_time_to_ready_for_container_seconds",
676                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
677                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
678         })
679         reg.MustRegister(wp.mTimeToReadyForContainer)
680         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
681                 Namespace:  "arvados",
682                 Subsystem:  "dispatchcloud",
683                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
684                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
685                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
686         })
687         reg.MustRegister(wp.mTimeFromShutdownToGone)
688         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
689                 Namespace:  "arvados",
690                 Subsystem:  "dispatchcloud",
691                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
692                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
693                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
694         })
695         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
696         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
697                 Namespace:  "arvados",
698                 Subsystem:  "dispatchcloud",
699                 Name:       "instances_run_probe_duration_seconds",
700                 Help:       "Number of seconds per runProbe call.",
701                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
702         }, []string{"outcome"})
703         reg.MustRegister(wp.mRunProbeDuration)
704 }
705
706 func (wp *Pool) runMetrics() {
707         ch := wp.Subscribe()
708         defer wp.Unsubscribe(ch)
709         wp.updateMetrics()
710         for range ch {
711                 wp.updateMetrics()
712         }
713 }
714
715 func (wp *Pool) updateMetrics() {
716         wp.mtx.RLock()
717         defer wp.mtx.RUnlock()
718
719         type entKey struct {
720                 cat      string
721                 instType string
722         }
723         instances := map[entKey]int64{}
724         price := map[string]float64{}
725         cpu := map[string]int64{}
726         mem := map[string]int64{}
727         var running int64
728         for _, wkr := range wp.workers {
729                 var cat string
730                 switch {
731                 case len(wkr.running)+len(wkr.starting) > 0:
732                         cat = "inuse"
733                 case wkr.idleBehavior == IdleBehaviorHold:
734                         cat = "hold"
735                 case wkr.state == StateBooting:
736                         cat = "booting"
737                 case wkr.state == StateUnknown:
738                         cat = "unknown"
739                 default:
740                         cat = "idle"
741                 }
742                 instances[entKey{cat, wkr.instType.Name}]++
743                 price[cat] += wkr.instType.Price
744                 cpu[cat] += int64(wkr.instType.VCPUs)
745                 mem[cat] += int64(wkr.instType.RAM)
746                 running += int64(len(wkr.running) + len(wkr.starting))
747         }
748         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
749                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
750                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
751                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
752                 // make sure to reset gauges for non-existing category/nodetype combinations
753                 for _, it := range wp.instanceTypes {
754                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
755                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
756                         }
757                 }
758         }
759         for k, v := range instances {
760                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
761         }
762         wp.mContainersRunning.Set(float64(running))
763 }
764
765 func (wp *Pool) runProbes() {
766         maxPPS := wp.maxProbesPerSecond
767         if maxPPS < 1 {
768                 maxPPS = defaultMaxProbesPerSecond
769         }
770         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
771         defer limitticker.Stop()
772
773         probeticker := time.NewTicker(wp.probeInterval)
774         defer probeticker.Stop()
775
776         workers := []cloud.InstanceID{}
777         for range probeticker.C {
778                 // Add some jitter. Without this, if probeInterval is
779                 // a multiple of syncInterval and sync is
780                 // instantaneous (as with the loopback driver), the
781                 // first few probes race with sync operations and
782                 // don't update the workers.
783                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
784
785                 workers = workers[:0]
786                 wp.mtx.Lock()
787                 for id, wkr := range wp.workers {
788                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
789                                 continue
790                         }
791                         workers = append(workers, id)
792                 }
793                 wp.mtx.Unlock()
794
795                 for _, id := range workers {
796                         wp.mtx.Lock()
797                         wkr, ok := wp.workers[id]
798                         wp.mtx.Unlock()
799                         if !ok {
800                                 // Deleted while we were probing
801                                 // others
802                                 continue
803                         }
804                         go wkr.ProbeAndUpdate()
805                         select {
806                         case <-wp.stop:
807                                 return
808                         case <-limitticker.C:
809                         }
810                 }
811         }
812 }
813
814 func (wp *Pool) runSync() {
815         // sync once immediately, then wait syncInterval, sync again,
816         // etc.
817         timer := time.NewTimer(1)
818         for {
819                 select {
820                 case <-timer.C:
821                         err := wp.getInstancesAndSync()
822                         if err != nil {
823                                 wp.logger.WithError(err).Warn("sync failed")
824                         }
825                         timer.Reset(wp.syncInterval)
826                 case <-wp.stop:
827                         wp.logger.Debug("worker.Pool stopped")
828                         return
829                 }
830         }
831 }
832
833 // Stop synchronizing with the InstanceSet.
834 func (wp *Pool) Stop() {
835         wp.setupOnce.Do(wp.setup)
836         close(wp.stop)
837 }
838
839 // Instances returns an InstanceView for each worker in the pool,
840 // summarizing its current state and recent activity.
841 func (wp *Pool) Instances() []InstanceView {
842         var r []InstanceView
843         wp.setupOnce.Do(wp.setup)
844         wp.mtx.Lock()
845         for _, w := range wp.workers {
846                 r = append(r, InstanceView{
847                         Instance:             w.instance.ID(),
848                         Address:              w.instance.Address(),
849                         Price:                w.instType.Price,
850                         ArvadosInstanceType:  w.instType.Name,
851                         ProviderInstanceType: w.instType.ProviderType,
852                         LastContainerUUID:    w.lastUUID,
853                         LastBusy:             w.busy,
854                         WorkerState:          w.state.String(),
855                         IdleBehavior:         w.idleBehavior,
856                 })
857         }
858         wp.mtx.Unlock()
859         sort.Slice(r, func(i, j int) bool {
860                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
861         })
862         return r
863 }
864
865 // KillInstance destroys a cloud VM instance. It returns an error if
866 // the given instance does not exist.
867 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
868         wkr, ok := wp.workers[id]
869         if !ok {
870                 return errors.New("instance not found")
871         }
872         wkr.logger.WithField("Reason", reason).Info("shutting down")
873         wkr.reportBootOutcome(BootOutcomeAborted)
874         wkr.shutdown()
875         return nil
876 }
877
878 func (wp *Pool) setup() {
879         wp.creating = map[string]createCall{}
880         wp.exited = map[string]time.Time{}
881         wp.workers = map[cloud.InstanceID]*worker{}
882         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
883         wp.loadRunnerData()
884 }
885
886 // Load the runner program to be deployed on worker nodes into
887 // wp.runnerData, if necessary. Errors are logged.
888 //
889 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
890 //
891 // Caller must not have lock.
892 func (wp *Pool) loadRunnerData() error {
893         wp.mtx.Lock()
894         defer wp.mtx.Unlock()
895         if wp.runnerData != nil {
896                 return nil
897         } else if wp.runnerSource == "" {
898                 wp.runnerCmd = wp.runnerCmdDefault
899                 wp.runnerData = []byte{}
900                 return nil
901         }
902         logger := wp.logger.WithField("source", wp.runnerSource)
903         logger.Debug("loading runner")
904         buf, err := ioutil.ReadFile(wp.runnerSource)
905         if err != nil {
906                 logger.WithError(err).Error("failed to load runner program")
907                 return err
908         }
909         wp.runnerData = buf
910         wp.runnerMD5 = md5.Sum(buf)
911         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
912         return nil
913 }
914
915 func (wp *Pool) notify() {
916         wp.mtx.RLock()
917         defer wp.mtx.RUnlock()
918         for _, send := range wp.subscribers {
919                 select {
920                 case send <- struct{}{}:
921                 default:
922                 }
923         }
924 }
925
926 func (wp *Pool) getInstancesAndSync() error {
927         wp.setupOnce.Do(wp.setup)
928         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
929                 return err
930         }
931         wp.logger.Debug("getting instance list")
932         threshold := time.Now()
933         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
934         if err != nil {
935                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
936                 return err
937         }
938         wp.sync(threshold, instances)
939         wp.logger.Debug("sync done")
940         return nil
941 }
942
943 // Add/remove/update workers based on instances, which was obtained
944 // from the instanceSet. However, don't clobber any other updates that
945 // already happened after threshold.
946 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
947         wp.mtx.Lock()
948         defer wp.mtx.Unlock()
949         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
950         notify := false
951
952         for _, inst := range instances {
953                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
954                 it, ok := wp.instanceTypes[itTag]
955                 if !ok {
956                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
957                         continue
958                 }
959                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
960                         notify = true
961                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
962                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
963                         wkr.shutdown()
964                 }
965         }
966
967         for id, wkr := range wp.workers {
968                 if wkr.updated.After(threshold) {
969                         continue
970                 }
971                 logger := wp.logger.WithFields(logrus.Fields{
972                         "Instance":    wkr.instance.ID(),
973                         "WorkerState": wkr.state,
974                 })
975                 logger.Info("instance disappeared in cloud")
976                 wkr.reportBootOutcome(BootOutcomeDisappeared)
977                 if wp.mDisappearances != nil {
978                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
979                 }
980                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
981                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
982                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
983                 }
984                 delete(wp.workers, id)
985                 go wkr.Close()
986                 notify = true
987         }
988
989         if !wp.loaded {
990                 notify = true
991                 wp.loaded = true
992                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
993         }
994
995         if notify {
996                 go wp.notify()
997         }
998 }
999
1000 func (wp *Pool) waitUntilLoaded() {
1001         ch := wp.Subscribe()
1002         wp.mtx.RLock()
1003         defer wp.mtx.RUnlock()
1004         for !wp.loaded {
1005                 wp.mtx.RUnlock()
1006                 <-ch
1007                 wp.mtx.RLock()
1008         }
1009 }
1010
1011 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1012         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1013         fmt.Fprint(h, uuid)
1014         return fmt.Sprintf("%x", h.Sum(nil))
1015 }
1016
1017 // Return a random string of n hexadecimal digits (n*4 random bits). n
1018 // must be even.
1019 func randomHex(n int) string {
1020         buf := make([]byte, n/2)
1021         _, err := rand.Read(buf)
1022         if err != nil {
1023                 panic(err)
1024         }
1025         return fmt.Sprintf("%x", buf)
1026 }