1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/lib/crunchrun"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/sirupsen/logrus"
25 "golang.org/x/crypto/ssh"
28 // A StubDriver implements cloud.Driver by setting up local SSH
29 // servers that do fake command executions.
30 type StubDriver struct {
32 AuthorizedKeys []ssh.PublicKey
34 // SetupVM, if set, is called upon creation of each new
35 // StubVM. This is the caller's opportunity to customize the
36 // VM's error rate and other behaviors.
39 // Bugf, if set, is called if a bug is detected in the caller
40 // or stub. Typically set to (*check.C)Errorf. If unset,
41 // logger.Warnf is called instead.
42 Bugf func(string, ...interface{})
44 // StubVM's fake crunch-run uses this Queue to read and update
48 // Frequency of artificially introduced errors on calls to
49 // Create and Destroy. 0=always succeed, 1=always fail.
50 ErrorRateCreate float64
51 ErrorRateDestroy float64
53 // If Create() or Instances() is called too frequently, return
54 // rate-limiting errors.
55 MinTimeBetweenCreateCalls time.Duration
56 MinTimeBetweenInstancesCalls time.Duration
58 // If true, Create and Destroy calls block until Release() is
62 instanceSets []*StubInstanceSet
63 holdCloudOps chan bool
66 // InstanceSet returns a new *StubInstanceSet.
67 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
68 if sd.holdCloudOps == nil {
69 sd.holdCloudOps = make(chan bool)
71 sis := StubInstanceSet{
74 servers: map[cloud.InstanceID]*StubVM{},
76 sd.instanceSets = append(sd.instanceSets, &sis)
80 err = json.Unmarshal(params, &sis)
85 // InstanceSets returns all instances that have been created by the
86 // driver. This can be used to test a component that uses the driver
87 // but doesn't expose the InstanceSets it has created.
88 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
89 return sd.instanceSets
92 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
93 // are fewer than n blocked calls pending, it waits for the rest to
95 func (sd *StubDriver) ReleaseCloudOps(n int) {
96 for i := 0; i < n; i++ {
101 type StubInstanceSet struct {
103 logger logrus.FieldLogger
104 servers map[cloud.InstanceID]*StubVM
108 allowCreateCall time.Time
109 allowInstancesCall time.Time
113 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
114 if sis.driver.HoldCloudOps {
115 sis.driver.holdCloudOps <- true
118 defer sis.mtx.Unlock()
120 return nil, errors.New("StubInstanceSet: Create called after Stop")
122 if sis.allowCreateCall.After(time.Now()) {
123 return nil, RateLimitError{sis.allowCreateCall}
125 if math_rand.Float64() < sis.driver.ErrorRateCreate {
126 return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
128 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
129 ak := sis.driver.AuthorizedKeys
131 ak = append([]ssh.PublicKey{authKey}, ak...)
135 InitCommand: initCommand,
137 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
138 tags: copyTags(tags),
139 providerType: it.ProviderType,
140 running: map[string]stubProcess{},
141 killing: map[string]bool{},
143 svm.SSHService = SSHService{
144 HostKey: sis.driver.HostKey,
145 AuthorizedUser: "root",
149 if setup := sis.driver.SetupVM; setup != nil {
152 sis.servers[svm.id] = svm
153 return svm.Instance(), nil
156 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
158 defer sis.mtx.RUnlock()
159 if sis.allowInstancesCall.After(time.Now()) {
160 return nil, RateLimitError{sis.allowInstancesCall}
162 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
163 var r []cloud.Instance
164 for _, ss := range sis.servers {
165 r = append(r, ss.Instance())
170 func (sis *StubInstanceSet) Stop() {
172 defer sis.mtx.Unlock()
174 panic("Stop called twice")
179 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
181 defer sis.mtx.Unlock()
182 for _, vm := range sis.servers {
183 svms = append(svms, vm)
188 type RateLimitError struct{ Retry time.Time }
190 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
191 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
193 // StubVM is a fake server that runs an SSH service. It represents a
194 // VM running in a fake cloud.
196 // Note this is distinct from a stubInstance, which is a snapshot of
197 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
198 // running (and might change IP addresses, shut down, etc.) without
199 // updating any stubInstances that have been returned to callers.
203 ReportBroken time.Time
204 CrunchRunMissing bool
205 CrunchRunCrashRate float64
206 CrunchRunDetachDelay time.Duration
207 ArvMountMaxExitLag time.Duration
208 ArvMountDeadlockRate float64
209 ExecuteContainer func(arvados.Container) int
210 CrashRunningContainer func(arvados.Container)
211 ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
213 // Populated by (*StubInstanceSet)Create()
214 InitCommand cloud.InitCommand
218 tags cloud.InstanceTags
220 SSHService SSHService
221 running map[string]stubProcess
222 killing map[string]bool
228 type stubProcess struct {
231 // crunch-run has exited, but arv-mount process (or something)
232 // still holds lock in /var/run/
236 func (svm *StubVM) Instance() stubInstance {
241 addr: svm.SSHService.Address(),
242 // We deliberately return a cached/stale copy of the
243 // real tags here, so that (Instance)Tags() sometimes
244 // returns old data after a call to
245 // (Instance)SetTags(). This is permitted by the
246 // driver interface, and this might help remind
247 // callers that they need to tolerate it.
248 tags: copyTags(svm.tags),
252 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
253 stdinData, err := ioutil.ReadAll(stdin)
255 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
258 queue := svm.sis.driver.Queue
259 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
260 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
261 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
264 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
265 fmt.Fprintf(stderr, "cannot fork\n")
268 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
269 fmt.Fprint(stderr, "crunch-run: command not found\n")
272 if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
273 var configData crunchrun.ConfigData
274 err := json.Unmarshal(stdinData, &configData)
276 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
279 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
280 if configData.Env[name] == "" {
281 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
288 svm.running[uuid] = stubProcess{pid: pid}
290 time.Sleep(svm.CrunchRunDetachDelay)
291 fmt.Fprintf(stderr, "starting %s\n", uuid)
292 logger := svm.sis.logger.WithFields(logrus.Fields{
294 "ContainerUUID": uuid,
297 logger.Printf("[test] starting crunch-run stub")
299 var ctr arvados.Container
300 var started, completed bool
302 logger.Print("[test] exiting crunch-run stub")
305 if svm.running[uuid].pid != pid {
306 bugf := svm.sis.driver.Bugf
310 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
314 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
315 if started && svm.CrashRunningContainer != nil {
316 svm.CrashRunningContainer(ctr)
319 sproc := svm.running[uuid]
321 svm.running[uuid] = sproc
323 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
325 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
326 delete(svm.running, uuid)
330 crashluck := math_rand.Float64()
331 wantCrash := crashluck < svm.CrunchRunCrashRate
332 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
334 ctr, ok := queue.Get(uuid)
336 logger.Print("[test] container not in queue")
340 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
343 killed := svm.killing[uuid]
345 if killed || wantCrashEarly {
349 ctr.State = arvados.ContainerStateRunning
350 started = queue.Notify(ctr)
352 ctr, _ = queue.Get(uuid)
353 logger.Print("[test] erroring out because state=Running update was rejected")
358 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
361 if svm.ExecuteContainer != nil {
362 ctr.ExitCode = svm.ExecuteContainer(ctr)
364 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
365 ctr.State = arvados.ContainerStateComplete
366 completed = queue.Notify(ctr)
370 if command == "crunch-run --list" {
373 for uuid, sproc := range svm.running {
375 fmt.Fprintf(stdout, "%s stale\n", uuid)
377 fmt.Fprintf(stdout, "%s\n", uuid)
380 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
381 fmt.Fprintln(stdout, "broken")
383 fmt.Fprintln(stdout, svm.deadlocked)
386 if strings.HasPrefix(command, "crunch-run --kill ") {
388 sproc, running := svm.running[uuid]
389 if running && !sproc.exited {
390 svm.killing[uuid] = true
392 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
394 sproc, running = svm.running[uuid]
397 if running && !sproc.exited {
398 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
401 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
404 if command == "true" {
407 fmt.Fprintf(stderr, "%q: command not found", command)
411 type stubInstance struct {
414 tags cloud.InstanceTags
417 func (si stubInstance) ID() cloud.InstanceID {
421 func (si stubInstance) Address() string {
425 func (si stubInstance) RemoteUser() string {
426 return si.svm.SSHService.AuthorizedUser
429 func (si stubInstance) Destroy() error {
431 if sis.driver.HoldCloudOps {
432 sis.driver.holdCloudOps <- true
434 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
435 return errors.New("instance could not be destroyed")
437 si.svm.SSHService.Close()
439 defer sis.mtx.Unlock()
440 delete(sis.servers, si.svm.id)
444 func (si stubInstance) ProviderType() string {
445 return si.svm.providerType
448 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
449 tags = copyTags(tags)
459 func (si stubInstance) Tags() cloud.InstanceTags {
460 // Return a copy to ensure a caller can't change our saved
461 // tags just by writing to the returned map.
462 return copyTags(si.tags)
465 func (si stubInstance) String() string {
466 return string(si.svm.id)
469 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
470 buf := make([]byte, 512)
471 _, err := io.ReadFull(rand.Reader, buf)
475 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
479 return key.Verify(buf, sig)
482 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
483 dst := cloud.InstanceTags{}
484 for k, v := range src {
490 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {