Merge branch 'master' into 14360-dispatch-cloud
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         math_rand "math/rand"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/lib/cloud"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/Sirupsen/logrus"
21         "github.com/mitchellh/mapstructure"
22         "golang.org/x/crypto/ssh"
23 )
24
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
28         HostKey        ssh.Signer
29         AuthorizedKeys []ssh.PublicKey
30
31         // SetupVM, if set, is called upon creation of each new VM.
32         SetupVM          func(*StubVM)
33         ErrorRateDestroy float64
34         Queue            *Queue
35
36         instanceSets []*StubInstanceSet
37 }
38
39 // InstanceSet returns a new *StubInstanceSet.
40 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
41         sis := StubInstanceSet{
42                 driver:  sd,
43                 servers: map[cloud.InstanceID]*StubVM{},
44         }
45         sd.instanceSets = append(sd.instanceSets, &sis)
46         return &sis, mapstructure.Decode(params, &sis)
47 }
48
49 // InstanceSets returns all instances that have been created by the
50 // driver. This can be used to test a component that uses the driver
51 // but doesn't expose the InstanceSets it has created.
52 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
53         return sd.instanceSets
54 }
55
56 type StubInstanceSet struct {
57         driver  *StubDriver
58         servers map[cloud.InstanceID]*StubVM
59         mtx     sync.RWMutex
60         stopped bool
61 }
62
63 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
64         sis.mtx.Lock()
65         defer sis.mtx.Unlock()
66         if sis.stopped {
67                 return nil, errors.New("StubInstanceSet: Create called after Stop")
68         }
69         ak := sis.driver.AuthorizedKeys
70         if authKey != nil {
71                 ak = append([]ssh.PublicKey{authKey}, ak...)
72         }
73         svm := &StubVM{
74                 sis:          sis,
75                 id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
76                 tags:         copyTags(tags),
77                 providerType: it.ProviderType,
78         }
79         svm.SSHService = SSHService{
80                 HostKey:        sis.driver.HostKey,
81                 AuthorizedKeys: ak,
82                 Exec:           svm.Exec,
83         }
84         if setup := sis.driver.SetupVM; setup != nil {
85                 setup(svm)
86         }
87         sis.servers[svm.id] = svm
88         return svm.Instance(), nil
89 }
90
91 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
92         sis.mtx.RLock()
93         defer sis.mtx.RUnlock()
94         var r []cloud.Instance
95         for _, ss := range sis.servers {
96                 r = append(r, ss.Instance())
97         }
98         return r, nil
99 }
100
101 func (sis *StubInstanceSet) Stop() {
102         sis.mtx.Lock()
103         defer sis.mtx.Unlock()
104         if sis.stopped {
105                 panic("Stop called twice")
106         }
107         sis.stopped = true
108 }
109
110 // StubVM is a fake server that runs an SSH service. It represents a
111 // VM running in a fake cloud.
112 //
113 // Note this is distinct from a stubInstance, which is a snapshot of
114 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
115 // running (and might change IP addresses, shut down, etc.)  without
116 // updating any stubInstances that have been returned to callers.
117 type StubVM struct {
118         Boot                 time.Time
119         Broken               time.Time
120         CrunchRunMissing     bool
121         CrunchRunCrashRate   float64
122         CrunchRunDetachDelay time.Duration
123         CtrExit              int
124         OnCancel             func(string)
125         OnComplete           func(string)
126
127         sis          *StubInstanceSet
128         id           cloud.InstanceID
129         tags         cloud.InstanceTags
130         providerType string
131         SSHService   SSHService
132         running      map[string]bool
133         sync.Mutex
134 }
135
136 func (svm *StubVM) Instance() stubInstance {
137         svm.Lock()
138         defer svm.Unlock()
139         return stubInstance{
140                 svm:  svm,
141                 addr: svm.SSHService.Address(),
142                 // We deliberately return a cached/stale copy of the
143                 // real tags here, so that (Instance)Tags() sometimes
144                 // returns old data after a call to
145                 // (Instance)SetTags().  This is permitted by the
146                 // driver interface, and this might help remind
147                 // callers that they need to tolerate it.
148                 tags: copyTags(svm.tags),
149         }
150 }
151
152 func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
153         queue := svm.sis.driver.Queue
154         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
155         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
156                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
157                 return 1
158         }
159         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
160                 fmt.Fprintf(stderr, "cannot fork\n")
161                 return 2
162         }
163         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
164                 fmt.Fprint(stderr, "crunch-run: command not found\n")
165                 return 1
166         }
167         if strings.HasPrefix(command, "crunch-run --detach ") {
168                 svm.Lock()
169                 if svm.running == nil {
170                         svm.running = map[string]bool{}
171                 }
172                 svm.running[uuid] = true
173                 svm.Unlock()
174                 time.Sleep(svm.CrunchRunDetachDelay)
175                 fmt.Fprintf(stderr, "starting %s\n", uuid)
176                 logger := logrus.WithField("ContainerUUID", uuid)
177                 logger.Printf("[test] starting crunch-run stub")
178                 go func() {
179                         crashluck := math_rand.Float64()
180                         ctr, ok := queue.Get(uuid)
181                         if !ok {
182                                 logger.Print("[test] container not in queue")
183                                 return
184                         }
185                         if crashluck > svm.CrunchRunCrashRate/2 {
186                                 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
187                                 ctr.State = arvados.ContainerStateRunning
188                                 queue.Notify(ctr)
189                         }
190
191                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
192                         svm.Lock()
193                         _, running := svm.running[uuid]
194                         svm.Unlock()
195                         if !running {
196                                 logger.Print("[test] container was killed")
197                                 return
198                         }
199                         // TODO: Check whether the stub instance has
200                         // been destroyed, and if so, don't call
201                         // onComplete. Then "container finished twice"
202                         // can be classified as a bug.
203                         if crashluck < svm.CrunchRunCrashRate {
204                                 logger.Print("[test] crashing crunch-run stub")
205                                 if svm.OnCancel != nil && ctr.State == arvados.ContainerStateRunning {
206                                         svm.OnCancel(uuid)
207                                 }
208                         } else {
209                                 ctr.State = arvados.ContainerStateComplete
210                                 ctr.ExitCode = svm.CtrExit
211                                 queue.Notify(ctr)
212                                 if svm.OnComplete != nil {
213                                         svm.OnComplete(uuid)
214                                 }
215                         }
216                         logger.Print("[test] exiting crunch-run stub")
217                         svm.Lock()
218                         defer svm.Unlock()
219                         delete(svm.running, uuid)
220                 }()
221                 return 0
222         }
223         if command == "crunch-run --list" {
224                 svm.Lock()
225                 defer svm.Unlock()
226                 for uuid := range svm.running {
227                         fmt.Fprintf(stdout, "%s\n", uuid)
228                 }
229                 return 0
230         }
231         if strings.HasPrefix(command, "crunch-run --kill ") {
232                 svm.Lock()
233                 defer svm.Unlock()
234                 if svm.running[uuid] {
235                         delete(svm.running, uuid)
236                 } else {
237                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
238                 }
239                 return 0
240         }
241         if command == "true" {
242                 return 0
243         }
244         fmt.Fprintf(stderr, "%q: command not found", command)
245         return 1
246 }
247
248 type stubInstance struct {
249         svm  *StubVM
250         addr string
251         tags cloud.InstanceTags
252 }
253
254 func (si stubInstance) ID() cloud.InstanceID {
255         return si.svm.id
256 }
257
258 func (si stubInstance) Address() string {
259         return si.addr
260 }
261
262 func (si stubInstance) Destroy() error {
263         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
264                 return errors.New("instance could not be destroyed")
265         }
266         si.svm.SSHService.Close()
267         sis := si.svm.sis
268         sis.mtx.Lock()
269         defer sis.mtx.Unlock()
270         delete(sis.servers, si.svm.id)
271         return nil
272 }
273
274 func (si stubInstance) ProviderType() string {
275         return si.svm.providerType
276 }
277
278 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
279         tags = copyTags(tags)
280         svm := si.svm
281         go func() {
282                 svm.Lock()
283                 defer svm.Unlock()
284                 svm.tags = tags
285         }()
286         return nil
287 }
288
289 func (si stubInstance) Tags() cloud.InstanceTags {
290         return si.tags
291 }
292
293 func (si stubInstance) String() string {
294         return string(si.svm.id)
295 }
296
297 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
298         buf := make([]byte, 512)
299         _, err := io.ReadFull(rand.Reader, buf)
300         if err != nil {
301                 return err
302         }
303         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
304         if err != nil {
305                 return err
306         }
307         return key.Verify(buf, sig)
308 }
309
310 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
311         dst := cloud.InstanceTags{}
312         for k, v := range src {
313                 dst[k] = v
314         }
315         return dst
316 }