1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/lib/cloud"
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
30 AuthorizedKeys []ssh.PublicKey
32 // SetupVM, if set, is called upon creation of each new
33 // StubVM. This is the caller's opportunity to customize the
34 // VM's error rate and other behaviors.
37 // StubVM's fake crunch-run uses this Queue to read and update
41 // Frequency of artificially introduced errors on calls to
42 // Destroy. 0=always succeed, 1=always fail.
43 ErrorRateDestroy float64
45 // If Create() or Instances() is called too frequently, return
46 // rate-limiting errors.
47 MinTimeBetweenCreateCalls time.Duration
48 MinTimeBetweenInstancesCalls time.Duration
50 // If true, Create and Destroy calls block until Release() is
54 instanceSets []*StubInstanceSet
55 holdCloudOps chan bool
58 // InstanceSet returns a new *StubInstanceSet.
59 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
60 if sd.holdCloudOps == nil {
61 sd.holdCloudOps = make(chan bool)
63 sis := StubInstanceSet{
66 servers: map[cloud.InstanceID]*StubVM{},
68 sd.instanceSets = append(sd.instanceSets, &sis)
72 err = json.Unmarshal(params, &sis)
77 // InstanceSets returns all instances that have been created by the
78 // driver. This can be used to test a component that uses the driver
79 // but doesn't expose the InstanceSets it has created.
80 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
81 return sd.instanceSets
84 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
85 // are fewer than n blocked calls pending, it waits for the rest to
87 func (sd *StubDriver) ReleaseCloudOps(n int) {
88 for i := 0; i < n; i++ {
93 type StubInstanceSet struct {
95 logger logrus.FieldLogger
96 servers map[cloud.InstanceID]*StubVM
100 allowCreateCall time.Time
101 allowInstancesCall time.Time
104 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
105 if sis.driver.HoldCloudOps {
106 sis.driver.holdCloudOps <- true
109 defer sis.mtx.Unlock()
111 return nil, errors.New("StubInstanceSet: Create called after Stop")
113 if sis.allowCreateCall.After(time.Now()) {
114 return nil, RateLimitError{sis.allowCreateCall}
116 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
119 ak := sis.driver.AuthorizedKeys
121 ak = append([]ssh.PublicKey{authKey}, ak...)
125 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
126 tags: copyTags(tags),
127 providerType: it.ProviderType,
129 running: map[string]int64{},
130 killing: map[string]bool{},
132 svm.SSHService = SSHService{
133 HostKey: sis.driver.HostKey,
134 AuthorizedUser: "root",
138 if setup := sis.driver.SetupVM; setup != nil {
141 sis.servers[svm.id] = svm
142 return svm.Instance(), nil
145 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
147 defer sis.mtx.RUnlock()
148 if sis.allowInstancesCall.After(time.Now()) {
149 return nil, RateLimitError{sis.allowInstancesCall}
151 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
153 var r []cloud.Instance
154 for _, ss := range sis.servers {
155 r = append(r, ss.Instance())
160 func (sis *StubInstanceSet) Stop() {
162 defer sis.mtx.Unlock()
164 panic("Stop called twice")
169 type RateLimitError struct{ Retry time.Time }
171 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
172 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
174 // StubVM is a fake server that runs an SSH service. It represents a
175 // VM running in a fake cloud.
177 // Note this is distinct from a stubInstance, which is a snapshot of
178 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
179 // running (and might change IP addresses, shut down, etc.) without
180 // updating any stubInstances that have been returned to callers.
184 ReportBroken time.Time
185 CrunchRunMissing bool
186 CrunchRunCrashRate float64
187 CrunchRunDetachDelay time.Duration
188 ExecuteContainer func(arvados.Container) int
189 CrashRunningContainer func(arvados.Container)
193 tags cloud.InstanceTags
194 initCommand cloud.InitCommand
196 SSHService SSHService
197 running map[string]int64
198 killing map[string]bool
203 func (svm *StubVM) Instance() stubInstance {
208 addr: svm.SSHService.Address(),
209 // We deliberately return a cached/stale copy of the
210 // real tags here, so that (Instance)Tags() sometimes
211 // returns old data after a call to
212 // (Instance)SetTags(). This is permitted by the
213 // driver interface, and this might help remind
214 // callers that they need to tolerate it.
215 tags: copyTags(svm.tags),
219 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
220 stdinData, err := ioutil.ReadAll(stdin)
222 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
225 queue := svm.sis.driver.Queue
226 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
227 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
228 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
231 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
232 fmt.Fprintf(stderr, "cannot fork\n")
235 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
236 fmt.Fprint(stderr, "crunch-run: command not found\n")
239 if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
240 var stdinKV map[string]string
241 err := json.Unmarshal(stdinData, &stdinKV)
243 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
246 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
247 if stdinKV[name] == "" {
248 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
255 svm.running[uuid] = pid
257 time.Sleep(svm.CrunchRunDetachDelay)
258 fmt.Fprintf(stderr, "starting %s\n", uuid)
259 logger := svm.sis.logger.WithFields(logrus.Fields{
261 "ContainerUUID": uuid,
264 logger.Printf("[test] starting crunch-run stub")
266 crashluck := math_rand.Float64()
267 ctr, ok := queue.Get(uuid)
269 logger.Print("[test] container not in queue")
274 if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil {
275 svm.CrashRunningContainer(ctr)
279 if crashluck > svm.CrunchRunCrashRate/2 {
280 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
281 ctr.State = arvados.ContainerStateRunning
282 if !queue.Notify(ctr) {
283 ctr, _ = queue.Get(uuid)
284 logger.Print("[test] erroring out because state=Running update was rejected")
289 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
293 if svm.running[uuid] != pid {
294 logger.Print("[test] container was killed")
297 delete(svm.running, uuid)
299 if crashluck < svm.CrunchRunCrashRate {
300 logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
302 if svm.ExecuteContainer != nil {
303 ctr.ExitCode = svm.ExecuteContainer(ctr)
305 logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub")
306 ctr.State = arvados.ContainerStateComplete
312 if command == "crunch-run --list" {
315 for uuid := range svm.running {
316 fmt.Fprintf(stdout, "%s\n", uuid)
318 if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
319 fmt.Fprintln(stdout, "broken")
323 if strings.HasPrefix(command, "crunch-run --kill ") {
325 pid, running := svm.running[uuid]
326 if running && !svm.killing[uuid] {
327 svm.killing[uuid] = true
329 time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond)
332 if svm.running[uuid] == pid {
333 // Kill only if the running entry
334 // hasn't since been killed and
335 // replaced with a different one.
336 delete(svm.running, uuid)
338 delete(svm.killing, uuid)
341 time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
343 _, running = svm.running[uuid]
347 fmt.Fprintf(stderr, "%s: container is running\n", uuid)
350 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
354 if command == "true" {
357 fmt.Fprintf(stderr, "%q: command not found", command)
361 type stubInstance struct {
364 tags cloud.InstanceTags
367 func (si stubInstance) ID() cloud.InstanceID {
371 func (si stubInstance) Address() string {
375 func (si stubInstance) RemoteUser() string {
376 return si.svm.SSHService.AuthorizedUser
379 func (si stubInstance) Destroy() error {
381 if sis.driver.HoldCloudOps {
382 sis.driver.holdCloudOps <- true
384 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
385 return errors.New("instance could not be destroyed")
387 si.svm.SSHService.Close()
389 defer sis.mtx.Unlock()
390 delete(sis.servers, si.svm.id)
394 func (si stubInstance) ProviderType() string {
395 return si.svm.providerType
398 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
399 tags = copyTags(tags)
409 func (si stubInstance) Tags() cloud.InstanceTags {
410 // Return a copy to ensure a caller can't change our saved
411 // tags just by writing to the returned map.
412 return copyTags(si.tags)
415 func (si stubInstance) String() string {
416 return string(si.svm.id)
419 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
420 buf := make([]byte, 512)
421 _, err := io.ReadFull(rand.Reader, buf)
425 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
429 return key.Verify(buf, sig)
432 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
433 dst := cloud.InstanceTags{}
434 for k, v := range src {