14325: Don't shutdown busy VMs even if boot probe fails.
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         math_rand "math/rand"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/lib/cloud"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/mitchellh/mapstructure"
21         "github.com/sirupsen/logrus"
22         "golang.org/x/crypto/ssh"
23 )
24
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
28         HostKey        ssh.Signer
29         AuthorizedKeys []ssh.PublicKey
30
31         // SetupVM, if set, is called upon creation of each new
32         // StubVM. This is the caller's opportunity to customize the
33         // VM's error rate and other behaviors.
34         SetupVM func(*StubVM)
35
36         // StubVM's fake crunch-run uses this Queue to read and update
37         // container state.
38         Queue *Queue
39
40         // Frequency of artificially introduced errors on calls to
41         // Destroy. 0=always succeed, 1=always fail.
42         ErrorRateDestroy float64
43
44         // If Create() or Instances() is called too frequently, return
45         // rate-limiting errors.
46         MinTimeBetweenCreateCalls    time.Duration
47         MinTimeBetweenInstancesCalls time.Duration
48
49         instanceSets []*StubInstanceSet
50 }
51
52 // InstanceSet returns a new *StubInstanceSet.
53 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
54         sis := StubInstanceSet{
55                 driver:  sd,
56                 servers: map[cloud.InstanceID]*StubVM{},
57         }
58         sd.instanceSets = append(sd.instanceSets, &sis)
59         return &sis, mapstructure.Decode(params, &sis)
60 }
61
62 // InstanceSets returns all instances that have been created by the
63 // driver. This can be used to test a component that uses the driver
64 // but doesn't expose the InstanceSets it has created.
65 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
66         return sd.instanceSets
67 }
68
69 type StubInstanceSet struct {
70         driver  *StubDriver
71         servers map[cloud.InstanceID]*StubVM
72         mtx     sync.RWMutex
73         stopped bool
74
75         allowCreateCall    time.Time
76         allowInstancesCall time.Time
77 }
78
79 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
80         sis.mtx.Lock()
81         defer sis.mtx.Unlock()
82         if sis.stopped {
83                 return nil, errors.New("StubInstanceSet: Create called after Stop")
84         }
85         if sis.allowCreateCall.After(time.Now()) {
86                 return nil, RateLimitError{sis.allowCreateCall}
87         } else {
88                 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
89         }
90
91         ak := sis.driver.AuthorizedKeys
92         if authKey != nil {
93                 ak = append([]ssh.PublicKey{authKey}, ak...)
94         }
95         svm := &StubVM{
96                 sis:          sis,
97                 id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
98                 tags:         copyTags(tags),
99                 providerType: it.ProviderType,
100         }
101         svm.SSHService = SSHService{
102                 HostKey:        sis.driver.HostKey,
103                 AuthorizedKeys: ak,
104                 Exec:           svm.Exec,
105         }
106         if setup := sis.driver.SetupVM; setup != nil {
107                 setup(svm)
108         }
109         sis.servers[svm.id] = svm
110         return svm.Instance(), nil
111 }
112
113 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
114         sis.mtx.RLock()
115         defer sis.mtx.RUnlock()
116         if sis.allowInstancesCall.After(time.Now()) {
117                 return nil, RateLimitError{sis.allowInstancesCall}
118         } else {
119                 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
120         }
121         var r []cloud.Instance
122         for _, ss := range sis.servers {
123                 r = append(r, ss.Instance())
124         }
125         return r, nil
126 }
127
128 func (sis *StubInstanceSet) Stop() {
129         sis.mtx.Lock()
130         defer sis.mtx.Unlock()
131         if sis.stopped {
132                 panic("Stop called twice")
133         }
134         sis.stopped = true
135 }
136
137 type RateLimitError struct{ Retry time.Time }
138
139 func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
140 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
141
142 // StubVM is a fake server that runs an SSH service. It represents a
143 // VM running in a fake cloud.
144 //
145 // Note this is distinct from a stubInstance, which is a snapshot of
146 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
147 // running (and might change IP addresses, shut down, etc.)  without
148 // updating any stubInstances that have been returned to callers.
149 type StubVM struct {
150         Boot                 time.Time
151         Broken               time.Time
152         CrunchRunMissing     bool
153         CrunchRunCrashRate   float64
154         CrunchRunDetachDelay time.Duration
155         ExecuteContainer     func(arvados.Container) int
156
157         sis          *StubInstanceSet
158         id           cloud.InstanceID
159         tags         cloud.InstanceTags
160         providerType string
161         SSHService   SSHService
162         running      map[string]bool
163         sync.Mutex
164 }
165
166 func (svm *StubVM) Instance() stubInstance {
167         svm.Lock()
168         defer svm.Unlock()
169         return stubInstance{
170                 svm:  svm,
171                 addr: svm.SSHService.Address(),
172                 // We deliberately return a cached/stale copy of the
173                 // real tags here, so that (Instance)Tags() sometimes
174                 // returns old data after a call to
175                 // (Instance)SetTags().  This is permitted by the
176                 // driver interface, and this might help remind
177                 // callers that they need to tolerate it.
178                 tags: copyTags(svm.tags),
179         }
180 }
181
182 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
183         queue := svm.sis.driver.Queue
184         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
185         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
186                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
187                 return 1
188         }
189         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
190                 fmt.Fprintf(stderr, "cannot fork\n")
191                 return 2
192         }
193         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
194                 fmt.Fprint(stderr, "crunch-run: command not found\n")
195                 return 1
196         }
197         if strings.HasPrefix(command, "crunch-run --detach ") {
198                 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
199                         if env[name] == "" {
200                                 fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
201                                 return 1
202                         }
203                 }
204                 svm.Lock()
205                 if svm.running == nil {
206                         svm.running = map[string]bool{}
207                 }
208                 svm.running[uuid] = true
209                 svm.Unlock()
210                 time.Sleep(svm.CrunchRunDetachDelay)
211                 fmt.Fprintf(stderr, "starting %s\n", uuid)
212                 logger := logrus.WithFields(logrus.Fields{
213                         "Instance":      svm.id,
214                         "ContainerUUID": uuid,
215                 })
216                 logger.Printf("[test] starting crunch-run stub")
217                 go func() {
218                         crashluck := math_rand.Float64()
219                         ctr, ok := queue.Get(uuid)
220                         if !ok {
221                                 logger.Print("[test] container not in queue")
222                                 return
223                         }
224                         if crashluck > svm.CrunchRunCrashRate/2 {
225                                 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
226                                 ctr.State = arvados.ContainerStateRunning
227                                 queue.Notify(ctr)
228                         }
229
230                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
231                         svm.Lock()
232                         _, running := svm.running[uuid]
233                         svm.Unlock()
234                         if !running {
235                                 logger.Print("[test] container was killed")
236                                 return
237                         }
238                         if svm.ExecuteContainer != nil {
239                                 ctr.ExitCode = svm.ExecuteContainer(ctr)
240                         }
241                         // TODO: Check whether the stub instance has
242                         // been destroyed, and if so, don't call
243                         // queue.Notify. Then "container finished
244                         // twice" can be classified as a bug.
245                         if crashluck < svm.CrunchRunCrashRate {
246                                 logger.Print("[test] crashing crunch-run stub")
247                         } else {
248                                 ctr.State = arvados.ContainerStateComplete
249                                 queue.Notify(ctr)
250                         }
251                         logger.Print("[test] exiting crunch-run stub")
252                         svm.Lock()
253                         defer svm.Unlock()
254                         delete(svm.running, uuid)
255                 }()
256                 return 0
257         }
258         if command == "crunch-run --list" {
259                 svm.Lock()
260                 defer svm.Unlock()
261                 for uuid := range svm.running {
262                         fmt.Fprintf(stdout, "%s\n", uuid)
263                 }
264                 return 0
265         }
266         if strings.HasPrefix(command, "crunch-run --kill ") {
267                 svm.Lock()
268                 defer svm.Unlock()
269                 if svm.running[uuid] {
270                         delete(svm.running, uuid)
271                 } else {
272                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
273                 }
274                 return 0
275         }
276         if command == "true" {
277                 return 0
278         }
279         fmt.Fprintf(stderr, "%q: command not found", command)
280         return 1
281 }
282
283 type stubInstance struct {
284         svm  *StubVM
285         addr string
286         tags cloud.InstanceTags
287 }
288
289 func (si stubInstance) ID() cloud.InstanceID {
290         return si.svm.id
291 }
292
293 func (si stubInstance) Address() string {
294         return si.addr
295 }
296
297 func (si stubInstance) Destroy() error {
298         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
299                 return errors.New("instance could not be destroyed")
300         }
301         si.svm.SSHService.Close()
302         sis := si.svm.sis
303         sis.mtx.Lock()
304         defer sis.mtx.Unlock()
305         delete(sis.servers, si.svm.id)
306         return nil
307 }
308
309 func (si stubInstance) ProviderType() string {
310         return si.svm.providerType
311 }
312
313 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
314         tags = copyTags(tags)
315         svm := si.svm
316         go func() {
317                 svm.Lock()
318                 defer svm.Unlock()
319                 svm.tags = tags
320         }()
321         return nil
322 }
323
324 func (si stubInstance) Tags() cloud.InstanceTags {
325         return si.tags
326 }
327
328 func (si stubInstance) String() string {
329         return string(si.svm.id)
330 }
331
332 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
333         buf := make([]byte, 512)
334         _, err := io.ReadFull(rand.Reader, buf)
335         if err != nil {
336                 return err
337         }
338         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
339         if err != nil {
340                 return err
341         }
342         return key.Verify(buf, sig)
343 }
344
345 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
346         dst := cloud.InstanceTags{}
347         for k, v := range src {
348                 dst[k] = v
349         }
350         return dst
351 }