14324: Use logrus in Azure driver. Fix Sirupsen->sirupsen in imports
[arvados.git] / lib / dispatchcloud / test / stub_driver.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package test
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         math_rand "math/rand"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/lib/cloud"
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/mitchellh/mapstructure"
21         "github.com/sirupsen/logrus"
22         "golang.org/x/crypto/ssh"
23 )
24
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
28         HostKey        ssh.Signer
29         AuthorizedKeys []ssh.PublicKey
30
31         // SetupVM, if set, is called upon creation of each new
32         // StubVM. This is the caller's opportunity to customize the
33         // VM's error rate and other behaviors.
34         SetupVM func(*StubVM)
35
36         // StubVM's fake crunch-run uses this Queue to read and update
37         // container state.
38         Queue *Queue
39
40         // Frequency of artificially introduced errors on calls to
41         // Destroy. 0=always succeed, 1=always fail.
42         ErrorRateDestroy float64
43
44         instanceSets []*StubInstanceSet
45 }
46
47 // InstanceSet returns a new *StubInstanceSet.
48 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
49         sis := StubInstanceSet{
50                 driver:  sd,
51                 servers: map[cloud.InstanceID]*StubVM{},
52         }
53         sd.instanceSets = append(sd.instanceSets, &sis)
54         return &sis, mapstructure.Decode(params, &sis)
55 }
56
57 // InstanceSets returns all instances that have been created by the
58 // driver. This can be used to test a component that uses the driver
59 // but doesn't expose the InstanceSets it has created.
60 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
61         return sd.instanceSets
62 }
63
64 type StubInstanceSet struct {
65         driver  *StubDriver
66         servers map[cloud.InstanceID]*StubVM
67         mtx     sync.RWMutex
68         stopped bool
69 }
70
71 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
72         sis.mtx.Lock()
73         defer sis.mtx.Unlock()
74         if sis.stopped {
75                 return nil, errors.New("StubInstanceSet: Create called after Stop")
76         }
77         ak := sis.driver.AuthorizedKeys
78         if authKey != nil {
79                 ak = append([]ssh.PublicKey{authKey}, ak...)
80         }
81         svm := &StubVM{
82                 sis:          sis,
83                 id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
84                 tags:         copyTags(tags),
85                 providerType: it.ProviderType,
86         }
87         svm.SSHService = SSHService{
88                 HostKey:        sis.driver.HostKey,
89                 AuthorizedKeys: ak,
90                 Exec:           svm.Exec,
91         }
92         if setup := sis.driver.SetupVM; setup != nil {
93                 setup(svm)
94         }
95         sis.servers[svm.id] = svm
96         return svm.Instance(), nil
97 }
98
99 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
100         sis.mtx.RLock()
101         defer sis.mtx.RUnlock()
102         var r []cloud.Instance
103         for _, ss := range sis.servers {
104                 r = append(r, ss.Instance())
105         }
106         return r, nil
107 }
108
109 func (sis *StubInstanceSet) Stop() {
110         sis.mtx.Lock()
111         defer sis.mtx.Unlock()
112         if sis.stopped {
113                 panic("Stop called twice")
114         }
115         sis.stopped = true
116 }
117
118 // StubVM is a fake server that runs an SSH service. It represents a
119 // VM running in a fake cloud.
120 //
121 // Note this is distinct from a stubInstance, which is a snapshot of
122 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
123 // running (and might change IP addresses, shut down, etc.)  without
124 // updating any stubInstances that have been returned to callers.
125 type StubVM struct {
126         Boot                 time.Time
127         Broken               time.Time
128         CrunchRunMissing     bool
129         CrunchRunCrashRate   float64
130         CrunchRunDetachDelay time.Duration
131         ExecuteContainer     func(arvados.Container) int
132
133         sis          *StubInstanceSet
134         id           cloud.InstanceID
135         tags         cloud.InstanceTags
136         providerType string
137         SSHService   SSHService
138         running      map[string]bool
139         sync.Mutex
140 }
141
142 func (svm *StubVM) Instance() stubInstance {
143         svm.Lock()
144         defer svm.Unlock()
145         return stubInstance{
146                 svm:  svm,
147                 addr: svm.SSHService.Address(),
148                 // We deliberately return a cached/stale copy of the
149                 // real tags here, so that (Instance)Tags() sometimes
150                 // returns old data after a call to
151                 // (Instance)SetTags().  This is permitted by the
152                 // driver interface, and this might help remind
153                 // callers that they need to tolerate it.
154                 tags: copyTags(svm.tags),
155         }
156 }
157
158 func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
159         queue := svm.sis.driver.Queue
160         uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
161         if eta := svm.Boot.Sub(time.Now()); eta > 0 {
162                 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
163                 return 1
164         }
165         if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
166                 fmt.Fprintf(stderr, "cannot fork\n")
167                 return 2
168         }
169         if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
170                 fmt.Fprint(stderr, "crunch-run: command not found\n")
171                 return 1
172         }
173         if strings.HasPrefix(command, "crunch-run --detach ") {
174                 svm.Lock()
175                 if svm.running == nil {
176                         svm.running = map[string]bool{}
177                 }
178                 svm.running[uuid] = true
179                 svm.Unlock()
180                 time.Sleep(svm.CrunchRunDetachDelay)
181                 fmt.Fprintf(stderr, "starting %s\n", uuid)
182                 logger := logrus.WithField("ContainerUUID", uuid)
183                 logger.Printf("[test] starting crunch-run stub")
184                 go func() {
185                         crashluck := math_rand.Float64()
186                         ctr, ok := queue.Get(uuid)
187                         if !ok {
188                                 logger.Print("[test] container not in queue")
189                                 return
190                         }
191                         if crashluck > svm.CrunchRunCrashRate/2 {
192                                 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
193                                 ctr.State = arvados.ContainerStateRunning
194                                 queue.Notify(ctr)
195                         }
196
197                         time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
198                         svm.Lock()
199                         _, running := svm.running[uuid]
200                         svm.Unlock()
201                         if !running {
202                                 logger.Print("[test] container was killed")
203                                 return
204                         }
205                         if svm.ExecuteContainer != nil {
206                                 ctr.ExitCode = svm.ExecuteContainer(ctr)
207                         }
208                         // TODO: Check whether the stub instance has
209                         // been destroyed, and if so, don't call
210                         // queue.Notify. Then "container finished
211                         // twice" can be classified as a bug.
212                         if crashluck < svm.CrunchRunCrashRate {
213                                 logger.Print("[test] crashing crunch-run stub")
214                         } else {
215                                 ctr.State = arvados.ContainerStateComplete
216                                 queue.Notify(ctr)
217                         }
218                         logger.Print("[test] exiting crunch-run stub")
219                         svm.Lock()
220                         defer svm.Unlock()
221                         delete(svm.running, uuid)
222                 }()
223                 return 0
224         }
225         if command == "crunch-run --list" {
226                 svm.Lock()
227                 defer svm.Unlock()
228                 for uuid := range svm.running {
229                         fmt.Fprintf(stdout, "%s\n", uuid)
230                 }
231                 return 0
232         }
233         if strings.HasPrefix(command, "crunch-run --kill ") {
234                 svm.Lock()
235                 defer svm.Unlock()
236                 if svm.running[uuid] {
237                         delete(svm.running, uuid)
238                 } else {
239                         fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
240                 }
241                 return 0
242         }
243         if command == "true" {
244                 return 0
245         }
246         fmt.Fprintf(stderr, "%q: command not found", command)
247         return 1
248 }
249
250 type stubInstance struct {
251         svm  *StubVM
252         addr string
253         tags cloud.InstanceTags
254 }
255
256 func (si stubInstance) ID() cloud.InstanceID {
257         return si.svm.id
258 }
259
260 func (si stubInstance) Address() string {
261         return si.addr
262 }
263
264 func (si stubInstance) Destroy() error {
265         if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
266                 return errors.New("instance could not be destroyed")
267         }
268         si.svm.SSHService.Close()
269         sis := si.svm.sis
270         sis.mtx.Lock()
271         defer sis.mtx.Unlock()
272         delete(sis.servers, si.svm.id)
273         return nil
274 }
275
276 func (si stubInstance) ProviderType() string {
277         return si.svm.providerType
278 }
279
280 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
281         tags = copyTags(tags)
282         svm := si.svm
283         go func() {
284                 svm.Lock()
285                 defer svm.Unlock()
286                 svm.tags = tags
287         }()
288         return nil
289 }
290
291 func (si stubInstance) Tags() cloud.InstanceTags {
292         return si.tags
293 }
294
295 func (si stubInstance) String() string {
296         return string(si.svm.id)
297 }
298
299 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
300         buf := make([]byte, 512)
301         _, err := io.ReadFull(rand.Reader, buf)
302         if err != nil {
303                 return err
304         }
305         sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
306         if err != nil {
307                 return err
308         }
309         return key.Verify(buf, sig)
310 }
311
312 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
313         dst := cloud.InstanceTags{}
314         for k, v := range src {
315                 dst[k] = v
316         }
317         return dst
318 }