1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
30 AuthorizedKeys []ssh.PublicKey
32 // SetupVM, if set, is called upon creation of each new
33 // StubVM. This is the caller's opportunity to customize the
34 // VM's error rate and other behaviors.
37 // Bugf, if set, is called if a bug is detected in the caller
38 // or stub. Typically set to (*check.C)Errorf. If unset,
39 // logger.Warnf is called instead.
40 Bugf func(string, ...interface{})
42 // StubVM's fake crunch-run uses this Queue to read and update
46 // Frequency of artificially introduced errors on calls to
47 // Destroy. 0=always succeed, 1=always fail.
48 ErrorRateDestroy float64
50 // If Create() or Instances() is called too frequently, return
51 // rate-limiting errors.
52 MinTimeBetweenCreateCalls time.Duration
53 MinTimeBetweenInstancesCalls time.Duration
55 // If true, Create and Destroy calls block until Release() is
59 instanceSets []*StubInstanceSet
60 holdCloudOps chan bool
63 // InstanceSet returns a new *StubInstanceSet.
64 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
65 if sd.holdCloudOps == nil {
66 sd.holdCloudOps = make(chan bool)
68 sis := StubInstanceSet{
71 servers: map[cloud.InstanceID]*StubVM{},
73 sd.instanceSets = append(sd.instanceSets, &sis)
77 err = json.Unmarshal(params, &sis)
82 // InstanceSets returns all instances that have been created by the
83 // driver. This can be used to test a component that uses the driver
84 // but doesn't expose the InstanceSets it has created.
85 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
86 return sd.instanceSets
89 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
90 // are fewer than n blocked calls pending, it waits for the rest to
92 func (sd *StubDriver) ReleaseCloudOps(n int) {
93 for i := 0; i < n; i++ {
98 type StubInstanceSet struct {
100 logger logrus.FieldLogger
101 servers map[cloud.InstanceID]*StubVM
105 allowCreateCall time.Time
106 allowInstancesCall time.Time
110 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
111 if sis.driver.HoldCloudOps {
112 sis.driver.holdCloudOps <- true
115 defer sis.mtx.Unlock()
117 return nil, errors.New("StubInstanceSet: Create called after Stop")
119 if sis.allowCreateCall.After(time.Now()) {
120 return nil, RateLimitError{sis.allowCreateCall}
122 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
123 ak := sis.driver.AuthorizedKeys
125 ak = append([]ssh.PublicKey{authKey}, ak...)
130 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
131 tags: copyTags(tags),
132 providerType: it.ProviderType,
134 running: map[string]stubProcess{},
135 killing: map[string]bool{},
137 svm.SSHService = SSHService{
138 HostKey: sis.driver.HostKey,
139 AuthorizedUser: "root",
143 if setup := sis.driver.SetupVM; setup != nil {
146 sis.servers[svm.id] = svm
147 return svm.Instance(), nil
150 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
152 defer sis.mtx.RUnlock()
153 if sis.allowInstancesCall.After(time.Now()) {
154 return nil, RateLimitError{sis.allowInstancesCall}
156 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
157 var r []cloud.Instance
158 for _, ss := range sis.servers {
159 r = append(r, ss.Instance())
164 func (sis *StubInstanceSet) Stop() {
166 defer sis.mtx.Unlock()
168 panic("Stop called twice")
173 type RateLimitError struct{ Retry time.Time }
175 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
176 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
178 // StubVM is a fake server that runs an SSH service. It represents a
179 // VM running in a fake cloud.
181 // Note this is distinct from a stubInstance, which is a snapshot of
182 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
183 // running (and might change IP addresses, shut down, etc.) without
184 // updating any stubInstances that have been returned to callers.
188 ReportBroken time.Time
189 CrunchRunMissing bool
190 CrunchRunCrashRate float64
191 CrunchRunDetachDelay time.Duration
192 ArvMountMaxExitLag time.Duration
193 ArvMountDeadlockRate float64
194 ExecuteContainer func(arvados.Container) int
195 CrashRunningContainer func(arvados.Container)
199 tags cloud.InstanceTags
200 initCommand cloud.InitCommand
202 SSHService SSHService
203 running map[string]stubProcess
204 killing map[string]bool
210 type stubProcess struct {
213 // crunch-run has exited, but arv-mount process (or something)
214 // still holds lock in /var/run/
218 func (svm *StubVM) Instance() stubInstance {
223 addr: svm.SSHService.Address(),
224 // We deliberately return a cached/stale copy of the
225 // real tags here, so that (Instance)Tags() sometimes
226 // returns old data after a call to
227 // (Instance)SetTags(). This is permitted by the
228 // driver interface, and this might help remind
229 // callers that they need to tolerate it.
230 tags: copyTags(svm.tags),
234 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
235 stdinData, err := ioutil.ReadAll(stdin)
237 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
240 queue := svm.sis.driver.Queue
241 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
242 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
243 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
246 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
247 fmt.Fprintf(stderr, "cannot fork\n")
250 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
251 fmt.Fprint(stderr, "crunch-run: command not found\n")
254 if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
255 var stdinKV map[string]string
256 err := json.Unmarshal(stdinData, &stdinKV)
258 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
261 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
262 if stdinKV[name] == "" {
263 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
270 svm.running[uuid] = stubProcess{pid: pid}
272 time.Sleep(svm.CrunchRunDetachDelay)
273 fmt.Fprintf(stderr, "starting %s\n", uuid)
274 logger := svm.sis.logger.WithFields(logrus.Fields{
276 "ContainerUUID": uuid,
279 logger.Printf("[test] starting crunch-run stub")
281 var ctr arvados.Container
282 var started, completed bool
284 logger.Print("[test] exiting crunch-run stub")
287 if svm.running[uuid].pid != pid {
288 bugf := svm.sis.driver.Bugf
292 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
296 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
297 if started && svm.CrashRunningContainer != nil {
298 svm.CrashRunningContainer(ctr)
301 sproc := svm.running[uuid]
303 svm.running[uuid] = sproc
305 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
307 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
308 delete(svm.running, uuid)
312 crashluck := math_rand.Float64()
313 wantCrash := crashluck < svm.CrunchRunCrashRate
314 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
316 ctr, ok := queue.Get(uuid)
318 logger.Print("[test] container not in queue")
322 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
325 killed := svm.killing[uuid]
327 if killed || wantCrashEarly {
331 ctr.State = arvados.ContainerStateRunning
332 started = queue.Notify(ctr)
334 ctr, _ = queue.Get(uuid)
335 logger.Print("[test] erroring out because state=Running update was rejected")
340 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
343 if svm.ExecuteContainer != nil {
344 ctr.ExitCode = svm.ExecuteContainer(ctr)
346 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
347 ctr.State = arvados.ContainerStateComplete
348 completed = queue.Notify(ctr)
352 if command == "crunch-run --list" {
355 for uuid, sproc := range svm.running {
357 fmt.Fprintf(stdout, "%s stale\n", uuid)
359 fmt.Fprintf(stdout, "%s\n", uuid)
362 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
363 fmt.Fprintln(stdout, "broken")
365 fmt.Fprintln(stdout, svm.deadlocked)
368 if strings.HasPrefix(command, "crunch-run --kill ") {
370 sproc, running := svm.running[uuid]
371 if running && !sproc.exited {
372 svm.killing[uuid] = true
374 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
376 sproc, running = svm.running[uuid]
379 if running && !sproc.exited {
380 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
383 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
386 if command == "true" {
389 fmt.Fprintf(stderr, "%q: command not found", command)
393 type stubInstance struct {
396 tags cloud.InstanceTags
399 func (si stubInstance) ID() cloud.InstanceID {
403 func (si stubInstance) Address() string {
407 func (si stubInstance) RemoteUser() string {
408 return si.svm.SSHService.AuthorizedUser
411 func (si stubInstance) Destroy() error {
413 if sis.driver.HoldCloudOps {
414 sis.driver.holdCloudOps <- true
416 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
417 return errors.New("instance could not be destroyed")
419 si.svm.SSHService.Close()
421 defer sis.mtx.Unlock()
422 delete(sis.servers, si.svm.id)
426 func (si stubInstance) ProviderType() string {
427 return si.svm.providerType
430 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
431 tags = copyTags(tags)
441 func (si stubInstance) Tags() cloud.InstanceTags {
442 // Return a copy to ensure a caller can't change our saved
443 // tags just by writing to the returned map.
444 return copyTags(si.tags)
447 func (si stubInstance) String() string {
448 return string(si.svm.id)
451 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
452 buf := make([]byte, 512)
453 _, err := io.ReadFull(rand.Reader, buf)
457 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
461 return key.Verify(buf, sig)
464 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
465 dst := cloud.InstanceTags{}
466 for k, v := range src {