X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d2bdc5af995f829240593cdfa580f670a2d30e27..ecf1cbf6b843ec566890b4bf944fb893ca9481de:/lib/dispatchcloud/test/stub_driver.go diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go index 4a88bfab14..7a1f423016 100644 --- a/lib/dispatchcloud/test/stub_driver.go +++ b/lib/dispatchcloud/test/stub_driver.go @@ -17,8 +17,8 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" ) @@ -56,12 +56,13 @@ type StubDriver struct { } // InstanceSet returns a new *StubInstanceSet. -func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) { +func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) { if sd.holdCloudOps == nil { sd.holdCloudOps = make(chan bool) } sis := StubInstanceSet{ driver: sd, + logger: logger, servers: map[cloud.InstanceID]*StubVM{}, } sd.instanceSets = append(sd.instanceSets, &sis) @@ -91,6 +92,7 @@ func (sd *StubDriver) ReleaseCloudOps(n int) { type StubInstanceSet struct { driver *StubDriver + logger logrus.FieldLogger servers map[cloud.InstanceID]*StubVM mtx sync.RWMutex stopped bool @@ -99,7 +101,7 @@ type StubInstanceSet struct { allowInstancesCall time.Time } -func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) { +func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) { if sis.driver.HoldCloudOps { sis.driver.holdCloudOps <- true } @@ -123,9 +125,13 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())), tags: copyTags(tags), providerType: it.ProviderType, + initCommand: cmd, + running: map[string]int64{}, + killing: map[string]bool{}, } svm.SSHService = SSHService{ HostKey: sis.driver.HostKey, + AuthorizedUser: "root", AuthorizedKeys: ak, Exec: svm.Exec, } @@ -173,19 +179,24 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry } // running (and might change IP addresses, shut down, etc.) without // updating any stubInstances that have been returned to callers. type StubVM struct { - Boot time.Time - Broken time.Time - CrunchRunMissing bool - CrunchRunCrashRate float64 - CrunchRunDetachDelay time.Duration - ExecuteContainer func(arvados.Container) int + Boot time.Time + Broken time.Time + ReportBroken time.Time + CrunchRunMissing bool + CrunchRunCrashRate float64 + CrunchRunDetachDelay time.Duration + ExecuteContainer func(arvados.Container) int + CrashRunningContainer func(arvados.Container) sis *StubInstanceSet id cloud.InstanceID tags cloud.InstanceTags + initCommand cloud.InitCommand providerType string SSHService SSHService - running map[string]bool + running map[string]int64 + killing map[string]bool + lastPID int64 sync.Mutex } @@ -225,31 +236,30 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, fmt.Fprint(stderr, "crunch-run: command not found\n") return 1 } - if strings.HasPrefix(command, "source /dev/stdin; crunch-run --detach ") { - stdinKV := map[string]string{} - for _, line := range strings.Split(string(stdinData), "\n") { - kv := strings.SplitN(strings.TrimPrefix(line, "export "), "=", 2) - if len(kv) == 2 && len(kv[1]) > 0 { - stdinKV[kv[0]] = kv[1] - } + if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") { + var stdinKV map[string]string + err := json.Unmarshal(stdinData, &stdinKV) + if err != nil { + fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData) + return 1 } for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} { if stdinKV[name] == "" { - fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin) + fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData) return 1 } } svm.Lock() - if svm.running == nil { - svm.running = map[string]bool{} - } - svm.running[uuid] = true + svm.lastPID++ + pid := svm.lastPID + svm.running[uuid] = pid svm.Unlock() time.Sleep(svm.CrunchRunDetachDelay) fmt.Fprintf(stderr, "starting %s\n", uuid) - logger := logrus.WithFields(logrus.Fields{ + logger := svm.sis.logger.WithFields(logrus.Fields{ "Instance": svm.id, "ContainerUUID": uuid, + "PID": pid, }) logger.Printf("[test] starting crunch-run stub") go func() { @@ -259,37 +269,43 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, logger.Print("[test] container not in queue") return } + + defer func() { + if ctr.State == arvados.ContainerStateRunning && svm.CrashRunningContainer != nil { + svm.CrashRunningContainer(ctr) + } + }() + if crashluck > svm.CrunchRunCrashRate/2 { time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond) ctr.State = arvados.ContainerStateRunning - queue.Notify(ctr) + if !queue.Notify(ctr) { + ctr, _ = queue.Get(uuid) + logger.Print("[test] erroring out because state=Running update was rejected") + return + } } time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond) + svm.Lock() - _, running := svm.running[uuid] - svm.Unlock() - if !running { + defer svm.Unlock() + if svm.running[uuid] != pid { logger.Print("[test] container was killed") return } - if svm.ExecuteContainer != nil { - ctr.ExitCode = svm.ExecuteContainer(ctr) - } - // TODO: Check whether the stub instance has - // been destroyed, and if so, don't call - // queue.Notify. Then "container finished - // twice" can be classified as a bug. + delete(svm.running, uuid) + if crashluck < svm.CrunchRunCrashRate { - logger.Print("[test] crashing crunch-run stub") + logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub") } else { + if svm.ExecuteContainer != nil { + ctr.ExitCode = svm.ExecuteContainer(ctr) + } + logger.WithField("ExitCode", ctr.ExitCode).Print("[test] exiting crunch-run stub") ctr.State = arvados.ContainerStateComplete - queue.Notify(ctr) + go queue.Notify(ctr) } - logger.Print("[test] exiting crunch-run stub") - svm.Lock() - defer svm.Unlock() - delete(svm.running, uuid) }() return 0 } @@ -299,17 +315,41 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, for uuid := range svm.running { fmt.Fprintf(stdout, "%s\n", uuid) } + if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) { + fmt.Fprintln(stdout, "broken") + } return 0 } if strings.HasPrefix(command, "crunch-run --kill ") { svm.Lock() - defer svm.Unlock() - if svm.running[uuid] { - delete(svm.running, uuid) + pid, running := svm.running[uuid] + if running && !svm.killing[uuid] { + svm.killing[uuid] = true + go func() { + time.Sleep(time.Duration(math_rand.Float64()*30) * time.Millisecond) + svm.Lock() + defer svm.Unlock() + if svm.running[uuid] == pid { + // Kill only if the running entry + // hasn't since been killed and + // replaced with a different one. + delete(svm.running, uuid) + } + delete(svm.killing, uuid) + }() + svm.Unlock() + time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond) + svm.Lock() + _, running = svm.running[uuid] + } + svm.Unlock() + if running { + fmt.Fprintf(stderr, "%s: container is running\n", uuid) + return 1 } else { fmt.Fprintf(stderr, "%s: container is not running\n", uuid) + return 0 } - return 0 } if command == "true" { return 0 @@ -332,6 +372,10 @@ func (si stubInstance) Address() string { return si.addr } +func (si stubInstance) RemoteUser() string { + return si.svm.SSHService.AuthorizedUser +} + func (si stubInstance) Destroy() error { sis := si.svm.sis if sis.driver.HoldCloudOps { @@ -363,7 +407,9 @@ func (si stubInstance) SetTags(tags cloud.InstanceTags) error { } func (si stubInstance) Tags() cloud.InstanceTags { - return si.tags + // Return a copy to ensure a caller can't change our saved + // tags just by writing to the returned map. + return copyTags(si.tags) } func (si stubInstance) String() string {