1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/lib/cloud"
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/mitchellh/mapstructure"
21 "github.com/sirupsen/logrus"
22 "golang.org/x/crypto/ssh"
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
29 AuthorizedKeys []ssh.PublicKey
31 // SetupVM, if set, is called upon creation of each new
32 // StubVM. This is the caller's opportunity to customize the
33 // VM's error rate and other behaviors.
36 // StubVM's fake crunch-run uses this Queue to read and update
40 // Frequency of artificially introduced errors on calls to
41 // Destroy. 0=always succeed, 1=always fail.
42 ErrorRateDestroy float64
44 instanceSets []*StubInstanceSet
47 // InstanceSet returns a new *StubInstanceSet.
48 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID,
49 logger logrus.FieldLogger) (cloud.InstanceSet, error) {
51 sis := StubInstanceSet{
53 servers: map[cloud.InstanceID]*StubVM{},
55 sd.instanceSets = append(sd.instanceSets, &sis)
56 return &sis, mapstructure.Decode(params, &sis)
59 // InstanceSets returns all instances that have been created by the
60 // driver. This can be used to test a component that uses the driver
61 // but doesn't expose the InstanceSets it has created.
62 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
63 return sd.instanceSets
66 type StubInstanceSet struct {
68 servers map[cloud.InstanceID]*StubVM
73 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
75 defer sis.mtx.Unlock()
77 return nil, errors.New("StubInstanceSet: Create called after Stop")
79 ak := sis.driver.AuthorizedKeys
81 ak = append([]ssh.PublicKey{authKey}, ak...)
85 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
87 providerType: it.ProviderType,
89 svm.SSHService = SSHService{
90 HostKey: sis.driver.HostKey,
94 if setup := sis.driver.SetupVM; setup != nil {
97 sis.servers[svm.id] = svm
98 return svm.Instance(), nil
101 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
103 defer sis.mtx.RUnlock()
104 var r []cloud.Instance
105 for _, ss := range sis.servers {
106 r = append(r, ss.Instance())
111 func (sis *StubInstanceSet) Stop() {
113 defer sis.mtx.Unlock()
115 panic("Stop called twice")
120 // StubVM is a fake server that runs an SSH service. It represents a
121 // VM running in a fake cloud.
123 // Note this is distinct from a stubInstance, which is a snapshot of
124 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
125 // running (and might change IP addresses, shut down, etc.) without
126 // updating any stubInstances that have been returned to callers.
130 CrunchRunMissing bool
131 CrunchRunCrashRate float64
132 CrunchRunDetachDelay time.Duration
133 ExecuteContainer func(arvados.Container) int
137 tags cloud.InstanceTags
139 SSHService SSHService
140 running map[string]bool
144 func (svm *StubVM) Instance() stubInstance {
149 addr: svm.SSHService.Address(),
150 // We deliberately return a cached/stale copy of the
151 // real tags here, so that (Instance)Tags() sometimes
152 // returns old data after a call to
153 // (Instance)SetTags(). This is permitted by the
154 // driver interface, and this might help remind
155 // callers that they need to tolerate it.
156 tags: copyTags(svm.tags),
160 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
161 queue := svm.sis.driver.Queue
162 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
163 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
164 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
167 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
168 fmt.Fprintf(stderr, "cannot fork\n")
171 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
172 fmt.Fprint(stderr, "crunch-run: command not found\n")
175 if strings.HasPrefix(command, "crunch-run --detach ") {
176 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
178 fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
183 if svm.running == nil {
184 svm.running = map[string]bool{}
186 svm.running[uuid] = true
188 time.Sleep(svm.CrunchRunDetachDelay)
189 fmt.Fprintf(stderr, "starting %s\n", uuid)
190 logger := logrus.WithField("ContainerUUID", uuid)
191 logger.Printf("[test] starting crunch-run stub")
193 crashluck := math_rand.Float64()
194 ctr, ok := queue.Get(uuid)
196 logger.Print("[test] container not in queue")
199 if crashluck > svm.CrunchRunCrashRate/2 {
200 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
201 ctr.State = arvados.ContainerStateRunning
205 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
207 _, running := svm.running[uuid]
210 logger.Print("[test] container was killed")
213 if svm.ExecuteContainer != nil {
214 ctr.ExitCode = svm.ExecuteContainer(ctr)
216 // TODO: Check whether the stub instance has
217 // been destroyed, and if so, don't call
218 // queue.Notify. Then "container finished
219 // twice" can be classified as a bug.
220 if crashluck < svm.CrunchRunCrashRate {
221 logger.Print("[test] crashing crunch-run stub")
223 ctr.State = arvados.ContainerStateComplete
226 logger.Print("[test] exiting crunch-run stub")
229 delete(svm.running, uuid)
233 if command == "crunch-run --list" {
236 for uuid := range svm.running {
237 fmt.Fprintf(stdout, "%s\n", uuid)
241 if strings.HasPrefix(command, "crunch-run --kill ") {
244 if svm.running[uuid] {
245 delete(svm.running, uuid)
247 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
251 if command == "true" {
254 fmt.Fprintf(stderr, "%q: command not found", command)
258 type stubInstance struct {
261 tags cloud.InstanceTags
264 func (si stubInstance) ID() cloud.InstanceID {
268 func (si stubInstance) Address() string {
272 func (si stubInstance) Destroy() error {
273 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
274 return errors.New("instance could not be destroyed")
276 si.svm.SSHService.Close()
279 defer sis.mtx.Unlock()
280 delete(sis.servers, si.svm.id)
284 func (si stubInstance) ProviderType() string {
285 return si.svm.providerType
288 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
289 tags = copyTags(tags)
299 func (si stubInstance) Tags() cloud.InstanceTags {
303 func (si stubInstance) String() string {
304 return string(si.svm.id)
307 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
308 buf := make([]byte, 512)
309 _, err := io.ReadFull(rand.Reader, buf)
313 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
317 return key.Verify(buf, sig)
320 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
321 dst := cloud.InstanceTags{}
322 for k, v := range src {