1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/lib/cloud"
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/Sirupsen/logrus"
21 "github.com/mitchellh/mapstructure"
22 "golang.org/x/crypto/ssh"
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
29 AuthorizedKeys []ssh.PublicKey
31 // SetupVM, if set, is called upon creation of each new VM.
33 ErrorRateDestroy float64
36 instanceSets []*StubInstanceSet
39 // InstanceSet returns a new *StubInstanceSet.
40 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
41 sis := StubInstanceSet{
43 servers: map[cloud.InstanceID]*StubVM{},
45 sd.instanceSets = append(sd.instanceSets, &sis)
46 return &sis, mapstructure.Decode(params, &sis)
49 // InstanceSets returns all instances that have been created by the
50 // driver. This can be used to test a component that uses the driver
51 // but doesn't expose the InstanceSets it has created.
52 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
53 return sd.instanceSets
56 type StubInstanceSet struct {
58 servers map[cloud.InstanceID]*StubVM
63 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
65 defer sis.mtx.Unlock()
67 return nil, errors.New("StubInstanceSet: Create called after Stop")
69 ak := sis.driver.AuthorizedKeys
71 ak = append([]ssh.PublicKey{authKey}, ak...)
75 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
77 providerType: it.ProviderType,
79 svm.SSHService = SSHService{
80 HostKey: sis.driver.HostKey,
84 if setup := sis.driver.SetupVM; setup != nil {
87 sis.servers[svm.id] = svm
88 return svm.Instance(), nil
91 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
93 defer sis.mtx.RUnlock()
94 var r []cloud.Instance
95 for _, ss := range sis.servers {
96 r = append(r, ss.Instance())
101 func (sis *StubInstanceSet) Stop() {
103 defer sis.mtx.Unlock()
105 panic("Stop called twice")
110 // StubVM is a fake server that runs an SSH service. It represents a
111 // VM running in a fake cloud.
113 // Note this is distinct from a stubInstance, which is a snapshot of
114 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
115 // running (and might change IP addresses, shut down, etc.) without
116 // updating any stubInstances that have been returned to callers.
120 CrunchRunMissing bool
121 CrunchRunCrashRate float64
122 CrunchRunDetachDelay time.Duration
124 OnCancel func(string)
125 OnComplete func(string)
129 tags cloud.InstanceTags
131 SSHService SSHService
132 running map[string]bool
136 func (svm *StubVM) Instance() stubInstance {
141 addr: svm.SSHService.Address(),
142 // We deliberately return a cached/stale copy of the
143 // real tags here, so that (Instance)Tags() sometimes
144 // returns old data after a call to
145 // (Instance)SetTags(). This is permitted by the
146 // driver interface, and this might help remind
147 // callers that they need to tolerate it.
148 tags: copyTags(svm.tags),
152 func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
153 queue := svm.sis.driver.Queue
154 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
155 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
156 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
159 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
160 fmt.Fprintf(stderr, "cannot fork\n")
163 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
164 fmt.Fprint(stderr, "crunch-run: command not found\n")
167 if strings.HasPrefix(command, "crunch-run --detach ") {
169 if svm.running == nil {
170 svm.running = map[string]bool{}
172 svm.running[uuid] = true
174 time.Sleep(svm.CrunchRunDetachDelay)
175 fmt.Fprintf(stderr, "starting %s\n", uuid)
176 logger := logrus.WithField("ContainerUUID", uuid)
177 logger.Printf("[test] starting crunch-run stub")
179 crashluck := math_rand.Float64()
180 ctr, ok := queue.Get(uuid)
182 logger.Print("[test] container not in queue")
185 if crashluck > svm.CrunchRunCrashRate/2 {
186 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
187 ctr.State = arvados.ContainerStateRunning
191 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
193 _, running := svm.running[uuid]
196 logger.Print("[test] container was killed")
199 // TODO: Check whether the stub instance has
200 // been destroyed, and if so, don't call
201 // onComplete. Then "container finished twice"
202 // can be classified as a bug.
203 if crashluck < svm.CrunchRunCrashRate {
204 logger.Print("[test] crashing crunch-run stub")
205 if svm.OnCancel != nil && ctr.State == arvados.ContainerStateRunning {
209 ctr.State = arvados.ContainerStateComplete
210 ctr.ExitCode = svm.CtrExit
212 if svm.OnComplete != nil {
216 logger.Print("[test] exiting crunch-run stub")
219 delete(svm.running, uuid)
223 if command == "crunch-run --list" {
226 for uuid := range svm.running {
227 fmt.Fprintf(stdout, "%s\n", uuid)
231 if strings.HasPrefix(command, "crunch-run --kill ") {
234 if svm.running[uuid] {
235 delete(svm.running, uuid)
237 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
241 if command == "true" {
244 fmt.Fprintf(stderr, "%q: command not found", command)
248 type stubInstance struct {
251 tags cloud.InstanceTags
254 func (si stubInstance) ID() cloud.InstanceID {
258 func (si stubInstance) Address() string {
262 func (si stubInstance) Destroy() error {
263 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
264 return errors.New("instance could not be destroyed")
266 si.svm.SSHService.Close()
269 defer sis.mtx.Unlock()
270 delete(sis.servers, si.svm.id)
274 func (si stubInstance) ProviderType() string {
275 return si.svm.providerType
278 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
279 tags = copyTags(tags)
289 func (si stubInstance) Tags() cloud.InstanceTags {
293 func (si stubInstance) String() string {
294 return string(si.svm.id)
297 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
298 buf := make([]byte, 512)
299 _, err := io.ReadFull(rand.Reader, buf)
303 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
307 return key.Verify(buf, sig)
310 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
311 dst := cloud.InstanceTags{}
312 for k, v := range src {