1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/lib/cloud"
21 "git.curoverse.com/arvados.git/sdk/go/arvados"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
30 AuthorizedKeys []ssh.PublicKey
32 // SetupVM, if set, is called upon creation of each new
33 // StubVM. This is the caller's opportunity to customize the
34 // VM's error rate and other behaviors.
37 // StubVM's fake crunch-run uses this Queue to read and update
41 // Frequency of artificially introduced errors on calls to
42 // Destroy. 0=always succeed, 1=always fail.
43 ErrorRateDestroy float64
45 // If Create() or Instances() is called too frequently, return
46 // rate-limiting errors.
47 MinTimeBetweenCreateCalls time.Duration
48 MinTimeBetweenInstancesCalls time.Duration
50 // If true, Create and Destroy calls block until Release() is
54 instanceSets []*StubInstanceSet
55 holdCloudOps chan bool
58 // InstanceSet returns a new *StubInstanceSet.
59 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
60 if sd.holdCloudOps == nil {
61 sd.holdCloudOps = make(chan bool)
63 sis := StubInstanceSet{
66 servers: map[cloud.InstanceID]*StubVM{},
68 sd.instanceSets = append(sd.instanceSets, &sis)
72 err = json.Unmarshal(params, &sis)
77 // InstanceSets returns all instances that have been created by the
78 // driver. This can be used to test a component that uses the driver
79 // but doesn't expose the InstanceSets it has created.
80 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
81 return sd.instanceSets
84 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
85 // are fewer than n blocked calls pending, it waits for the rest to
87 func (sd *StubDriver) ReleaseCloudOps(n int) {
88 for i := 0; i < n; i++ {
93 type StubInstanceSet struct {
95 logger logrus.FieldLogger
96 servers map[cloud.InstanceID]*StubVM
100 allowCreateCall time.Time
101 allowInstancesCall time.Time
104 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
105 if sis.driver.HoldCloudOps {
106 sis.driver.holdCloudOps <- true
109 defer sis.mtx.Unlock()
111 return nil, errors.New("StubInstanceSet: Create called after Stop")
113 if sis.allowCreateCall.After(time.Now()) {
114 return nil, RateLimitError{sis.allowCreateCall}
116 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
119 ak := sis.driver.AuthorizedKeys
121 ak = append([]ssh.PublicKey{authKey}, ak...)
125 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
126 tags: copyTags(tags),
127 providerType: it.ProviderType,
130 svm.SSHService = SSHService{
131 HostKey: sis.driver.HostKey,
132 AuthorizedUser: "root",
136 if setup := sis.driver.SetupVM; setup != nil {
139 sis.servers[svm.id] = svm
140 return svm.Instance(), nil
143 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
145 defer sis.mtx.RUnlock()
146 if sis.allowInstancesCall.After(time.Now()) {
147 return nil, RateLimitError{sis.allowInstancesCall}
149 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
151 var r []cloud.Instance
152 for _, ss := range sis.servers {
153 r = append(r, ss.Instance())
158 func (sis *StubInstanceSet) Stop() {
160 defer sis.mtx.Unlock()
162 panic("Stop called twice")
167 type RateLimitError struct{ Retry time.Time }
169 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
170 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
172 // StubVM is a fake server that runs an SSH service. It represents a
173 // VM running in a fake cloud.
175 // Note this is distinct from a stubInstance, which is a snapshot of
176 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
177 // running (and might change IP addresses, shut down, etc.) without
178 // updating any stubInstances that have been returned to callers.
182 CrunchRunMissing bool
183 CrunchRunCrashRate float64
184 CrunchRunDetachDelay time.Duration
185 ExecuteContainer func(arvados.Container) int
189 tags cloud.InstanceTags
190 initCommand cloud.InitCommand
192 SSHService SSHService
193 running map[string]bool
197 func (svm *StubVM) Instance() stubInstance {
202 addr: svm.SSHService.Address(),
203 // We deliberately return a cached/stale copy of the
204 // real tags here, so that (Instance)Tags() sometimes
205 // returns old data after a call to
206 // (Instance)SetTags(). This is permitted by the
207 // driver interface, and this might help remind
208 // callers that they need to tolerate it.
209 tags: copyTags(svm.tags),
213 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
214 stdinData, err := ioutil.ReadAll(stdin)
216 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
219 queue := svm.sis.driver.Queue
220 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
221 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
222 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
225 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
226 fmt.Fprintf(stderr, "cannot fork\n")
229 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
230 fmt.Fprint(stderr, "crunch-run: command not found\n")
233 if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
234 var stdinKV map[string]string
235 err := json.Unmarshal(stdinData, &stdinKV)
237 fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
240 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
241 if stdinKV[name] == "" {
242 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
247 if svm.running == nil {
248 svm.running = map[string]bool{}
250 svm.running[uuid] = true
252 time.Sleep(svm.CrunchRunDetachDelay)
253 fmt.Fprintf(stderr, "starting %s\n", uuid)
254 logger := svm.sis.logger.WithFields(logrus.Fields{
256 "ContainerUUID": uuid,
258 logger.Printf("[test] starting crunch-run stub")
260 crashluck := math_rand.Float64()
261 ctr, ok := queue.Get(uuid)
263 logger.Print("[test] container not in queue")
266 if crashluck > svm.CrunchRunCrashRate/2 {
267 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
268 ctr.State = arvados.ContainerStateRunning
272 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
274 _, running := svm.running[uuid]
277 logger.Print("[test] container was killed")
280 if svm.ExecuteContainer != nil {
281 ctr.ExitCode = svm.ExecuteContainer(ctr)
283 // TODO: Check whether the stub instance has
284 // been destroyed, and if so, don't call
285 // queue.Notify. Then "container finished
286 // twice" can be classified as a bug.
287 if crashluck < svm.CrunchRunCrashRate {
288 logger.Print("[test] crashing crunch-run stub")
290 ctr.State = arvados.ContainerStateComplete
293 logger.Print("[test] exiting crunch-run stub")
296 delete(svm.running, uuid)
300 if command == "crunch-run --list" {
303 for uuid := range svm.running {
304 fmt.Fprintf(stdout, "%s\n", uuid)
308 if strings.HasPrefix(command, "crunch-run --kill ") {
311 if svm.running[uuid] {
312 delete(svm.running, uuid)
314 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
318 if command == "true" {
321 fmt.Fprintf(stderr, "%q: command not found", command)
325 type stubInstance struct {
328 tags cloud.InstanceTags
331 func (si stubInstance) ID() cloud.InstanceID {
335 func (si stubInstance) Address() string {
339 func (si stubInstance) RemoteUser() string {
340 return si.svm.SSHService.AuthorizedUser
343 func (si stubInstance) Destroy() error {
345 if sis.driver.HoldCloudOps {
346 sis.driver.holdCloudOps <- true
348 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
349 return errors.New("instance could not be destroyed")
351 si.svm.SSHService.Close()
353 defer sis.mtx.Unlock()
354 delete(sis.servers, si.svm.id)
358 func (si stubInstance) ProviderType() string {
359 return si.svm.providerType
362 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
363 tags = copyTags(tags)
373 func (si stubInstance) Tags() cloud.InstanceTags {
374 // Return a copy to ensure a caller can't change our saved
375 // tags just by writing to the returned map.
376 return copyTags(si.tags)
379 func (si stubInstance) String() string {
380 return string(si.svm.id)
383 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
384 buf := make([]byte, 512)
385 _, err := io.ReadFull(rand.Reader, buf)
389 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
393 return key.Verify(buf, sig)
396 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
397 dst := cloud.InstanceTags{}
398 for k, v := range src {