1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/lib/cloud"
21 "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
22 "git.curoverse.com/arvados.git/sdk/go/arvados"
23 "github.com/Sirupsen/logrus"
24 "golang.org/x/crypto/ssh"
25 check "gopkg.in/check.v1"
28 var _ = check.Suite(&DispatcherSuite{})
30 // fakeCloud provides an exec method that can be used as a
31 // test.StubExecFunc. It calls the provided makeVM func when called
32 // with a previously unseen instance ID. Calls to exec are passed on
33 // to the *fakeVM for the appropriate instance ID.
34 type fakeCloud struct {
36 makeVM func(cloud.Instance) *fakeVM
37 onComplete func(string)
39 vms map[cloud.InstanceID]*fakeVM
43 func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
45 fvm, ok := fc.vms[inst.ID()]
48 fc.vms = make(map[cloud.InstanceID]*fakeVM)
51 fc.vms[inst.ID()] = fvm
54 return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
57 // fakeVM is a fake VM with configurable delays and failure modes.
62 crunchRunCrashRate float64
63 crunchRunDetachDelay time.Duration
65 running map[string]bool
70 func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
71 uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
72 if eta := fvm.boot.Sub(time.Now()); eta > 0 {
73 fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
76 if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
77 fmt.Fprintf(stderr, "cannot fork\n")
80 if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
81 fmt.Fprint(stderr, "crunch-run: command not found\n")
84 if strings.HasPrefix(command, "crunch-run --detach ") {
86 if fvm.running == nil {
87 fvm.running = map[string]bool{}
89 fvm.running[uuid] = true
91 time.Sleep(fvm.crunchRunDetachDelay)
92 fmt.Fprintf(stderr, "starting %s\n", uuid)
93 logger := logrus.WithField("ContainerUUID", uuid)
94 logger.Printf("[test] starting crunch-run stub")
96 crashluck := rand.Float64()
97 ctr, ok := queue.Get(uuid)
99 logger.Print("[test] container not in queue")
102 if crashluck > fvm.crunchRunCrashRate/2 {
103 time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
104 ctr.State = arvados.ContainerStateRunning
108 time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
110 _, running := fvm.running[uuid]
113 logger.Print("[test] container was killed")
116 if crashluck < fvm.crunchRunCrashRate {
117 logger.Print("[test] crashing crunch-run stub")
118 if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
122 ctr.State = arvados.ContainerStateComplete
123 ctr.ExitCode = fvm.ctrExit
125 if onComplete != nil {
129 logger.Print("[test] exiting crunch-run stub")
132 delete(fvm.running, uuid)
136 if command == "crunch-run --list" {
139 for uuid := range fvm.running {
140 fmt.Fprintf(stdout, "%s\n", uuid)
144 if strings.HasPrefix(command, "crunch-run --kill ") {
147 if fvm.running[uuid] {
148 delete(fvm.running, uuid)
150 fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
154 if command == "true" {
157 fmt.Fprintf(stderr, "%q: command not found", command)
161 type DispatcherSuite struct {
162 cluster *arvados.Cluster
163 instanceSet *test.LameInstanceSet
164 stubDriver *test.StubDriver
168 func (s *DispatcherSuite) SetUpSuite(c *check.C) {
169 if os.Getenv("ARVADOS_DEBUG") != "" {
170 logrus.StandardLogger().SetLevel(logrus.DebugLevel)
174 func (s *DispatcherSuite) SetUpTest(c *check.C) {
175 dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
176 dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
177 c.Assert(err, check.IsNil)
179 _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
180 s.stubDriver = &test.StubDriver{
181 Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
182 c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
186 AuthorizedKeys: []ssh.PublicKey{dispatchpub},
189 s.cluster = &arvados.Cluster{
190 CloudVMs: arvados.CloudVMs{
192 SyncInterval: arvados.Duration(10 * time.Millisecond),
193 TimeoutIdle: arvados.Duration(30 * time.Millisecond),
194 TimeoutBooting: arvados.Duration(30 * time.Millisecond),
195 TimeoutProbe: arvados.Duration(15 * time.Millisecond),
196 TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
198 Dispatch: arvados.Dispatch{
199 PrivateKey: dispatchprivraw,
200 PollInterval: arvados.Duration(5 * time.Millisecond),
201 ProbeInterval: arvados.Duration(5 * time.Millisecond),
202 StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
203 MaxProbesPerSecond: 1000,
205 InstanceTypes: arvados.InstanceTypeMap{
206 test.InstanceType(1).Name: test.InstanceType(1),
207 test.InstanceType(2).Name: test.InstanceType(2),
208 test.InstanceType(3).Name: test.InstanceType(3),
209 test.InstanceType(4).Name: test.InstanceType(4),
210 test.InstanceType(6).Name: test.InstanceType(6),
211 test.InstanceType(8).Name: test.InstanceType(8),
212 test.InstanceType(16).Name: test.InstanceType(16),
214 NodeProfiles: map[string]arvados.NodeProfile{
216 Controller: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
217 DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
221 s.disp = &dispatcher{Cluster: s.cluster}
222 // Test cases can modify s.cluster before calling
223 // initialize(), and then modify private state before calling
227 func (s *DispatcherSuite) TearDownTest(c *check.C) {
231 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
232 drivers["test"] = s.stubDriver
233 s.disp.setupOnce.Do(s.disp.initialize)
234 queue := &test.Queue{
235 ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
236 return ChooseInstanceType(s.cluster, ctr)
239 for i := 0; i < 200; i++ {
240 queue.Containers = append(queue.Containers, arvados.Container{
241 UUID: test.ContainerUUID(i + 1),
242 State: arvados.ContainerStateQueued,
243 Priority: int64(i%20 + 1),
244 RuntimeConstraints: arvados.RuntimeConstraints{
245 RAM: int64(i%3+1) << 30,
253 done := make(chan struct{})
254 waiting := map[string]struct{}{}
255 for _, ctr := range queue.Containers {
256 waiting[ctr.UUID] = struct{}{}
258 onComplete := func(uuid string) {
261 if _, ok := waiting[uuid]; !ok {
262 c.Errorf("container completed twice: %s", uuid)
264 delete(waiting, uuid)
265 if len(waiting) == 0 {
272 makeVM: func(inst cloud.Instance) *fakeVM {
275 boot: time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
276 crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
277 ctrExit: int(rand.Uint32() & 0x3),
281 fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
283 fvm.crunchRunMissing = true
285 fvm.crunchRunCrashRate = 0.1
289 onComplete: onComplete,
290 onCancel: onComplete,
292 s.stubDriver.Exec = fc.exec
296 err := s.disp.CheckHealth()
297 c.Check(err, check.IsNil)
301 c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
302 case <-time.After(10 * time.Second):
303 c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
306 deadline := time.Now().Add(time.Second)
307 for range time.NewTicker(10 * time.Millisecond).C {
308 insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
309 c.Check(err, check.IsNil)
311 ents, _ := queue.Entries()
312 if len(ents) == 0 && len(insts) == 0 {
315 if time.Now().After(deadline) {
316 c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
321 func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
322 s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
323 drivers["test"] = s.stubDriver
325 type instance struct {
329 LastContainerUUID string
330 ArvadosInstanceType string
331 ProviderInstanceType string
333 type instancesResponse struct {
336 getInstances := func() instancesResponse {
337 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
338 resp := httptest.NewRecorder()
339 s.disp.ServeHTTP(resp, req)
340 var sr instancesResponse
341 err := json.Unmarshal(resp.Body.Bytes(), &sr)
342 c.Check(err, check.IsNil)
347 c.Check(len(sr.Items), check.Equals, 0)
349 ch := s.disp.pool.Subscribe()
350 defer s.disp.pool.Unsubscribe(ch)
351 err := s.disp.pool.Create(test.InstanceType(1))
352 c.Check(err, check.IsNil)
356 c.Assert(len(sr.Items), check.Equals, 1)
357 c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
358 c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
359 c.Check(sr.Items[0].Price, check.Equals, 0.123)
360 c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
361 c.Check(sr.Items[0].ProviderInstanceType, check.Equals, test.InstanceType(1).ProviderType)
362 c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, test.InstanceType(1).Name)