1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/lib/crunchrun"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/sirupsen/logrus"
24 "golang.org/x/crypto/ssh"
27 // A StubDriver implements cloud.Driver by setting up local SSH
28 // servers that do fake command executions.
29 type StubDriver struct {
31 AuthorizedKeys []ssh.PublicKey
33 // SetupVM, if set, is called upon creation of each new
34 // StubVM. This is the caller's opportunity to customize the
35 // VM's error rate and other behaviors.
38 // Bugf, if set, is called if a bug is detected in the caller
39 // or stub. Typically set to (*check.C)Errorf. If unset,
40 // logger.Warnf is called instead.
41 Bugf func(string, ...interface{})
43 // StubVM's fake crunch-run uses this Queue to read and update
47 // Frequency of artificially introduced errors on calls to
48 // Destroy. 0=always succeed, 1=always fail.
49 ErrorRateDestroy float64
51 // If Create() or Instances() is called too frequently, return
52 // rate-limiting errors.
53 MinTimeBetweenCreateCalls time.Duration
54 MinTimeBetweenInstancesCalls time.Duration
56 // If true, Create and Destroy calls block until Release() is
60 instanceSets []*StubInstanceSet
61 holdCloudOps chan bool
64 // InstanceSet returns a new *StubInstanceSet.
65 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
66 if sd.holdCloudOps == nil {
67 sd.holdCloudOps = make(chan bool)
69 sis := StubInstanceSet{
72 servers: map[cloud.InstanceID]*StubVM{},
74 sd.instanceSets = append(sd.instanceSets, &sis)
78 err = json.Unmarshal(params, &sis)
83 // InstanceSets returns all instances that have been created by the
84 // driver. This can be used to test a component that uses the driver
85 // but doesn't expose the InstanceSets it has created.
86 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
87 return sd.instanceSets
90 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
91 // are fewer than n blocked calls pending, it waits for the rest to
93 func (sd *StubDriver) ReleaseCloudOps(n int) {
94 for i := 0; i < n; i++ {
99 type StubInstanceSet struct {
101 logger logrus.FieldLogger
102 servers map[cloud.InstanceID]*StubVM
106 allowCreateCall time.Time
107 allowInstancesCall time.Time
111 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
112 if sis.driver.HoldCloudOps {
113 sis.driver.holdCloudOps <- true
116 defer sis.mtx.Unlock()
118 return nil, errors.New("StubInstanceSet: Create called after Stop")
120 if sis.allowCreateCall.After(time.Now()) {
121 return nil, RateLimitError{sis.allowCreateCall}
123 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
124 ak := sis.driver.AuthorizedKeys
126 ak = append([]ssh.PublicKey{authKey}, ak...)
130 InitCommand: initCommand,
132 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
133 tags: copyTags(tags),
134 providerType: it.ProviderType,
135 running: map[string]stubProcess{},
136 killing: map[string]bool{},
138 svm.SSHService = SSHService{
139 HostKey: sis.driver.HostKey,
140 AuthorizedUser: "root",
144 if setup := sis.driver.SetupVM; setup != nil {
147 sis.servers[svm.id] = svm
148 return svm.Instance(), nil
151 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
153 defer sis.mtx.RUnlock()
154 if sis.allowInstancesCall.After(time.Now()) {
155 return nil, RateLimitError{sis.allowInstancesCall}
157 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
158 var r []cloud.Instance
159 for _, ss := range sis.servers {
160 r = append(r, ss.Instance())
165 func (sis *StubInstanceSet) Stop() {
167 defer sis.mtx.Unlock()
169 panic("Stop called twice")
174 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
176 defer sis.mtx.Unlock()
177 for _, vm := range sis.servers {
178 svms = append(svms, vm)
183 type RateLimitError struct{ Retry time.Time }
185 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
186 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
188 // StubVM is a fake server that runs an SSH service. It represents a
189 // VM running in a fake cloud.
191 // Note this is distinct from a stubInstance, which is a snapshot of
192 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
193 // running (and might change IP addresses, shut down, etc.) without
194 // updating any stubInstances that have been returned to callers.
198 ReportBroken time.Time
199 CrunchRunMissing bool
200 CrunchRunCrashRate float64
201 CrunchRunDetachDelay time.Duration
202 ArvMountMaxExitLag time.Duration
203 ArvMountDeadlockRate float64
204 ExecuteContainer func(arvados.Container) int
205 CrashRunningContainer func(arvados.Container)
206 ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
208 // Populated by (*StubInstanceSet)Create()
209 InitCommand cloud.InitCommand
213 tags cloud.InstanceTags
215 SSHService SSHService
216 running map[string]stubProcess
217 killing map[string]bool
223 type stubProcess struct {
226 // crunch-run has exited, but arv-mount process (or something)
227 // still holds lock in /var/run/
231 func (svm *StubVM) Instance() stubInstance {
236 addr: svm.SSHService.Address(),
237 // We deliberately return a cached/stale copy of the
238 // real tags here, so that (Instance)Tags() sometimes
239 // returns old data after a call to
240 // (Instance)SetTags(). This is permitted by the
241 // driver interface, and this might help remind
242 // callers that they need to tolerate it.
243 tags: copyTags(svm.tags),
247 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
248 stdinData, err := ioutil.ReadAll(stdin)
250 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
253 queue := svm.sis.driver.Queue
254 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
255 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
256 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
259 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
260 fmt.Fprintf(stderr, "cannot fork\n")
263 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
264 fmt.Fprint(stderr, "crunch-run: command not found\n")
267 if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
268 var configData crunchrun.ConfigData
269 err := json.Unmarshal(stdinData, &configData)
271 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
274 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
275 if configData.Env[name] == "" {
276 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
283 svm.running[uuid] = stubProcess{pid: pid}
285 time.Sleep(svm.CrunchRunDetachDelay)
286 fmt.Fprintf(stderr, "starting %s\n", uuid)
287 logger := svm.sis.logger.WithFields(logrus.Fields{
289 "ContainerUUID": uuid,
292 logger.Printf("[test] starting crunch-run stub")
294 var ctr arvados.Container
295 var started, completed bool
297 logger.Print("[test] exiting crunch-run stub")
300 if svm.running[uuid].pid != pid {
301 bugf := svm.sis.driver.Bugf
305 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
309 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
310 if started && svm.CrashRunningContainer != nil {
311 svm.CrashRunningContainer(ctr)
314 sproc := svm.running[uuid]
316 svm.running[uuid] = sproc
318 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
320 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
321 delete(svm.running, uuid)
325 crashluck := math_rand.Float64()
326 wantCrash := crashluck < svm.CrunchRunCrashRate
327 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
329 ctr, ok := queue.Get(uuid)
331 logger.Print("[test] container not in queue")
335 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
338 killed := svm.killing[uuid]
340 if killed || wantCrashEarly {
344 ctr.State = arvados.ContainerStateRunning
345 started = queue.Notify(ctr)
347 ctr, _ = queue.Get(uuid)
348 logger.Print("[test] erroring out because state=Running update was rejected")
353 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
356 if svm.ExecuteContainer != nil {
357 ctr.ExitCode = svm.ExecuteContainer(ctr)
359 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
360 ctr.State = arvados.ContainerStateComplete
361 completed = queue.Notify(ctr)
365 if command == "crunch-run --list" {
368 for uuid, sproc := range svm.running {
370 fmt.Fprintf(stdout, "%s stale\n", uuid)
372 fmt.Fprintf(stdout, "%s\n", uuid)
375 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
376 fmt.Fprintln(stdout, "broken")
378 fmt.Fprintln(stdout, svm.deadlocked)
381 if strings.HasPrefix(command, "crunch-run --kill ") {
383 sproc, running := svm.running[uuid]
384 if running && !sproc.exited {
385 svm.killing[uuid] = true
387 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
389 sproc, running = svm.running[uuid]
392 if running && !sproc.exited {
393 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
396 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
399 if command == "true" {
402 fmt.Fprintf(stderr, "%q: command not found", command)
406 type stubInstance struct {
409 tags cloud.InstanceTags
412 func (si stubInstance) ID() cloud.InstanceID {
416 func (si stubInstance) Address() string {
420 func (si stubInstance) RemoteUser() string {
421 return si.svm.SSHService.AuthorizedUser
424 func (si stubInstance) Destroy() error {
426 if sis.driver.HoldCloudOps {
427 sis.driver.holdCloudOps <- true
429 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
430 return errors.New("instance could not be destroyed")
432 si.svm.SSHService.Close()
434 defer sis.mtx.Unlock()
435 delete(sis.servers, si.svm.id)
439 func (si stubInstance) ProviderType() string {
440 return si.svm.providerType
443 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
444 tags = copyTags(tags)
454 func (si stubInstance) Tags() cloud.InstanceTags {
455 // Return a copy to ensure a caller can't change our saved
456 // tags just by writing to the returned map.
457 return copyTags(si.tags)
460 func (si stubInstance) String() string {
461 return string(si.svm.id)
464 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
465 buf := make([]byte, 512)
466 _, err := io.ReadFull(rand.Reader, buf)
470 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
474 return key.Verify(buf, sig)
477 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
478 dst := cloud.InstanceTags{}
479 for k, v := range src {
485 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {