20846: Merge branch '19213-ubuntu2204-support' into 20846-ubuntu2204
[arvados.git] / lib / service / cmd_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // package service provides a cmd.Handler that brings up a system service.
6 package service
7
8 import (
9         "bytes"
10         "context"
11         "crypto/tls"
12         "encoding/json"
13         "fmt"
14         "io/ioutil"
15         "net"
16         "net/http"
17         "net/url"
18         "os"
19         "strings"
20         "testing"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/prometheus/client_golang/prometheus"
26         check "gopkg.in/check.v1"
27 )
28
29 func Test(t *testing.T) {
30         check.TestingT(t)
31 }
32
33 var _ = check.Suite(&Suite{})
34
35 type Suite struct{}
36 type key int
37
38 const (
39         contextKey key = iota
40 )
41
42 func unusedPort(c *check.C) string {
43         // Find an available port on the testing host, so the test
44         // cases don't get confused by "already in use" errors.
45         listener, err := net.Listen("tcp", ":")
46         c.Assert(err, check.IsNil)
47         listener.Close()
48         _, port, err := net.SplitHostPort(listener.Addr().String())
49         c.Assert(err, check.IsNil)
50         return port
51 }
52
53 func (*Suite) TestGetListenAddress(c *check.C) {
54         port := unusedPort(c)
55         defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
56         for idx, trial := range []struct {
57                 // internalURL => listenURL, both with trailing "/"
58                 // because config loader always adds it
59                 internalURLs     map[string]string
60                 envVar           string
61                 expectErrorMatch string
62                 expectLogsMatch  string
63                 expectListen     string
64                 expectInternal   string
65         }{
66                 {
67                         internalURLs:   map[string]string{"http://localhost:" + port + "/": ""},
68                         expectListen:   "http://localhost:" + port + "/",
69                         expectInternal: "http://localhost:" + port + "/",
70                 },
71                 { // implicit port 80 in InternalURLs
72                         internalURLs:     map[string]string{"http://localhost/": ""},
73                         expectErrorMatch: `.*:80: bind: permission denied`,
74                 },
75                 { // implicit port 443 in InternalURLs
76                         internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + port + "/"},
77                         expectListen:   "http://localhost:" + port + "/",
78                         expectInternal: "https://host.example/",
79                 },
80                 { // implicit port 443 in ListenURL
81                         internalURLs:     map[string]string{"wss://host.example/": "wss://localhost/"},
82                         expectErrorMatch: `.*:443: bind: permission denied`,
83                 },
84                 {
85                         internalURLs:   map[string]string{"https://hostname.example/": "http://localhost:8000/"},
86                         expectListen:   "http://localhost:8000/",
87                         expectInternal: "https://hostname.example/",
88                 },
89                 {
90                         internalURLs: map[string]string{
91                                 "https://hostname1.example/": "http://localhost:12435/",
92                                 "https://hostname2.example/": "http://localhost:" + port + "/",
93                         },
94                         envVar:         "https://hostname2.example", // note this works despite missing trailing "/"
95                         expectListen:   "http://localhost:" + port + "/",
96                         expectInternal: "https://hostname2.example/",
97                 },
98                 { // cannot listen on any of the ListenURLs
99                         internalURLs: map[string]string{
100                                 "https://hostname1.example/": "http://1.2.3.4:" + port + "/",
101                                 "https://hostname2.example/": "http://1.2.3.4:" + port + "/",
102                         },
103                         expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
104                 },
105                 { // cannot listen on any of the (implied) ListenURLs
106                         internalURLs: map[string]string{
107                                 "https://1.2.3.4/": "",
108                                 "https://1.2.3.5/": "",
109                         },
110                         expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
111                 },
112                 { // impossible port number
113                         internalURLs: map[string]string{
114                                 "https://host.example/": "http://0.0.0.0:1234567",
115                         },
116                         expectErrorMatch: `.*:1234567: listen tcp: address 1234567: invalid port`,
117                 },
118                 {
119                         // env var URL not mentioned in config = obey env var, with warning
120                         internalURLs:    map[string]string{"https://hostname1.example/": "http://localhost:8000/"},
121                         envVar:          "https://hostname2.example",
122                         expectListen:    "https://hostname2.example/",
123                         expectInternal:  "https://hostname2.example/",
124                         expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname2.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
125                 },
126                 {
127                         // env var + empty config = obey env var, with warning
128                         envVar:          "https://hostname.example",
129                         expectListen:    "https://hostname.example/",
130                         expectInternal:  "https://hostname.example/",
131                         expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
132                 },
133         } {
134                 c.Logf("trial %d %+v", idx, trial)
135                 os.Setenv("ARVADOS_SERVICE_INTERNAL_URL", trial.envVar)
136                 var logbuf bytes.Buffer
137                 log := ctxlog.New(&logbuf, "text", "info")
138                 services := arvados.Services{Controller: arvados.Service{InternalURLs: map[arvados.URL]arvados.ServiceInstance{}}}
139                 for k, v := range trial.internalURLs {
140                         u, err := url.Parse(k)
141                         c.Assert(err, check.IsNil)
142                         si := arvados.ServiceInstance{}
143                         if v != "" {
144                                 u, err := url.Parse(v)
145                                 c.Assert(err, check.IsNil)
146                                 si.ListenURL = arvados.URL(*u)
147                         }
148                         services.Controller.InternalURLs[arvados.URL(*u)] = si
149                 }
150                 listenURL, internalURL, err := getListenAddr(services, "arvados-controller", log)
151                 if trial.expectLogsMatch != "" {
152                         c.Check(logbuf.String(), check.Matches, trial.expectLogsMatch)
153                 }
154                 if trial.expectErrorMatch != "" {
155                         c.Check(err, check.ErrorMatches, trial.expectErrorMatch)
156                         continue
157                 }
158                 if !c.Check(err, check.IsNil) {
159                         continue
160                 }
161                 c.Check(listenURL.String(), check.Equals, trial.expectListen)
162                 c.Check(internalURL.String(), check.Equals, trial.expectInternal)
163         }
164 }
165
166 func (*Suite) TestCommand(c *check.C) {
167         cf, err := ioutil.TempFile("", "cmd_test.")
168         c.Assert(err, check.IsNil)
169         defer os.Remove(cf.Name())
170         defer cf.Close()
171         fmt.Fprintf(cf, "Clusters:\n zzzzz:\n  SystemRootToken: abcde\n  NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")
172
173         healthCheck := make(chan bool, 1)
174         ctx, cancel := context.WithCancel(context.Background())
175         defer cancel()
176
177         cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
178                 c.Check(ctx.Value(contextKey), check.Equals, "bar")
179                 c.Check(token, check.Equals, "abcde")
180                 return &testHandler{ctx: ctx, healthCheck: healthCheck}
181         })
182         cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
183
184         done := make(chan bool)
185         var stdin, stdout, stderr bytes.Buffer
186
187         go func() {
188                 cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
189                 close(done)
190         }()
191         select {
192         case <-healthCheck:
193         case <-done:
194                 c.Error("command exited without health check")
195         }
196         cancel()
197         c.Check(stdout.String(), check.Equals, "")
198         c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
199 }
200
201 func (s *Suite) TestDumpRequestsKeepweb(c *check.C) {
202         s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
203 }
204
205 func (s *Suite) TestDumpRequestsController(c *check.C) {
206         s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
207 }
208
209 func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
210         defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
211         requestQueueDumpCheckInterval = time.Second / 10
212
213         port := unusedPort(c)
214         tmpdir := c.MkDir()
215         cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
216         c.Assert(err, check.IsNil)
217         defer os.Remove(cf.Name())
218         defer cf.Close()
219
220         max := 24
221         fmt.Fprintf(cf, `
222 Clusters:
223  zzzzz:
224   SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
225   ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
226   API:
227    `+maxReqsConfigKey+`: %d
228    MaxQueuedRequests: 0
229   SystemLogs: {RequestQueueDumpDirectory: %q}
230   Services:
231    Controller:
232     ExternalURL: "http://localhost:`+port+`"
233     InternalURLs: {"http://localhost:`+port+`": {}}
234    WebDAV:
235     ExternalURL: "http://localhost:`+port+`"
236     InternalURLs: {"http://localhost:`+port+`": {}}
237 `, max, tmpdir)
238         cf.Close()
239
240         started := make(chan bool, max+1)
241         hold := make(chan bool)
242         handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
243                 started <- true
244                 <-hold
245         })
246         healthCheck := make(chan bool, 1)
247         ctx, cancel := context.WithCancel(context.Background())
248         defer cancel()
249
250         cmd := Command(serviceName, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
251                 return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
252         })
253         cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
254
255         exited := make(chan bool)
256         var stdin, stdout, stderr bytes.Buffer
257
258         go func() {
259                 cmd.RunCommand(string(serviceName), []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
260                 close(exited)
261         }()
262         select {
263         case <-healthCheck:
264         case <-exited:
265                 c.Logf("%s", stderr.String())
266                 c.Error("command exited without health check")
267         }
268         client := http.Client{}
269         deadline := time.Now().Add(time.Second * 2)
270         for i := 0; i < max+1; i++ {
271                 go func() {
272                         resp, err := client.Get("http://localhost:" + port + "/testpath")
273                         for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
274                                 time.Sleep(time.Second / 100)
275                                 resp, err = client.Get("http://localhost:" + port + "/testpath")
276                         }
277                         if c.Check(err, check.IsNil) {
278                                 c.Logf("resp StatusCode %d", resp.StatusCode)
279                         }
280                 }()
281         }
282         for i := 0; i < max; i++ {
283                 select {
284                 case <-started:
285                 case <-time.After(time.Second):
286                         c.Logf("%s", stderr.String())
287                         c.Fatal("timed out")
288                 }
289         }
290         for delay := time.Second / 100; ; delay = delay * 2 {
291                 time.Sleep(delay)
292                 j, err := os.ReadFile(tmpdir + "/" + string(serviceName) + "-requests.json")
293                 if os.IsNotExist(err) && deadline.After(time.Now()) {
294                         continue
295                 }
296                 c.Assert(err, check.IsNil)
297                 c.Logf("stderr:\n%s", stderr.String())
298                 c.Logf("json:\n%s", string(j))
299
300                 var loaded []struct{ URL string }
301                 err = json.Unmarshal(j, &loaded)
302                 c.Check(err, check.IsNil)
303                 if len(loaded) < max {
304                         // Dumped when #requests was >90% but <100% of
305                         // limit. If we stop now, we won't be able to
306                         // confirm (below) that management endpoints
307                         // are still accessible when normal requests
308                         // are at 100%.
309                         c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
310                         continue
311                 }
312                 c.Check(loaded, check.HasLen, max)
313                 c.Check(loaded[0].URL, check.Equals, "/testpath")
314                 break
315         }
316
317         for _, path := range []string{"/_inspect/requests", "/metrics"} {
318                 req, err := http.NewRequest("GET", "http://localhost:"+port+""+path, nil)
319                 c.Assert(err, check.IsNil)
320                 req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
321                 resp, err := client.Do(req)
322                 if !c.Check(err, check.IsNil) {
323                         break
324                 }
325                 c.Logf("got response for %s", path)
326                 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
327                 buf, err := ioutil.ReadAll(resp.Body)
328                 c.Check(err, check.IsNil)
329                 switch path {
330                 case "/metrics":
331                         c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`)
332                 case "/_inspect/requests":
333                         c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
334                 default:
335                         c.Error("oops, testing bug")
336                 }
337         }
338         close(hold)
339         cancel()
340 }
341
342 func (*Suite) TestTLS(c *check.C) {
343         port := unusedPort(c)
344         cwd, err := os.Getwd()
345         c.Assert(err, check.IsNil)
346
347         stdin := bytes.NewBufferString(`
348 Clusters:
349  zzzzz:
350   SystemRootToken: abcde
351   Services:
352    Controller:
353     ExternalURL: "https://localhost:` + port + `"
354     InternalURLs: {"https://localhost:` + port + `": {}}
355   TLS:
356    Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
357    Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
358 `)
359
360         called := make(chan bool)
361         cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
362                 return &testHandler{handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
363                         w.Write([]byte("ok"))
364                         close(called)
365                 })}
366         })
367
368         exited := make(chan bool)
369         var stdout, stderr bytes.Buffer
370         go func() {
371                 cmd.RunCommand("arvados-controller", []string{"-config", "-"}, stdin, &stdout, &stderr)
372                 close(exited)
373         }()
374         got := make(chan bool)
375         go func() {
376                 defer close(got)
377                 client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
378                 for range time.NewTicker(time.Millisecond).C {
379                         resp, err := client.Get("https://localhost:" + port)
380                         if err != nil {
381                                 c.Log(err)
382                                 continue
383                         }
384                         body, err := ioutil.ReadAll(resp.Body)
385                         c.Check(err, check.IsNil)
386                         c.Logf("status %d, body %s", resp.StatusCode, string(body))
387                         c.Check(resp.StatusCode, check.Equals, http.StatusOK)
388                         break
389                 }
390         }()
391         select {
392         case <-called:
393         case <-exited:
394                 c.Error("command exited without calling handler")
395         case <-time.After(time.Second):
396                 c.Error("timed out")
397         }
398         select {
399         case <-got:
400         case <-exited:
401                 c.Error("command exited before client received response")
402         case <-time.After(time.Second):
403                 c.Error("timed out")
404         }
405         c.Log(stderr.String())
406 }
407
408 type testHandler struct {
409         ctx         context.Context
410         handler     http.Handler
411         healthCheck chan bool
412 }
413
414 func (th *testHandler) Done() <-chan struct{}                            { return nil }
415 func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
416 func (th *testHandler) CheckHealth() error {
417         ctxlog.FromContext(th.ctx).Info("CheckHealth called")
418         select {
419         case th.healthCheck <- true:
420         default:
421         }
422         return nil
423 }