1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/lib/crunchrun"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/sirupsen/logrus"
25 "golang.org/x/crypto/ssh"
28 // A StubDriver implements cloud.Driver by setting up local SSH
29 // servers that do fake command executions.
30 type StubDriver struct {
32 AuthorizedKeys []ssh.PublicKey
34 // SetupVM, if set, is called upon creation of each new
35 // StubVM. This is the caller's opportunity to customize the
36 // VM's error rate and other behaviors.
38 // If SetupVM returns an error, that error will be returned to
39 // the caller of Create(), and the new VM will be discarded.
40 SetupVM func(*StubVM) error
42 // Bugf, if set, is called if a bug is detected in the caller
43 // or stub. Typically set to (*check.C)Errorf. If unset,
44 // logger.Warnf is called instead.
45 Bugf func(string, ...interface{})
47 // StubVM's fake crunch-run uses this Queue to read and update
51 // Frequency of artificially introduced errors on calls to
52 // Create and Destroy. 0=always succeed, 1=always fail.
53 ErrorRateCreate float64
54 ErrorRateDestroy float64
56 // If Create() or Instances() is called too frequently, return
57 // rate-limiting errors.
58 MinTimeBetweenCreateCalls time.Duration
59 MinTimeBetweenInstancesCalls time.Duration
63 // If true, Create and Destroy calls block until Release() is
67 instanceSets []*StubInstanceSet
68 holdCloudOps chan bool
71 // InstanceSet returns a new *StubInstanceSet.
72 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
73 if sd.holdCloudOps == nil {
74 sd.holdCloudOps = make(chan bool)
76 sis := StubInstanceSet{
79 servers: map[cloud.InstanceID]*StubVM{},
81 sd.instanceSets = append(sd.instanceSets, &sis)
85 err = json.Unmarshal(params, &sis)
90 // InstanceSets returns all instances that have been created by the
91 // driver. This can be used to test a component that uses the driver
92 // but doesn't expose the InstanceSets it has created.
93 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
94 return sd.instanceSets
97 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
98 // are fewer than n blocked calls pending, it waits for the rest to
100 func (sd *StubDriver) ReleaseCloudOps(n int) {
101 for i := 0; i < n; i++ {
106 type StubInstanceSet struct {
108 logger logrus.FieldLogger
109 servers map[cloud.InstanceID]*StubVM
113 allowCreateCall time.Time
114 allowInstancesCall time.Time
118 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
119 if sis.driver.HoldCloudOps {
120 sis.driver.holdCloudOps <- true
123 defer sis.mtx.Unlock()
125 return nil, errors.New("StubInstanceSet: Create called after Stop")
127 if sis.allowCreateCall.After(time.Now()) {
128 return nil, RateLimitError{sis.allowCreateCall}
130 if math_rand.Float64() < sis.driver.ErrorRateCreate {
131 return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
133 if max := sis.driver.QuotaMaxInstances; max > 0 && len(sis.servers) >= max {
134 return nil, QuotaError{fmt.Errorf("StubInstanceSet: reached QuotaMaxInstances %d", max)}
136 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
137 ak := sis.driver.AuthorizedKeys
139 ak = append([]ssh.PublicKey{authKey}, ak...)
143 InitCommand: initCommand,
145 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
146 tags: copyTags(tags),
147 providerType: it.ProviderType,
148 running: map[string]stubProcess{},
149 killing: map[string]bool{},
151 svm.SSHService = SSHService{
152 HostKey: sis.driver.HostKey,
153 AuthorizedUser: "root",
157 if setup := sis.driver.SetupVM; setup != nil {
163 sis.servers[svm.id] = svm
164 return svm.Instance(), nil
167 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
169 defer sis.mtx.RUnlock()
170 if sis.allowInstancesCall.After(time.Now()) {
171 return nil, RateLimitError{sis.allowInstancesCall}
173 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
174 var r []cloud.Instance
175 for _, ss := range sis.servers {
176 r = append(r, ss.Instance())
181 func (sis *StubInstanceSet) Stop() {
183 defer sis.mtx.Unlock()
185 panic("Stop called twice")
190 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
192 defer sis.mtx.Unlock()
193 for _, vm := range sis.servers {
194 svms = append(svms, vm)
199 type RateLimitError struct{ Retry time.Time }
201 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
202 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
204 type CapacityError struct{ InstanceTypeSpecific bool }
206 func (e CapacityError) Error() string { return "insufficient capacity" }
207 func (e CapacityError) IsCapacityError() bool { return true }
208 func (e CapacityError) IsInstanceTypeSpecific() bool { return e.InstanceTypeSpecific }
210 // StubVM is a fake server that runs an SSH service. It represents a
211 // VM running in a fake cloud.
213 // Note this is distinct from a stubInstance, which is a snapshot of
214 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
215 // running (and might change IP addresses, shut down, etc.) without
216 // updating any stubInstances that have been returned to callers.
220 ReportBroken time.Time
221 CrunchRunMissing bool
222 CrunchRunCrashRate float64
223 CrunchRunDetachDelay time.Duration
224 ArvMountMaxExitLag time.Duration
225 ArvMountDeadlockRate float64
226 ExecuteContainer func(arvados.Container) int
227 CrashRunningContainer func(arvados.Container)
228 ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
230 // Populated by (*StubInstanceSet)Create()
231 InitCommand cloud.InitCommand
235 tags cloud.InstanceTags
237 SSHService SSHService
238 running map[string]stubProcess
239 killing map[string]bool
245 type stubProcess struct {
248 // crunch-run has exited, but arv-mount process (or something)
249 // still holds lock in /var/run/
253 func (svm *StubVM) Instance() stubInstance {
258 addr: svm.SSHService.Address(),
259 // We deliberately return a cached/stale copy of the
260 // real tags here, so that (Instance)Tags() sometimes
261 // returns old data after a call to
262 // (Instance)SetTags(). This is permitted by the
263 // driver interface, and this might help remind
264 // callers that they need to tolerate it.
265 tags: copyTags(svm.tags),
269 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
270 stdinData, err := ioutil.ReadAll(stdin)
272 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
275 queue := svm.sis.driver.Queue
276 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
277 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
278 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
281 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
282 fmt.Fprintf(stderr, "cannot fork\n")
285 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
286 fmt.Fprint(stderr, "crunch-run: command not found\n")
289 if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
290 var configData crunchrun.ConfigData
291 err := json.Unmarshal(stdinData, &configData)
293 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
296 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
297 if configData.Env[name] == "" {
298 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
305 svm.running[uuid] = stubProcess{pid: pid}
307 time.Sleep(svm.CrunchRunDetachDelay)
308 fmt.Fprintf(stderr, "starting %s\n", uuid)
309 logger := svm.sis.logger.WithFields(logrus.Fields{
311 "ContainerUUID": uuid,
314 logger.Printf("[test] starting crunch-run stub")
316 var ctr arvados.Container
317 var started, completed bool
319 logger.Print("[test] exiting crunch-run stub")
322 if svm.running[uuid].pid != pid {
323 bugf := svm.sis.driver.Bugf
327 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
331 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
332 if started && svm.CrashRunningContainer != nil {
333 svm.CrashRunningContainer(ctr)
336 sproc := svm.running[uuid]
338 svm.running[uuid] = sproc
340 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
342 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
343 delete(svm.running, uuid)
347 crashluck := math_rand.Float64()
348 wantCrash := crashluck < svm.CrunchRunCrashRate
349 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
351 ctr, ok := queue.Get(uuid)
353 logger.Print("[test] container not in queue")
357 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
360 killed := svm.killing[uuid]
362 if killed || wantCrashEarly {
366 ctr.State = arvados.ContainerStateRunning
367 started = queue.Notify(ctr)
369 ctr, _ = queue.Get(uuid)
370 logger.Print("[test] erroring out because state=Running update was rejected")
375 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
378 if svm.ExecuteContainer != nil {
379 ctr.ExitCode = svm.ExecuteContainer(ctr)
381 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
382 ctr.State = arvados.ContainerStateComplete
383 completed = queue.Notify(ctr)
387 if command == "crunch-run --list" {
390 for uuid, sproc := range svm.running {
392 fmt.Fprintf(stdout, "%s stale\n", uuid)
394 fmt.Fprintf(stdout, "%s\n", uuid)
397 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
398 fmt.Fprintln(stdout, "broken")
400 fmt.Fprintln(stdout, svm.deadlocked)
403 if strings.HasPrefix(command, "crunch-run --kill ") {
405 sproc, running := svm.running[uuid]
406 if running && !sproc.exited {
407 svm.killing[uuid] = true
409 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
411 sproc, running = svm.running[uuid]
414 if running && !sproc.exited {
415 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
418 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
421 if command == "true" {
424 fmt.Fprintf(stderr, "%q: command not found", command)
428 type stubInstance struct {
431 tags cloud.InstanceTags
434 func (si stubInstance) ID() cloud.InstanceID {
438 func (si stubInstance) Address() string {
442 func (si stubInstance) RemoteUser() string {
443 return si.svm.SSHService.AuthorizedUser
446 func (si stubInstance) Destroy() error {
448 if sis.driver.HoldCloudOps {
449 sis.driver.holdCloudOps <- true
451 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
452 return errors.New("instance could not be destroyed")
454 si.svm.SSHService.Close()
456 defer sis.mtx.Unlock()
457 delete(sis.servers, si.svm.id)
461 func (si stubInstance) ProviderType() string {
462 return si.svm.providerType
465 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
466 tags = copyTags(tags)
476 func (si stubInstance) Tags() cloud.InstanceTags {
477 // Return a copy to ensure a caller can't change our saved
478 // tags just by writing to the returned map.
479 return copyTags(si.tags)
482 func (si stubInstance) String() string {
483 return string(si.svm.id)
486 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
487 buf := make([]byte, 512)
488 _, err := io.ReadFull(rand.Reader, buf)
492 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
496 return key.Verify(buf, sig)
499 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
500 dst := cloud.InstanceTags{}
501 for k, v := range src {
507 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
511 type QuotaError struct {
515 func (QuotaError) IsQuotaError() bool { return true }