]> git.arvados.org - arvados.git/blob - lib/dispatchcloud/test/stub_driver.go
14807: Expose instance IP addresses in logs and management API.
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "crypto/rand"
9         "encoding/json"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         math_rand "math/rand"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/lib/cloud"
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
29         HostKey        ssh.Signer
30         AuthorizedKeys []ssh.PublicKey
31
32         // SetupVM, if set, is called upon creation of each new
33         // StubVM. This is the caller's opportunity to customize the
34         // VM's error rate and other behaviors.
35         SetupVM func(*StubVM)
36
37         // StubVM's fake crunch-run uses this Queue to read and update
38         // container state.
39         Queue *Queue
40
41         // Frequency of artificially introduced errors on calls to
42         // Destroy. 0=always succeed, 1=always fail.
43         ErrorRateDestroy float64
44
45         // If Create() or Instances() is called too frequently, return
46         // rate-limiting errors.
47         MinTimeBetweenCreateCalls    time.Duration
48         MinTimeBetweenInstancesCalls time.Duration
49
50         // If true, Create and Destroy calls block until Release() is
51         // called.
52         HoldCloudOps bool
53
54         instanceSets []*StubInstanceSet
55         holdCloudOps chan bool
56 }
57
58 // InstanceSet returns a new *StubInstanceSet.
59 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
60         if sd.holdCloudOps == nil {
61                 sd.holdCloudOps = make(chan bool)
62         }
63         sis := StubInstanceSet{
64                 driver:  sd,
65                 servers: map[cloud.InstanceID]*StubVM{},
66         }
67         sd.instanceSets = append(sd.instanceSets, &sis)
68
69         var err error
70         if params != nil {
71                 err = json.Unmarshal(params, &sis)
72         }
73         return &sis, err
74 }
75
76 // InstanceSets returns all instances that have been created by the
77 // driver. This can be used to test a component that uses the driver
78 // but doesn't expose the InstanceSets it has created.
79 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
80         return sd.instanceSets
81 }
82
83 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
84 // are fewer than n blocked calls pending, it waits for the rest to
85 // arrive.
86 func (sd *StubDriver) ReleaseCloudOps(n int) {
87         for i := 0; i < n; i++ {
88                 <-sd.holdCloudOps
89         }
90 }
91
92 type StubInstanceSet struct {
93         driver  *StubDriver
94         servers map[cloud.InstanceID]*StubVM
95         mtx     sync.RWMutex
96         stopped bool
97
98         allowCreateCall    time.Time
99         allowInstancesCall time.Time
100 }
101
102 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
103         if sis.driver.HoldCloudOps {
104                 sis.driver.holdCloudOps <- true
105         }
106         sis.mtx.Lock()
107         defer sis.mtx.Unlock()
108         if sis.stopped {
109                 return nil, errors.New("StubInstanceSet: Create called after Stop")
110         }
111         if sis.allowCreateCall.After(time.Now()) {
112                 return nil, RateLimitError{sis.allowCreateCall}
113         } else {
114                 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
115         }
116
117         ak := sis.driver.AuthorizedKeys
118         if authKey != nil {
119                 ak = append([]ssh.PublicKey{authKey}, ak...)
120         }
121         svm := &StubVM{
122                 sis:          sis,
123                 id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
124                 tags:         copyTags(tags),
125                 providerType: it.ProviderType,
126         }
127         svm.SSHService = SSHService{
128                 HostKey:        sis.driver.HostKey,
129                 AuthorizedUser: "root",
130                 AuthorizedKeys: ak,
131                 Exec:           svm.Exec,
132         }
133         if setup := sis.driver.SetupVM; setup != nil {
134                 setup(svm)
135         }
136         sis.servers[svm.id] = svm
137         return svm.Instance(), nil
138 }
139
140 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
141         sis.mtx.RLock()
142         defer sis.mtx.RUnlock()
143         if sis.allowInstancesCall.After(time.Now()) {
144                 return nil, RateLimitError{sis.allowInstancesCall}
145         } else {
146                 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
147         }
148         var r []cloud.Instance
149         for _, ss := range sis.servers {
150                 r = append(r, ss.Instance())
151         }
152         return r, nil
153 }
154
155 func (sis *StubInstanceSet) Stop() {
156         sis.mtx.Lock()
157         defer sis.mtx.Unlock()
158         if sis.stopped {
159                 panic("Stop called twice")
160         }
161         sis.stopped = true
162 }
163
164 type RateLimitError struct{ Retry time.Time }
165
166 func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
167 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
168
169 // StubVM is a fake server that runs an SSH service. It represents a
170 // VM running in a fake cloud.
171 //
172 // Note this is distinct from a stubInstance, which is a snapshot of
173 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
174 // running (and might change IP addresses, shut down, etc.)  without
175 // updating any stubInstances that have been returned to callers.
176 type StubVM struct {
177         Boot                 time.Time
178         Broken               time.Time
179         CrunchRunMissing     bool
180         CrunchRunCrashRate   float64
181         CrunchRunDetachDelay time.Duration
182         ExecuteContainer     func(arvados.Container) int
183
184         sis          *StubInstanceSet
185         id           cloud.InstanceID
186         tags         cloud.InstanceTags
187         providerType string
188         SSHService   SSHService
189         running      map[string]bool
190         sync.Mutex
191 }
192
193 func (svm *StubVM) Instance() stubInstance {
194         svm.Lock()
195         defer svm.Unlock()
196         return stubInstance{
197                 svm:  svm,
198                 addr: svm.SSHService.Address(),
199                 // We deliberately return a cached/stale copy of the
200                 // real tags here, so that (Instance)Tags() sometimes
201                 // returns old data after a call to
202                 // (Instance)SetTags().  This is permitted by the
203                 // driver interface, and this might help remind
204                 // callers that they need to tolerate it.
205                 tags: copyTags(svm.tags),
206         }
207 }
208
209 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
210         stdinData, err := ioutil.ReadAll(stdin)
211         if err != nil {
212                 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
213                 return 1
214         }
215         queue := svm.sis.driver.Queue
216         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
217         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
218                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
219                 return 1
220         }
221         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
222                 fmt.Fprintf(stderr, "cannot fork\n")
223                 return 2
224         }
225         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
226                 fmt.Fprint(stderr, "crunch-run: command not found\n")
227                 return 1
228         }
229         if strings.HasPrefix(command, "source /dev/stdin; crunch-run --detach ") {
230                 stdinKV := map[string]string{}
231                 for _, line := range strings.Split(string(stdinData), "\n") {
232                         kv := strings.SplitN(strings.TrimPrefix(line, "export "), "=", 2)
233                         if len(kv) == 2 && len(kv[1]) > 0 {
234                                 stdinKV[kv[0]] = kv[1]
235                         }
236                 }
237                 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
238                         if stdinKV[name] == "" {
239                                 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
240                                 return 1
241                         }
242                 }
243                 svm.Lock()
244                 if svm.running == nil {
245                         svm.running = map[string]bool{}
246                 }
247                 svm.running[uuid] = true
248                 svm.Unlock()
249                 time.Sleep(svm.CrunchRunDetachDelay)
250                 fmt.Fprintf(stderr, "starting %s\n", uuid)
251                 logger := logrus.WithFields(logrus.Fields{
252                         "Instance":      svm.id,
253                         "ContainerUUID": uuid,
254                 })
255                 logger.Printf("[test] starting crunch-run stub")
256                 go func() {
257                         crashluck := math_rand.Float64()
258                         ctr, ok := queue.Get(uuid)
259                         if !ok {
260                                 logger.Print("[test] container not in queue")
261                                 return
262                         }
263                         if crashluck > svm.CrunchRunCrashRate/2 {
264                                 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
265                                 ctr.State = arvados.ContainerStateRunning
266                                 queue.Notify(ctr)
267                         }
268
269                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
270                         svm.Lock()
271                         _, running := svm.running[uuid]
272                         svm.Unlock()
273                         if !running {
274                                 logger.Print("[test] container was killed")
275                                 return
276                         }
277                         if svm.ExecuteContainer != nil {
278                                 ctr.ExitCode = svm.ExecuteContainer(ctr)
279                         }
280                         // TODO: Check whether the stub instance has
281                         // been destroyed, and if so, don't call
282                         // queue.Notify. Then "container finished
283                         // twice" can be classified as a bug.
284                         if crashluck < svm.CrunchRunCrashRate {
285                                 logger.Print("[test] crashing crunch-run stub")
286                         } else {
287                                 ctr.State = arvados.ContainerStateComplete
288                                 queue.Notify(ctr)
289                         }
290                         logger.Print("[test] exiting crunch-run stub")
291                         svm.Lock()
292                         defer svm.Unlock()
293                         delete(svm.running, uuid)
294                 }()
295                 return 0
296         }
297         if command == "crunch-run --list" {
298                 svm.Lock()
299                 defer svm.Unlock()
300                 for uuid := range svm.running {
301                         fmt.Fprintf(stdout, "%s\n", uuid)
302                 }
303                 return 0
304         }
305         if strings.HasPrefix(command, "crunch-run --kill ") {
306                 svm.Lock()
307                 defer svm.Unlock()
308                 if svm.running[uuid] {
309                         delete(svm.running, uuid)
310                 } else {
311                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
312                 }
313                 return 0
314         }
315         if command == "true" {
316                 return 0
317         }
318         fmt.Fprintf(stderr, "%q: command not found", command)
319         return 1
320 }
321
322 type stubInstance struct {
323         svm  *StubVM
324         addr string
325         tags cloud.InstanceTags
326 }
327
328 func (si stubInstance) ID() cloud.InstanceID {
329         return si.svm.id
330 }
331
332 func (si stubInstance) Address() string {
333         return si.addr
334 }
335
336 func (si stubInstance) RemoteUser() string {
337         return si.svm.SSHService.AuthorizedUser
338 }
339
340 func (si stubInstance) Destroy() error {
341         sis := si.svm.sis
342         if sis.driver.HoldCloudOps {
343                 sis.driver.holdCloudOps <- true
344         }
345         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
346                 return errors.New("instance could not be destroyed")
347         }
348         si.svm.SSHService.Close()
349         sis.mtx.Lock()
350         defer sis.mtx.Unlock()
351         delete(sis.servers, si.svm.id)
352         return nil
353 }
354
355 func (si stubInstance) ProviderType() string {
356         return si.svm.providerType
357 }
358
359 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
360         tags = copyTags(tags)
361         svm := si.svm
362         go func() {
363                 svm.Lock()
364                 defer svm.Unlock()
365                 svm.tags = tags
366         }()
367         return nil
368 }
369
370 func (si stubInstance) Tags() cloud.InstanceTags {
371         return si.tags
372 }
373
374 func (si stubInstance) String() string {
375         return string(si.svm.id)
376 }
377
378 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
379         buf := make([]byte, 512)
380         _, err := io.ReadFull(rand.Reader, buf)
381         if err != nil {
382                 return err
383         }
384         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
385         if err != nil {
386                 return err
387         }
388         return key.Verify(buf, sig)
389 }
390
391 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
392         dst := cloud.InstanceTags{}
393         for k, v := range src {
394                 dst[k] = v
395         }
396         return dst
397 }