14325: Propagate API env vars to crunch-run.
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         math_rand "math/rand"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/lib/cloud"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/mitchellh/mapstructure"
21         "github.com/sirupsen/logrus"
22         "golang.org/x/crypto/ssh"
23 )
24
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
28         HostKey        ssh.Signer
29         AuthorizedKeys []ssh.PublicKey
30
31         // SetupVM, if set, is called upon creation of each new
32         // StubVM. This is the caller's opportunity to customize the
33         // VM's error rate and other behaviors.
34         SetupVM func(*StubVM)
35
36         // StubVM's fake crunch-run uses this Queue to read and update
37         // container state.
38         Queue *Queue
39
40         // Frequency of artificially introduced errors on calls to
41         // Destroy. 0=always succeed, 1=always fail.
42         ErrorRateDestroy float64
43
44         instanceSets []*StubInstanceSet
45 }
46
47 // InstanceSet returns a new *StubInstanceSet.
48 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID,
49         logger logrus.FieldLogger) (cloud.InstanceSet, error) {
50
51         sis := StubInstanceSet{
52                 driver:  sd,
53                 servers: map[cloud.InstanceID]*StubVM{},
54         }
55         sd.instanceSets = append(sd.instanceSets, &sis)
56         return &sis, mapstructure.Decode(params, &sis)
57 }
58
59 // InstanceSets returns all instances that have been created by the
60 // driver. This can be used to test a component that uses the driver
61 // but doesn't expose the InstanceSets it has created.
62 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
63         return sd.instanceSets
64 }
65
66 type StubInstanceSet struct {
67         driver  *StubDriver
68         servers map[cloud.InstanceID]*StubVM
69         mtx     sync.RWMutex
70         stopped bool
71 }
72
73 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
74         sis.mtx.Lock()
75         defer sis.mtx.Unlock()
76         if sis.stopped {
77                 return nil, errors.New("StubInstanceSet: Create called after Stop")
78         }
79         ak := sis.driver.AuthorizedKeys
80         if authKey != nil {
81                 ak = append([]ssh.PublicKey{authKey}, ak...)
82         }
83         svm := &StubVM{
84                 sis:          sis,
85                 id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
86                 tags:         copyTags(tags),
87                 providerType: it.ProviderType,
88         }
89         svm.SSHService = SSHService{
90                 HostKey:        sis.driver.HostKey,
91                 AuthorizedKeys: ak,
92                 Exec:           svm.Exec,
93         }
94         if setup := sis.driver.SetupVM; setup != nil {
95                 setup(svm)
96         }
97         sis.servers[svm.id] = svm
98         return svm.Instance(), nil
99 }
100
101 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
102         sis.mtx.RLock()
103         defer sis.mtx.RUnlock()
104         var r []cloud.Instance
105         for _, ss := range sis.servers {
106                 r = append(r, ss.Instance())
107         }
108         return r, nil
109 }
110
111 func (sis *StubInstanceSet) Stop() {
112         sis.mtx.Lock()
113         defer sis.mtx.Unlock()
114         if sis.stopped {
115                 panic("Stop called twice")
116         }
117         sis.stopped = true
118 }
119
120 // StubVM is a fake server that runs an SSH service. It represents a
121 // VM running in a fake cloud.
122 //
123 // Note this is distinct from a stubInstance, which is a snapshot of
124 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
125 // running (and might change IP addresses, shut down, etc.)  without
126 // updating any stubInstances that have been returned to callers.
127 type StubVM struct {
128         Boot                 time.Time
129         Broken               time.Time
130         CrunchRunMissing     bool
131         CrunchRunCrashRate   float64
132         CrunchRunDetachDelay time.Duration
133         ExecuteContainer     func(arvados.Container) int
134
135         sis          *StubInstanceSet
136         id           cloud.InstanceID
137         tags         cloud.InstanceTags
138         providerType string
139         SSHService   SSHService
140         running      map[string]bool
141         sync.Mutex
142 }
143
144 func (svm *StubVM) Instance() stubInstance {
145         svm.Lock()
146         defer svm.Unlock()
147         return stubInstance{
148                 svm:  svm,
149                 addr: svm.SSHService.Address(),
150                 // We deliberately return a cached/stale copy of the
151                 // real tags here, so that (Instance)Tags() sometimes
152                 // returns old data after a call to
153                 // (Instance)SetTags().  This is permitted by the
154                 // driver interface, and this might help remind
155                 // callers that they need to tolerate it.
156                 tags: copyTags(svm.tags),
157         }
158 }
159
160 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
161         queue := svm.sis.driver.Queue
162         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
163         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
164                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
165                 return 1
166         }
167         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
168                 fmt.Fprintf(stderr, "cannot fork\n")
169                 return 2
170         }
171         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
172                 fmt.Fprint(stderr, "crunch-run: command not found\n")
173                 return 1
174         }
175         if strings.HasPrefix(command, "crunch-run --detach ") {
176                 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
177                         if env[name] == "" {
178                                 fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
179                                 return 1
180                         }
181                 }
182                 svm.Lock()
183                 if svm.running == nil {
184                         svm.running = map[string]bool{}
185                 }
186                 svm.running[uuid] = true
187                 svm.Unlock()
188                 time.Sleep(svm.CrunchRunDetachDelay)
189                 fmt.Fprintf(stderr, "starting %s\n", uuid)
190                 logger := logrus.WithField("ContainerUUID", uuid)
191                 logger.Printf("[test] starting crunch-run stub")
192                 go func() {
193                         crashluck := math_rand.Float64()
194                         ctr, ok := queue.Get(uuid)
195                         if !ok {
196                                 logger.Print("[test] container not in queue")
197                                 return
198                         }
199                         if crashluck > svm.CrunchRunCrashRate/2 {
200                                 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
201                                 ctr.State = arvados.ContainerStateRunning
202                                 queue.Notify(ctr)
203                         }
204
205                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
206                         svm.Lock()
207                         _, running := svm.running[uuid]
208                         svm.Unlock()
209                         if !running {
210                                 logger.Print("[test] container was killed")
211                                 return
212                         }
213                         if svm.ExecuteContainer != nil {
214                                 ctr.ExitCode = svm.ExecuteContainer(ctr)
215                         }
216                         // TODO: Check whether the stub instance has
217                         // been destroyed, and if so, don't call
218                         // queue.Notify. Then "container finished
219                         // twice" can be classified as a bug.
220                         if crashluck < svm.CrunchRunCrashRate {
221                                 logger.Print("[test] crashing crunch-run stub")
222                         } else {
223                                 ctr.State = arvados.ContainerStateComplete
224                                 queue.Notify(ctr)
225                         }
226                         logger.Print("[test] exiting crunch-run stub")
227                         svm.Lock()
228                         defer svm.Unlock()
229                         delete(svm.running, uuid)
230                 }()
231                 return 0
232         }
233         if command == "crunch-run --list" {
234                 svm.Lock()
235                 defer svm.Unlock()
236                 for uuid := range svm.running {
237                         fmt.Fprintf(stdout, "%s\n", uuid)
238                 }
239                 return 0
240         }
241         if strings.HasPrefix(command, "crunch-run --kill ") {
242                 svm.Lock()
243                 defer svm.Unlock()
244                 if svm.running[uuid] {
245                         delete(svm.running, uuid)
246                 } else {
247                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
248                 }
249                 return 0
250         }
251         if command == "true" {
252                 return 0
253         }
254         fmt.Fprintf(stderr, "%q: command not found", command)
255         return 1
256 }
257
258 type stubInstance struct {
259         svm  *StubVM
260         addr string
261         tags cloud.InstanceTags
262 }
263
264 func (si stubInstance) ID() cloud.InstanceID {
265         return si.svm.id
266 }
267
268 func (si stubInstance) Address() string {
269         return si.addr
270 }
271
272 func (si stubInstance) Destroy() error {
273         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
274                 return errors.New("instance could not be destroyed")
275         }
276         si.svm.SSHService.Close()
277         sis := si.svm.sis
278         sis.mtx.Lock()
279         defer sis.mtx.Unlock()
280         delete(sis.servers, si.svm.id)
281         return nil
282 }
283
284 func (si stubInstance) ProviderType() string {
285         return si.svm.providerType
286 }
287
288 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
289         tags = copyTags(tags)
290         svm := si.svm
291         go func() {
292                 svm.Lock()
293                 defer svm.Unlock()
294                 svm.tags = tags
295         }()
296         return nil
297 }
298
299 func (si stubInstance) Tags() cloud.InstanceTags {
300         return si.tags
301 }
302
303 func (si stubInstance) String() string {
304         return string(si.svm.id)
305 }
306
307 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
308         buf := make([]byte, 512)
309         _, err := io.ReadFull(rand.Reader, buf)
310         if err != nil {
311                 return err
312         }
313         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
314         if err != nil {
315                 return err
316         }
317         return key.Verify(buf, sig)
318 }
319
320 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
321         dst := cloud.InstanceTags{}
322         for k, v := range src {
323                 dst[k] = v
324         }
325         return dst
326 }