Fix 2.4.1 release date refs #19017
[arvados.git] / lib / dispatchcloud / driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "fmt"
9         "time"
10
11         "git.arvados.org/arvados.git/lib/cloud"
12         "git.arvados.org/arvados.git/lib/cloud/azure"
13         "git.arvados.org/arvados.git/lib/cloud/ec2"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "github.com/prometheus/client_golang/prometheus"
16         "github.com/sirupsen/logrus"
17         "golang.org/x/crypto/ssh"
18 )
19
20 // Drivers is a map of available cloud drivers.
21 // Clusters.*.Containers.CloudVMs.Driver configuration values
22 // correspond to keys in this map.
23 var Drivers = map[string]cloud.Driver{
24         "azure": azure.Driver,
25         "ec2":   ec2.Driver,
26 }
27
28 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
29         driver, ok := Drivers[cluster.Containers.CloudVMs.Driver]
30         if !ok {
31                 return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
32         }
33         sharedResourceTags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
34         is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger)
35         is = newInstrumentedInstanceSet(is, reg)
36         if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
37                 is = rateLimitedInstanceSet{
38                         InstanceSet: is,
39                         ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
40                 }
41         }
42         is = defaultTaggingInstanceSet{
43                 InstanceSet: is,
44                 defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
45         }
46         is = filteringInstanceSet{
47                 InstanceSet: is,
48                 logger:      logger,
49         }
50         return is, err
51 }
52
53 type rateLimitedInstanceSet struct {
54         cloud.InstanceSet
55         ticker *time.Ticker
56 }
57
58 func (is rateLimitedInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
59         <-is.ticker.C
60         insts, err := is.InstanceSet.Instances(tags)
61         for i, inst := range insts {
62                 insts[i] = &rateLimitedInstance{inst, is.ticker}
63         }
64         return insts, err
65 }
66
67 func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
68         <-is.ticker.C
69         inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
70         return &rateLimitedInstance{inst, is.ticker}, err
71 }
72
73 type rateLimitedInstance struct {
74         cloud.Instance
75         ticker *time.Ticker
76 }
77
78 func (inst *rateLimitedInstance) Destroy() error {
79         <-inst.ticker.C
80         return inst.Instance.Destroy()
81 }
82
83 func (inst *rateLimitedInstance) SetTags(tags cloud.InstanceTags) error {
84         <-inst.ticker.C
85         return inst.Instance.SetTags(tags)
86 }
87
88 // Adds the specified defaultTags to every Create() call.
89 type defaultTaggingInstanceSet struct {
90         cloud.InstanceSet
91         defaultTags cloud.InstanceTags
92 }
93
94 func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
95         allTags := cloud.InstanceTags{}
96         for k, v := range is.defaultTags {
97                 allTags[k] = v
98         }
99         for k, v := range tags {
100                 allTags[k] = v
101         }
102         return is.InstanceSet.Create(it, image, allTags, init, pk)
103 }
104
105 // Filter the instances returned by the wrapped InstanceSet's
106 // Instances() method (in case the wrapped InstanceSet didn't do this
107 // itself).
108 type filteringInstanceSet struct {
109         cloud.InstanceSet
110         logger logrus.FieldLogger
111 }
112
113 func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
114         instances, err := is.InstanceSet.Instances(tags)
115
116         skipped := 0
117         var returning []cloud.Instance
118 nextInstance:
119         for _, inst := range instances {
120                 instTags := inst.Tags()
121                 for k, v := range tags {
122                         if instTags[k] != v {
123                                 skipped++
124                                 continue nextInstance
125                         }
126                 }
127                 returning = append(returning, inst)
128         }
129         is.logger.WithFields(logrus.Fields{
130                 "returning": len(returning),
131                 "skipped":   skipped,
132         }).WithError(err).Debugf("filteringInstanceSet returning instances")
133         return returning, err
134 }
135
136 func newInstrumentedInstanceSet(is cloud.InstanceSet, reg *prometheus.Registry) cloud.InstanceSet {
137         cv := prometheus.NewCounterVec(prometheus.CounterOpts{
138                 Namespace: "arvados",
139                 Subsystem: "dispatchcloud",
140                 Name:      "driver_operations",
141                 Help:      "Number of instance-create/destroy/list operations performed via cloud driver.",
142         }, []string{"operation", "error"})
143
144         // Create all counters, so they are reported with zero values
145         // (instead of being missing) until they are incremented.
146         for _, op := range []string{"Create", "List", "Destroy", "SetTags"} {
147                 for _, error := range []string{"0", "1"} {
148                         cv.WithLabelValues(op, error).Add(0)
149                 }
150         }
151
152         reg.MustRegister(cv)
153         return instrumentedInstanceSet{is, cv}
154 }
155
156 type instrumentedInstanceSet struct {
157         cloud.InstanceSet
158         cv *prometheus.CounterVec
159 }
160
161 func (is instrumentedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
162         inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
163         is.cv.WithLabelValues("Create", boolLabelValue(err != nil)).Inc()
164         return instrumentedInstance{inst, is.cv}, err
165 }
166
167 func (is instrumentedInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
168         instances, err := is.InstanceSet.Instances(tags)
169         is.cv.WithLabelValues("List", boolLabelValue(err != nil)).Inc()
170         var instrumented []cloud.Instance
171         for _, i := range instances {
172                 instrumented = append(instrumented, instrumentedInstance{i, is.cv})
173         }
174         return instrumented, err
175 }
176
177 type instrumentedInstance struct {
178         cloud.Instance
179         cv *prometheus.CounterVec
180 }
181
182 func (inst instrumentedInstance) Destroy() error {
183         err := inst.Instance.Destroy()
184         inst.cv.WithLabelValues("Destroy", boolLabelValue(err != nil)).Inc()
185         return err
186 }
187
188 func (inst instrumentedInstance) SetTags(tags cloud.InstanceTags) error {
189         err := inst.Instance.SetTags(tags)
190         inst.cv.WithLabelValues("SetTags", boolLabelValue(err != nil)).Inc()
191         return err
192 }
193
194 func boolLabelValue(v bool) string {
195         if v {
196                 return "1"
197         }
198         return "0"
199 }