1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/lib/cloud"
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/mitchellh/mapstructure"
21 "github.com/sirupsen/logrus"
22 "golang.org/x/crypto/ssh"
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
29 AuthorizedKeys []ssh.PublicKey
31 // SetupVM, if set, is called upon creation of each new
32 // StubVM. This is the caller's opportunity to customize the
33 // VM's error rate and other behaviors.
36 // StubVM's fake crunch-run uses this Queue to read and update
40 // Frequency of artificially introduced errors on calls to
41 // Destroy. 0=always succeed, 1=always fail.
42 ErrorRateDestroy float64
44 instanceSets []*StubInstanceSet
47 // InstanceSet returns a new *StubInstanceSet.
48 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID,
49 logger logrus.FieldLogger) (cloud.InstanceSet, error) {
51 sis := StubInstanceSet{
53 servers: map[cloud.InstanceID]*StubVM{},
55 sd.instanceSets = append(sd.instanceSets, &sis)
56 return &sis, mapstructure.Decode(params, &sis)
59 // InstanceSets returns all instances that have been created by the
60 // driver. This can be used to test a component that uses the driver
61 // but doesn't expose the InstanceSets it has created.
62 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
63 return sd.instanceSets
66 type StubInstanceSet struct {
68 servers map[cloud.InstanceID]*StubVM
73 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
75 defer sis.mtx.Unlock()
77 return nil, errors.New("StubInstanceSet: Create called after Stop")
79 ak := sis.driver.AuthorizedKeys
81 ak = append([]ssh.PublicKey{authKey}, ak...)
85 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
87 providerType: it.ProviderType,
89 svm.SSHService = SSHService{
90 HostKey: sis.driver.HostKey,
94 if setup := sis.driver.SetupVM; setup != nil {
97 sis.servers[svm.id] = svm
98 return svm.Instance(), nil
101 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
103 defer sis.mtx.RUnlock()
104 var r []cloud.Instance
105 for _, ss := range sis.servers {
106 r = append(r, ss.Instance())
111 func (sis *StubInstanceSet) Stop() {
113 defer sis.mtx.Unlock()
115 panic("Stop called twice")
120 // StubVM is a fake server that runs an SSH service. It represents a
121 // VM running in a fake cloud.
123 // Note this is distinct from a stubInstance, which is a snapshot of
124 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
125 // running (and might change IP addresses, shut down, etc.) without
126 // updating any stubInstances that have been returned to callers.
130 CrunchRunMissing bool
131 CrunchRunCrashRate float64
132 CrunchRunDetachDelay time.Duration
133 ExecuteContainer func(arvados.Container) int
137 tags cloud.InstanceTags
139 SSHService SSHService
140 running map[string]bool
144 func (svm *StubVM) Instance() stubInstance {
149 addr: svm.SSHService.Address(),
150 // We deliberately return a cached/stale copy of the
151 // real tags here, so that (Instance)Tags() sometimes
152 // returns old data after a call to
153 // (Instance)SetTags(). This is permitted by the
154 // driver interface, and this might help remind
155 // callers that they need to tolerate it.
156 tags: copyTags(svm.tags),
160 func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
161 queue := svm.sis.driver.Queue
162 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
163 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
164 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
167 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
168 fmt.Fprintf(stderr, "cannot fork\n")
171 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
172 fmt.Fprint(stderr, "crunch-run: command not found\n")
175 if strings.HasPrefix(command, "crunch-run --detach ") {
177 if svm.running == nil {
178 svm.running = map[string]bool{}
180 svm.running[uuid] = true
182 time.Sleep(svm.CrunchRunDetachDelay)
183 fmt.Fprintf(stderr, "starting %s\n", uuid)
184 logger := logrus.WithField("ContainerUUID", uuid)
185 logger.Printf("[test] starting crunch-run stub")
187 crashluck := math_rand.Float64()
188 ctr, ok := queue.Get(uuid)
190 logger.Print("[test] container not in queue")
193 if crashluck > svm.CrunchRunCrashRate/2 {
194 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
195 ctr.State = arvados.ContainerStateRunning
199 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
201 _, running := svm.running[uuid]
204 logger.Print("[test] container was killed")
207 if svm.ExecuteContainer != nil {
208 ctr.ExitCode = svm.ExecuteContainer(ctr)
210 // TODO: Check whether the stub instance has
211 // been destroyed, and if so, don't call
212 // queue.Notify. Then "container finished
213 // twice" can be classified as a bug.
214 if crashluck < svm.CrunchRunCrashRate {
215 logger.Print("[test] crashing crunch-run stub")
217 ctr.State = arvados.ContainerStateComplete
220 logger.Print("[test] exiting crunch-run stub")
223 delete(svm.running, uuid)
227 if command == "crunch-run --list" {
230 for uuid := range svm.running {
231 fmt.Fprintf(stdout, "%s\n", uuid)
235 if strings.HasPrefix(command, "crunch-run --kill ") {
238 if svm.running[uuid] {
239 delete(svm.running, uuid)
241 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
245 if command == "true" {
248 fmt.Fprintf(stderr, "%q: command not found", command)
252 type stubInstance struct {
255 tags cloud.InstanceTags
258 func (si stubInstance) ID() cloud.InstanceID {
262 func (si stubInstance) Address() string {
266 func (si stubInstance) Destroy() error {
267 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
268 return errors.New("instance could not be destroyed")
270 si.svm.SSHService.Close()
273 defer sis.mtx.Unlock()
274 delete(sis.servers, si.svm.id)
278 func (si stubInstance) ProviderType() string {
279 return si.svm.providerType
282 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
283 tags = copyTags(tags)
293 func (si stubInstance) Tags() cloud.InstanceTags {
297 func (si stubInstance) String() string {
298 return string(si.svm.id)
301 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
302 buf := make([]byte, 512)
303 _, err := io.ReadFull(rand.Reader, buf)
307 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
311 return key.Verify(buf, sig)
314 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
315 dst := cloud.InstanceTags{}
316 for k, v := range src {