Merge branch '15370-loopback-dispatchcloud'
[arvados.git] / lib / dispatchcloud / driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "fmt"
9         "time"
10
11         "git.arvados.org/arvados.git/lib/cloud"
12         "git.arvados.org/arvados.git/lib/cloud/azure"
13         "git.arvados.org/arvados.git/lib/cloud/ec2"
14         "git.arvados.org/arvados.git/lib/cloud/loopback"
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "github.com/prometheus/client_golang/prometheus"
17         "github.com/sirupsen/logrus"
18         "golang.org/x/crypto/ssh"
19 )
20
21 // Drivers is a map of available cloud drivers.
22 // Clusters.*.Containers.CloudVMs.Driver configuration values
23 // correspond to keys in this map.
24 var Drivers = map[string]cloud.Driver{
25         "azure":    azure.Driver,
26         "ec2":      ec2.Driver,
27         "loopback": loopback.Driver,
28 }
29
30 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
31         driver, ok := Drivers[cluster.Containers.CloudVMs.Driver]
32         if !ok {
33                 return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
34         }
35         sharedResourceTags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
36         is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger)
37         is = newInstrumentedInstanceSet(is, reg)
38         if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
39                 is = rateLimitedInstanceSet{
40                         InstanceSet: is,
41                         ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
42                 }
43         }
44         is = defaultTaggingInstanceSet{
45                 InstanceSet: is,
46                 defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
47         }
48         is = filteringInstanceSet{
49                 InstanceSet: is,
50                 logger:      logger,
51         }
52         return is, err
53 }
54
55 type rateLimitedInstanceSet struct {
56         cloud.InstanceSet
57         ticker *time.Ticker
58 }
59
60 func (is rateLimitedInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
61         <-is.ticker.C
62         insts, err := is.InstanceSet.Instances(tags)
63         for i, inst := range insts {
64                 insts[i] = &rateLimitedInstance{inst, is.ticker}
65         }
66         return insts, err
67 }
68
69 func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
70         <-is.ticker.C
71         inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
72         return &rateLimitedInstance{inst, is.ticker}, err
73 }
74
75 type rateLimitedInstance struct {
76         cloud.Instance
77         ticker *time.Ticker
78 }
79
80 func (inst *rateLimitedInstance) Destroy() error {
81         <-inst.ticker.C
82         return inst.Instance.Destroy()
83 }
84
85 func (inst *rateLimitedInstance) SetTags(tags cloud.InstanceTags) error {
86         <-inst.ticker.C
87         return inst.Instance.SetTags(tags)
88 }
89
90 // Adds the specified defaultTags to every Create() call.
91 type defaultTaggingInstanceSet struct {
92         cloud.InstanceSet
93         defaultTags cloud.InstanceTags
94 }
95
96 func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
97         allTags := cloud.InstanceTags{}
98         for k, v := range is.defaultTags {
99                 allTags[k] = v
100         }
101         for k, v := range tags {
102                 allTags[k] = v
103         }
104         return is.InstanceSet.Create(it, image, allTags, init, pk)
105 }
106
107 // Filter the instances returned by the wrapped InstanceSet's
108 // Instances() method (in case the wrapped InstanceSet didn't do this
109 // itself).
110 type filteringInstanceSet struct {
111         cloud.InstanceSet
112         logger logrus.FieldLogger
113 }
114
115 func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
116         instances, err := is.InstanceSet.Instances(tags)
117
118         skipped := 0
119         var returning []cloud.Instance
120 nextInstance:
121         for _, inst := range instances {
122                 instTags := inst.Tags()
123                 for k, v := range tags {
124                         if instTags[k] != v {
125                                 skipped++
126                                 continue nextInstance
127                         }
128                 }
129                 returning = append(returning, inst)
130         }
131         is.logger.WithFields(logrus.Fields{
132                 "returning": len(returning),
133                 "skipped":   skipped,
134         }).WithError(err).Debugf("filteringInstanceSet returning instances")
135         return returning, err
136 }
137
138 func newInstrumentedInstanceSet(is cloud.InstanceSet, reg *prometheus.Registry) cloud.InstanceSet {
139         cv := prometheus.NewCounterVec(prometheus.CounterOpts{
140                 Namespace: "arvados",
141                 Subsystem: "dispatchcloud",
142                 Name:      "driver_operations",
143                 Help:      "Number of instance-create/destroy/list operations performed via cloud driver.",
144         }, []string{"operation", "error"})
145
146         // Create all counters, so they are reported with zero values
147         // (instead of being missing) until they are incremented.
148         for _, op := range []string{"Create", "List", "Destroy", "SetTags"} {
149                 for _, error := range []string{"0", "1"} {
150                         cv.WithLabelValues(op, error).Add(0)
151                 }
152         }
153
154         reg.MustRegister(cv)
155         return instrumentedInstanceSet{is, cv}
156 }
157
158 type instrumentedInstanceSet struct {
159         cloud.InstanceSet
160         cv *prometheus.CounterVec
161 }
162
163 func (is instrumentedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
164         inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
165         is.cv.WithLabelValues("Create", boolLabelValue(err != nil)).Inc()
166         return instrumentedInstance{inst, is.cv}, err
167 }
168
169 func (is instrumentedInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
170         instances, err := is.InstanceSet.Instances(tags)
171         is.cv.WithLabelValues("List", boolLabelValue(err != nil)).Inc()
172         var instrumented []cloud.Instance
173         for _, i := range instances {
174                 instrumented = append(instrumented, instrumentedInstance{i, is.cv})
175         }
176         return instrumented, err
177 }
178
179 type instrumentedInstance struct {
180         cloud.Instance
181         cv *prometheus.CounterVec
182 }
183
184 func (inst instrumentedInstance) Destroy() error {
185         err := inst.Instance.Destroy()
186         inst.cv.WithLabelValues("Destroy", boolLabelValue(err != nil)).Inc()
187         return err
188 }
189
190 func (inst instrumentedInstance) SetTags(tags cloud.InstanceTags) error {
191         err := inst.Instance.SetTags(tags)
192         inst.cv.WithLabelValues("SetTags", boolLabelValue(err != nil)).Inc()
193         return err
194 }
195
196 func boolLabelValue(v bool) string {
197         if v {
198                 return "1"
199         }
200         return "0"
201 }