14287: Dedup "UUIDs seen" list before diff in test.
[arvados.git] / lib / dispatchcloud / driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "fmt"
9         "time"
10
11         "git.curoverse.com/arvados.git/lib/cloud"
12         "git.curoverse.com/arvados.git/lib/cloud/azure"
13         "git.curoverse.com/arvados.git/lib/cloud/ec2"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/prometheus/client_golang/prometheus"
16         "github.com/sirupsen/logrus"
17         "golang.org/x/crypto/ssh"
18 )
19
20 var drivers = map[string]cloud.Driver{
21         "azure": azure.Driver,
22         "ec2":   ec2.Driver,
23 }
24
25 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
26         driver, ok := drivers[cluster.Containers.CloudVMs.Driver]
27         if !ok {
28                 return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
29         }
30         sharedResourceTags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
31         is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger)
32         is = newInstrumentedInstanceSet(is, reg)
33         if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
34                 is = rateLimitedInstanceSet{
35                         InstanceSet: is,
36                         ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
37                 }
38         }
39         is = defaultTaggingInstanceSet{
40                 InstanceSet: is,
41                 defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
42         }
43         is = filteringInstanceSet{
44                 InstanceSet: is,
45                 logger:      logger,
46         }
47         return is, err
48 }
49
50 type rateLimitedInstanceSet struct {
51         cloud.InstanceSet
52         ticker *time.Ticker
53 }
54
55 func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
56         <-is.ticker.C
57         inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
58         return &rateLimitedInstance{inst, is.ticker}, err
59 }
60
61 type rateLimitedInstance struct {
62         cloud.Instance
63         ticker *time.Ticker
64 }
65
66 func (inst *rateLimitedInstance) Destroy() error {
67         <-inst.ticker.C
68         return inst.Instance.Destroy()
69 }
70
71 // Adds the specified defaultTags to every Create() call.
72 type defaultTaggingInstanceSet struct {
73         cloud.InstanceSet
74         defaultTags cloud.InstanceTags
75 }
76
77 func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
78         allTags := cloud.InstanceTags{}
79         for k, v := range is.defaultTags {
80                 allTags[k] = v
81         }
82         for k, v := range tags {
83                 allTags[k] = v
84         }
85         return is.InstanceSet.Create(it, image, allTags, init, pk)
86 }
87
88 // Filters the instances returned by the wrapped InstanceSet's
89 // Instances() method (in case the wrapped InstanceSet didn't do this
90 // itself).
91 type filteringInstanceSet struct {
92         cloud.InstanceSet
93         logger logrus.FieldLogger
94 }
95
96 func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
97         instances, err := is.InstanceSet.Instances(tags)
98
99         skipped := 0
100         var returning []cloud.Instance
101 nextInstance:
102         for _, inst := range instances {
103                 instTags := inst.Tags()
104                 for k, v := range tags {
105                         if instTags[k] != v {
106                                 skipped++
107                                 continue nextInstance
108                         }
109                 }
110                 returning = append(returning, inst)
111         }
112         is.logger.WithFields(logrus.Fields{
113                 "returning": len(returning),
114                 "skipped":   skipped,
115         }).WithError(err).Debugf("filteringInstanceSet returning instances")
116         return returning, err
117 }
118
119 func newInstrumentedInstanceSet(is cloud.InstanceSet, reg *prometheus.Registry) cloud.InstanceSet {
120         cv := prometheus.NewCounterVec(prometheus.CounterOpts{
121                 Namespace: "arvados",
122                 Subsystem: "dispatchcloud",
123                 Name:      "driver_operations",
124                 Help:      "Number of instance-create/destroy/list operations performed via cloud driver.",
125         }, []string{"operation", "error"})
126
127         // Create all counters, so they are reported with zero values
128         // (instead of being missing) until they are incremented.
129         for _, op := range []string{"Create", "List", "Destroy", "SetTags"} {
130                 for _, error := range []string{"0", "1"} {
131                         cv.WithLabelValues(op, error).Add(0)
132                 }
133         }
134
135         reg.MustRegister(cv)
136         return instrumentedInstanceSet{is, cv}
137 }
138
139 type instrumentedInstanceSet struct {
140         cloud.InstanceSet
141         cv *prometheus.CounterVec
142 }
143
144 func (is instrumentedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
145         inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
146         is.cv.WithLabelValues("Create", boolLabelValue(err != nil)).Inc()
147         return instrumentedInstance{inst, is.cv}, err
148 }
149
150 func (is instrumentedInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
151         instances, err := is.InstanceSet.Instances(tags)
152         is.cv.WithLabelValues("List", boolLabelValue(err != nil)).Inc()
153         return instances, err
154 }
155
156 type instrumentedInstance struct {
157         cloud.Instance
158         cv *prometheus.CounterVec
159 }
160
161 func (inst instrumentedInstance) Destroy() error {
162         err := inst.Instance.Destroy()
163         inst.cv.WithLabelValues("Destroy", boolLabelValue(err != nil)).Inc()
164         return err
165 }
166
167 func (inst instrumentedInstance) SetTags(tags cloud.InstanceTags) error {
168         err := inst.Instance.SetTags(tags)
169         inst.cv.WithLabelValues("SetTags", boolLabelValue(err != nil)).Inc()
170         return err
171 }
172
173 func boolLabelValue(v bool) string {
174         if v {
175                 return "1"
176         } else {
177                 return "0"
178         }
179 }