1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/lib/cloud"
21 "git.curoverse.com/arvados.git/sdk/go/arvados"
22 "github.com/sirupsen/logrus"
23 "golang.org/x/crypto/ssh"
26 // A StubDriver implements cloud.Driver by setting up local SSH
27 // servers that do fake command executions.
28 type StubDriver struct {
30 AuthorizedKeys []ssh.PublicKey
32 // SetupVM, if set, is called upon creation of each new
33 // StubVM. This is the caller's opportunity to customize the
34 // VM's error rate and other behaviors.
37 // StubVM's fake crunch-run uses this Queue to read and update
41 // Frequency of artificially introduced errors on calls to
42 // Destroy. 0=always succeed, 1=always fail.
43 ErrorRateDestroy float64
45 // If Create() or Instances() is called too frequently, return
46 // rate-limiting errors.
47 MinTimeBetweenCreateCalls time.Duration
48 MinTimeBetweenInstancesCalls time.Duration
50 // If true, Create and Destroy calls block until Release() is
54 instanceSets []*StubInstanceSet
55 holdCloudOps chan bool
58 // InstanceSet returns a new *StubInstanceSet.
59 func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
60 if sd.holdCloudOps == nil {
61 sd.holdCloudOps = make(chan bool)
63 sis := StubInstanceSet{
65 servers: map[cloud.InstanceID]*StubVM{},
67 sd.instanceSets = append(sd.instanceSets, &sis)
71 err = json.Unmarshal(params, &sis)
76 // InstanceSets returns all instances that have been created by the
77 // driver. This can be used to test a component that uses the driver
78 // but doesn't expose the InstanceSets it has created.
79 func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
80 return sd.instanceSets
83 // ReleaseCloudOps releases n pending Create/Destroy calls. If there
84 // are fewer than n blocked calls pending, it waits for the rest to
86 func (sd *StubDriver) ReleaseCloudOps(n int) {
87 for i := 0; i < n; i++ {
92 type StubInstanceSet struct {
94 servers map[cloud.InstanceID]*StubVM
98 allowCreateCall time.Time
99 allowInstancesCall time.Time
102 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
103 if sis.driver.HoldCloudOps {
104 sis.driver.holdCloudOps <- true
107 defer sis.mtx.Unlock()
109 return nil, errors.New("StubInstanceSet: Create called after Stop")
111 if sis.allowCreateCall.After(time.Now()) {
112 return nil, RateLimitError{sis.allowCreateCall}
114 sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
117 ak := sis.driver.AuthorizedKeys
119 ak = append([]ssh.PublicKey{authKey}, ak...)
123 id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
124 tags: copyTags(tags),
125 providerType: it.ProviderType,
127 svm.SSHService = SSHService{
128 HostKey: sis.driver.HostKey,
129 AuthorizedUser: "root",
133 if setup := sis.driver.SetupVM; setup != nil {
136 sis.servers[svm.id] = svm
137 return svm.Instance(), nil
140 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
142 defer sis.mtx.RUnlock()
143 if sis.allowInstancesCall.After(time.Now()) {
144 return nil, RateLimitError{sis.allowInstancesCall}
146 sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
148 var r []cloud.Instance
149 for _, ss := range sis.servers {
150 r = append(r, ss.Instance())
155 func (sis *StubInstanceSet) Stop() {
157 defer sis.mtx.Unlock()
159 panic("Stop called twice")
164 type RateLimitError struct{ Retry time.Time }
166 func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
167 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
169 // StubVM is a fake server that runs an SSH service. It represents a
170 // VM running in a fake cloud.
172 // Note this is distinct from a stubInstance, which is a snapshot of
173 // the VM's metadata. Like a VM in a real cloud, a StubVM keeps
174 // running (and might change IP addresses, shut down, etc.) without
175 // updating any stubInstances that have been returned to callers.
179 CrunchRunMissing bool
180 CrunchRunCrashRate float64
181 CrunchRunDetachDelay time.Duration
182 ExecuteContainer func(arvados.Container) int
186 tags cloud.InstanceTags
188 SSHService SSHService
189 running map[string]bool
193 func (svm *StubVM) Instance() stubInstance {
198 addr: svm.SSHService.Address(),
199 // We deliberately return a cached/stale copy of the
200 // real tags here, so that (Instance)Tags() sometimes
201 // returns old data after a call to
202 // (Instance)SetTags(). This is permitted by the
203 // driver interface, and this might help remind
204 // callers that they need to tolerate it.
205 tags: copyTags(svm.tags),
209 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
210 stdinData, err := ioutil.ReadAll(stdin)
212 fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
215 queue := svm.sis.driver.Queue
216 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
217 if eta := svm.Boot.Sub(time.Now()); eta > 0 {
218 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
221 if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
222 fmt.Fprintf(stderr, "cannot fork\n")
225 if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
226 fmt.Fprint(stderr, "crunch-run: command not found\n")
229 if strings.HasPrefix(command, "source /dev/stdin; crunch-run --detach ") {
230 stdinKV := map[string]string{}
231 for _, line := range strings.Split(string(stdinData), "\n") {
232 kv := strings.SplitN(strings.TrimPrefix(line, "export "), "=", 2)
233 if len(kv) == 2 && len(kv[1]) > 0 {
234 stdinKV[kv[0]] = kv[1]
237 for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
238 if stdinKV[name] == "" {
239 fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
244 if svm.running == nil {
245 svm.running = map[string]bool{}
247 svm.running[uuid] = true
249 time.Sleep(svm.CrunchRunDetachDelay)
250 fmt.Fprintf(stderr, "starting %s\n", uuid)
251 logger := logrus.WithFields(logrus.Fields{
253 "ContainerUUID": uuid,
255 logger.Printf("[test] starting crunch-run stub")
257 crashluck := math_rand.Float64()
258 ctr, ok := queue.Get(uuid)
260 logger.Print("[test] container not in queue")
263 if crashluck > svm.CrunchRunCrashRate/2 {
264 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
265 ctr.State = arvados.ContainerStateRunning
269 time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
271 _, running := svm.running[uuid]
274 logger.Print("[test] container was killed")
277 if svm.ExecuteContainer != nil {
278 ctr.ExitCode = svm.ExecuteContainer(ctr)
280 // TODO: Check whether the stub instance has
281 // been destroyed, and if so, don't call
282 // queue.Notify. Then "container finished
283 // twice" can be classified as a bug.
284 if crashluck < svm.CrunchRunCrashRate {
285 logger.Print("[test] crashing crunch-run stub")
287 ctr.State = arvados.ContainerStateComplete
290 logger.Print("[test] exiting crunch-run stub")
293 delete(svm.running, uuid)
297 if command == "crunch-run --list" {
300 for uuid := range svm.running {
301 fmt.Fprintf(stdout, "%s\n", uuid)
305 if strings.HasPrefix(command, "crunch-run --kill ") {
308 if svm.running[uuid] {
309 delete(svm.running, uuid)
311 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
315 if command == "true" {
318 fmt.Fprintf(stderr, "%q: command not found", command)
322 type stubInstance struct {
325 tags cloud.InstanceTags
328 func (si stubInstance) ID() cloud.InstanceID {
332 func (si stubInstance) Address() string {
336 func (si stubInstance) RemoteUser() string {
337 return si.svm.SSHService.AuthorizedUser
340 func (si stubInstance) Destroy() error {
342 if sis.driver.HoldCloudOps {
343 sis.driver.holdCloudOps <- true
345 if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
346 return errors.New("instance could not be destroyed")
348 si.svm.SSHService.Close()
350 defer sis.mtx.Unlock()
351 delete(sis.servers, si.svm.id)
355 func (si stubInstance) ProviderType() string {
356 return si.svm.providerType
359 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
360 tags = copyTags(tags)
370 func (si stubInstance) Tags() cloud.InstanceTags {
374 func (si stubInstance) String() string {
375 return string(si.svm.id)
378 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
379 buf := make([]byte, 512)
380 _, err := io.ReadFull(rand.Reader, buf)
384 sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
388 return key.Verify(buf, sig)
391 func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
392 dst := cloud.InstanceTags{}
393 for k, v := range src {