1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/lib/crunchrun"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/sirupsen/logrus"
25 "golang.org/x/crypto/ssh"
28 // A StubDriver implements cloud.Driver by setting up local SSH
29 // servers that do fake command executions.
30 type StubDriver struct {
32 AuthorizedKeys []ssh.PublicKey
34 // SetupVM, if set, is called upon creation of each new
35 // StubVM. This is the caller's opportunity to customize the
36 // VM's error rate and other behaviors.
38 // If SetupVM returns an error, that error will be returned to
39 // the caller of Create(), and the new VM will be discarded.
40 SetupVM func(*StubVM) error
42 // Bugf, if set, is called if a bug is detected in the caller
43 // or stub. Typically set to (*check.C)Errorf. If unset,
44 // logger.Warnf is called instead.
45 Bugf func(string, ...interface{})
47 // StubVM's fake crunch-run uses this Queue to read and update
51 // Frequency of artificially introduced errors on calls to
52 // Create and Destroy. 0=always succeed, 1=always fail.
53 ErrorRateCreate float64
54 ErrorRateDestroy float64
56 // If Create() or Instances() is called too frequently, return
57 // rate-limiting errors.
58 MinTimeBetweenCreateCalls time.Duration
59 MinTimeBetweenInstancesCalls time.Duration
63 // If true, Create and Destroy calls block until Release() is
67 instanceSets []*StubInstanceSet
68 holdCloudOps chan bool
71 // InstanceSet returns a new *StubInstanceSet.
72 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
73 if sd.holdCloudOps == nil {
74 sd.holdCloudOps = make(chan bool)
76 sis := StubInstanceSet{
79 servers: map[cloud.InstanceID]*StubVM{},
81 sd.instanceSets = append(sd.instanceSets, &sis)
85 err = json.Unmarshal(params, &sis)
90 // InstanceSets returns all instances that have been created by the
91 // driver. This can be used to test a component that uses the driver
92 // but doesn't expose the InstanceSets it has created.
93 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
94 return sd.instanceSets
97 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
98 // are fewer than n blocked calls pending, it waits for the rest to
100 func (sd *StubDriver) ReleaseCloudOps(n int) {
101 for i := 0; i < n; i++ {
106 type StubInstanceSet struct {
108 logger logrus.FieldLogger
109 servers map[cloud.InstanceID]*StubVM
113 allowCreateCall time.Time
114 allowInstancesCall time.Time
118 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
119 if sis.driver.HoldCloudOps {
120 sis.driver.holdCloudOps <- true
123 defer sis.mtx.Unlock()
125 return nil, errors.New("StubInstanceSet: Create called after Stop")
127 if sis.allowCreateCall.After(time.Now()) {
128 return nil, RateLimitError{sis.allowCreateCall}
130 if math_rand.Float64() < sis.driver.ErrorRateCreate {
131 return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
133 if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
134 return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
136 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
137 ak := sis.driver.AuthorizedKeys
139 ak = append([]ssh.PublicKey{authKey}, ak...)
143 InitCommand: initCommand,
145 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
146 tags: copyTags(tags),
147 providerType: it.ProviderType,
148 running: map[string]stubProcess{},
149 killing: map[string]bool{},
151 svm.SSHService = SSHService{
152 HostKey: sis.driver.HostKey,
153 AuthorizedUser: "root",
157 if setup := sis.driver.SetupVM; setup != nil {
163 sis.servers[svm.id] = svm
164 return svm.Instance(), nil
167 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
169 defer sis.mtx.RUnlock()
170 if sis.allowInstancesCall.After(time.Now()) {
171 return nil, RateLimitError{sis.allowInstancesCall}
173 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
174 var r []cloud.Instance
175 for _, ss := range sis.servers {
176 r = append(r, ss.Instance())
181 func (sis *StubInstanceSet) Stop() {
183 defer sis.mtx.Unlock()
185 panic("Stop called twice")
190 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
192 defer sis.mtx.Unlock()
193 for _, vm := range sis.servers {
194 svms = append(svms, vm)
199 type RateLimitError struct{ Retry time.Time }
201 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
202 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
204 type CapacityError struct{ InstanceTypeSpecific bool }
206 func (e CapacityError) Error() string { return "insufficient capacity" }
207 func (e CapacityError) IsCapacityError() bool { return true }
208 func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
210 // StubVM is a fake server that runs an SSH service. It represents a
211 // VM running in a fake cloud.
213 // Note this is distinct from a stubInstance, which is a snapshot of
214 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
215 // running (and might change IP addresses, shut down, etc.) without
216 // updating any stubInstances that have been returned to callers.
220 ReportBroken time.Time
221 CrunchRunMissing bool
222 CrunchRunCrashRate float64
223 CrunchRunDetachDelay time.Duration
224 ArvMountMaxExitLag time.Duration
225 ArvMountDeadlockRate float64
226 ExecuteContainer func(arvados.Container) int
227 CrashRunningContainer func(arvados.Container)
228 ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
230 // Populated by (*StubInstanceSet)Create()
231 InitCommand cloud.InitCommand
235 tags cloud.InstanceTags
237 SSHService SSHService
238 running map[string]stubProcess
239 killing map[string]bool
242 stubprocs sync.WaitGroup
247 type stubProcess struct {
250 // crunch-run has exited, but arv-mount process (or something)
251 // still holds lock in /var/run/
255 func (svm *StubVM) Instance() stubInstance {
260 addr: svm.SSHService.Address(),
261 // We deliberately return a cached/stale copy of the
262 // real tags here, so that (Instance)Tags() sometimes
263 // returns old data after a call to
264 // (Instance)SetTags(). This is permitted by the
265 // driver interface, and this might help remind
266 // callers that they need to tolerate it.
267 tags: copyTags(svm.tags),
271 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
272 // Ensure we don't start any new stubprocs after Destroy()
273 // has started Wait()ing for stubprocs to end.
280 defer svm.stubprocs.Done()
283 stdinData, err := ioutil.ReadAll(stdin)
285 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
288 queue := svm.sis.driver.Queue
289 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
290 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
291 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
294 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
295 fmt.Fprintf(stderr, "cannot fork\n")
298 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
299 fmt.Fprint(stderr, "crunch-run: command not found\n")
302 if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
303 var configData crunchrun.ConfigData
304 err := json.Unmarshal(stdinData, &configData)
306 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
309 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
310 if configData.Env[name] == "" {
311 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
318 svm.running[uuid] = stubProcess{pid: pid}
321 time.Sleep(svm.CrunchRunDetachDelay)
326 fmt.Fprint(stderr, "crunch-run: killed by system shutdown\n")
329 fmt.Fprintf(stderr, "starting %s\n", uuid)
330 logger := svm.sis.logger.WithFields(logrus.Fields{
332 "ContainerUUID": uuid,
335 logger.Printf("[test] starting crunch-run stub")
338 defer svm.stubprocs.Done()
339 var ctr arvados.Container
340 var started, completed bool
342 logger.Print("[test] exiting crunch-run stub")
348 if svm.running[uuid].pid != pid {
349 bugf := svm.sis.driver.Bugf
353 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
357 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
358 if started && svm.CrashRunningContainer != nil {
359 svm.CrashRunningContainer(ctr)
362 sproc := svm.running[uuid]
364 svm.running[uuid] = sproc
366 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
368 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
369 delete(svm.running, uuid)
373 crashluck := math_rand.Float64()
374 wantCrash := crashluck < svm.CrunchRunCrashRate
375 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
377 ctr, ok := queue.Get(uuid)
379 logger.Print("[test] container not in queue")
383 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
386 killed := svm.killing[uuid]
387 delete(svm.killing, uuid)
388 destroying := svm.destroying
390 if killed || wantCrashEarly || destroying {
394 ctr.State = arvados.ContainerStateRunning
395 started = queue.Notify(ctr)
397 ctr, _ = queue.Get(uuid)
398 logger.Print("[test] erroring out because state=Running update was rejected")
403 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
406 if svm.ExecuteContainer != nil {
407 ctr.ExitCode = svm.ExecuteContainer(ctr)
409 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
410 ctr.State = arvados.ContainerStateComplete
411 completed = queue.Notify(ctr)
415 if command == "crunch-run --list" {
418 for uuid, sproc := range svm.running {
420 fmt.Fprintf(stdout, "%s stale\n", uuid)
422 fmt.Fprintf(stdout, "%s\n", uuid)
425 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
426 fmt.Fprintln(stdout, "broken")
428 fmt.Fprintln(stdout, svm.deadlocked)
431 if strings.HasPrefix(command, "crunch-run --kill ") {
433 sproc, running := svm.running[uuid]
434 if running && !sproc.exited {
435 svm.killing[uuid] = true
437 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
439 sproc, running = svm.running[uuid]
442 if running && !sproc.exited {
443 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
446 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
449 if command == "true" {
452 fmt.Fprintf(stderr, "%q: command not found", command)
456 type stubInstance struct {
459 tags cloud.InstanceTags
462 func (si stubInstance) ID() cloud.InstanceID {
466 func (si stubInstance) Address() string {
470 func (si stubInstance) RemoteUser() string {
471 return si.svm.SSHService.AuthorizedUser
474 func (si stubInstance) Destroy() error {
476 if sis.driver.HoldCloudOps {
477 sis.driver.holdCloudOps <- true
479 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
480 return errors.New("instance could not be destroyed")
483 si.svm.destroying = true
485 si.svm.stubprocs.Wait()
486 si.svm.SSHService.Close()
488 defer sis.mtx.Unlock()
489 delete(sis.servers, si.svm.id)
493 func (si stubInstance) ProviderType() string {
494 return si.svm.providerType
497 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
498 tags = copyTags(tags)
508 func (si stubInstance) Tags() cloud.InstanceTags {
509 // Return a copy to ensure a caller can't change our saved
510 // tags just by writing to the returned map.
511 return copyTags(si.tags)
514 func (si stubInstance) String() string {
515 return string(si.svm.id)
518 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
519 buf := make([]byte, 512)
520 _, err := io.ReadFull(rand.Reader, buf)
524 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
528 return key.Verify(buf, sig)
531 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
532 dst := cloud.InstanceTags{}
533 for k, v := range src {
539 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
543 type QuotaError struct {
547 func (QuotaError) IsQuotaError() bool { return true }