1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
30 AuthorizedKeys []ssh.PublicKey
32 // SetupVM, if set, is called upon creation of each new
33 // StubVM. This is the caller's opportunity to customize the
34 // VM's error rate and other behaviors.
37 // Bugf, if set, is called if a bug is detected in the caller
38 // or stub. Typically set to (*check.C)Errorf. If unset,
39 // logger.Warnf is called instead.
40 Bugf func(string, ...interface{})
42 // StubVM's fake crunch-run uses this Queue to read and update
46 // Frequency of artificially introduced errors on calls to
47 // Destroy. 0=always succeed, 1=always fail.
48 ErrorRateDestroy float64
50 // If Create() or Instances() is called too frequently, return
51 // rate-limiting errors.
52 MinTimeBetweenCreateCalls time.Duration
53 MinTimeBetweenInstancesCalls time.Duration
55 // If true, Create and Destroy calls block until Release() is
59 instanceSets []*StubInstanceSet
60 holdCloudOps chan bool
63 // InstanceSet returns a new *StubInstanceSet.
64 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
65 if sd.holdCloudOps == nil {
66 sd.holdCloudOps = make(chan bool)
68 sis := StubInstanceSet{
71 servers: map[cloud.InstanceID]*StubVM{},
73 sd.instanceSets = append(sd.instanceSets, &sis)
77 err = json.Unmarshal(params, &sis)
82 // InstanceSets returns all instances that have been created by the
83 // driver. This can be used to test a component that uses the driver
84 // but doesn't expose the InstanceSets it has created.
85 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
86 return sd.instanceSets
89 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
90 // are fewer than n blocked calls pending, it waits for the rest to
92 func (sd *StubDriver) ReleaseCloudOps(n int) {
93 for i := 0; i < n; i++ {
98 type StubInstanceSet struct {
100 logger logrus.FieldLogger
101 servers map[cloud.InstanceID]*StubVM
105 allowCreateCall time.Time
106 allowInstancesCall time.Time
110 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
111 if sis.driver.HoldCloudOps {
112 sis.driver.holdCloudOps <- true
115 defer sis.mtx.Unlock()
117 return nil, errors.New("StubInstanceSet: Create called after Stop")
119 if sis.allowCreateCall.After(time.Now()) {
120 return nil, RateLimitError{sis.allowCreateCall}
122 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
123 ak := sis.driver.AuthorizedKeys
125 ak = append([]ssh.PublicKey{authKey}, ak...)
130 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
131 tags: copyTags(tags),
132 providerType: it.ProviderType,
134 running: map[string]stubProcess{},
135 killing: map[string]bool{},
137 svm.SSHService = SSHService{
138 HostKey: sis.driver.HostKey,
139 AuthorizedUser: "root",
143 if setup := sis.driver.SetupVM; setup != nil {
146 sis.servers[svm.id] = svm
147 return svm.Instance(), nil
150 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
152 defer sis.mtx.RUnlock()
153 if sis.allowInstancesCall.After(time.Now()) {
154 return nil, RateLimitError{sis.allowInstancesCall}
156 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
157 var r []cloud.Instance
158 for _, ss := range sis.servers {
159 r = append(r, ss.Instance())
164 func (sis *StubInstanceSet) Stop() {
166 defer sis.mtx.Unlock()
168 panic("Stop called twice")
173 type RateLimitError struct{ Retry time.Time }
175 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
176 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
178 // StubVM is a fake server that runs an SSH service. It represents a
179 // VM running in a fake cloud.
181 // Note this is distinct from a stubInstance, which is a snapshot of
182 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
183 // running (and might change IP addresses, shut down, etc.) without
184 // updating any stubInstances that have been returned to callers.
188 ReportBroken time.Time
189 CrunchRunMissing bool
190 CrunchRunCrashRate float64
191 CrunchRunDetachDelay time.Duration
192 ArvMountMaxExitLag time.Duration
193 ArvMountDeadlockRate float64
194 ExecuteContainer func(arvados.Container) int
195 CrashRunningContainer func(arvados.Container)
196 ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-env "
200 tags cloud.InstanceTags
201 initCommand cloud.InitCommand
203 SSHService SSHService
204 running map[string]stubProcess
205 killing map[string]bool
211 type stubProcess struct {
214 // crunch-run has exited, but arv-mount process (or something)
215 // still holds lock in /var/run/
219 func (svm *StubVM) Instance() stubInstance {
224 addr: svm.SSHService.Address(),
225 // We deliberately return a cached/stale copy of the
226 // real tags here, so that (Instance)Tags() sometimes
227 // returns old data after a call to
228 // (Instance)SetTags(). This is permitted by the
229 // driver interface, and this might help remind
230 // callers that they need to tolerate it.
231 tags: copyTags(svm.tags),
235 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
236 stdinData, err := ioutil.ReadAll(stdin)
238 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
241 queue := svm.sis.driver.Queue
242 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
243 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
244 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
247 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
248 fmt.Fprintf(stderr, "cannot fork\n")
251 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
252 fmt.Fprint(stderr, "crunch-run: command not found\n")
255 if strings.HasPrefix(command, "crunch-run --detach --stdin-env "+svm.ExtraCrunchRunArgs) {
256 var stdinKV map[string]string
257 err := json.Unmarshal(stdinData, &stdinKV)
259 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
262 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
263 if stdinKV[name] == "" {
264 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
271 svm.running[uuid] = stubProcess{pid: pid}
273 time.Sleep(svm.CrunchRunDetachDelay)
274 fmt.Fprintf(stderr, "starting %s\n", uuid)
275 logger := svm.sis.logger.WithFields(logrus.Fields{
277 "ContainerUUID": uuid,
280 logger.Printf("[test] starting crunch-run stub")
282 var ctr arvados.Container
283 var started, completed bool
285 logger.Print("[test] exiting crunch-run stub")
288 if svm.running[uuid].pid != pid {
289 bugf := svm.sis.driver.Bugf
293 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
297 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
298 if started && svm.CrashRunningContainer != nil {
299 svm.CrashRunningContainer(ctr)
302 sproc := svm.running[uuid]
304 svm.running[uuid] = sproc
306 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
308 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
309 delete(svm.running, uuid)
313 crashluck := math_rand.Float64()
314 wantCrash := crashluck < svm.CrunchRunCrashRate
315 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
317 ctr, ok := queue.Get(uuid)
319 logger.Print("[test] container not in queue")
323 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
326 killed := svm.killing[uuid]
328 if killed || wantCrashEarly {
332 ctr.State = arvados.ContainerStateRunning
333 started = queue.Notify(ctr)
335 ctr, _ = queue.Get(uuid)
336 logger.Print("[test] erroring out because state=Running update was rejected")
341 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
344 if svm.ExecuteContainer != nil {
345 ctr.ExitCode = svm.ExecuteContainer(ctr)
347 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
348 ctr.State = arvados.ContainerStateComplete
349 completed = queue.Notify(ctr)
353 if command == "crunch-run --list" {
356 for uuid, sproc := range svm.running {
358 fmt.Fprintf(stdout, "%s stale\n", uuid)
360 fmt.Fprintf(stdout, "%s\n", uuid)
363 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
364 fmt.Fprintln(stdout, "broken")
366 fmt.Fprintln(stdout, svm.deadlocked)
369 if strings.HasPrefix(command, "crunch-run --kill ") {
371 sproc, running := svm.running[uuid]
372 if running && !sproc.exited {
373 svm.killing[uuid] = true
375 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
377 sproc, running = svm.running[uuid]
380 if running && !sproc.exited {
381 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
384 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
387 if command == "true" {
390 fmt.Fprintf(stderr, "%q: command not found", command)
394 type stubInstance struct {
397 tags cloud.InstanceTags
400 func (si stubInstance) ID() cloud.InstanceID {
404 func (si stubInstance) Address() string {
408 func (si stubInstance) RemoteUser() string {
409 return si.svm.SSHService.AuthorizedUser
412 func (si stubInstance) Destroy() error {
414 if sis.driver.HoldCloudOps {
415 sis.driver.holdCloudOps <- true
417 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
418 return errors.New("instance could not be destroyed")
420 si.svm.SSHService.Close()
422 defer sis.mtx.Unlock()
423 delete(sis.servers, si.svm.id)
427 func (si stubInstance) ProviderType() string {
428 return si.svm.providerType
431 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
432 tags = copyTags(tags)
442 func (si stubInstance) Tags() cloud.InstanceTags {
443 // Return a copy to ensure a caller can't change our saved
444 // tags just by writing to the returned map.
445 return copyTags(si.tags)
448 func (si stubInstance) String() string {
449 return string(si.svm.id)
452 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
453 buf := make([]byte, 512)
454 _, err := io.ReadFull(rand.Reader, buf)
458 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
462 return key.Verify(buf, sig)
465 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
466 dst := cloud.InstanceTags{}
467 for k, v := range src {