1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/lib/crunchrun"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/sirupsen/logrus"
25 "golang.org/x/crypto/ssh"
28 // A StubDriver implements cloud.Driver by setting up local SSH
29 // servers that do fake command executions.
30 type StubDriver struct {
32 AuthorizedKeys []ssh.PublicKey
34 // SetupVM, if set, is called upon creation of each new
35 // StubVM. This is the caller's opportunity to customize the
36 // VM's error rate and other behaviors.
39 // Bugf, if set, is called if a bug is detected in the caller
40 // or stub. Typically set to (*check.C)Errorf. If unset,
41 // logger.Warnf is called instead.
42 Bugf func(string, ...interface{})
44 // StubVM's fake crunch-run uses this Queue to read and update
48 // Frequency of artificially introduced errors on calls to
49 // Create and Destroy. 0=always succeed, 1=always fail.
50 ErrorRateCreate float64
51 ErrorRateDestroy float64
53 // If Create() or Instances() is called too frequently, return
54 // rate-limiting errors.
55 MinTimeBetweenCreateCalls time.Duration
56 MinTimeBetweenInstancesCalls time.Duration
60 // If true, Create and Destroy calls block until Release() is
64 instanceSets []*StubInstanceSet
65 holdCloudOps chan bool
68 // InstanceSet returns a new *StubInstanceSet.
69 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
70 if sd.holdCloudOps == nil {
71 sd.holdCloudOps = make(chan bool)
73 sis := StubInstanceSet{
76 servers: map[cloud.InstanceID]*StubVM{},
78 sd.instanceSets = append(sd.instanceSets, &sis)
82 err = json.Unmarshal(params, &sis)
87 // InstanceSets returns all instances that have been created by the
88 // driver. This can be used to test a component that uses the driver
89 // but doesn't expose the InstanceSets it has created.
90 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
91 return sd.instanceSets
94 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
95 // are fewer than n blocked calls pending, it waits for the rest to
97 func (sd *StubDriver) ReleaseCloudOps(n int) {
98 for i := 0; i < n; i++ {
103 type StubInstanceSet struct {
105 logger logrus.FieldLogger
106 servers map[cloud.InstanceID]*StubVM
110 allowCreateCall time.Time
111 allowInstancesCall time.Time
115 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
116 if sis.driver.HoldCloudOps {
117 sis.driver.holdCloudOps <- true
120 defer sis.mtx.Unlock()
122 return nil, errors.New("StubInstanceSet: Create called after Stop")
124 if sis.allowCreateCall.After(time.Now()) {
125 return nil, RateLimitError{sis.allowCreateCall}
127 if math_rand.Float64() < sis.driver.ErrorRateCreate {
128 return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
130 if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
131 return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
133 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
134 ak := sis.driver.AuthorizedKeys
136 ak = append([]ssh.PublicKey{authKey}, ak...)
140 InitCommand: initCommand,
142 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
143 tags: copyTags(tags),
144 providerType: it.ProviderType,
145 running: map[string]stubProcess{},
146 killing: map[string]bool{},
148 svm.SSHService = SSHService{
149 HostKey: sis.driver.HostKey,
150 AuthorizedUser: "root",
154 if setup := sis.driver.SetupVM; setup != nil {
157 sis.servers[svm.id] = svm
158 return svm.Instance(), nil
161 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
163 defer sis.mtx.RUnlock()
164 if sis.allowInstancesCall.After(time.Now()) {
165 return nil, RateLimitError{sis.allowInstancesCall}
167 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
168 var r []cloud.Instance
169 for _, ss := range sis.servers {
170 r = append(r, ss.Instance())
175 func (sis *StubInstanceSet) Stop() {
177 defer sis.mtx.Unlock()
179 panic("Stop called twice")
184 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
186 defer sis.mtx.Unlock()
187 for _, vm := range sis.servers {
188 svms = append(svms, vm)
193 type RateLimitError struct{ Retry time.Time }
195 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
196 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
198 // StubVM is a fake server that runs an SSH service. It represents a
199 // VM running in a fake cloud.
201 // Note this is distinct from a stubInstance, which is a snapshot of
202 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
203 // running (and might change IP addresses, shut down, etc.) without
204 // updating any stubInstances that have been returned to callers.
208 ReportBroken time.Time
209 CrunchRunMissing bool
210 CrunchRunCrashRate float64
211 CrunchRunDetachDelay time.Duration
212 ArvMountMaxExitLag time.Duration
213 ArvMountDeadlockRate float64
214 ExecuteContainer func(arvados.Container) int
215 CrashRunningContainer func(arvados.Container)
216 ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
218 // Populated by (*StubInstanceSet)Create()
219 InitCommand cloud.InitCommand
223 tags cloud.InstanceTags
225 SSHService SSHService
226 running map[string]stubProcess
227 killing map[string]bool
233 type stubProcess struct {
236 // crunch-run has exited, but arv-mount process (or something)
237 // still holds lock in /var/run/
241 func (svm *StubVM) Instance() stubInstance {
246 addr: svm.SSHService.Address(),
247 // We deliberately return a cached/stale copy of the
248 // real tags here, so that (Instance)Tags() sometimes
249 // returns old data after a call to
250 // (Instance)SetTags(). This is permitted by the
251 // driver interface, and this might help remind
252 // callers that they need to tolerate it.
253 tags: copyTags(svm.tags),
257 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
258 stdinData, err := ioutil.ReadAll(stdin)
260 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
263 queue := svm.sis.driver.Queue
264 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
265 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
266 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
269 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
270 fmt.Fprintf(stderr, "cannot fork\n")
273 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
274 fmt.Fprint(stderr, "crunch-run: command not found\n")
277 if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
278 var configData crunchrun.ConfigData
279 err := json.Unmarshal(stdinData, &configData)
281 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
284 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
285 if configData.Env[name] == "" {
286 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
293 svm.running[uuid] = stubProcess{pid: pid}
295 time.Sleep(svm.CrunchRunDetachDelay)
296 fmt.Fprintf(stderr, "starting %s\n", uuid)
297 logger := svm.sis.logger.WithFields(logrus.Fields{
299 "ContainerUUID": uuid,
302 logger.Printf("[test] starting crunch-run stub")
304 var ctr arvados.Container
305 var started, completed bool
307 logger.Print("[test] exiting crunch-run stub")
310 if svm.running[uuid].pid != pid {
311 bugf := svm.sis.driver.Bugf
315 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
319 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
320 if started && svm.CrashRunningContainer != nil {
321 svm.CrashRunningContainer(ctr)
324 sproc := svm.running[uuid]
326 svm.running[uuid] = sproc
328 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
330 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
331 delete(svm.running, uuid)
335 crashluck := math_rand.Float64()
336 wantCrash := crashluck < svm.CrunchRunCrashRate
337 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
339 ctr, ok := queue.Get(uuid)
341 logger.Print("[test] container not in queue")
345 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
348 killed := svm.killing[uuid]
350 if killed || wantCrashEarly {
354 ctr.State = arvados.ContainerStateRunning
355 started = queue.Notify(ctr)
357 ctr, _ = queue.Get(uuid)
358 logger.Print("[test] erroring out because state=Running update was rejected")
363 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
366 if svm.ExecuteContainer != nil {
367 ctr.ExitCode = svm.ExecuteContainer(ctr)
369 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
370 ctr.State = arvados.ContainerStateComplete
371 completed = queue.Notify(ctr)
375 if command == "crunch-run --list" {
378 for uuid, sproc := range svm.running {
380 fmt.Fprintf(stdout, "%s stale\n", uuid)
382 fmt.Fprintf(stdout, "%s\n", uuid)
385 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
386 fmt.Fprintln(stdout, "broken")
388 fmt.Fprintln(stdout, svm.deadlocked)
391 if strings.HasPrefix(command, "crunch-run --kill ") {
393 sproc, running := svm.running[uuid]
394 if running && !sproc.exited {
395 svm.killing[uuid] = true
397 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
399 sproc, running = svm.running[uuid]
402 if running && !sproc.exited {
403 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
406 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
409 if command == "true" {
412 fmt.Fprintf(stderr, "%q: command not found", command)
416 type stubInstance struct {
419 tags cloud.InstanceTags
422 func (si stubInstance) ID() cloud.InstanceID {
426 func (si stubInstance) Address() string {
430 func (si stubInstance) RemoteUser() string {
431 return si.svm.SSHService.AuthorizedUser
434 func (si stubInstance) Destroy() error {
436 if sis.driver.HoldCloudOps {
437 sis.driver.holdCloudOps <- true
439 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
440 return errors.New("instance could not be destroyed")
442 si.svm.SSHService.Close()
444 defer sis.mtx.Unlock()
445 delete(sis.servers, si.svm.id)
449 func (si stubInstance) ProviderType() string {
450 return si.svm.providerType
453 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
454 tags = copyTags(tags)
464 func (si stubInstance) Tags() cloud.InstanceTags {
465 // Return a copy to ensure a caller can't change our saved
466 // tags just by writing to the returned map.
467 return copyTags(si.tags)
470 func (si stubInstance) String() string {
471 return string(si.svm.id)
474 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
475 buf := make([]byte, 512)
476 _, err := io.ReadFull(rand.Reader, buf)
480 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
484 return key.Verify(buf, sig)
487 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
488 dst := cloud.InstanceTags{}
489 for k, v := range src {
495 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
499 type QuotaError struct {
503 func (QuotaError) IsQuotaError() bool { return true }