14807: Accept .../instances/_/drain?instance_id=X.
[arvados.git] / lib / dispatchcloud / dispatcher_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "encoding/json"
9         "io/ioutil"
10         "math/rand"
11         "net/http"
12         "net/http/httptest"
13         "os"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "golang.org/x/crypto/ssh"
20         check "gopkg.in/check.v1"
21 )
22
23 var _ = check.Suite(&DispatcherSuite{})
24
25 type DispatcherSuite struct {
26         cluster    *arvados.Cluster
27         stubDriver *test.StubDriver
28         disp       *dispatcher
29 }
30
31 func (s *DispatcherSuite) SetUpTest(c *check.C) {
32         dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
33         dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
34         c.Assert(err, check.IsNil)
35
36         _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
37         s.stubDriver = &test.StubDriver{
38                 HostKey:                   hostpriv,
39                 AuthorizedKeys:            []ssh.PublicKey{dispatchpub},
40                 ErrorRateDestroy:          0.1,
41                 MinTimeBetweenCreateCalls: time.Millisecond,
42         }
43
44         s.cluster = &arvados.Cluster{
45                 CloudVMs: arvados.CloudVMs{
46                         Driver:          "test",
47                         SyncInterval:    arvados.Duration(10 * time.Millisecond),
48                         TimeoutIdle:     arvados.Duration(150 * time.Millisecond),
49                         TimeoutBooting:  arvados.Duration(150 * time.Millisecond),
50                         TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
51                         TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
52                 },
53                 Dispatch: arvados.Dispatch{
54                         PrivateKey:         string(dispatchprivraw),
55                         PollInterval:       arvados.Duration(5 * time.Millisecond),
56                         ProbeInterval:      arvados.Duration(5 * time.Millisecond),
57                         StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
58                         MaxProbesPerSecond: 1000,
59                 },
60                 InstanceTypes: arvados.InstanceTypeMap{
61                         test.InstanceType(1).Name:  test.InstanceType(1),
62                         test.InstanceType(2).Name:  test.InstanceType(2),
63                         test.InstanceType(3).Name:  test.InstanceType(3),
64                         test.InstanceType(4).Name:  test.InstanceType(4),
65                         test.InstanceType(6).Name:  test.InstanceType(6),
66                         test.InstanceType(8).Name:  test.InstanceType(8),
67                         test.InstanceType(16).Name: test.InstanceType(16),
68                 },
69                 NodeProfiles: map[string]arvados.NodeProfile{
70                         "*": {
71                                 Controller:    arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
72                                 DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
73                         },
74                 },
75         }
76         s.disp = &dispatcher{Cluster: s.cluster}
77         // Test cases can modify s.cluster before calling
78         // initialize(), and then modify private state before calling
79         // go run().
80 }
81
82 func (s *DispatcherSuite) TearDownTest(c *check.C) {
83         s.disp.Close()
84 }
85
86 // DispatchToStubDriver checks that the dispatcher wires everything
87 // together effectively. It uses a real scheduler and worker pool with
88 // a fake queue and cloud driver. The fake cloud driver injects
89 // artificial errors in order to exercise a variety of code paths.
90 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
91         drivers["test"] = s.stubDriver
92         s.disp.setupOnce.Do(s.disp.initialize)
93         queue := &test.Queue{
94                 ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
95                         return ChooseInstanceType(s.cluster, ctr)
96                 },
97         }
98         for i := 0; i < 200; i++ {
99                 queue.Containers = append(queue.Containers, arvados.Container{
100                         UUID:     test.ContainerUUID(i + 1),
101                         State:    arvados.ContainerStateQueued,
102                         Priority: int64(i%20 + 1),
103                         RuntimeConstraints: arvados.RuntimeConstraints{
104                                 RAM:   int64(i%3+1) << 30,
105                                 VCPUs: i%8 + 1,
106                         },
107                 })
108         }
109         s.disp.queue = queue
110
111         var mtx sync.Mutex
112         done := make(chan struct{})
113         waiting := map[string]struct{}{}
114         for _, ctr := range queue.Containers {
115                 waiting[ctr.UUID] = struct{}{}
116         }
117         executeContainer := func(ctr arvados.Container) int {
118                 mtx.Lock()
119                 defer mtx.Unlock()
120                 if _, ok := waiting[ctr.UUID]; !ok {
121                         c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
122                         return 1
123                 }
124                 delete(waiting, ctr.UUID)
125                 if len(waiting) == 0 {
126                         close(done)
127                 }
128                 return int(rand.Uint32() & 0x3)
129         }
130         n := 0
131         s.stubDriver.Queue = queue
132         s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
133                 n++
134                 stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
135                 stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
136                 stubvm.ExecuteContainer = executeContainer
137                 switch n % 7 {
138                 case 0:
139                         stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
140                 case 1:
141                         stubvm.CrunchRunMissing = true
142                 default:
143                         stubvm.CrunchRunCrashRate = 0.1
144                 }
145         }
146
147         start := time.Now()
148         go s.disp.run()
149         err := s.disp.CheckHealth()
150         c.Check(err, check.IsNil)
151
152         select {
153         case <-done:
154                 c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
155         case <-time.After(10 * time.Second):
156                 c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
157         }
158
159         deadline := time.Now().Add(5 * time.Second)
160         for range time.NewTicker(10 * time.Millisecond).C {
161                 insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
162                 c.Check(err, check.IsNil)
163                 queue.Update()
164                 ents, _ := queue.Entries()
165                 if len(ents) == 0 && len(insts) == 0 {
166                         break
167                 }
168                 if time.Now().After(deadline) {
169                         c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
170                 }
171         }
172 }
173
174 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
175         s.cluster.ManagementToken = "abcdefgh"
176         drivers["test"] = s.stubDriver
177         s.disp.setupOnce.Do(s.disp.initialize)
178         s.disp.queue = &test.Queue{}
179         go s.disp.run()
180
181         for _, token := range []string{"abc", ""} {
182                 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
183                 if token != "" {
184                         req.Header.Set("Authorization", "Bearer "+token)
185                 }
186                 resp := httptest.NewRecorder()
187                 s.disp.ServeHTTP(resp, req)
188                 if token == "" {
189                         c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
190                 } else {
191                         c.Check(resp.Code, check.Equals, http.StatusForbidden)
192                 }
193         }
194 }
195
196 func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
197         s.cluster.ManagementToken = ""
198         drivers["test"] = s.stubDriver
199         s.disp.setupOnce.Do(s.disp.initialize)
200         s.disp.queue = &test.Queue{}
201         go s.disp.run()
202
203         for _, token := range []string{"abc", ""} {
204                 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
205                 if token != "" {
206                         req.Header.Set("Authorization", "Bearer "+token)
207                 }
208                 resp := httptest.NewRecorder()
209                 s.disp.ServeHTTP(resp, req)
210                 c.Check(resp.Code, check.Equals, http.StatusForbidden)
211         }
212 }
213
214 func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
215         s.cluster.ManagementToken = "abcdefgh"
216         s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
217         drivers["test"] = s.stubDriver
218         s.disp.setupOnce.Do(s.disp.initialize)
219         s.disp.queue = &test.Queue{}
220         go s.disp.run()
221
222         type instance struct {
223                 Instance             string
224                 WorkerState          string `json:"worker_state"`
225                 Price                float64
226                 LastContainerUUID    string `json:"last_container_uuid"`
227                 ArvadosInstanceType  string `json:"arvados_instance_type"`
228                 ProviderInstanceType string `json:"provider_instance_type"`
229         }
230         type instancesResponse struct {
231                 Items []instance
232         }
233         getInstances := func() instancesResponse {
234                 req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
235                 req.Header.Set("Authorization", "Bearer abcdefgh")
236                 resp := httptest.NewRecorder()
237                 s.disp.ServeHTTP(resp, req)
238                 var sr instancesResponse
239                 c.Check(resp.Code, check.Equals, http.StatusOK)
240                 err := json.Unmarshal(resp.Body.Bytes(), &sr)
241                 c.Check(err, check.IsNil)
242                 return sr
243         }
244
245         sr := getInstances()
246         c.Check(len(sr.Items), check.Equals, 0)
247
248         ch := s.disp.pool.Subscribe()
249         defer s.disp.pool.Unsubscribe(ch)
250         ok := s.disp.pool.Create(test.InstanceType(1))
251         c.Check(ok, check.Equals, true)
252         <-ch
253
254         sr = getInstances()
255         c.Assert(len(sr.Items), check.Equals, 1)
256         c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
257         c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
258         c.Check(sr.Items[0].Price, check.Equals, 0.123)
259         c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
260         c.Check(sr.Items[0].ProviderInstanceType, check.Equals, test.InstanceType(1).ProviderType)
261         c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, test.InstanceType(1).Name)
262 }