Merge branch 'master' into 14930-arvput-trash-at
[arvados.git] / lib / dispatchcloud / driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "fmt"
9         "time"
10
11         "git.curoverse.com/arvados.git/lib/cloud"
12         "git.curoverse.com/arvados.git/lib/cloud/azure"
13         "git.curoverse.com/arvados.git/lib/cloud/ec2"
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "github.com/sirupsen/logrus"
16         "golang.org/x/crypto/ssh"
17 )
18
19 var drivers = map[string]cloud.Driver{
20         "azure": azure.Driver,
21         "ec2":   ec2.Driver,
22 }
23
24 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
25         driver, ok := drivers[cluster.Containers.CloudVMs.Driver]
26         if !ok {
27                 return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
28         }
29         sharedResourceTags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
30         is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger)
31         if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
32                 is = rateLimitedInstanceSet{
33                         InstanceSet: is,
34                         ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
35                 }
36         }
37         is = defaultTaggingInstanceSet{
38                 InstanceSet: is,
39                 defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
40         }
41         is = filteringInstanceSet{
42                 InstanceSet: is,
43                 logger:      logger,
44         }
45         return is, err
46 }
47
48 type rateLimitedInstanceSet struct {
49         cloud.InstanceSet
50         ticker *time.Ticker
51 }
52
53 func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
54         <-is.ticker.C
55         inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
56         return &rateLimitedInstance{inst, is.ticker}, err
57 }
58
59 type rateLimitedInstance struct {
60         cloud.Instance
61         ticker *time.Ticker
62 }
63
64 func (inst *rateLimitedInstance) Destroy() error {
65         <-inst.ticker.C
66         return inst.Instance.Destroy()
67 }
68
69 // Adds the specified defaultTags to every Create() call.
70 type defaultTaggingInstanceSet struct {
71         cloud.InstanceSet
72         defaultTags cloud.InstanceTags
73 }
74
75 func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
76         allTags := cloud.InstanceTags{}
77         for k, v := range is.defaultTags {
78                 allTags[k] = v
79         }
80         for k, v := range tags {
81                 allTags[k] = v
82         }
83         return is.InstanceSet.Create(it, image, allTags, init, pk)
84 }
85
86 // Filters the instances returned by the wrapped InstanceSet's
87 // Instances() method (in case the wrapped InstanceSet didn't do this
88 // itself).
89 type filteringInstanceSet struct {
90         cloud.InstanceSet
91         logger logrus.FieldLogger
92 }
93
94 func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
95         instances, err := is.InstanceSet.Instances(tags)
96
97         skipped := 0
98         var returning []cloud.Instance
99 nextInstance:
100         for _, inst := range instances {
101                 instTags := inst.Tags()
102                 for k, v := range tags {
103                         if instTags[k] != v {
104                                 skipped++
105                                 continue nextInstance
106                         }
107                 }
108                 returning = append(returning, inst)
109         }
110         is.logger.WithFields(logrus.Fields{
111                 "returning": len(returning),
112                 "skipped":   skipped,
113         }).WithError(err).Debugf("filteringInstanceSet returning instances")
114         return returning, err
115 }