1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/lib/cloud"
19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 "github.com/mitchellh/mapstructure"
21 "github.com/sirupsen/logrus"
22 "golang.org/x/crypto/ssh"
25 // A StubDriver implements cloud.Driver by setting up local SSH
26 // servers that do fake command executions.
27 type StubDriver struct {
29 AuthorizedKeys []ssh.PublicKey
31 // SetupVM, if set, is called upon creation of each new
32 // StubVM. This is the caller's opportunity to customize the
33 // VM's error rate and other behaviors.
36 // StubVM's fake crunch-run uses this Queue to read and update
40 // Frequency of artificially introduced errors on calls to
41 // Destroy. 0=always succeed, 1=always fail.
42 ErrorRateDestroy float64
44 // If Create() or Instances() is called too frequently, return
45 // rate-limiting errors.
46 MinTimeBetweenCreateCalls time.Duration
47 MinTimeBetweenInstancesCalls time.Duration
49 instanceSets []*StubInstanceSet
52 // InstanceSet returns a new *StubInstanceSet.
53 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
54 sis := StubInstanceSet{
56 servers: map[cloud.InstanceID]*StubVM{},
58 sd.instanceSets = append(sd.instanceSets, &sis)
59 return &sis, mapstructure.Decode(params, &sis)
62 // InstanceSets returns all instances that have been created by the
63 // driver. This can be used to test a component that uses the driver
64 // but doesn't expose the InstanceSets it has created.
65 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
66 return sd.instanceSets
69 type StubInstanceSet struct {
71 servers map[cloud.InstanceID]*StubVM
75 allowCreateCall time.Time
76 allowInstancesCall time.Time
79 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
81 defer sis.mtx.Unlock()
83 return nil, errors.New("StubInstanceSet: Create called after Stop")
85 if sis.allowCreateCall.After(time.Now()) {
86 return nil, RateLimitError{sis.allowCreateCall}
88 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
91 ak := sis.driver.AuthorizedKeys
93 ak = append([]ssh.PublicKey{authKey}, ak...)
97 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
99 providerType: it.ProviderType,
101 svm.SSHService = SSHService{
102 HostKey: sis.driver.HostKey,
106 if setup := sis.driver.SetupVM; setup != nil {
109 sis.servers[svm.id] = svm
110 return svm.Instance(), nil
113 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
115 defer sis.mtx.RUnlock()
116 if sis.allowInstancesCall.After(time.Now()) {
117 return nil, RateLimitError{sis.allowInstancesCall}
119 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
121 var r []cloud.Instance
122 for _, ss := range sis.servers {
123 r = append(r, ss.Instance())
128 func (sis *StubInstanceSet) Stop() {
130 defer sis.mtx.Unlock()
132 panic("Stop called twice")
137 type RateLimitError struct{ Retry time.Time }
139 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
140 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
142 // StubVM is a fake server that runs an SSH service. It represents a
143 // VM running in a fake cloud.
145 // Note this is distinct from a stubInstance, which is a snapshot of
146 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
147 // running (and might change IP addresses, shut down, etc.) without
148 // updating any stubInstances that have been returned to callers.
152 CrunchRunMissing bool
153 CrunchRunCrashRate float64
154 CrunchRunDetachDelay time.Duration
155 ExecuteContainer func(arvados.Container) int
159 tags cloud.InstanceTags
161 SSHService SSHService
162 running map[string]bool
166 func (svm *StubVM) Instance() stubInstance {
171 addr: svm.SSHService.Address(),
172 // We deliberately return a cached/stale copy of the
173 // real tags here, so that (Instance)Tags() sometimes
174 // returns old data after a call to
175 // (Instance)SetTags(). This is permitted by the
176 // driver interface, and this might help remind
177 // callers that they need to tolerate it.
178 tags: copyTags(svm.tags),
182 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
183 queue := svm.sis.driver.Queue
184 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
185 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
186 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
189 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
190 fmt.Fprintf(stderr, "cannot fork\n")
193 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
194 fmt.Fprint(stderr, "crunch-run: command not found\n")
197 if strings.HasPrefix(command, "crunch-run --detach ") {
198 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
200 fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
205 if svm.running == nil {
206 svm.running = map[string]bool{}
208 svm.running[uuid] = true
210 time.Sleep(svm.CrunchRunDetachDelay)
211 fmt.Fprintf(stderr, "starting %s\n", uuid)
212 logger := logrus.WithFields(logrus.Fields{
214 "ContainerUUID": uuid,
216 logger.Printf("[test] starting crunch-run stub")
218 crashluck := math_rand.Float64()
219 ctr, ok := queue.Get(uuid)
221 logger.Print("[test] container not in queue")
224 if crashluck > svm.CrunchRunCrashRate/2 {
225 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
226 ctr.State = arvados.ContainerStateRunning
230 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
232 _, running := svm.running[uuid]
235 logger.Print("[test] container was killed")
238 if svm.ExecuteContainer != nil {
239 ctr.ExitCode = svm.ExecuteContainer(ctr)
241 // TODO: Check whether the stub instance has
242 // been destroyed, and if so, don't call
243 // queue.Notify. Then "container finished
244 // twice" can be classified as a bug.
245 if crashluck < svm.CrunchRunCrashRate {
246 logger.Print("[test] crashing crunch-run stub")
248 ctr.State = arvados.ContainerStateComplete
251 logger.Print("[test] exiting crunch-run stub")
254 delete(svm.running, uuid)
258 if command == "crunch-run --list" {
261 for uuid := range svm.running {
262 fmt.Fprintf(stdout, "%s\n", uuid)
266 if strings.HasPrefix(command, "crunch-run --kill ") {
269 if svm.running[uuid] {
270 delete(svm.running, uuid)
272 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
276 if command == "true" {
279 fmt.Fprintf(stderr, "%q: command not found", command)
283 type stubInstance struct {
286 tags cloud.InstanceTags
289 func (si stubInstance) ID() cloud.InstanceID {
293 func (si stubInstance) Address() string {
297 func (si stubInstance) Destroy() error {
298 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
299 return errors.New("instance could not be destroyed")
301 si.svm.SSHService.Close()
304 defer sis.mtx.Unlock()
305 delete(sis.servers, si.svm.id)
309 func (si stubInstance) ProviderType() string {
310 return si.svm.providerType
313 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
314 tags = copyTags(tags)
324 func (si stubInstance) Tags() cloud.InstanceTags {
328 func (si stubInstance) String() string {
329 return string(si.svm.id)
332 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
333 buf := make([]byte, 512)
334 _, err := io.ReadFull(rand.Reader, buf)
338 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
342 return key.Verify(buf, sig)
345 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
346 dst := cloud.InstanceTags{}
347 for k, v := range src {