f9566241dda9b8c1b148bf77f73460d032d4343c
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "errors"
9         "io"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14
15         "git.curoverse.com/arvados.git/lib/cloud"
16         "git.curoverse.com/arvados.git/sdk/go/arvados"
17         "github.com/prometheus/client_golang/prometheus"
18         "github.com/sirupsen/logrus"
19 )
20
21 const (
22         tagKeyInstanceType = "InstanceType"
23         tagKeyIdleBehavior = "IdleBehavior"
24 )
25
26 // An InstanceView shows a worker's current state and recent activity.
27 type InstanceView struct {
28         Instance             cloud.InstanceID `json:"instance"`
29         Price                float64          `json:"price"`
30         ArvadosInstanceType  string           `json:"arvados_instance_type"`
31         ProviderInstanceType string           `json:"provider_instance_type"`
32         LastContainerUUID    string           `json:"last_container_uuid"`
33         LastBusy             time.Time        `json:"last_busy"`
34         WorkerState          string           `json:"worker_state"`
35         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
36 }
37
38 // An Executor executes shell commands on a remote host.
39 type Executor interface {
40         // Run cmd on the current target.
41         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
42
43         // Use the given target for subsequent operations. The new
44         // target is the same host as the previous target, but it
45         // might return a different address and verify a different
46         // host key.
47         //
48         // SetTarget is called frequently, and in most cases the new
49         // target will behave exactly the same as the old one. An
50         // implementation should optimize accordingly.
51         //
52         // SetTarget must not block on concurrent Execute calls.
53         SetTarget(cloud.ExecutorTarget)
54
55         Close()
56 }
57
58 const (
59         defaultSyncInterval       = time.Minute
60         defaultProbeInterval      = time.Second * 10
61         defaultMaxProbesPerSecond = 10
62         defaultTimeoutIdle        = time.Minute
63         defaultTimeoutBooting     = time.Minute * 10
64         defaultTimeoutProbe       = time.Minute * 10
65         defaultTimeoutShutdown    = time.Second * 10
66 )
67
68 func duration(conf arvados.Duration, def time.Duration) time.Duration {
69         if conf > 0 {
70                 return time.Duration(conf)
71         } else {
72                 return def
73         }
74 }
75
76 // NewPool creates a Pool of workers backed by instanceSet.
77 //
78 // New instances are configured and set up according to the given
79 // cluster configuration.
80 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
81         wp := &Pool{
82                 logger:             logger,
83                 arvClient:          arvClient,
84                 instanceSet:        instanceSet,
85                 newExecutor:        newExecutor,
86                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
87                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
88                 instanceTypes:      cluster.InstanceTypes,
89                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
90                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
91                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
92                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
93                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
94                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
95                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
96                 stop:               make(chan bool),
97         }
98         wp.registerMetrics(reg)
99         go func() {
100                 wp.setupOnce.Do(wp.setup)
101                 go wp.runMetrics()
102                 go wp.runProbes()
103                 go wp.runSync()
104         }()
105         return wp
106 }
107
108 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
109 // zero Pool should not be used. Call NewPool to create a new Pool.
110 type Pool struct {
111         // configuration
112         logger             logrus.FieldLogger
113         arvClient          *arvados.Client
114         instanceSet        cloud.InstanceSet
115         newExecutor        func(cloud.Instance) Executor
116         bootProbeCommand   string
117         imageID            cloud.ImageID
118         instanceTypes      map[string]arvados.InstanceType
119         syncInterval       time.Duration
120         probeInterval      time.Duration
121         maxProbesPerSecond int
122         timeoutIdle        time.Duration
123         timeoutBooting     time.Duration
124         timeoutProbe       time.Duration
125         timeoutShutdown    time.Duration
126
127         // private state
128         subscribers  map[<-chan struct{}]chan<- struct{}
129         creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
130         workers      map[cloud.InstanceID]*worker
131         loaded       bool                 // loaded list of instances from InstanceSet at least once
132         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
133         atQuotaUntil time.Time
134         atQuotaErr   cloud.QuotaError
135         stop         chan bool
136         mtx          sync.RWMutex
137         setupOnce    sync.Once
138
139         mInstances         prometheus.Gauge
140         mInstancesPrice    prometheus.Gauge
141         mContainersRunning prometheus.Gauge
142         mVCPUs             prometheus.Gauge
143         mVCPUsInuse        prometheus.Gauge
144         mMemory            prometheus.Gauge
145         mMemoryInuse       prometheus.Gauge
146 }
147
148 // Subscribe returns a channel that becomes ready whenever a worker's
149 // state changes.
150 //
151 // Example:
152 //
153 //      ch := wp.Subscribe()
154 //      defer wp.Unsubscribe(ch)
155 //      for range ch {
156 //              // ...try scheduling some work...
157 //              if done {
158 //                      break
159 //              }
160 //      }
161 func (wp *Pool) Subscribe() <-chan struct{} {
162         wp.setupOnce.Do(wp.setup)
163         wp.mtx.Lock()
164         defer wp.mtx.Unlock()
165         ch := make(chan struct{}, 1)
166         wp.subscribers[ch] = ch
167         return ch
168 }
169
170 // Unsubscribe stops sending updates to the given channel.
171 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
172         wp.setupOnce.Do(wp.setup)
173         wp.mtx.Lock()
174         defer wp.mtx.Unlock()
175         delete(wp.subscribers, ch)
176 }
177
178 // Unallocated returns the number of unallocated (creating + booting +
179 // idle + unknown) workers for each instance type.  Workers in
180 // hold/drain mode are not included.
181 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
182         wp.setupOnce.Do(wp.setup)
183         wp.mtx.RLock()
184         defer wp.mtx.RUnlock()
185         unalloc := map[arvados.InstanceType]int{}
186         creating := map[arvados.InstanceType]int{}
187         for it, times := range wp.creating {
188                 creating[it] = len(times)
189         }
190         for _, wkr := range wp.workers {
191                 if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
192                         continue
193                 }
194                 it := wkr.instType
195                 unalloc[it]++
196                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
197                         // If up to N new workers appear in
198                         // Instances() while we are waiting for N
199                         // Create() calls to complete, we assume we're
200                         // just seeing a race between Instances() and
201                         // Create() responses.
202                         //
203                         // The other common reason why nodes have
204                         // state==Unknown is that they appeared at
205                         // startup, before any Create calls. They
206                         // don't match the above timing condition, so
207                         // we never mistakenly attribute them to
208                         // pending Create calls.
209                         creating[it]--
210                 }
211         }
212         for it, c := range creating {
213                 unalloc[it] += c
214         }
215         return unalloc
216 }
217
218 // Create a new instance with the given type, and add it to the worker
219 // pool. The worker is added immediately; instance creation runs in
220 // the background.
221 func (wp *Pool) Create(it arvados.InstanceType) error {
222         logger := wp.logger.WithField("InstanceType", it.Name)
223         wp.setupOnce.Do(wp.setup)
224         wp.mtx.Lock()
225         defer wp.mtx.Unlock()
226         if time.Now().Before(wp.atQuotaUntil) {
227                 return wp.atQuotaErr
228         }
229         tags := cloud.InstanceTags{
230                 tagKeyInstanceType: it.Name,
231                 tagKeyIdleBehavior: string(IdleBehaviorRun),
232         }
233         now := time.Now()
234         wp.creating[it] = append(wp.creating[it], now)
235         go func() {
236                 defer wp.notify()
237                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
238                 wp.mtx.Lock()
239                 defer wp.mtx.Unlock()
240                 // Remove our timestamp marker from wp.creating
241                 for i, t := range wp.creating[it] {
242                         if t == now {
243                                 copy(wp.creating[it][i:], wp.creating[it][i+1:])
244                                 wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
245                                 break
246                         }
247                 }
248                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
249                         wp.atQuotaErr = err
250                         wp.atQuotaUntil = time.Now().Add(time.Minute)
251                 }
252                 if err != nil {
253                         logger.WithError(err).Error("create failed")
254                         return
255                 }
256                 wp.updateWorker(inst, it, StateBooting)
257         }()
258         return nil
259 }
260
261 // AtQuota returns true if Create is not expected to work at the
262 // moment.
263 func (wp *Pool) AtQuota() bool {
264         wp.mtx.Lock()
265         defer wp.mtx.Unlock()
266         return time.Now().Before(wp.atQuotaUntil)
267 }
268
269 // SetIdleBehavior determines how the indicated instance will behave
270 // when it has no containers running.
271 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
272         wp.mtx.Lock()
273         defer wp.mtx.Unlock()
274         wkr, ok := wp.workers[id]
275         if !ok {
276                 return errors.New("requested instance does not exist")
277         }
278         wkr.idleBehavior = idleBehavior
279         wkr.saveTags()
280         wkr.shutdownIfIdle()
281         return nil
282 }
283
284 // Add or update worker attached to the given instance. Use
285 // initialState if a new worker is created.
286 //
287 // The second return value is true if a new worker is created.
288 //
289 // Caller must have lock.
290 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
291         id := inst.ID()
292         if wkr := wp.workers[id]; wkr != nil {
293                 wkr.executor.SetTarget(inst)
294                 wkr.instance = inst
295                 wkr.updated = time.Now()
296                 if initialState == StateBooting && wkr.state == StateUnknown {
297                         wkr.state = StateBooting
298                 }
299                 wkr.saveTags()
300                 return wkr, false
301         }
302
303         // If an instance has a valid IdleBehavior tag when it first
304         // appears, initialize the new worker accordingly (this is how
305         // we restore IdleBehavior that was set by a prior dispatch
306         // process); otherwise, default to "run". After this,
307         // wkr.idleBehavior is the source of truth, and will only be
308         // changed via SetIdleBehavior().
309         idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
310         if !validIdleBehavior[idleBehavior] {
311                 idleBehavior = IdleBehaviorRun
312         }
313
314         logger := wp.logger.WithFields(logrus.Fields{
315                 "InstanceType": it.Name,
316                 "Instance":     inst,
317         })
318         logger.WithFields(logrus.Fields{
319                 "State":        initialState,
320                 "IdleBehavior": idleBehavior,
321         }).Infof("instance appeared in cloud")
322         now := time.Now()
323         wkr := &worker{
324                 mtx:          &wp.mtx,
325                 wp:           wp,
326                 logger:       logger,
327                 executor:     wp.newExecutor(inst),
328                 state:        initialState,
329                 idleBehavior: idleBehavior,
330                 instance:     inst,
331                 instType:     it,
332                 appeared:     now,
333                 probed:       now,
334                 busy:         now,
335                 updated:      now,
336                 running:      make(map[string]struct{}),
337                 starting:     make(map[string]struct{}),
338                 probing:      make(chan struct{}, 1),
339         }
340         wp.workers[id] = wkr
341         return wkr, true
342 }
343
344 // caller must have lock.
345 func (wp *Pool) notifyExited(uuid string, t time.Time) {
346         wp.exited[uuid] = t
347 }
348
349 // Shutdown shuts down a worker with the given type, or returns false
350 // if all workers with the given type are busy.
351 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
352         wp.setupOnce.Do(wp.setup)
353         wp.mtx.Lock()
354         defer wp.mtx.Unlock()
355         logger := wp.logger.WithField("InstanceType", it.Name)
356         logger.Info("shutdown requested")
357         for _, tryState := range []State{StateBooting, StateIdle} {
358                 // TODO: shutdown the worker with the longest idle
359                 // time (Idle) or the earliest create time (Booting)
360                 for _, wkr := range wp.workers {
361                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
362                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
363                                 wkr.shutdown()
364                                 return true
365                         }
366                 }
367         }
368         return false
369 }
370
371 // CountWorkers returns the current number of workers in each state.
372 func (wp *Pool) CountWorkers() map[State]int {
373         wp.setupOnce.Do(wp.setup)
374         wp.mtx.Lock()
375         defer wp.mtx.Unlock()
376         r := map[State]int{}
377         for _, w := range wp.workers {
378                 r[w.state]++
379         }
380         return r
381 }
382
383 // Running returns the container UUIDs being prepared/run on workers.
384 func (wp *Pool) Running() map[string]time.Time {
385         wp.setupOnce.Do(wp.setup)
386         wp.mtx.Lock()
387         defer wp.mtx.Unlock()
388         r := map[string]time.Time{}
389         for _, wkr := range wp.workers {
390                 for uuid := range wkr.running {
391                         r[uuid] = time.Time{}
392                 }
393                 for uuid := range wkr.starting {
394                         r[uuid] = time.Time{}
395                 }
396         }
397         for uuid, exited := range wp.exited {
398                 r[uuid] = exited
399         }
400         return r
401 }
402
403 // StartContainer starts a container on an idle worker immediately if
404 // possible, otherwise returns false.
405 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
406         wp.setupOnce.Do(wp.setup)
407         wp.mtx.Lock()
408         defer wp.mtx.Unlock()
409         var wkr *worker
410         for _, w := range wp.workers {
411                 if w.instType == it && w.state == StateIdle {
412                         if wkr == nil || w.busy.After(wkr.busy) {
413                                 wkr = w
414                         }
415                 }
416         }
417         if wkr == nil {
418                 return false
419         }
420         wkr.startContainer(ctr)
421         return true
422 }
423
424 // KillContainer kills the crunch-run process for the given container
425 // UUID, if it's running on any worker.
426 //
427 // KillContainer returns immediately; the act of killing the container
428 // takes some time, and runs in the background.
429 func (wp *Pool) KillContainer(uuid string) {
430         wp.mtx.Lock()
431         defer wp.mtx.Unlock()
432         if _, ok := wp.exited[uuid]; ok {
433                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
434                 delete(wp.exited, uuid)
435                 return
436         }
437         for _, wkr := range wp.workers {
438                 if _, ok := wkr.running[uuid]; ok {
439                         go wp.kill(wkr, uuid)
440                         return
441                 }
442         }
443         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
444 }
445
446 func (wp *Pool) kill(wkr *worker, uuid string) {
447         logger := wp.logger.WithFields(logrus.Fields{
448                 "ContainerUUID": uuid,
449                 "Instance":      wkr.instance,
450         })
451         logger.Debug("killing process")
452         stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
453         if err != nil {
454                 logger.WithFields(logrus.Fields{
455                         "stderr": string(stderr),
456                         "stdout": string(stdout),
457                         "error":  err,
458                 }).Warn("kill failed")
459                 return
460         }
461         logger.Debug("killing process succeeded")
462         wp.mtx.Lock()
463         defer wp.mtx.Unlock()
464         if _, ok := wkr.running[uuid]; ok {
465                 delete(wkr.running, uuid)
466                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
467                         wkr.state = StateIdle
468                 }
469                 wkr.updated = time.Now()
470                 go wp.notify()
471         }
472 }
473
474 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
475         if reg == nil {
476                 reg = prometheus.NewRegistry()
477         }
478         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
479                 Namespace: "arvados",
480                 Subsystem: "dispatchcloud",
481                 Name:      "instances_total",
482                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
483         })
484         reg.MustRegister(wp.mInstances)
485         wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
486                 Namespace: "arvados",
487                 Subsystem: "dispatchcloud",
488                 Name:      "instances_price_total",
489                 Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
490         })
491         reg.MustRegister(wp.mInstancesPrice)
492         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
493                 Namespace: "arvados",
494                 Subsystem: "dispatchcloud",
495                 Name:      "containers_running",
496                 Help:      "Number of containers reported running by cloud VMs.",
497         })
498         reg.MustRegister(wp.mContainersRunning)
499
500         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
501                 Namespace: "arvados",
502                 Subsystem: "dispatchcloud",
503                 Name:      "vcpus_total",
504                 Help:      "Total VCPUs on all cloud VMs.",
505         })
506         reg.MustRegister(wp.mVCPUs)
507         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
508                 Namespace: "arvados",
509                 Subsystem: "dispatchcloud",
510                 Name:      "vcpus_inuse",
511                 Help:      "VCPUs on cloud VMs that are running containers.",
512         })
513         reg.MustRegister(wp.mVCPUsInuse)
514         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
515                 Namespace: "arvados",
516                 Subsystem: "dispatchcloud",
517                 Name:      "memory_bytes_total",
518                 Help:      "Total memory on all cloud VMs.",
519         })
520         reg.MustRegister(wp.mMemory)
521         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
522                 Namespace: "arvados",
523                 Subsystem: "dispatchcloud",
524                 Name:      "memory_bytes_inuse",
525                 Help:      "Memory on cloud VMs that are running containers.",
526         })
527         reg.MustRegister(wp.mMemoryInuse)
528 }
529
530 func (wp *Pool) runMetrics() {
531         ch := wp.Subscribe()
532         defer wp.Unsubscribe(ch)
533         for range ch {
534                 wp.updateMetrics()
535         }
536 }
537
538 func (wp *Pool) updateMetrics() {
539         wp.mtx.RLock()
540         defer wp.mtx.RUnlock()
541
542         var price float64
543         var alloc, cpu, cpuInuse, mem, memInuse int64
544         for _, wkr := range wp.workers {
545                 price += wkr.instType.Price
546                 cpu += int64(wkr.instType.VCPUs)
547                 mem += int64(wkr.instType.RAM)
548                 if len(wkr.running)+len(wkr.starting) == 0 {
549                         continue
550                 }
551                 alloc += int64(len(wkr.running) + len(wkr.starting))
552                 cpuInuse += int64(wkr.instType.VCPUs)
553                 memInuse += int64(wkr.instType.RAM)
554         }
555         wp.mInstances.Set(float64(len(wp.workers)))
556         wp.mInstancesPrice.Set(price)
557         wp.mContainersRunning.Set(float64(alloc))
558         wp.mVCPUs.Set(float64(cpu))
559         wp.mMemory.Set(float64(mem))
560         wp.mVCPUsInuse.Set(float64(cpuInuse))
561         wp.mMemoryInuse.Set(float64(memInuse))
562 }
563
564 func (wp *Pool) runProbes() {
565         maxPPS := wp.maxProbesPerSecond
566         if maxPPS < 1 {
567                 maxPPS = defaultMaxProbesPerSecond
568         }
569         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
570         defer limitticker.Stop()
571
572         probeticker := time.NewTicker(wp.probeInterval)
573         defer probeticker.Stop()
574
575         workers := []cloud.InstanceID{}
576         for range probeticker.C {
577                 workers = workers[:0]
578                 wp.mtx.Lock()
579                 for id, wkr := range wp.workers {
580                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
581                                 continue
582                         }
583                         workers = append(workers, id)
584                 }
585                 wp.mtx.Unlock()
586
587                 for _, id := range workers {
588                         wp.mtx.Lock()
589                         wkr, ok := wp.workers[id]
590                         wp.mtx.Unlock()
591                         if !ok {
592                                 // Deleted while we were probing
593                                 // others
594                                 continue
595                         }
596                         go wkr.ProbeAndUpdate()
597                         select {
598                         case <-wp.stop:
599                                 return
600                         case <-limitticker.C:
601                         }
602                 }
603         }
604 }
605
606 func (wp *Pool) runSync() {
607         // sync once immediately, then wait syncInterval, sync again,
608         // etc.
609         timer := time.NewTimer(1)
610         for {
611                 select {
612                 case <-timer.C:
613                         err := wp.getInstancesAndSync()
614                         if err != nil {
615                                 wp.logger.WithError(err).Warn("sync failed")
616                         }
617                         timer.Reset(wp.syncInterval)
618                 case <-wp.stop:
619                         wp.logger.Debug("worker.Pool stopped")
620                         return
621                 }
622         }
623 }
624
625 // Stop synchronizing with the InstanceSet.
626 func (wp *Pool) Stop() {
627         wp.setupOnce.Do(wp.setup)
628         close(wp.stop)
629 }
630
631 // Instances returns an InstanceView for each worker in the pool,
632 // summarizing its current state and recent activity.
633 func (wp *Pool) Instances() []InstanceView {
634         var r []InstanceView
635         wp.setupOnce.Do(wp.setup)
636         wp.mtx.Lock()
637         for _, w := range wp.workers {
638                 r = append(r, InstanceView{
639                         Instance:             w.instance.ID(),
640                         Price:                w.instType.Price,
641                         ArvadosInstanceType:  w.instType.Name,
642                         ProviderInstanceType: w.instType.ProviderType,
643                         LastContainerUUID:    w.lastUUID,
644                         LastBusy:             w.busy,
645                         WorkerState:          w.state.String(),
646                         IdleBehavior:         w.idleBehavior,
647                 })
648         }
649         wp.mtx.Unlock()
650         sort.Slice(r, func(i, j int) bool {
651                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
652         })
653         return r
654 }
655
656 func (wp *Pool) setup() {
657         wp.creating = map[arvados.InstanceType][]time.Time{}
658         wp.exited = map[string]time.Time{}
659         wp.workers = map[cloud.InstanceID]*worker{}
660         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
661 }
662
663 func (wp *Pool) notify() {
664         wp.mtx.RLock()
665         defer wp.mtx.RUnlock()
666         for _, send := range wp.subscribers {
667                 select {
668                 case send <- struct{}{}:
669                 default:
670                 }
671         }
672 }
673
674 func (wp *Pool) getInstancesAndSync() error {
675         wp.setupOnce.Do(wp.setup)
676         wp.logger.Debug("getting instance list")
677         threshold := time.Now()
678         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
679         if err != nil {
680                 return err
681         }
682         wp.sync(threshold, instances)
683         wp.logger.Debug("sync done")
684         return nil
685 }
686
687 // Add/remove/update workers based on instances, which was obtained
688 // from the instanceSet. However, don't clobber any other updates that
689 // already happened after threshold.
690 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
691         wp.mtx.Lock()
692         defer wp.mtx.Unlock()
693         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
694         notify := false
695
696         for _, inst := range instances {
697                 itTag := inst.Tags()[tagKeyInstanceType]
698                 it, ok := wp.instanceTypes[itTag]
699                 if !ok {
700                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
701                         continue
702                 }
703                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
704                         notify = true
705                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
706                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
707                         wkr.shutdown()
708                 }
709         }
710
711         for id, wkr := range wp.workers {
712                 if wkr.updated.After(threshold) {
713                         continue
714                 }
715                 logger := wp.logger.WithFields(logrus.Fields{
716                         "Instance":    wkr.instance,
717                         "WorkerState": wkr.state,
718                 })
719                 logger.Info("instance disappeared in cloud")
720                 delete(wp.workers, id)
721                 go wkr.executor.Close()
722                 notify = true
723         }
724
725         if !wp.loaded {
726                 wp.loaded = true
727                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
728         }
729
730         if notify {
731                 go wp.notify()
732         }
733 }