1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.curoverse.com/arvados.git/lib/cloud"
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "github.com/Sirupsen/logrus"
22 "github.com/mitchellh/mapstructure"
23 "golang.org/x/crypto/ssh"
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
30 AuthorizedKeys []ssh.PublicKey
32 // SetupVM, if set, is called upon creation of each new
33 // StubVM. This is the caller's opportunity to customize the
34 // VM's error rate and other behaviors.
37 // StubVM's fake crunch-run uses this Queue to read and update
41 // Frequency of artificially introduced errors on calls to
42 // Destroy. 0=always succeed, 1=always fail.
43 ErrorRateDestroy float64
45 instanceSets []*StubInstanceSet
48 // InstanceSet returns a new *StubInstanceSet.
49 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
50 sis := StubInstanceSet{
52 servers: map[cloud.InstanceID]*StubVM{},
54 sd.instanceSets = append(sd.instanceSets, &sis)
55 return &sis, mapstructure.Decode(params, &sis)
58 // InstanceSets returns all instances that have been created by the
59 // driver. This can be used to test a component that uses the driver
60 // but doesn't expose the InstanceSets it has created.
61 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
62 return sd.instanceSets
65 type StubInstanceSet struct {
67 servers map[cloud.InstanceID]*StubVM
72 func (sis *StubInstanceSet) Create(_ context.Context, it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
74 defer sis.mtx.Unlock()
76 return nil, errors.New("StubInstanceSet: Create called after Stop")
78 ak := sis.driver.AuthorizedKeys
80 ak = append([]ssh.PublicKey{authKey}, ak...)
84 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
86 providerType: it.ProviderType,
88 svm.SSHService = SSHService{
89 HostKey: sis.driver.HostKey,
93 if setup := sis.driver.SetupVM; setup != nil {
96 sis.servers[svm.id] = svm
97 return svm.Instance(), nil
100 func (sis *StubInstanceSet) Instances(context.Context, cloud.InstanceTags) ([]cloud.Instance, error) {
102 defer sis.mtx.RUnlock()
103 var r []cloud.Instance
104 for _, ss := range sis.servers {
105 r = append(r, ss.Instance())
110 func (sis *StubInstanceSet) Stop() {
112 defer sis.mtx.Unlock()
114 panic("Stop called twice")
119 // StubVM is a fake server that runs an SSH service. It represents a
120 // VM running in a fake cloud.
122 // Note this is distinct from a stubInstance, which is a snapshot of
123 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
124 // running (and might change IP addresses, shut down, etc.) without
125 // updating any stubInstances that have been returned to callers.
129 CrunchRunMissing bool
130 CrunchRunCrashRate float64
131 CrunchRunDetachDelay time.Duration
132 ExecuteContainer func(arvados.Container) int
136 tags cloud.InstanceTags
138 SSHService SSHService
139 running map[string]bool
143 func (svm *StubVM) Instance() stubInstance {
148 addr: svm.SSHService.Address(),
149 // We deliberately return a cached/stale copy of the
150 // real tags here, so that (Instance)Tags() sometimes
151 // returns old data after a call to
152 // (Instance)SetTags(). This is permitted by the
153 // driver interface, and this might help remind
154 // callers that they need to tolerate it.
155 tags: copyTags(svm.tags),
159 func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
160 queue := svm.sis.driver.Queue
161 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
162 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
163 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
166 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
167 fmt.Fprintf(stderr, "cannot fork\n")
170 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
171 fmt.Fprint(stderr, "crunch-run: command not found\n")
174 if strings.HasPrefix(command, "crunch-run --detach ") {
176 if svm.running == nil {
177 svm.running = map[string]bool{}
179 svm.running[uuid] = true
181 time.Sleep(svm.CrunchRunDetachDelay)
182 fmt.Fprintf(stderr, "starting %s\n", uuid)
183 logger := logrus.WithField("ContainerUUID", uuid)
184 logger.Printf("[test] starting crunch-run stub")
186 crashluck := math_rand.Float64()
187 ctr, ok := queue.Get(uuid)
189 logger.Print("[test] container not in queue")
192 if crashluck > svm.CrunchRunCrashRate/2 {
193 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
194 ctr.State = arvados.ContainerStateRunning
198 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
200 _, running := svm.running[uuid]
203 logger.Print("[test] container was killed")
206 if svm.ExecuteContainer != nil {
207 ctr.ExitCode = svm.ExecuteContainer(ctr)
209 // TODO: Check whether the stub instance has
210 // been destroyed, and if so, don't call
211 // queue.Notify. Then "container finished
212 // twice" can be classified as a bug.
213 if crashluck < svm.CrunchRunCrashRate {
214 logger.Print("[test] crashing crunch-run stub")
216 ctr.State = arvados.ContainerStateComplete
219 logger.Print("[test] exiting crunch-run stub")
222 delete(svm.running, uuid)
226 if command == "crunch-run --list" {
229 for uuid := range svm.running {
230 fmt.Fprintf(stdout, "%s\n", uuid)
234 if strings.HasPrefix(command, "crunch-run --kill ") {
237 if svm.running[uuid] {
238 delete(svm.running, uuid)
240 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
244 if command == "true" {
247 fmt.Fprintf(stderr, "%q: command not found", command)
251 type stubInstance struct {
254 tags cloud.InstanceTags
257 func (si stubInstance) ID() cloud.InstanceID {
261 func (si stubInstance) Address() string {
265 func (si stubInstance) Destroy(_ context.Context) error {
266 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
267 return errors.New("instance could not be destroyed")
269 si.svm.SSHService.Close()
272 defer sis.mtx.Unlock()
273 delete(sis.servers, si.svm.id)
277 func (si stubInstance) ProviderType() string {
278 return si.svm.providerType
281 func (si stubInstance) SetTags(_ context.Context, tags cloud.InstanceTags) error {
282 tags = copyTags(tags)
292 func (si stubInstance) Tags() cloud.InstanceTags {
296 func (si stubInstance) String() string {
297 return string(si.svm.id)
300 func (si stubInstance) VerifyHostKey(_ context.Context, key ssh.PublicKey, client *ssh.Client) error {
301 buf := make([]byte, 512)
302 _, err := io.ReadFull(rand.Reader, buf)
306 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
310 return key.Verify(buf, sig)
313 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
314 dst := cloud.InstanceTags{}
315 for k, v := range src {