19961: Merge branch 'main'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time between "X failed because rate limiting" messages
86         logRateLimitErrorInterval = time.Second * 10
87 )
88
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90         if conf > 0 {
91                 return time.Duration(conf)
92         }
93         return def
94 }
95
96 // NewPool creates a Pool of workers backed by instanceSet.
97 //
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
101         wp := &Pool{
102                 logger:                         logger,
103                 arvClient:                      arvClient,
104                 instanceSetID:                  instanceSetID,
105                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
106                 newExecutor:                    newExecutor,
107                 cluster:                        cluster,
108                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
109                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
110                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
111                 instanceTypes:                  cluster.InstanceTypes,
112                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
113                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
114                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
115                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
116                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
117                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
118                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
119                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
120                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
121                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
122                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
123                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
124                 systemRootToken:                cluster.SystemRootToken,
125                 installPublicKey:               installPublicKey,
126                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
127                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
128                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
129                 stop:                           make(chan bool),
130         }
131         wp.registerMetrics(reg)
132         go func() {
133                 wp.setupOnce.Do(wp.setup)
134                 go wp.runMetrics()
135                 go wp.runProbes()
136                 go wp.runSync()
137         }()
138         return wp
139 }
140
141 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
142 // zero Pool should not be used. Call NewPool to create a new Pool.
143 type Pool struct {
144         // configuration
145         logger                         logrus.FieldLogger
146         arvClient                      *arvados.Client
147         instanceSetID                  cloud.InstanceSetID
148         instanceSet                    *throttledInstanceSet
149         newExecutor                    func(cloud.Instance) Executor
150         cluster                        *arvados.Cluster
151         bootProbeCommand               string
152         runnerSource                   string
153         imageID                        cloud.ImageID
154         instanceTypes                  map[string]arvados.InstanceType
155         syncInterval                   time.Duration
156         probeInterval                  time.Duration
157         maxProbesPerSecond             int
158         maxConcurrentInstanceCreateOps int
159         maxInstances                   int
160         timeoutIdle                    time.Duration
161         timeoutBooting                 time.Duration
162         timeoutProbe                   time.Duration
163         timeoutShutdown                time.Duration
164         timeoutTERM                    time.Duration
165         timeoutSignal                  time.Duration
166         timeoutStaleRunLock            time.Duration
167         systemRootToken                string
168         installPublicKey               ssh.PublicKey
169         tagKeyPrefix                   string
170         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
171         runnerArgs                     []string // extra args passed to crunch-run
172
173         // private state
174         subscribers  map[<-chan struct{}]chan<- struct{}
175         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
176         workers      map[cloud.InstanceID]*worker
177         loaded       bool                 // loaded list of instances from InstanceSet at least once
178         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
179         atQuotaUntil time.Time
180         atQuotaErr   cloud.QuotaError
181         stop         chan bool
182         mtx          sync.RWMutex
183         setupOnce    sync.Once
184         runnerData   []byte
185         runnerMD5    [md5.Size]byte
186         runnerCmd    string
187
188         mContainersRunning        prometheus.Gauge
189         mInstances                *prometheus.GaugeVec
190         mInstancesPrice           *prometheus.GaugeVec
191         mVCPUs                    *prometheus.GaugeVec
192         mMemory                   *prometheus.GaugeVec
193         mBootOutcomes             *prometheus.CounterVec
194         mDisappearances           *prometheus.CounterVec
195         mTimeToSSH                prometheus.Summary
196         mTimeToReadyForContainer  prometheus.Summary
197         mTimeFromShutdownToGone   prometheus.Summary
198         mTimeFromQueueToCrunchRun prometheus.Summary
199         mRunProbeDuration         *prometheus.SummaryVec
200 }
201
202 type createCall struct {
203         time         time.Time
204         instanceType arvados.InstanceType
205 }
206
207 func (wp *Pool) CheckHealth() error {
208         wp.setupOnce.Do(wp.setup)
209         if err := wp.loadRunnerData(); err != nil {
210                 return fmt.Errorf("error loading runner binary: %s", err)
211         }
212         return nil
213 }
214
215 // Subscribe returns a buffered channel that becomes ready after any
216 // change to the pool's state that could have scheduling implications:
217 // a worker's state changes, a new worker appears, the cloud
218 // provider's API rate limiting period ends, etc.
219 //
220 // Additional events that occur while the channel is already ready
221 // will be dropped, so it is OK if the caller services the channel
222 // slowly.
223 //
224 // Example:
225 //
226 //      ch := wp.Subscribe()
227 //      defer wp.Unsubscribe(ch)
228 //      for range ch {
229 //              tryScheduling(wp)
230 //              if done {
231 //                      break
232 //              }
233 //      }
234 func (wp *Pool) Subscribe() <-chan struct{} {
235         wp.setupOnce.Do(wp.setup)
236         wp.mtx.Lock()
237         defer wp.mtx.Unlock()
238         ch := make(chan struct{}, 1)
239         wp.subscribers[ch] = ch
240         return ch
241 }
242
243 // Unsubscribe stops sending updates to the given channel.
244 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
245         wp.setupOnce.Do(wp.setup)
246         wp.mtx.Lock()
247         defer wp.mtx.Unlock()
248         delete(wp.subscribers, ch)
249 }
250
251 // Unallocated returns the number of unallocated (creating + booting +
252 // idle + unknown) workers for each instance type.  Workers in
253 // hold/drain mode are not included.
254 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
255         wp.setupOnce.Do(wp.setup)
256         wp.mtx.RLock()
257         defer wp.mtx.RUnlock()
258         unalloc := map[arvados.InstanceType]int{}
259         creating := map[arvados.InstanceType]int{}
260         oldestCreate := map[arvados.InstanceType]time.Time{}
261         for _, cc := range wp.creating {
262                 it := cc.instanceType
263                 creating[it]++
264                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
265                         oldestCreate[it] = cc.time
266                 }
267         }
268         for _, wkr := range wp.workers {
269                 // Skip workers that are not expected to become
270                 // available soon. Note len(wkr.running)>0 is not
271                 // redundant here: it can be true even in
272                 // StateUnknown.
273                 if wkr.state == StateShutdown ||
274                         wkr.state == StateRunning ||
275                         wkr.idleBehavior != IdleBehaviorRun ||
276                         len(wkr.running) > 0 {
277                         continue
278                 }
279                 it := wkr.instType
280                 unalloc[it]++
281                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
282                         // If up to N new workers appear in
283                         // Instances() while we are waiting for N
284                         // Create() calls to complete, we assume we're
285                         // just seeing a race between Instances() and
286                         // Create() responses.
287                         //
288                         // The other common reason why nodes have
289                         // state==Unknown is that they appeared at
290                         // startup, before any Create calls. They
291                         // don't match the above timing condition, so
292                         // we never mistakenly attribute them to
293                         // pending Create calls.
294                         creating[it]--
295                 }
296         }
297         for it, c := range creating {
298                 unalloc[it] += c
299         }
300         return unalloc
301 }
302
303 // Create a new instance with the given type, and add it to the worker
304 // pool. The worker is added immediately; instance creation runs in
305 // the background.
306 //
307 // Create returns false if a pre-existing error or a configuration
308 // setting prevents it from even attempting to create a new
309 // instance. Those errors are logged by the Pool, so the caller does
310 // not need to log anything in such cases.
311 func (wp *Pool) Create(it arvados.InstanceType) bool {
312         logger := wp.logger.WithField("InstanceType", it.Name)
313         wp.setupOnce.Do(wp.setup)
314         if wp.loadRunnerData() != nil {
315                 // Boot probe is certain to fail.
316                 return false
317         }
318         wp.mtx.Lock()
319         defer wp.mtx.Unlock()
320         if time.Now().Before(wp.atQuotaUntil) ||
321                 wp.instanceSet.throttleCreate.Error() != nil ||
322                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
323                 return false
324         }
325         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
326         // requests in flight. It was added to work around a limitation in Azure's
327         // managed disks, which support no more than 20 concurrent node creation
328         // requests from a single disk image (cf.
329         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
330         // The code assumes that node creation, from Azure's perspective, means the
331         // period until the instance appears in the "get all instances" list.
332         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
333                 logger.Info("reached MaxConcurrentInstanceCreateOps")
334                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
335                 return false
336         }
337         now := time.Now()
338         secret := randomHex(instanceSecretLength)
339         wp.creating[secret] = createCall{time: now, instanceType: it}
340         go func() {
341                 defer wp.notify()
342                 tags := cloud.InstanceTags{
343                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
344                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
345                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
346                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
347                 }
348                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
349                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
350                 wp.mtx.Lock()
351                 defer wp.mtx.Unlock()
352                 // delete() is deferred so the updateWorker() call
353                 // below knows to use StateBooting when adding a new
354                 // worker.
355                 defer delete(wp.creating, secret)
356                 if err != nil {
357                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
358                                 wp.atQuotaErr = err
359                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
360                                 time.AfterFunc(quotaErrorTTL, wp.notify)
361                         }
362                         logger.WithError(err).Error("create failed")
363                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
364                         return
365                 }
366                 wp.updateWorker(inst, it)
367         }()
368         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
369                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
370         }
371         return true
372 }
373
374 // AtQuota returns true if Create is not expected to work at the
375 // moment (e.g., cloud provider has reported quota errors, or we are
376 // already at our own configured quota).
377 func (wp *Pool) AtQuota() bool {
378         wp.mtx.Lock()
379         defer wp.mtx.Unlock()
380         return time.Now().Before(wp.atQuotaUntil) || (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
381 }
382
383 // SetIdleBehavior determines how the indicated instance will behave
384 // when it has no containers running.
385 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
386         wp.mtx.Lock()
387         defer wp.mtx.Unlock()
388         wkr, ok := wp.workers[id]
389         if !ok {
390                 return errors.New("requested instance does not exist")
391         }
392         wkr.setIdleBehavior(idleBehavior)
393         return nil
394 }
395
396 // Successful connection to the SSH daemon, update the mTimeToSSH metric
397 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
398         wp.mtx.Lock()
399         defer wp.mtx.Unlock()
400         wkr := wp.workers[inst.ID()]
401         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
402                 // the node is not in booting state (can happen if a-d-c is restarted) OR
403                 // this is not the first SSH connection
404                 return
405         }
406
407         wkr.firstSSHConnection = time.Now()
408         if wp.mTimeToSSH != nil {
409                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
410         }
411 }
412
413 // Add or update worker attached to the given instance.
414 //
415 // The second return value is true if a new worker is created.
416 //
417 // A newly added instance has state=StateBooting if its tags match an
418 // entry in wp.creating, otherwise StateUnknown.
419 //
420 // Caller must have lock.
421 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
422         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
423         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
424         id := inst.ID()
425         if wkr := wp.workers[id]; wkr != nil {
426                 wkr.executor.SetTarget(inst)
427                 wkr.instance = inst
428                 wkr.updated = time.Now()
429                 wkr.saveTags()
430                 return wkr, false
431         }
432
433         state := StateUnknown
434         if _, ok := wp.creating[secret]; ok {
435                 state = StateBooting
436         }
437
438         // If an instance has a valid IdleBehavior tag when it first
439         // appears, initialize the new worker accordingly (this is how
440         // we restore IdleBehavior that was set by a prior dispatch
441         // process); otherwise, default to "run". After this,
442         // wkr.idleBehavior is the source of truth, and will only be
443         // changed via SetIdleBehavior().
444         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
445         if !validIdleBehavior[idleBehavior] {
446                 idleBehavior = IdleBehaviorRun
447         }
448
449         logger := wp.logger.WithFields(logrus.Fields{
450                 "InstanceType": it.Name,
451                 "Instance":     inst.ID(),
452                 "Address":      inst.Address(),
453         })
454         logger.WithFields(logrus.Fields{
455                 "State":        state,
456                 "IdleBehavior": idleBehavior,
457         }).Infof("instance appeared in cloud")
458         now := time.Now()
459         wkr := &worker{
460                 mtx:          &wp.mtx,
461                 wp:           wp,
462                 logger:       logger,
463                 executor:     wp.newExecutor(inst),
464                 state:        state,
465                 idleBehavior: idleBehavior,
466                 instance:     inst,
467                 instType:     it,
468                 appeared:     now,
469                 probed:       now,
470                 busy:         now,
471                 updated:      now,
472                 running:      make(map[string]*remoteRunner),
473                 starting:     make(map[string]*remoteRunner),
474                 probing:      make(chan struct{}, 1),
475         }
476         wp.workers[id] = wkr
477         return wkr, true
478 }
479
480 // Shutdown shuts down a worker with the given type, or returns false
481 // if all workers with the given type are busy.
482 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
483         wp.setupOnce.Do(wp.setup)
484         wp.mtx.Lock()
485         defer wp.mtx.Unlock()
486         logger := wp.logger.WithField("InstanceType", it.Name)
487         logger.Info("shutdown requested")
488         for _, tryState := range []State{StateBooting, StateIdle} {
489                 // TODO: shutdown the worker with the longest idle
490                 // time (Idle) or the earliest create time (Booting)
491                 for _, wkr := range wp.workers {
492                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
493                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
494                                 wkr.reportBootOutcome(BootOutcomeAborted)
495                                 wkr.shutdown()
496                                 return true
497                         }
498                 }
499         }
500         return false
501 }
502
503 // CountWorkers returns the current number of workers in each state.
504 //
505 // CountWorkers blocks, if necessary, until the initial instance list
506 // has been loaded from the cloud provider.
507 func (wp *Pool) CountWorkers() map[State]int {
508         wp.setupOnce.Do(wp.setup)
509         wp.waitUntilLoaded()
510         wp.mtx.Lock()
511         defer wp.mtx.Unlock()
512         r := map[State]int{}
513         for _, w := range wp.workers {
514                 r[w.state]++
515         }
516         return r
517 }
518
519 // Running returns the container UUIDs being prepared/run on workers.
520 //
521 // In the returned map, the time value indicates when the Pool
522 // observed that the container process had exited. A container that
523 // has not yet exited has a zero time value. The caller should use
524 // ForgetContainer() to garbage-collect the entries for exited
525 // containers.
526 func (wp *Pool) Running() map[string]time.Time {
527         wp.setupOnce.Do(wp.setup)
528         wp.mtx.Lock()
529         defer wp.mtx.Unlock()
530         r := map[string]time.Time{}
531         for _, wkr := range wp.workers {
532                 for uuid := range wkr.running {
533                         r[uuid] = time.Time{}
534                 }
535                 for uuid := range wkr.starting {
536                         r[uuid] = time.Time{}
537                 }
538         }
539         for uuid, exited := range wp.exited {
540                 r[uuid] = exited
541         }
542         return r
543 }
544
545 // StartContainer starts a container on an idle worker immediately if
546 // possible, otherwise returns false.
547 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
548         wp.setupOnce.Do(wp.setup)
549         wp.mtx.Lock()
550         defer wp.mtx.Unlock()
551         var wkr *worker
552         for _, w := range wp.workers {
553                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
554                         if wkr == nil || w.busy.After(wkr.busy) {
555                                 wkr = w
556                         }
557                 }
558         }
559         if wkr == nil {
560                 return false
561         }
562         wkr.startContainer(ctr)
563         return true
564 }
565
566 // KillContainer kills the crunch-run process for the given container
567 // UUID, if it's running on any worker.
568 //
569 // KillContainer returns immediately; the act of killing the container
570 // takes some time, and runs in the background.
571 //
572 // KillContainer returns false if the container has already ended.
573 func (wp *Pool) KillContainer(uuid string, reason string) bool {
574         wp.mtx.Lock()
575         defer wp.mtx.Unlock()
576         logger := wp.logger.WithFields(logrus.Fields{
577                 "ContainerUUID": uuid,
578                 "Reason":        reason,
579         })
580         for _, wkr := range wp.workers {
581                 rr := wkr.running[uuid]
582                 if rr == nil {
583                         rr = wkr.starting[uuid]
584                 }
585                 if rr != nil {
586                         rr.Kill(reason)
587                         return true
588                 }
589         }
590         logger.Debug("cannot kill: already disappeared")
591         return false
592 }
593
594 // ForgetContainer clears the placeholder for the given exited
595 // container, so it isn't returned by subsequent calls to Running().
596 //
597 // ForgetContainer has no effect if the container has not yet exited.
598 //
599 // The "container exited at time T" placeholder (which necessitates
600 // ForgetContainer) exists to make it easier for the caller
601 // (scheduler) to distinguish a container that exited without
602 // finalizing its state from a container that exited too recently for
603 // its final state to have appeared in the scheduler's queue cache.
604 func (wp *Pool) ForgetContainer(uuid string) {
605         wp.mtx.Lock()
606         defer wp.mtx.Unlock()
607         if _, ok := wp.exited[uuid]; ok {
608                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
609                 delete(wp.exited, uuid)
610         }
611 }
612
613 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
614         if reg == nil {
615                 reg = prometheus.NewRegistry()
616         }
617         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
618                 Namespace: "arvados",
619                 Subsystem: "dispatchcloud",
620                 Name:      "containers_running",
621                 Help:      "Number of containers reported running by cloud VMs.",
622         })
623         reg.MustRegister(wp.mContainersRunning)
624         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
625                 Namespace: "arvados",
626                 Subsystem: "dispatchcloud",
627                 Name:      "instances_total",
628                 Help:      "Number of cloud VMs.",
629         }, []string{"category", "instance_type"})
630         reg.MustRegister(wp.mInstances)
631         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
632                 Namespace: "arvados",
633                 Subsystem: "dispatchcloud",
634                 Name:      "instances_price",
635                 Help:      "Price of cloud VMs.",
636         }, []string{"category"})
637         reg.MustRegister(wp.mInstancesPrice)
638         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
639                 Namespace: "arvados",
640                 Subsystem: "dispatchcloud",
641                 Name:      "vcpus_total",
642                 Help:      "Total VCPUs on all cloud VMs.",
643         }, []string{"category"})
644         reg.MustRegister(wp.mVCPUs)
645         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
646                 Namespace: "arvados",
647                 Subsystem: "dispatchcloud",
648                 Name:      "memory_bytes_total",
649                 Help:      "Total memory on all cloud VMs.",
650         }, []string{"category"})
651         reg.MustRegister(wp.mMemory)
652         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
653                 Namespace: "arvados",
654                 Subsystem: "dispatchcloud",
655                 Name:      "boot_outcomes",
656                 Help:      "Boot outcomes by type.",
657         }, []string{"outcome"})
658         for k := range validBootOutcomes {
659                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
660         }
661         reg.MustRegister(wp.mBootOutcomes)
662         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
663                 Namespace: "arvados",
664                 Subsystem: "dispatchcloud",
665                 Name:      "instances_disappeared",
666                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
667         }, []string{"state"})
668         for _, v := range stateString {
669                 wp.mDisappearances.WithLabelValues(v).Add(0)
670         }
671         reg.MustRegister(wp.mDisappearances)
672         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
673                 Namespace:  "arvados",
674                 Subsystem:  "dispatchcloud",
675                 Name:       "instances_time_to_ssh_seconds",
676                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
677                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
678         })
679         reg.MustRegister(wp.mTimeToSSH)
680         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
681                 Namespace:  "arvados",
682                 Subsystem:  "dispatchcloud",
683                 Name:       "instances_time_to_ready_for_container_seconds",
684                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
685                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
686         })
687         reg.MustRegister(wp.mTimeToReadyForContainer)
688         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
689                 Namespace:  "arvados",
690                 Subsystem:  "dispatchcloud",
691                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
692                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
693                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
694         })
695         reg.MustRegister(wp.mTimeFromShutdownToGone)
696         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
697                 Namespace:  "arvados",
698                 Subsystem:  "dispatchcloud",
699                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
700                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
701                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
702         })
703         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
704         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
705                 Namespace:  "arvados",
706                 Subsystem:  "dispatchcloud",
707                 Name:       "instances_run_probe_duration_seconds",
708                 Help:       "Number of seconds per runProbe call.",
709                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
710         }, []string{"outcome"})
711         reg.MustRegister(wp.mRunProbeDuration)
712 }
713
714 func (wp *Pool) runMetrics() {
715         ch := wp.Subscribe()
716         defer wp.Unsubscribe(ch)
717         wp.updateMetrics()
718         for range ch {
719                 wp.updateMetrics()
720         }
721 }
722
723 func (wp *Pool) updateMetrics() {
724         wp.mtx.RLock()
725         defer wp.mtx.RUnlock()
726
727         type entKey struct {
728                 cat      string
729                 instType string
730         }
731         instances := map[entKey]int64{}
732         price := map[string]float64{}
733         cpu := map[string]int64{}
734         mem := map[string]int64{}
735         var running int64
736         for _, wkr := range wp.workers {
737                 var cat string
738                 switch {
739                 case len(wkr.running)+len(wkr.starting) > 0:
740                         cat = "inuse"
741                 case wkr.idleBehavior == IdleBehaviorHold:
742                         cat = "hold"
743                 case wkr.state == StateBooting:
744                         cat = "booting"
745                 case wkr.state == StateUnknown:
746                         cat = "unknown"
747                 default:
748                         cat = "idle"
749                 }
750                 instances[entKey{cat, wkr.instType.Name}]++
751                 price[cat] += wkr.instType.Price
752                 cpu[cat] += int64(wkr.instType.VCPUs)
753                 mem[cat] += int64(wkr.instType.RAM)
754                 running += int64(len(wkr.running) + len(wkr.starting))
755         }
756         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
757                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
758                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
759                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
760                 // make sure to reset gauges for non-existing category/nodetype combinations
761                 for _, it := range wp.instanceTypes {
762                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
763                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
764                         }
765                 }
766         }
767         for k, v := range instances {
768                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
769         }
770         wp.mContainersRunning.Set(float64(running))
771 }
772
773 func (wp *Pool) runProbes() {
774         maxPPS := wp.maxProbesPerSecond
775         if maxPPS < 1 {
776                 maxPPS = defaultMaxProbesPerSecond
777         }
778         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
779         defer limitticker.Stop()
780
781         probeticker := time.NewTicker(wp.probeInterval)
782         defer probeticker.Stop()
783
784         workers := []cloud.InstanceID{}
785         for range probeticker.C {
786                 // Add some jitter. Without this, if probeInterval is
787                 // a multiple of syncInterval and sync is
788                 // instantaneous (as with the loopback driver), the
789                 // first few probes race with sync operations and
790                 // don't update the workers.
791                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
792
793                 workers = workers[:0]
794                 wp.mtx.Lock()
795                 for id, wkr := range wp.workers {
796                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
797                                 continue
798                         }
799                         workers = append(workers, id)
800                 }
801                 wp.mtx.Unlock()
802
803                 for _, id := range workers {
804                         wp.mtx.Lock()
805                         wkr, ok := wp.workers[id]
806                         wp.mtx.Unlock()
807                         if !ok {
808                                 // Deleted while we were probing
809                                 // others
810                                 continue
811                         }
812                         go wkr.ProbeAndUpdate()
813                         select {
814                         case <-wp.stop:
815                                 return
816                         case <-limitticker.C:
817                         }
818                 }
819         }
820 }
821
822 func (wp *Pool) runSync() {
823         // sync once immediately, then wait syncInterval, sync again,
824         // etc.
825         timer := time.NewTimer(1)
826         for {
827                 select {
828                 case <-timer.C:
829                         err := wp.getInstancesAndSync()
830                         if err != nil {
831                                 wp.logger.WithError(err).Warn("sync failed")
832                         }
833                         timer.Reset(wp.syncInterval)
834                 case <-wp.stop:
835                         wp.logger.Debug("worker.Pool stopped")
836                         return
837                 }
838         }
839 }
840
841 // Stop synchronizing with the InstanceSet.
842 func (wp *Pool) Stop() {
843         wp.setupOnce.Do(wp.setup)
844         close(wp.stop)
845 }
846
847 // Instances returns an InstanceView for each worker in the pool,
848 // summarizing its current state and recent activity.
849 func (wp *Pool) Instances() []InstanceView {
850         var r []InstanceView
851         wp.setupOnce.Do(wp.setup)
852         wp.mtx.Lock()
853         for _, w := range wp.workers {
854                 r = append(r, InstanceView{
855                         Instance:             w.instance.ID(),
856                         Address:              w.instance.Address(),
857                         Price:                w.instType.Price,
858                         ArvadosInstanceType:  w.instType.Name,
859                         ProviderInstanceType: w.instType.ProviderType,
860                         LastContainerUUID:    w.lastUUID,
861                         LastBusy:             w.busy,
862                         WorkerState:          w.state.String(),
863                         IdleBehavior:         w.idleBehavior,
864                 })
865         }
866         wp.mtx.Unlock()
867         sort.Slice(r, func(i, j int) bool {
868                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
869         })
870         return r
871 }
872
873 // KillInstance destroys a cloud VM instance. It returns an error if
874 // the given instance does not exist.
875 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
876         wkr, ok := wp.workers[id]
877         if !ok {
878                 return errors.New("instance not found")
879         }
880         wkr.logger.WithField("Reason", reason).Info("shutting down")
881         wkr.reportBootOutcome(BootOutcomeAborted)
882         wkr.shutdown()
883         return nil
884 }
885
886 func (wp *Pool) setup() {
887         wp.creating = map[string]createCall{}
888         wp.exited = map[string]time.Time{}
889         wp.workers = map[cloud.InstanceID]*worker{}
890         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
891         wp.loadRunnerData()
892 }
893
894 // Load the runner program to be deployed on worker nodes into
895 // wp.runnerData, if necessary. Errors are logged.
896 //
897 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
898 //
899 // Caller must not have lock.
900 func (wp *Pool) loadRunnerData() error {
901         wp.mtx.Lock()
902         defer wp.mtx.Unlock()
903         if wp.runnerData != nil {
904                 return nil
905         } else if wp.runnerSource == "" {
906                 wp.runnerCmd = wp.runnerCmdDefault
907                 wp.runnerData = []byte{}
908                 return nil
909         }
910         logger := wp.logger.WithField("source", wp.runnerSource)
911         logger.Debug("loading runner")
912         buf, err := ioutil.ReadFile(wp.runnerSource)
913         if err != nil {
914                 logger.WithError(err).Error("failed to load runner program")
915                 return err
916         }
917         wp.runnerData = buf
918         wp.runnerMD5 = md5.Sum(buf)
919         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
920         return nil
921 }
922
923 func (wp *Pool) notify() {
924         wp.mtx.RLock()
925         defer wp.mtx.RUnlock()
926         for _, send := range wp.subscribers {
927                 select {
928                 case send <- struct{}{}:
929                 default:
930                 }
931         }
932 }
933
934 func (wp *Pool) getInstancesAndSync() error {
935         wp.setupOnce.Do(wp.setup)
936         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
937                 return err
938         }
939         wp.logger.Debug("getting instance list")
940         threshold := time.Now()
941         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
942         if err != nil {
943                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
944                 return err
945         }
946         wp.sync(threshold, instances)
947         wp.logger.Debug("sync done")
948         return nil
949 }
950
951 // Add/remove/update workers based on instances, which was obtained
952 // from the instanceSet. However, don't clobber any other updates that
953 // already happened after threshold.
954 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
955         wp.mtx.Lock()
956         defer wp.mtx.Unlock()
957         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
958         notify := false
959
960         for _, inst := range instances {
961                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
962                 it, ok := wp.instanceTypes[itTag]
963                 if !ok {
964                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
965                         continue
966                 }
967                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
968                         notify = true
969                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
970                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
971                         wkr.shutdown()
972                 }
973         }
974
975         for id, wkr := range wp.workers {
976                 if wkr.updated.After(threshold) {
977                         continue
978                 }
979                 logger := wp.logger.WithFields(logrus.Fields{
980                         "Instance":    wkr.instance.ID(),
981                         "WorkerState": wkr.state,
982                 })
983                 logger.Info("instance disappeared in cloud")
984                 wkr.reportBootOutcome(BootOutcomeDisappeared)
985                 if wp.mDisappearances != nil {
986                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
987                 }
988                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
989                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
990                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
991                 }
992                 delete(wp.workers, id)
993                 go wkr.Close()
994                 notify = true
995         }
996
997         if !wp.loaded {
998                 notify = true
999                 wp.loaded = true
1000                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1001         }
1002
1003         if notify {
1004                 go wp.notify()
1005         }
1006 }
1007
1008 func (wp *Pool) waitUntilLoaded() {
1009         ch := wp.Subscribe()
1010         wp.mtx.RLock()
1011         defer wp.mtx.RUnlock()
1012         for !wp.loaded {
1013                 wp.mtx.RUnlock()
1014                 <-ch
1015                 wp.mtx.RLock()
1016         }
1017 }
1018
1019 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1020         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1021         fmt.Fprint(h, uuid)
1022         return fmt.Sprintf("%x", h.Sum(nil))
1023 }
1024
1025 // Return a random string of n hexadecimal digits (n*4 random bits). n
1026 // must be even.
1027 func randomHex(n int) string {
1028         buf := make([]byte, n/2)
1029         _, err := rand.Read(buf)
1030         if err != nil {
1031                 panic(err)
1032         }
1033         return fmt.Sprintf("%x", buf)
1034 }