Merge branch '14324-cdc-azure' refs #14324
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         math_rand "math/rand"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/lib/cloud"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/mitchellh/mapstructure"
21         "github.com/sirupsen/logrus"
22         "golang.org/x/crypto/ssh"
23 )
24
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
28         HostKey        ssh.Signer
29         AuthorizedKeys []ssh.PublicKey
30
31         // SetupVM, if set, is called upon creation of each new
32         // StubVM. This is the caller's opportunity to customize the
33         // VM's error rate and other behaviors.
34         SetupVM func(*StubVM)
35
36         // StubVM's fake crunch-run uses this Queue to read and update
37         // container state.
38         Queue *Queue
39
40         // Frequency of artificially introduced errors on calls to
41         // Destroy. 0=always succeed, 1=always fail.
42         ErrorRateDestroy float64
43
44         instanceSets []*StubInstanceSet
45 }
46
47 // InstanceSet returns a new *StubInstanceSet.
48 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID,
49         logger logrus.FieldLogger) (cloud.InstanceSet, error) {
50
51         sis := StubInstanceSet{
52                 driver:  sd,
53                 servers: map[cloud.InstanceID]*StubVM{},
54         }
55         sd.instanceSets = append(sd.instanceSets, &sis)
56         return &sis, mapstructure.Decode(params, &sis)
57 }
58
59 // InstanceSets returns all instances that have been created by the
60 // driver. This can be used to test a component that uses the driver
61 // but doesn't expose the InstanceSets it has created.
62 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
63         return sd.instanceSets
64 }
65
66 type StubInstanceSet struct {
67         driver  *StubDriver
68         servers map[cloud.InstanceID]*StubVM
69         mtx     sync.RWMutex
70         stopped bool
71 }
72
73 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
74         sis.mtx.Lock()
75         defer sis.mtx.Unlock()
76         if sis.stopped {
77                 return nil, errors.New("StubInstanceSet: Create called after Stop")
78         }
79         ak := sis.driver.AuthorizedKeys
80         if authKey != nil {
81                 ak = append([]ssh.PublicKey{authKey}, ak...)
82         }
83         svm := &StubVM{
84                 sis:          sis,
85                 id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
86                 tags:         copyTags(tags),
87                 providerType: it.ProviderType,
88         }
89         svm.SSHService = SSHService{
90                 HostKey:        sis.driver.HostKey,
91                 AuthorizedKeys: ak,
92                 Exec:           svm.Exec,
93         }
94         if setup := sis.driver.SetupVM; setup != nil {
95                 setup(svm)
96         }
97         sis.servers[svm.id] = svm
98         return svm.Instance(), nil
99 }
100
101 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
102         sis.mtx.RLock()
103         defer sis.mtx.RUnlock()
104         var r []cloud.Instance
105         for _, ss := range sis.servers {
106                 r = append(r, ss.Instance())
107         }
108         return r, nil
109 }
110
111 func (sis *StubInstanceSet) Stop() {
112         sis.mtx.Lock()
113         defer sis.mtx.Unlock()
114         if sis.stopped {
115                 panic("Stop called twice")
116         }
117         sis.stopped = true
118 }
119
120 // StubVM is a fake server that runs an SSH service. It represents a
121 // VM running in a fake cloud.
122 //
123 // Note this is distinct from a stubInstance, which is a snapshot of
124 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
125 // running (and might change IP addresses, shut down, etc.)  without
126 // updating any stubInstances that have been returned to callers.
127 type StubVM struct {
128         Boot                 time.Time
129         Broken               time.Time
130         CrunchRunMissing     bool
131         CrunchRunCrashRate   float64
132         CrunchRunDetachDelay time.Duration
133         ExecuteContainer     func(arvados.Container) int
134
135         sis          *StubInstanceSet
136         id           cloud.InstanceID
137         tags         cloud.InstanceTags
138         providerType string
139         SSHService   SSHService
140         running      map[string]bool
141         sync.Mutex
142 }
143
144 func (svm *StubVM) Instance() stubInstance {
145         svm.Lock()
146         defer svm.Unlock()
147         return stubInstance{
148                 svm:  svm,
149                 addr: svm.SSHService.Address(),
150                 // We deliberately return a cached/stale copy of the
151                 // real tags here, so that (Instance)Tags() sometimes
152                 // returns old data after a call to
153                 // (Instance)SetTags().  This is permitted by the
154                 // driver interface, and this might help remind
155                 // callers that they need to tolerate it.
156                 tags: copyTags(svm.tags),
157         }
158 }
159
160 func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
161         queue := svm.sis.driver.Queue
162         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
163         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
164                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
165                 return 1
166         }
167         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
168                 fmt.Fprintf(stderr, "cannot fork\n")
169                 return 2
170         }
171         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
172                 fmt.Fprint(stderr, "crunch-run: command not found\n")
173                 return 1
174         }
175         if strings.HasPrefix(command, "crunch-run --detach ") {
176                 svm.Lock()
177                 if svm.running == nil {
178                         svm.running = map[string]bool{}
179                 }
180                 svm.running[uuid] = true
181                 svm.Unlock()
182                 time.Sleep(svm.CrunchRunDetachDelay)
183                 fmt.Fprintf(stderr, "starting %s\n", uuid)
184                 logger := logrus.WithField("ContainerUUID", uuid)
185                 logger.Printf("[test] starting crunch-run stub")
186                 go func() {
187                         crashluck := math_rand.Float64()
188                         ctr, ok := queue.Get(uuid)
189                         if !ok {
190                                 logger.Print("[test] container not in queue")
191                                 return
192                         }
193                         if crashluck > svm.CrunchRunCrashRate/2 {
194                                 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
195                                 ctr.State = arvados.ContainerStateRunning
196                                 queue.Notify(ctr)
197                         }
198
199                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
200                         svm.Lock()
201                         _, running := svm.running[uuid]
202                         svm.Unlock()
203                         if !running {
204                                 logger.Print("[test] container was killed")
205                                 return
206                         }
207                         if svm.ExecuteContainer != nil {
208                                 ctr.ExitCode = svm.ExecuteContainer(ctr)
209                         }
210                         // TODO: Check whether the stub instance has
211                         // been destroyed, and if so, don't call
212                         // queue.Notify. Then "container finished
213                         // twice" can be classified as a bug.
214                         if crashluck < svm.CrunchRunCrashRate {
215                                 logger.Print("[test] crashing crunch-run stub")
216                         } else {
217                                 ctr.State = arvados.ContainerStateComplete
218                                 queue.Notify(ctr)
219                         }
220                         logger.Print("[test] exiting crunch-run stub")
221                         svm.Lock()
222                         defer svm.Unlock()
223                         delete(svm.running, uuid)
224                 }()
225                 return 0
226         }
227         if command == "crunch-run --list" {
228                 svm.Lock()
229                 defer svm.Unlock()
230                 for uuid := range svm.running {
231                         fmt.Fprintf(stdout, "%s\n", uuid)
232                 }
233                 return 0
234         }
235         if strings.HasPrefix(command, "crunch-run --kill ") {
236                 svm.Lock()
237                 defer svm.Unlock()
238                 if svm.running[uuid] {
239                         delete(svm.running, uuid)
240                 } else {
241                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
242                 }
243                 return 0
244         }
245         if command == "true" {
246                 return 0
247         }
248         fmt.Fprintf(stderr, "%q: command not found", command)
249         return 1
250 }
251
252 type stubInstance struct {
253         svm  *StubVM
254         addr string
255         tags cloud.InstanceTags
256 }
257
258 func (si stubInstance) ID() cloud.InstanceID {
259         return si.svm.id
260 }
261
262 func (si stubInstance) Address() string {
263         return si.addr
264 }
265
266 func (si stubInstance) Destroy() error {
267         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
268                 return errors.New("instance could not be destroyed")
269         }
270         si.svm.SSHService.Close()
271         sis := si.svm.sis
272         sis.mtx.Lock()
273         defer sis.mtx.Unlock()
274         delete(sis.servers, si.svm.id)
275         return nil
276 }
277
278 func (si stubInstance) ProviderType() string {
279         return si.svm.providerType
280 }
281
282 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
283         tags = copyTags(tags)
284         svm := si.svm
285         go func() {
286                 svm.Lock()
287                 defer svm.Unlock()
288                 svm.tags = tags
289         }()
290         return nil
291 }
292
293 func (si stubInstance) Tags() cloud.InstanceTags {
294         return si.tags
295 }
296
297 func (si stubInstance) String() string {
298         return string(si.svm.id)
299 }
300
301 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
302         buf := make([]byte, 512)
303         _, err := io.ReadFull(rand.Reader, buf)
304         if err != nil {
305                 return err
306         }
307         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
308         if err != nil {
309                 return err
310         }
311         return key.Verify(buf, sig)
312 }
313
314 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
315         dst := cloud.InstanceTags{}
316         for k, v := range src {
317                 dst[k] = v
318         }
319         return dst
320 }