14324: Azure driver for crunch-dispatch-cloud
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "context"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         math_rand "math/rand"
14         "regexp"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/lib/cloud"
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/Sirupsen/logrus"
22         "github.com/mitchellh/mapstructure"
23         "golang.org/x/crypto/ssh"
24 )
25
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
29         HostKey        ssh.Signer
30         AuthorizedKeys []ssh.PublicKey
31
32         // SetupVM, if set, is called upon creation of each new
33         // StubVM. This is the caller's opportunity to customize the
34         // VM's error rate and other behaviors.
35         SetupVM func(*StubVM)
36
37         // StubVM's fake crunch-run uses this Queue to read and update
38         // container state.
39         Queue *Queue
40
41         // Frequency of artificially introduced errors on calls to
42         // Destroy. 0=always succeed, 1=always fail.
43         ErrorRateDestroy float64
44
45         instanceSets []*StubInstanceSet
46 }
47
48 // InstanceSet returns a new *StubInstanceSet.
49 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
50         sis := StubInstanceSet{
51                 driver:  sd,
52                 servers: map[cloud.InstanceID]*StubVM{},
53         }
54         sd.instanceSets = append(sd.instanceSets, &sis)
55         return &sis, mapstructure.Decode(params, &sis)
56 }
57
58 // InstanceSets returns all instances that have been created by the
59 // driver. This can be used to test a component that uses the driver
60 // but doesn't expose the InstanceSets it has created.
61 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
62         return sd.instanceSets
63 }
64
65 type StubInstanceSet struct {
66         driver  *StubDriver
67         servers map[cloud.InstanceID]*StubVM
68         mtx     sync.RWMutex
69         stopped bool
70 }
71
72 func (sis *StubInstanceSet) Create(_ context.Context, it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
73         sis.mtx.Lock()
74         defer sis.mtx.Unlock()
75         if sis.stopped {
76                 return nil, errors.New("StubInstanceSet: Create called after Stop")
77         }
78         ak := sis.driver.AuthorizedKeys
79         if authKey != nil {
80                 ak = append([]ssh.PublicKey{authKey}, ak...)
81         }
82         svm := &StubVM{
83                 sis:          sis,
84                 id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
85                 tags:         copyTags(tags),
86                 providerType: it.ProviderType,
87         }
88         svm.SSHService = SSHService{
89                 HostKey:        sis.driver.HostKey,
90                 AuthorizedKeys: ak,
91                 Exec:           svm.Exec,
92         }
93         if setup := sis.driver.SetupVM; setup != nil {
94                 setup(svm)
95         }
96         sis.servers[svm.id] = svm
97         return svm.Instance(), nil
98 }
99
100 func (sis *StubInstanceSet) Instances(context.Context, cloud.InstanceTags) ([]cloud.Instance, error) {
101         sis.mtx.RLock()
102         defer sis.mtx.RUnlock()
103         var r []cloud.Instance
104         for _, ss := range sis.servers {
105                 r = append(r, ss.Instance())
106         }
107         return r, nil
108 }
109
110 func (sis *StubInstanceSet) Stop() {
111         sis.mtx.Lock()
112         defer sis.mtx.Unlock()
113         if sis.stopped {
114                 panic("Stop called twice")
115         }
116         sis.stopped = true
117 }
118
119 // StubVM is a fake server that runs an SSH service. It represents a
120 // VM running in a fake cloud.
121 //
122 // Note this is distinct from a stubInstance, which is a snapshot of
123 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
124 // running (and might change IP addresses, shut down, etc.)  without
125 // updating any stubInstances that have been returned to callers.
126 type StubVM struct {
127         Boot                 time.Time
128         Broken               time.Time
129         CrunchRunMissing     bool
130         CrunchRunCrashRate   float64
131         CrunchRunDetachDelay time.Duration
132         ExecuteContainer     func(arvados.Container) int
133
134         sis          *StubInstanceSet
135         id           cloud.InstanceID
136         tags         cloud.InstanceTags
137         providerType string
138         SSHService   SSHService
139         running      map[string]bool
140         sync.Mutex
141 }
142
143 func (svm *StubVM) Instance() stubInstance {
144         svm.Lock()
145         defer svm.Unlock()
146         return stubInstance{
147                 svm:  svm,
148                 addr: svm.SSHService.Address(),
149                 // We deliberately return a cached/stale copy of the
150                 // real tags here, so that (Instance)Tags() sometimes
151                 // returns old data after a call to
152                 // (Instance)SetTags().  This is permitted by the
153                 // driver interface, and this might help remind
154                 // callers that they need to tolerate it.
155                 tags: copyTags(svm.tags),
156         }
157 }
158
159 func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
160         queue := svm.sis.driver.Queue
161         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
162         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
163                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
164                 return 1
165         }
166         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
167                 fmt.Fprintf(stderr, "cannot fork\n")
168                 return 2
169         }
170         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
171                 fmt.Fprint(stderr, "crunch-run: command not found\n")
172                 return 1
173         }
174         if strings.HasPrefix(command, "crunch-run --detach ") {
175                 svm.Lock()
176                 if svm.running == nil {
177                         svm.running = map[string]bool{}
178                 }
179                 svm.running[uuid] = true
180                 svm.Unlock()
181                 time.Sleep(svm.CrunchRunDetachDelay)
182                 fmt.Fprintf(stderr, "starting %s\n", uuid)
183                 logger := logrus.WithField("ContainerUUID", uuid)
184                 logger.Printf("[test] starting crunch-run stub")
185                 go func() {
186                         crashluck := math_rand.Float64()
187                         ctr, ok := queue.Get(uuid)
188                         if !ok {
189                                 logger.Print("[test] container not in queue")
190                                 return
191                         }
192                         if crashluck > svm.CrunchRunCrashRate/2 {
193                                 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
194                                 ctr.State = arvados.ContainerStateRunning
195                                 queue.Notify(ctr)
196                         }
197
198                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
199                         svm.Lock()
200                         _, running := svm.running[uuid]
201                         svm.Unlock()
202                         if !running {
203                                 logger.Print("[test] container was killed")
204                                 return
205                         }
206                         if svm.ExecuteContainer != nil {
207                                 ctr.ExitCode = svm.ExecuteContainer(ctr)
208                         }
209                         // TODO: Check whether the stub instance has
210                         // been destroyed, and if so, don't call
211                         // queue.Notify. Then "container finished
212                         // twice" can be classified as a bug.
213                         if crashluck < svm.CrunchRunCrashRate {
214                                 logger.Print("[test] crashing crunch-run stub")
215                         } else {
216                                 ctr.State = arvados.ContainerStateComplete
217                                 queue.Notify(ctr)
218                         }
219                         logger.Print("[test] exiting crunch-run stub")
220                         svm.Lock()
221                         defer svm.Unlock()
222                         delete(svm.running, uuid)
223                 }()
224                 return 0
225         }
226         if command == "crunch-run --list" {
227                 svm.Lock()
228                 defer svm.Unlock()
229                 for uuid := range svm.running {
230                         fmt.Fprintf(stdout, "%s\n", uuid)
231                 }
232                 return 0
233         }
234         if strings.HasPrefix(command, "crunch-run --kill ") {
235                 svm.Lock()
236                 defer svm.Unlock()
237                 if svm.running[uuid] {
238                         delete(svm.running, uuid)
239                 } else {
240                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
241                 }
242                 return 0
243         }
244         if command == "true" {
245                 return 0
246         }
247         fmt.Fprintf(stderr, "%q: command not found", command)
248         return 1
249 }
250
251 type stubInstance struct {
252         svm  *StubVM
253         addr string
254         tags cloud.InstanceTags
255 }
256
257 func (si stubInstance) ID() cloud.InstanceID {
258         return si.svm.id
259 }
260
261 func (si stubInstance) Address() string {
262         return si.addr
263 }
264
265 func (si stubInstance) Destroy(_ context.Context) error {
266         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
267                 return errors.New("instance could not be destroyed")
268         }
269         si.svm.SSHService.Close()
270         sis := si.svm.sis
271         sis.mtx.Lock()
272         defer sis.mtx.Unlock()
273         delete(sis.servers, si.svm.id)
274         return nil
275 }
276
277 func (si stubInstance) ProviderType() string {
278         return si.svm.providerType
279 }
280
281 func (si stubInstance) SetTags(_ context.Context, tags cloud.InstanceTags) error {
282         tags = copyTags(tags)
283         svm := si.svm
284         go func() {
285                 svm.Lock()
286                 defer svm.Unlock()
287                 svm.tags = tags
288         }()
289         return nil
290 }
291
292 func (si stubInstance) Tags() cloud.InstanceTags {
293         return si.tags
294 }
295
296 func (si stubInstance) String() string {
297         return string(si.svm.id)
298 }
299
300 func (si stubInstance) VerifyHostKey(_ context.Context, key ssh.PublicKey, client *ssh.Client) error {
301         buf := make([]byte, 512)
302         _, err := io.ReadFull(rand.Reader, buf)
303         if err != nil {
304                 return err
305         }
306         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
307         if err != nil {
308                 return err
309         }
310         return key.Verify(buf, sig)
311 }
312
313 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
314         dst := cloud.InstanceTags{}
315         for k, v := range src {
316                 dst[k] = v
317         }
318         return dst
319 }