1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/lib/crunchrun"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/sirupsen/logrus"
24 "golang.org/x/crypto/ssh"
27 // A StubDriver implements cloud.Driver by setting up local SSH
28 // servers that do fake command executions.
29 type StubDriver struct {
31 AuthorizedKeys []ssh.PublicKey
33 // SetupVM, if set, is called upon creation of each new
34 // StubVM. This is the caller's opportunity to customize the
35 // VM's error rate and other behaviors.
38 // Bugf, if set, is called if a bug is detected in the caller
39 // or stub. Typically set to (*check.C)Errorf. If unset,
40 // logger.Warnf is called instead.
41 Bugf func(string, ...interface{})
43 // StubVM's fake crunch-run uses this Queue to read and update
47 // Frequency of artificially introduced errors on calls to
48 // Create and Destroy. 0=always succeed, 1=always fail.
49 ErrorRateCreate float64
50 ErrorRateDestroy float64
52 // If Create() or Instances() is called too frequently, return
53 // rate-limiting errors.
54 MinTimeBetweenCreateCalls time.Duration
55 MinTimeBetweenInstancesCalls time.Duration
59 // If true, Create and Destroy calls block until Release() is
63 instanceSets []*StubInstanceSet
64 holdCloudOps chan bool
67 // InstanceSet returns a new *StubInstanceSet.
68 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
69 if sd.holdCloudOps == nil {
70 sd.holdCloudOps = make(chan bool)
72 sis := StubInstanceSet{
75 servers: map[cloud.InstanceID]*StubVM{},
77 sd.instanceSets = append(sd.instanceSets, &sis)
81 err = json.Unmarshal(params, &sis)
86 // InstanceSets returns all instances that have been created by the
87 // driver. This can be used to test a component that uses the driver
88 // but doesn't expose the InstanceSets it has created.
89 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
90 return sd.instanceSets
93 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
94 // are fewer than n blocked calls pending, it waits for the rest to
96 func (sd *StubDriver) ReleaseCloudOps(n int) {
97 for i := 0; i < n; i++ {
102 type StubInstanceSet struct {
104 logger logrus.FieldLogger
105 servers map[cloud.InstanceID]*StubVM
109 allowCreateCall time.Time
110 allowInstancesCall time.Time
114 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
115 if sis.driver.HoldCloudOps {
116 sis.driver.holdCloudOps <- true
119 defer sis.mtx.Unlock()
121 return nil, errors.New("StubInstanceSet: Create called after Stop")
123 if sis.allowCreateCall.After(time.Now()) {
124 return nil, RateLimitError{sis.allowCreateCall}
126 if math_rand.Float64() < sis.driver.ErrorRateCreate {
127 return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
129 if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
130 return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
132 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
133 ak := sis.driver.AuthorizedKeys
135 ak = append([]ssh.PublicKey{authKey}, ak...)
139 InitCommand: initCommand,
141 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
142 tags: copyTags(tags),
143 providerType: it.ProviderType,
144 running: map[string]stubProcess{},
145 killing: map[string]bool{},
147 svm.SSHService = SSHService{
148 HostKey: sis.driver.HostKey,
149 AuthorizedUser: "root",
153 if setup := sis.driver.SetupVM; setup != nil {
156 sis.servers[svm.id] = svm
157 return svm.Instance(), nil
160 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
162 defer sis.mtx.RUnlock()
163 if sis.allowInstancesCall.After(time.Now()) {
164 return nil, RateLimitError{sis.allowInstancesCall}
166 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
167 var r []cloud.Instance
168 for _, ss := range sis.servers {
169 r = append(r, ss.Instance())
174 func (sis *StubInstanceSet) Stop() {
176 defer sis.mtx.Unlock()
178 panic("Stop called twice")
183 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
185 defer sis.mtx.Unlock()
186 for _, vm := range sis.servers {
187 svms = append(svms, vm)
192 type RateLimitError struct{ Retry time.Time }
194 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
195 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
197 // StubVM is a fake server that runs an SSH service. It represents a
198 // VM running in a fake cloud.
200 // Note this is distinct from a stubInstance, which is a snapshot of
201 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
202 // running (and might change IP addresses, shut down, etc.) without
203 // updating any stubInstances that have been returned to callers.
207 ReportBroken time.Time
208 CrunchRunMissing bool
209 CrunchRunCrashRate float64
210 CrunchRunDetachDelay time.Duration
211 ArvMountMaxExitLag time.Duration
212 ArvMountDeadlockRate float64
213 ExecuteContainer func(arvados.Container) int
214 CrashRunningContainer func(arvados.Container)
215 ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
217 // Populated by (*StubInstanceSet)Create()
218 InitCommand cloud.InitCommand
222 tags cloud.InstanceTags
224 SSHService SSHService
225 running map[string]stubProcess
226 killing map[string]bool
232 type stubProcess struct {
235 // crunch-run has exited, but arv-mount process (or something)
236 // still holds lock in /var/run/
240 func (svm *StubVM) Instance() stubInstance {
245 addr: svm.SSHService.Address(),
246 // We deliberately return a cached/stale copy of the
247 // real tags here, so that (Instance)Tags() sometimes
248 // returns old data after a call to
249 // (Instance)SetTags(). This is permitted by the
250 // driver interface, and this might help remind
251 // callers that they need to tolerate it.
252 tags: copyTags(svm.tags),
256 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
257 stdinData, err := ioutil.ReadAll(stdin)
259 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
262 queue := svm.sis.driver.Queue
263 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
264 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
265 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
268 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
269 fmt.Fprintf(stderr, "cannot fork\n")
272 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
273 fmt.Fprint(stderr, "crunch-run: command not found\n")
276 if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
277 var configData crunchrun.ConfigData
278 err := json.Unmarshal(stdinData, &configData)
280 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
283 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
284 if configData.Env[name] == "" {
285 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
292 svm.running[uuid] = stubProcess{pid: pid}
294 time.Sleep(svm.CrunchRunDetachDelay)
295 fmt.Fprintf(stderr, "starting %s\n", uuid)
296 logger := svm.sis.logger.WithFields(logrus.Fields{
298 "ContainerUUID": uuid,
301 logger.Printf("[test] starting crunch-run stub")
303 var ctr arvados.Container
304 var started, completed bool
306 logger.Print("[test] exiting crunch-run stub")
309 if svm.running[uuid].pid != pid {
310 bugf := svm.sis.driver.Bugf
314 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
318 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
319 if started && svm.CrashRunningContainer != nil {
320 svm.CrashRunningContainer(ctr)
323 sproc := svm.running[uuid]
325 svm.running[uuid] = sproc
327 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
329 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
330 delete(svm.running, uuid)
334 crashluck := math_rand.Float64()
335 wantCrash := crashluck < svm.CrunchRunCrashRate
336 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
338 ctr, ok := queue.Get(uuid)
340 logger.Print("[test] container not in queue")
344 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
347 killed := svm.killing[uuid]
349 if killed || wantCrashEarly {
353 ctr.State = arvados.ContainerStateRunning
354 started = queue.Notify(ctr)
356 ctr, _ = queue.Get(uuid)
357 logger.Print("[test] erroring out because state=Running update was rejected")
362 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
365 if svm.ExecuteContainer != nil {
366 ctr.ExitCode = svm.ExecuteContainer(ctr)
368 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
369 ctr.State = arvados.ContainerStateComplete
370 completed = queue.Notify(ctr)
374 if command == "crunch-run --list" {
377 for uuid, sproc := range svm.running {
379 fmt.Fprintf(stdout, "%s stale\n", uuid)
381 fmt.Fprintf(stdout, "%s\n", uuid)
384 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
385 fmt.Fprintln(stdout, "broken")
387 fmt.Fprintln(stdout, svm.deadlocked)
390 if strings.HasPrefix(command, "crunch-run --kill ") {
392 sproc, running := svm.running[uuid]
393 if running && !sproc.exited {
394 svm.killing[uuid] = true
396 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
398 sproc, running = svm.running[uuid]
401 if running && !sproc.exited {
402 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
405 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
408 if command == "true" {
411 fmt.Fprintf(stderr, "%q: command not found", command)
415 type stubInstance struct {
418 tags cloud.InstanceTags
421 func (si stubInstance) ID() cloud.InstanceID {
425 func (si stubInstance) Address() string {
429 func (si stubInstance) RemoteUser() string {
430 return si.svm.SSHService.AuthorizedUser
433 func (si stubInstance) Destroy() error {
435 if sis.driver.HoldCloudOps {
436 sis.driver.holdCloudOps <- true
438 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
439 return errors.New("instance could not be destroyed")
441 si.svm.SSHService.Close()
443 defer sis.mtx.Unlock()
444 delete(sis.servers, si.svm.id)
448 func (si stubInstance) ProviderType() string {
449 return si.svm.providerType
452 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
453 tags = copyTags(tags)
463 func (si stubInstance) Tags() cloud.InstanceTags {
464 // Return a copy to ensure a caller can't change our saved
465 // tags just by writing to the returned map.
466 return copyTags(si.tags)
469 func (si stubInstance) String() string {
470 return string(si.svm.id)
473 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
474 buf := make([]byte, 512)
475 _, err := io.ReadFull(rand.Reader, buf)
479 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
483 return key.Verify(buf, sig)
486 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
487 dst := cloud.InstanceTags{}
488 for k, v := range src {
494 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
498 type QuotaError struct {
502 func (QuotaError) IsQuotaError() bool { return true }