1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/lib/cloud"
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/mitchellh/mapstructure"
21 "github.com/sirupsen/logrus"
22 "golang.org/x/crypto/ssh"
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
29 AuthorizedKeys []ssh.PublicKey
31 // SetupVM, if set, is called upon creation of each new
32 // StubVM. This is the caller's opportunity to customize the
33 // VM's error rate and other behaviors.
36 // StubVM's fake crunch-run uses this Queue to read and update
40 // Frequency of artificially introduced errors on calls to
41 // Destroy. 0=always succeed, 1=always fail.
42 ErrorRateDestroy float64
44 instanceSets []*StubInstanceSet
47 // InstanceSet returns a new *StubInstanceSet.
48 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
49 sis := StubInstanceSet{
51 servers: map[cloud.InstanceID]*StubVM{},
53 sd.instanceSets = append(sd.instanceSets, &sis)
54 return &sis, mapstructure.Decode(params, &sis)
57 // InstanceSets returns all instances that have been created by the
58 // driver. This can be used to test a component that uses the driver
59 // but doesn't expose the InstanceSets it has created.
60 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
61 return sd.instanceSets
64 type StubInstanceSet struct {
66 servers map[cloud.InstanceID]*StubVM
71 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
73 defer sis.mtx.Unlock()
75 return nil, errors.New("StubInstanceSet: Create called after Stop")
77 ak := sis.driver.AuthorizedKeys
79 ak = append([]ssh.PublicKey{authKey}, ak...)
83 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
85 providerType: it.ProviderType,
87 svm.SSHService = SSHService{
88 HostKey: sis.driver.HostKey,
92 if setup := sis.driver.SetupVM; setup != nil {
95 sis.servers[svm.id] = svm
96 return svm.Instance(), nil
99 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
101 defer sis.mtx.RUnlock()
102 var r []cloud.Instance
103 for _, ss := range sis.servers {
104 r = append(r, ss.Instance())
109 func (sis *StubInstanceSet) Stop() {
111 defer sis.mtx.Unlock()
113 panic("Stop called twice")
118 // StubVM is a fake server that runs an SSH service. It represents a
119 // VM running in a fake cloud.
121 // Note this is distinct from a stubInstance, which is a snapshot of
122 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
123 // running (and might change IP addresses, shut down, etc.) without
124 // updating any stubInstances that have been returned to callers.
128 CrunchRunMissing bool
129 CrunchRunCrashRate float64
130 CrunchRunDetachDelay time.Duration
131 ExecuteContainer func(arvados.Container) int
135 tags cloud.InstanceTags
137 SSHService SSHService
138 running map[string]bool
142 func (svm *StubVM) Instance() stubInstance {
147 addr: svm.SSHService.Address(),
148 // We deliberately return a cached/stale copy of the
149 // real tags here, so that (Instance)Tags() sometimes
150 // returns old data after a call to
151 // (Instance)SetTags(). This is permitted by the
152 // driver interface, and this might help remind
153 // callers that they need to tolerate it.
154 tags: copyTags(svm.tags),
158 func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
159 queue := svm.sis.driver.Queue
160 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
161 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
162 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
165 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
166 fmt.Fprintf(stderr, "cannot fork\n")
169 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
170 fmt.Fprint(stderr, "crunch-run: command not found\n")
173 if strings.HasPrefix(command, "crunch-run --detach ") {
175 if svm.running == nil {
176 svm.running = map[string]bool{}
178 svm.running[uuid] = true
180 time.Sleep(svm.CrunchRunDetachDelay)
181 fmt.Fprintf(stderr, "starting %s\n", uuid)
182 logger := logrus.WithField("ContainerUUID", uuid)
183 logger.Printf("[test] starting crunch-run stub")
185 crashluck := math_rand.Float64()
186 ctr, ok := queue.Get(uuid)
188 logger.Print("[test] container not in queue")
191 if crashluck > svm.CrunchRunCrashRate/2 {
192 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
193 ctr.State = arvados.ContainerStateRunning
197 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
199 _, running := svm.running[uuid]
202 logger.Print("[test] container was killed")
205 if svm.ExecuteContainer != nil {
206 ctr.ExitCode = svm.ExecuteContainer(ctr)
208 // TODO: Check whether the stub instance has
209 // been destroyed, and if so, don't call
210 // queue.Notify. Then "container finished
211 // twice" can be classified as a bug.
212 if crashluck < svm.CrunchRunCrashRate {
213 logger.Print("[test] crashing crunch-run stub")
215 ctr.State = arvados.ContainerStateComplete
218 logger.Print("[test] exiting crunch-run stub")
221 delete(svm.running, uuid)
225 if command == "crunch-run --list" {
228 for uuid := range svm.running {
229 fmt.Fprintf(stdout, "%s\n", uuid)
233 if strings.HasPrefix(command, "crunch-run --kill ") {
236 if svm.running[uuid] {
237 delete(svm.running, uuid)
239 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
243 if command == "true" {
246 fmt.Fprintf(stderr, "%q: command not found", command)
250 type stubInstance struct {
253 tags cloud.InstanceTags
256 func (si stubInstance) ID() cloud.InstanceID {
260 func (si stubInstance) Address() string {
264 func (si stubInstance) Destroy() error {
265 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
266 return errors.New("instance could not be destroyed")
268 si.svm.SSHService.Close()
271 defer sis.mtx.Unlock()
272 delete(sis.servers, si.svm.id)
276 func (si stubInstance) ProviderType() string {
277 return si.svm.providerType
280 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
281 tags = copyTags(tags)
291 func (si stubInstance) Tags() cloud.InstanceTags {
295 func (si stubInstance) String() string {
296 return string(si.svm.id)
299 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
300 buf := make([]byte, 512)
301 _, err := io.ReadFull(rand.Reader, buf)
305 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
309 return key.Verify(buf, sig)
312 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
313 dst := cloud.InstanceTags{}
314 for k, v := range src {