1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/lib/crunchrun"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "github.com/sirupsen/logrus"
24 "golang.org/x/crypto/ssh"
27 // A StubDriver implements cloud.Driver by setting up local SSH
28 // servers that do fake command executions.
29 type StubDriver struct {
31 AuthorizedKeys []ssh.PublicKey
33 // SetupVM, if set, is called upon creation of each new
34 // StubVM. This is the caller's opportunity to customize the
35 // VM's error rate and other behaviors.
38 // Bugf, if set, is called if a bug is detected in the caller
39 // or stub. Typically set to (*check.C)Errorf. If unset,
40 // logger.Warnf is called instead.
41 Bugf func(string, ...interface{})
43 // StubVM's fake crunch-run uses this Queue to read and update
47 // Frequency of artificially introduced errors on calls to
48 // Create and Destroy. 0=always succeed, 1=always fail.
49 ErrorRateCreate float64
50 ErrorRateDestroy float64
52 // If Create() or Instances() is called too frequently, return
53 // rate-limiting errors.
54 MinTimeBetweenCreateCalls time.Duration
55 MinTimeBetweenInstancesCalls time.Duration
57 // If true, Create and Destroy calls block until Release() is
61 instanceSets []*StubInstanceSet
62 holdCloudOps chan bool
65 // InstanceSet returns a new *StubInstanceSet.
66 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
67 if sd.holdCloudOps == nil {
68 sd.holdCloudOps = make(chan bool)
70 sis := StubInstanceSet{
73 servers: map[cloud.InstanceID]*StubVM{},
75 sd.instanceSets = append(sd.instanceSets, &sis)
79 err = json.Unmarshal(params, &sis)
84 // InstanceSets returns all instances that have been created by the
85 // driver. This can be used to test a component that uses the driver
86 // but doesn't expose the InstanceSets it has created.
87 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
88 return sd.instanceSets
91 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
92 // are fewer than n blocked calls pending, it waits for the rest to
94 func (sd *StubDriver) ReleaseCloudOps(n int) {
95 for i := 0; i < n; i++ {
100 type StubInstanceSet struct {
102 logger logrus.FieldLogger
103 servers map[cloud.InstanceID]*StubVM
107 allowCreateCall time.Time
108 allowInstancesCall time.Time
112 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
113 if sis.driver.HoldCloudOps {
114 sis.driver.holdCloudOps <- true
117 defer sis.mtx.Unlock()
119 return nil, errors.New("StubInstanceSet: Create called after Stop")
121 if sis.allowCreateCall.After(time.Now()) {
122 return nil, RateLimitError{sis.allowCreateCall}
124 if math_rand.Float64() < sis.driver.ErrorRateCreate {
125 return nil, fmt.Errorf("StubInstanceSet: rand < ErrorRateCreate %f", sis.driver.ErrorRateCreate)
127 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
128 ak := sis.driver.AuthorizedKeys
130 ak = append([]ssh.PublicKey{authKey}, ak...)
134 InitCommand: initCommand,
136 id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
137 tags: copyTags(tags),
138 providerType: it.ProviderType,
139 running: map[string]stubProcess{},
140 killing: map[string]bool{},
142 svm.SSHService = SSHService{
143 HostKey: sis.driver.HostKey,
144 AuthorizedUser: "root",
148 if setup := sis.driver.SetupVM; setup != nil {
151 sis.servers[svm.id] = svm
152 return svm.Instance(), nil
155 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
157 defer sis.mtx.RUnlock()
158 if sis.allowInstancesCall.After(time.Now()) {
159 return nil, RateLimitError{sis.allowInstancesCall}
161 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
162 var r []cloud.Instance
163 for _, ss := range sis.servers {
164 r = append(r, ss.Instance())
169 func (sis *StubInstanceSet) Stop() {
171 defer sis.mtx.Unlock()
173 panic("Stop called twice")
178 func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
180 defer sis.mtx.Unlock()
181 for _, vm := range sis.servers {
182 svms = append(svms, vm)
187 type RateLimitError struct{ Retry time.Time }
189 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
190 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
192 // StubVM is a fake server that runs an SSH service. It represents a
193 // VM running in a fake cloud.
195 // Note this is distinct from a stubInstance, which is a snapshot of
196 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
197 // running (and might change IP addresses, shut down, etc.) without
198 // updating any stubInstances that have been returned to callers.
202 ReportBroken time.Time
203 CrunchRunMissing bool
204 CrunchRunCrashRate float64
205 CrunchRunDetachDelay time.Duration
206 ArvMountMaxExitLag time.Duration
207 ArvMountDeadlockRate float64
208 ExecuteContainer func(arvados.Container) int
209 CrashRunningContainer func(arvados.Container)
210 ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
212 // Populated by (*StubInstanceSet)Create()
213 InitCommand cloud.InitCommand
217 tags cloud.InstanceTags
219 SSHService SSHService
220 running map[string]stubProcess
221 killing map[string]bool
227 type stubProcess struct {
230 // crunch-run has exited, but arv-mount process (or something)
231 // still holds lock in /var/run/
235 func (svm *StubVM) Instance() stubInstance {
240 addr: svm.SSHService.Address(),
241 // We deliberately return a cached/stale copy of the
242 // real tags here, so that (Instance)Tags() sometimes
243 // returns old data after a call to
244 // (Instance)SetTags(). This is permitted by the
245 // driver interface, and this might help remind
246 // callers that they need to tolerate it.
247 tags: copyTags(svm.tags),
251 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
252 stdinData, err := ioutil.ReadAll(stdin)
254 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
257 queue := svm.sis.driver.Queue
258 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
259 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
260 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
263 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
264 fmt.Fprintf(stderr, "cannot fork\n")
267 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
268 fmt.Fprint(stderr, "crunch-run: command not found\n")
271 if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
272 var configData crunchrun.ConfigData
273 err := json.Unmarshal(stdinData, &configData)
275 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
278 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
279 if configData.Env[name] == "" {
280 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
287 svm.running[uuid] = stubProcess{pid: pid}
289 time.Sleep(svm.CrunchRunDetachDelay)
290 fmt.Fprintf(stderr, "starting %s\n", uuid)
291 logger := svm.sis.logger.WithFields(logrus.Fields{
293 "ContainerUUID": uuid,
296 logger.Printf("[test] starting crunch-run stub")
298 var ctr arvados.Container
299 var started, completed bool
301 logger.Print("[test] exiting crunch-run stub")
304 if svm.running[uuid].pid != pid {
305 bugf := svm.sis.driver.Bugf
309 bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
313 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
314 if started && svm.CrashRunningContainer != nil {
315 svm.CrashRunningContainer(ctr)
318 sproc := svm.running[uuid]
320 svm.running[uuid] = sproc
322 time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
324 if math_rand.Float64() >= svm.ArvMountDeadlockRate {
325 delete(svm.running, uuid)
329 crashluck := math_rand.Float64()
330 wantCrash := crashluck < svm.CrunchRunCrashRate
331 wantCrashEarly := crashluck < svm.CrunchRunCrashRate/2
333 ctr, ok := queue.Get(uuid)
335 logger.Print("[test] container not in queue")
339 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
342 killed := svm.killing[uuid]
344 if killed || wantCrashEarly {
348 ctr.State = arvados.ContainerStateRunning
349 started = queue.Notify(ctr)
351 ctr, _ = queue.Get(uuid)
352 logger.Print("[test] erroring out because state=Running update was rejected")
357 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
360 if svm.ExecuteContainer != nil {
361 ctr.ExitCode = svm.ExecuteContainer(ctr)
363 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] completing container")
364 ctr.State = arvados.ContainerStateComplete
365 completed = queue.Notify(ctr)
369 if command == "crunch-run --list" {
372 for uuid, sproc := range svm.running {
374 fmt.Fprintf(stdout, "%s stale\n", uuid)
376 fmt.Fprintf(stdout, "%s\n", uuid)
379 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
380 fmt.Fprintln(stdout, "broken")
382 fmt.Fprintln(stdout, svm.deadlocked)
385 if strings.HasPrefix(command, "crunch-run --kill ") {
387 sproc, running := svm.running[uuid]
388 if running && !sproc.exited {
389 svm.killing[uuid] = true
391 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
393 sproc, running = svm.running[uuid]
396 if running && !sproc.exited {
397 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
400 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
403 if command == "true" {
406 fmt.Fprintf(stderr, "%q: command not found", command)
410 type stubInstance struct {
413 tags cloud.InstanceTags
416 func (si stubInstance) ID() cloud.InstanceID {
420 func (si stubInstance) Address() string {
424 func (si stubInstance) RemoteUser() string {
425 return si.svm.SSHService.AuthorizedUser
428 func (si stubInstance) Destroy() error {
430 if sis.driver.HoldCloudOps {
431 sis.driver.holdCloudOps <- true
433 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
434 return errors.New("instance could not be destroyed")
436 si.svm.SSHService.Close()
438 defer sis.mtx.Unlock()
439 delete(sis.servers, si.svm.id)
443 func (si stubInstance) ProviderType() string {
444 return si.svm.providerType
447 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
448 tags = copyTags(tags)
458 func (si stubInstance) Tags() cloud.InstanceTags {
459 // Return a copy to ensure a caller can't change our saved
460 // tags just by writing to the returned map.
461 return copyTags(si.tags)
464 func (si stubInstance) String() string {
465 return string(si.svm.id)
468 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
469 buf := make([]byte, 512)
470 _, err := io.ReadFull(rand.Reader, buf)
474 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
478 return key.Verify(buf, sig)
481 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
482 dst := cloud.InstanceTags{}
483 for k, v := range src {
489 func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {