1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/lib/cloud"
21 "git.curoverse.com/arvados.git/sdk/go/arvados"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
30 AuthorizedKeys []ssh.PublicKey
32 // SetupVM, if set, is called upon creation of each new
33 // StubVM. This is the caller's opportunity to customize the
34 // VM's error rate and other behaviors.
37 // StubVM's fake crunch-run uses this Queue to read and update
41 // Frequency of artificially introduced errors on calls to
42 // Destroy. 0=always succeed, 1=always fail.
43 ErrorRateDestroy float64
45 // If Create() or Instances() is called too frequently, return
46 // rate-limiting errors.
47 MinTimeBetweenCreateCalls time.Duration
48 MinTimeBetweenInstancesCalls time.Duration
50 // If true, Create and Destroy calls block until Release() is
54 instanceSets []*StubInstanceSet
55 holdCloudOps chan bool
58 // InstanceSet returns a new *StubInstanceSet.
59 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
60 if sd.holdCloudOps == nil {
61 sd.holdCloudOps = make(chan bool)
63 sis := StubInstanceSet{
65 servers: map[cloud.InstanceID]*StubVM{},
67 sd.instanceSets = append(sd.instanceSets, &sis)
71 err = json.Unmarshal(params, &sis)
76 // InstanceSets returns all instances that have been created by the
77 // driver. This can be used to test a component that uses the driver
78 // but doesn't expose the InstanceSets it has created.
79 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
80 return sd.instanceSets
83 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
84 // are fewer than n blocked calls pending, it waits for the rest to
86 func (sd *StubDriver) ReleaseCloudOps(n int) {
87 for i := 0; i < n; i++ {
92 type StubInstanceSet struct {
94 servers map[cloud.InstanceID]*StubVM
98 allowCreateCall time.Time
99 allowInstancesCall time.Time
102 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
103 if sis.driver.HoldCloudOps {
104 sis.driver.holdCloudOps <- true
107 defer sis.mtx.Unlock()
109 return nil, errors.New("StubInstanceSet: Create called after Stop")
111 if sis.allowCreateCall.After(time.Now()) {
112 return nil, RateLimitError{sis.allowCreateCall}
114 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
117 ak := sis.driver.AuthorizedKeys
119 ak = append([]ssh.PublicKey{authKey}, ak...)
123 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
124 tags: copyTags(tags),
125 providerType: it.ProviderType,
127 svm.SSHService = SSHService{
128 HostKey: sis.driver.HostKey,
132 if setup := sis.driver.SetupVM; setup != nil {
135 sis.servers[svm.id] = svm
136 return svm.Instance(), nil
139 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
141 defer sis.mtx.RUnlock()
142 if sis.allowInstancesCall.After(time.Now()) {
143 return nil, RateLimitError{sis.allowInstancesCall}
145 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
147 var r []cloud.Instance
148 for _, ss := range sis.servers {
149 r = append(r, ss.Instance())
154 func (sis *StubInstanceSet) Stop() {
156 defer sis.mtx.Unlock()
158 panic("Stop called twice")
163 type RateLimitError struct{ Retry time.Time }
165 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
166 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
168 // StubVM is a fake server that runs an SSH service. It represents a
169 // VM running in a fake cloud.
171 // Note this is distinct from a stubInstance, which is a snapshot of
172 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
173 // running (and might change IP addresses, shut down, etc.) without
174 // updating any stubInstances that have been returned to callers.
178 CrunchRunMissing bool
179 CrunchRunCrashRate float64
180 CrunchRunDetachDelay time.Duration
181 ExecuteContainer func(arvados.Container) int
185 tags cloud.InstanceTags
187 SSHService SSHService
188 running map[string]bool
192 func (svm *StubVM) Instance() stubInstance {
197 addr: svm.SSHService.Address(),
198 // We deliberately return a cached/stale copy of the
199 // real tags here, so that (Instance)Tags() sometimes
200 // returns old data after a call to
201 // (Instance)SetTags(). This is permitted by the
202 // driver interface, and this might help remind
203 // callers that they need to tolerate it.
204 tags: copyTags(svm.tags),
208 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
209 stdinData, err := ioutil.ReadAll(stdin)
211 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
214 queue := svm.sis.driver.Queue
215 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
216 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
217 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
220 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
221 fmt.Fprintf(stderr, "cannot fork\n")
224 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
225 fmt.Fprint(stderr, "crunch-run: command not found\n")
228 if strings.HasPrefix(command, "source /dev/stdin; crunch-run --detach ") {
229 stdinKV := map[string]string{}
230 for _, line := range strings.Split(string(stdinData), "\n") {
231 kv := strings.SplitN(strings.TrimPrefix(line, "export "), "=", 2)
232 if len(kv) == 2 && len(kv[1]) > 0 {
233 stdinKV[kv[0]] = kv[1]
236 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
237 if stdinKV[name] == "" {
238 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
243 if svm.running == nil {
244 svm.running = map[string]bool{}
246 svm.running[uuid] = true
248 time.Sleep(svm.CrunchRunDetachDelay)
249 fmt.Fprintf(stderr, "starting %s\n", uuid)
250 logger := logrus.WithFields(logrus.Fields{
252 "ContainerUUID": uuid,
254 logger.Printf("[test] starting crunch-run stub")
256 crashluck := math_rand.Float64()
257 ctr, ok := queue.Get(uuid)
259 logger.Print("[test] container not in queue")
262 if crashluck > svm.CrunchRunCrashRate/2 {
263 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
264 ctr.State = arvados.ContainerStateRunning
268 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
270 _, running := svm.running[uuid]
273 logger.Print("[test] container was killed")
276 if svm.ExecuteContainer != nil {
277 ctr.ExitCode = svm.ExecuteContainer(ctr)
279 // TODO: Check whether the stub instance has
280 // been destroyed, and if so, don't call
281 // queue.Notify. Then "container finished
282 // twice" can be classified as a bug.
283 if crashluck < svm.CrunchRunCrashRate {
284 logger.Print("[test] crashing crunch-run stub")
286 ctr.State = arvados.ContainerStateComplete
289 logger.Print("[test] exiting crunch-run stub")
292 delete(svm.running, uuid)
296 if command == "crunch-run --list" {
299 for uuid := range svm.running {
300 fmt.Fprintf(stdout, "%s\n", uuid)
304 if strings.HasPrefix(command, "crunch-run --kill ") {
307 if svm.running[uuid] {
308 delete(svm.running, uuid)
310 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
314 if command == "true" {
317 fmt.Fprintf(stderr, "%q: command not found", command)
321 type stubInstance struct {
324 tags cloud.InstanceTags
327 func (si stubInstance) ID() cloud.InstanceID {
331 func (si stubInstance) Address() string {
335 func (si stubInstance) Destroy() error {
337 if sis.driver.HoldCloudOps {
338 sis.driver.holdCloudOps <- true
340 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
341 return errors.New("instance could not be destroyed")
343 si.svm.SSHService.Close()
345 defer sis.mtx.Unlock()
346 delete(sis.servers, si.svm.id)
350 func (si stubInstance) ProviderType() string {
351 return si.svm.providerType
354 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
355 tags = copyTags(tags)
365 func (si stubInstance) Tags() cloud.InstanceTags {
369 func (si stubInstance) String() string {
370 return string(si.svm.id)
373 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
374 buf := make([]byte, 512)
375 _, err := io.ReadFull(rand.Reader, buf)
379 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
383 return key.Verify(buf, sig)
386 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
387 dst := cloud.InstanceTags{}
388 for k, v := range src {