Merge branch 'github-pr-223'
[arvados.git] / lib / service / cmd_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // package service provides a cmd.Handler that brings up a system service.
6 package service
7
8 import (
9         "bytes"
10         "context"
11         "crypto/tls"
12         "encoding/json"
13         "fmt"
14         "io/ioutil"
15         "net"
16         "net/http"
17         "net/url"
18         "os"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "testing"
23         "time"
24
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "github.com/prometheus/client_golang/prometheus"
28         check "gopkg.in/check.v1"
29 )
30
31 func Test(t *testing.T) {
32         check.TestingT(t)
33 }
34
35 var _ = check.Suite(&Suite{})
36
37 type Suite struct{}
38 type key int
39
40 const (
41         contextKey key = iota
42 )
43
44 func unusedPort(c *check.C) string {
45         // Find an available port on the testing host, so the test
46         // cases don't get confused by "already in use" errors.
47         listener, err := net.Listen("tcp", ":")
48         c.Assert(err, check.IsNil)
49         listener.Close()
50         _, port, err := net.SplitHostPort(listener.Addr().String())
51         c.Assert(err, check.IsNil)
52         return port
53 }
54
55 func (*Suite) TestGetListenAddress(c *check.C) {
56         port := unusedPort(c)
57         defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
58         for idx, trial := range []struct {
59                 // internalURL => listenURL, both with trailing "/"
60                 // because config loader always adds it
61                 internalURLs     map[string]string
62                 envVar           string
63                 expectErrorMatch string
64                 expectLogsMatch  string
65                 expectListen     string
66                 expectInternal   string
67         }{
68                 {
69                         internalURLs:   map[string]string{"http://localhost:" + port + "/": ""},
70                         expectListen:   "http://localhost:" + port + "/",
71                         expectInternal: "http://localhost:" + port + "/",
72                 },
73                 { // implicit port 80 in InternalURLs
74                         internalURLs:     map[string]string{"http://localhost/": ""},
75                         expectErrorMatch: `.*:80: bind: permission denied`,
76                 },
77                 { // implicit port 443 in InternalURLs
78                         internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + port + "/"},
79                         expectListen:   "http://localhost:" + port + "/",
80                         expectInternal: "https://host.example/",
81                 },
82                 { // implicit port 443 in ListenURL
83                         internalURLs:     map[string]string{"wss://host.example/": "wss://localhost/"},
84                         expectErrorMatch: `.*:443: bind: permission denied`,
85                 },
86                 {
87                         internalURLs:   map[string]string{"https://hostname.example/": "http://localhost:8000/"},
88                         expectListen:   "http://localhost:8000/",
89                         expectInternal: "https://hostname.example/",
90                 },
91                 {
92                         internalURLs: map[string]string{
93                                 "https://hostname1.example/": "http://localhost:12435/",
94                                 "https://hostname2.example/": "http://localhost:" + port + "/",
95                         },
96                         envVar:         "https://hostname2.example", // note this works despite missing trailing "/"
97                         expectListen:   "http://localhost:" + port + "/",
98                         expectInternal: "https://hostname2.example/",
99                 },
100                 { // cannot listen on any of the ListenURLs
101                         internalURLs: map[string]string{
102                                 "https://hostname1.example/": "http://1.2.3.4:" + port + "/",
103                                 "https://hostname2.example/": "http://1.2.3.4:" + port + "/",
104                         },
105                         expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
106                 },
107                 { // cannot listen on any of the (implied) ListenURLs
108                         internalURLs: map[string]string{
109                                 "https://1.2.3.4/": "",
110                                 "https://1.2.3.5/": "",
111                         },
112                         expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
113                 },
114                 { // impossible port number
115                         internalURLs: map[string]string{
116                                 "https://host.example/": "http://0.0.0.0:1234567",
117                         },
118                         expectErrorMatch: `.*:1234567: listen tcp: address 1234567: invalid port`,
119                 },
120                 {
121                         // env var URL not mentioned in config = obey env var, with warning
122                         internalURLs:    map[string]string{"https://hostname1.example/": "http://localhost:8000/"},
123                         envVar:          "https://hostname2.example",
124                         expectListen:    "https://hostname2.example/",
125                         expectInternal:  "https://hostname2.example/",
126                         expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname2.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
127                 },
128                 {
129                         // env var + empty config = obey env var, with warning
130                         envVar:          "https://hostname.example",
131                         expectListen:    "https://hostname.example/",
132                         expectInternal:  "https://hostname.example/",
133                         expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
134                 },
135         } {
136                 c.Logf("trial %d %+v", idx, trial)
137                 os.Setenv("ARVADOS_SERVICE_INTERNAL_URL", trial.envVar)
138                 var logbuf bytes.Buffer
139                 log := ctxlog.New(&logbuf, "text", "info")
140                 services := arvados.Services{Controller: arvados.Service{InternalURLs: map[arvados.URL]arvados.ServiceInstance{}}}
141                 for k, v := range trial.internalURLs {
142                         u, err := url.Parse(k)
143                         c.Assert(err, check.IsNil)
144                         si := arvados.ServiceInstance{}
145                         if v != "" {
146                                 u, err := url.Parse(v)
147                                 c.Assert(err, check.IsNil)
148                                 si.ListenURL = arvados.URL(*u)
149                         }
150                         services.Controller.InternalURLs[arvados.URL(*u)] = si
151                 }
152                 listenURL, internalURL, err := getListenAddr(services, "arvados-controller", log)
153                 if trial.expectLogsMatch != "" {
154                         c.Check(logbuf.String(), check.Matches, trial.expectLogsMatch)
155                 }
156                 if trial.expectErrorMatch != "" {
157                         c.Check(err, check.ErrorMatches, trial.expectErrorMatch)
158                         continue
159                 }
160                 if !c.Check(err, check.IsNil) {
161                         continue
162                 }
163                 c.Check(listenURL.String(), check.Equals, trial.expectListen)
164                 c.Check(internalURL.String(), check.Equals, trial.expectInternal)
165         }
166 }
167
168 func (*Suite) TestCommand(c *check.C) {
169         cf, err := ioutil.TempFile("", "cmd_test.")
170         c.Assert(err, check.IsNil)
171         defer os.Remove(cf.Name())
172         defer cf.Close()
173         fmt.Fprintf(cf, "Clusters:\n zzzzz:\n  SystemRootToken: abcde\n  NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")
174
175         healthCheck := make(chan bool, 1)
176         ctx, cancel := context.WithCancel(context.Background())
177         defer cancel()
178
179         cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
180                 c.Check(ctx.Value(contextKey), check.Equals, "bar")
181                 c.Check(token, check.Equals, "abcde")
182                 return &testHandler{ctx: ctx, healthCheck: healthCheck}
183         })
184         cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
185
186         done := make(chan bool)
187         var stdin, stdout, stderr bytes.Buffer
188
189         go func() {
190                 cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
191                 close(done)
192         }()
193         select {
194         case <-healthCheck:
195         case <-done:
196                 c.Error("command exited without health check")
197         }
198         cancel()
199         c.Check(stdout.String(), check.Equals, "")
200         c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
201 }
202
203 func (s *Suite) TestTunnelPathRegexp(c *check.C) {
204         c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
205         c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
206         c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
207         c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
208         c.Check(reTunnelPath.MatchString(`/blah/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, false)
209         c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa`), check.Equals, false)
210 }
211
212 func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) {
213         s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
214 }
215
216 func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) {
217         s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
218 }
219
220 func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
221         defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
222         requestQueueDumpCheckInterval = time.Second / 10
223
224         port := unusedPort(c)
225         tmpdir := c.MkDir()
226         cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
227         c.Assert(err, check.IsNil)
228         defer os.Remove(cf.Name())
229         defer cf.Close()
230
231         max := 24
232         maxTunnels := 30
233         fmt.Fprintf(cf, `
234 Clusters:
235  zzzzz:
236   SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
237   ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
238   API:
239    `+maxReqsConfigKey+`: %d
240    MaxQueuedRequests: 1
241    MaxGatewayTunnels: %d
242   SystemLogs: {RequestQueueDumpDirectory: %q}
243   Services:
244    Controller:
245     ExternalURL: "http://localhost:`+port+`"
246     InternalURLs: {"http://localhost:`+port+`": {}}
247    WebDAV:
248     ExternalURL: "http://localhost:`+port+`"
249     InternalURLs: {"http://localhost:`+port+`": {}}
250 `, max, maxTunnels, tmpdir)
251         cf.Close()
252
253         started := make(chan bool, max+1)
254         hold := make(chan bool)
255         handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
256                 if strings.Contains(r.URL.Path, "/ssh") || strings.Contains(r.URL.Path, "/gateway_tunnel") {
257                         <-hold
258                 } else {
259                         started <- true
260                         <-hold
261                 }
262         })
263         healthCheck := make(chan bool, 1)
264         ctx, cancel := context.WithCancel(context.Background())
265         defer cancel()
266
267         cmd := Command(serviceName, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
268                 return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
269         })
270         cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
271
272         exited := make(chan bool)
273         var stdin, stdout, stderr bytes.Buffer
274
275         go func() {
276                 cmd.RunCommand(string(serviceName), []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
277                 close(exited)
278         }()
279         select {
280         case <-healthCheck:
281         case <-exited:
282                 c.Logf("%s", stderr.String())
283                 c.Error("command exited without health check")
284         }
285         client := http.Client{}
286         deadline := time.Now().Add(time.Second * 2)
287         var activeReqs sync.WaitGroup
288
289         // Start some API reqs
290         var apiResp200, apiResp503 int64
291         for i := 0; i < max+1; i++ {
292                 activeReqs.Add(1)
293                 go func() {
294                         defer activeReqs.Done()
295                         target := "http://localhost:" + port + "/testpath"
296                         resp, err := client.Get(target)
297                         for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
298                                 time.Sleep(time.Second / 100)
299                                 resp, err = client.Get(target)
300                         }
301                         if c.Check(err, check.IsNil) {
302                                 if resp.StatusCode == http.StatusOK {
303                                         atomic.AddInt64(&apiResp200, 1)
304                                 } else if resp.StatusCode == http.StatusServiceUnavailable {
305                                         atomic.AddInt64(&apiResp503, 1)
306                                 }
307                         }
308                 }()
309         }
310
311         // Start some gateway tunnel reqs that don't count toward our
312         // API req limit
313         extraTunnelReqs := 20
314         var tunnelResp200, tunnelResp503 int64
315         var paths = []string{
316                 "/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
317                 "/" + strings.Replace(arvados.EndpointContainerSSHCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
318                 "/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
319                 "/" + strings.Replace(arvados.EndpointContainerGatewayTunnelCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
320         }
321         for i := 0; i < maxTunnels+extraTunnelReqs; i++ {
322                 i := i
323                 activeReqs.Add(1)
324                 go func() {
325                         defer activeReqs.Done()
326                         target := "http://localhost:" + port + paths[i%len(paths)]
327                         resp, err := client.Post(target, "application/octet-stream", nil)
328                         for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
329                                 time.Sleep(time.Second / 100)
330                                 resp, err = client.Post(target, "application/octet-stream", nil)
331                         }
332                         if c.Check(err, check.IsNil) {
333                                 if resp.StatusCode == http.StatusOK {
334                                         atomic.AddInt64(&tunnelResp200, 1)
335                                 } else if resp.StatusCode == http.StatusServiceUnavailable {
336                                         atomic.AddInt64(&tunnelResp503, 1)
337                                 } else {
338                                         c.Errorf("tunnel response code %d", resp.StatusCode)
339                                 }
340                         }
341                 }()
342         }
343         for i := 0; i < max; i++ {
344                 select {
345                 case <-started:
346                 case <-time.After(time.Second):
347                         c.Logf("%s", stderr.String())
348                         c.Logf("apiResp200 %d", apiResp200)
349                         c.Logf("apiResp503 %d", apiResp503)
350                         c.Logf("tunnelResp200 %d", tunnelResp200)
351                         c.Logf("tunnelResp503 %d", tunnelResp503)
352                         c.Fatal("timed out")
353                 }
354         }
355         for delay := time.Second / 100; ; delay = delay * 2 {
356                 time.Sleep(delay)
357                 j, err := os.ReadFile(tmpdir + "/" + string(serviceName) + "-requests.json")
358                 if os.IsNotExist(err) && deadline.After(time.Now()) {
359                         continue
360                 }
361                 c.Assert(err, check.IsNil)
362                 c.Logf("stderr:\n%s", stderr.String())
363                 c.Logf("json:\n%s", string(j))
364
365                 var loaded []struct{ URL string }
366                 err = json.Unmarshal(j, &loaded)
367                 c.Check(err, check.IsNil)
368
369                 for i := 0; i < len(loaded); i++ {
370                         if strings.Contains(loaded[i].URL, "/ssh") || strings.Contains(loaded[i].URL, "/gateway_tunnel") {
371                                 // Filter out a gateway tunnel req
372                                 // that doesn't count toward our API
373                                 // req limit
374                                 if i < len(loaded)-1 {
375                                         copy(loaded[i:], loaded[i+1:])
376                                         i--
377                                 }
378                                 loaded = loaded[:len(loaded)-1]
379                         }
380                 }
381
382                 if len(loaded) < max {
383                         // Dumped when #requests was >90% but <100% of
384                         // limit. If we stop now, we won't be able to
385                         // confirm (below) that management endpoints
386                         // are still accessible when normal requests
387                         // are at 100%.
388                         c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
389                         continue
390                 }
391                 c.Check(loaded, check.HasLen, max+1)
392                 c.Check(loaded[0].URL, check.Equals, "/testpath")
393                 break
394         }
395
396         for _, path := range []string{"/_inspect/requests", "/metrics"} {
397                 req, err := http.NewRequest("GET", "http://localhost:"+port+""+path, nil)
398                 c.Assert(err, check.IsNil)
399                 req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
400                 resp, err := client.Do(req)
401                 if !c.Check(err, check.IsNil) {
402                         break
403                 }
404                 c.Logf("got response for %s", path)
405                 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
406                 buf, err := ioutil.ReadAll(resp.Body)
407                 c.Check(err, check.IsNil)
408                 switch path {
409                 case "/metrics":
410                         c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`)
411                         c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`)
412                 case "/_inspect/requests":
413                         c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
414                 default:
415                         c.Error("oops, testing bug")
416                 }
417         }
418         close(hold)
419         activeReqs.Wait()
420         c.Check(int(apiResp200), check.Equals, max+1)
421         c.Check(int(apiResp503), check.Equals, 0)
422         c.Check(int(tunnelResp200), check.Equals, maxTunnels)
423         c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs)
424         cancel()
425 }
426
427 func (*Suite) TestTLS(c *check.C) {
428         port := unusedPort(c)
429         cwd, err := os.Getwd()
430         c.Assert(err, check.IsNil)
431
432         stdin := bytes.NewBufferString(`
433 Clusters:
434  zzzzz:
435   SystemRootToken: abcde
436   Services:
437    Controller:
438     ExternalURL: "https://localhost:` + port + `"
439     InternalURLs: {"https://localhost:` + port + `": {}}
440   TLS:
441    Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
442    Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
443 `)
444
445         called := make(chan bool)
446         cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
447                 return &testHandler{handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
448                         w.Write([]byte("ok"))
449                         close(called)
450                 })}
451         })
452
453         exited := make(chan bool)
454         var stdout, stderr bytes.Buffer
455         go func() {
456                 cmd.RunCommand("arvados-controller", []string{"-config", "-"}, stdin, &stdout, &stderr)
457                 close(exited)
458         }()
459         got := make(chan bool)
460         go func() {
461                 defer close(got)
462                 client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
463                 for range time.NewTicker(time.Millisecond).C {
464                         resp, err := client.Get("https://localhost:" + port)
465                         if err != nil {
466                                 c.Log(err)
467                                 continue
468                         }
469                         body, err := ioutil.ReadAll(resp.Body)
470                         c.Check(err, check.IsNil)
471                         c.Logf("status %d, body %s", resp.StatusCode, string(body))
472                         c.Check(resp.StatusCode, check.Equals, http.StatusOK)
473                         break
474                 }
475         }()
476         select {
477         case <-called:
478         case <-exited:
479                 c.Error("command exited without calling handler")
480         case <-time.After(time.Second):
481                 c.Error("timed out")
482         }
483         select {
484         case <-got:
485         case <-exited:
486                 c.Error("command exited before client received response")
487         case <-time.After(time.Second):
488                 c.Error("timed out")
489         }
490         c.Log(stderr.String())
491 }
492
493 type testHandler struct {
494         ctx         context.Context
495         handler     http.Handler
496         healthCheck chan bool
497 }
498
499 func (th *testHandler) Done() <-chan struct{}                            { return nil }
500 func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
501 func (th *testHandler) CheckHealth() error {
502         ctxlog.FromContext(th.ctx).Info("CheckHealth called")
503         select {
504         case th.healthCheck <- true:
505         default:
506         }
507         return nil
508 }