20602: Disable queue when testing DumpRequests.
[arvados.git] / lib / service / cmd_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 // package service provides a cmd.Handler that brings up a system service.
6 package service
7
8 import (
9         "bytes"
10         "context"
11         "crypto/tls"
12         "encoding/json"
13         "fmt"
14         "io/ioutil"
15         "net"
16         "net/http"
17         "net/url"
18         "os"
19         "strings"
20         "testing"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/prometheus/client_golang/prometheus"
26         check "gopkg.in/check.v1"
27 )
28
29 func Test(t *testing.T) {
30         check.TestingT(t)
31 }
32
33 var _ = check.Suite(&Suite{})
34
35 type Suite struct{}
36 type key int
37
38 const (
39         contextKey key = iota
40 )
41
42 func (*Suite) TestGetListenAddress(c *check.C) {
43         // Find an available port on the testing host, so the test
44         // cases don't get confused by "already in use" errors.
45         listener, err := net.Listen("tcp", ":")
46         c.Assert(err, check.IsNil)
47         _, unusedPort, err := net.SplitHostPort(listener.Addr().String())
48         c.Assert(err, check.IsNil)
49         listener.Close()
50
51         defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
52         for idx, trial := range []struct {
53                 // internalURL => listenURL, both with trailing "/"
54                 // because config loader always adds it
55                 internalURLs     map[string]string
56                 envVar           string
57                 expectErrorMatch string
58                 expectLogsMatch  string
59                 expectListen     string
60                 expectInternal   string
61         }{
62                 {
63                         internalURLs:   map[string]string{"http://localhost:" + unusedPort + "/": ""},
64                         expectListen:   "http://localhost:" + unusedPort + "/",
65                         expectInternal: "http://localhost:" + unusedPort + "/",
66                 },
67                 { // implicit port 80 in InternalURLs
68                         internalURLs:     map[string]string{"http://localhost/": ""},
69                         expectErrorMatch: `.*:80: bind: permission denied`,
70                 },
71                 { // implicit port 443 in InternalURLs
72                         internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + unusedPort + "/"},
73                         expectListen:   "http://localhost:" + unusedPort + "/",
74                         expectInternal: "https://host.example/",
75                 },
76                 { // implicit port 443 in ListenURL
77                         internalURLs:     map[string]string{"wss://host.example/": "wss://localhost/"},
78                         expectErrorMatch: `.*:443: bind: permission denied`,
79                 },
80                 {
81                         internalURLs:   map[string]string{"https://hostname.example/": "http://localhost:8000/"},
82                         expectListen:   "http://localhost:8000/",
83                         expectInternal: "https://hostname.example/",
84                 },
85                 {
86                         internalURLs: map[string]string{
87                                 "https://hostname1.example/": "http://localhost:12435/",
88                                 "https://hostname2.example/": "http://localhost:" + unusedPort + "/",
89                         },
90                         envVar:         "https://hostname2.example", // note this works despite missing trailing "/"
91                         expectListen:   "http://localhost:" + unusedPort + "/",
92                         expectInternal: "https://hostname2.example/",
93                 },
94                 { // cannot listen on any of the ListenURLs
95                         internalURLs: map[string]string{
96                                 "https://hostname1.example/": "http://1.2.3.4:" + unusedPort + "/",
97                                 "https://hostname2.example/": "http://1.2.3.4:" + unusedPort + "/",
98                         },
99                         expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
100                 },
101                 { // cannot listen on any of the (implied) ListenURLs
102                         internalURLs: map[string]string{
103                                 "https://1.2.3.4/": "",
104                                 "https://1.2.3.5/": "",
105                         },
106                         expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
107                 },
108                 { // impossible port number
109                         internalURLs: map[string]string{
110                                 "https://host.example/": "http://0.0.0.0:1234567",
111                         },
112                         expectErrorMatch: `.*:1234567: listen tcp: address 1234567: invalid port`,
113                 },
114                 {
115                         // env var URL not mentioned in config = obey env var, with warning
116                         internalURLs:    map[string]string{"https://hostname1.example/": "http://localhost:8000/"},
117                         envVar:          "https://hostname2.example",
118                         expectListen:    "https://hostname2.example/",
119                         expectInternal:  "https://hostname2.example/",
120                         expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname2.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
121                 },
122                 {
123                         // env var + empty config = obey env var, with warning
124                         envVar:          "https://hostname.example",
125                         expectListen:    "https://hostname.example/",
126                         expectInternal:  "https://hostname.example/",
127                         expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
128                 },
129         } {
130                 c.Logf("trial %d %+v", idx, trial)
131                 os.Setenv("ARVADOS_SERVICE_INTERNAL_URL", trial.envVar)
132                 var logbuf bytes.Buffer
133                 log := ctxlog.New(&logbuf, "text", "info")
134                 services := arvados.Services{Controller: arvados.Service{InternalURLs: map[arvados.URL]arvados.ServiceInstance{}}}
135                 for k, v := range trial.internalURLs {
136                         u, err := url.Parse(k)
137                         c.Assert(err, check.IsNil)
138                         si := arvados.ServiceInstance{}
139                         if v != "" {
140                                 u, err := url.Parse(v)
141                                 c.Assert(err, check.IsNil)
142                                 si.ListenURL = arvados.URL(*u)
143                         }
144                         services.Controller.InternalURLs[arvados.URL(*u)] = si
145                 }
146                 listenURL, internalURL, err := getListenAddr(services, "arvados-controller", log)
147                 if trial.expectLogsMatch != "" {
148                         c.Check(logbuf.String(), check.Matches, trial.expectLogsMatch)
149                 }
150                 if trial.expectErrorMatch != "" {
151                         c.Check(err, check.ErrorMatches, trial.expectErrorMatch)
152                         continue
153                 }
154                 if !c.Check(err, check.IsNil) {
155                         continue
156                 }
157                 c.Check(listenURL.String(), check.Equals, trial.expectListen)
158                 c.Check(internalURL.String(), check.Equals, trial.expectInternal)
159         }
160 }
161
162 func (*Suite) TestCommand(c *check.C) {
163         cf, err := ioutil.TempFile("", "cmd_test.")
164         c.Assert(err, check.IsNil)
165         defer os.Remove(cf.Name())
166         defer cf.Close()
167         fmt.Fprintf(cf, "Clusters:\n zzzzz:\n  SystemRootToken: abcde\n  NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")
168
169         healthCheck := make(chan bool, 1)
170         ctx, cancel := context.WithCancel(context.Background())
171         defer cancel()
172
173         cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
174                 c.Check(ctx.Value(contextKey), check.Equals, "bar")
175                 c.Check(token, check.Equals, "abcde")
176                 return &testHandler{ctx: ctx, healthCheck: healthCheck}
177         })
178         cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
179
180         done := make(chan bool)
181         var stdin, stdout, stderr bytes.Buffer
182
183         go func() {
184                 cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
185                 close(done)
186         }()
187         select {
188         case <-healthCheck:
189         case <-done:
190                 c.Error("command exited without health check")
191         }
192         cancel()
193         c.Check(stdout.String(), check.Equals, "")
194         c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
195 }
196
197 func (*Suite) TestDumpRequests(c *check.C) {
198         defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
199         requestQueueDumpCheckInterval = time.Second / 10
200
201         tmpdir := c.MkDir()
202         cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
203         c.Assert(err, check.IsNil)
204         defer os.Remove(cf.Name())
205         defer cf.Close()
206
207         max := 24
208         fmt.Fprintf(cf, `
209 Clusters:
210  zzzzz:
211   SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
212   ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
213   API:
214    MaxConcurrentRequests: %d
215    MaxQueuedRequests: 0
216   SystemLogs: {RequestQueueDumpDirectory: %q}
217   Services:
218    Controller:
219     ExternalURL: "http://localhost:12345"
220     InternalURLs: {"http://localhost:12345": {}}
221 `, max, tmpdir)
222
223         started := make(chan bool, max+1)
224         hold := make(chan bool)
225         handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
226                 started <- true
227                 <-hold
228         })
229         healthCheck := make(chan bool, 1)
230         ctx, cancel := context.WithCancel(context.Background())
231         defer cancel()
232
233         cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
234                 return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
235         })
236         cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
237
238         exited := make(chan bool)
239         var stdin, stdout, stderr bytes.Buffer
240
241         go func() {
242                 cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
243                 close(exited)
244         }()
245         select {
246         case <-healthCheck:
247         case <-exited:
248                 c.Logf("%s", stderr.String())
249                 c.Error("command exited without health check")
250         }
251         client := http.Client{}
252         deadline := time.Now().Add(time.Second * 2)
253         for i := 0; i < max+1; i++ {
254                 go func() {
255                         resp, err := client.Get("http://localhost:12345/testpath")
256                         for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
257                                 time.Sleep(time.Second / 100)
258                                 resp, err = client.Get("http://localhost:12345/testpath")
259                         }
260                         if c.Check(err, check.IsNil) {
261                                 c.Logf("resp StatusCode %d", resp.StatusCode)
262                         }
263                 }()
264         }
265         for i := 0; i < max; i++ {
266                 select {
267                 case <-started:
268                 case <-time.After(time.Second):
269                         c.Logf("%s", stderr.String())
270                         panic("timed out")
271                 }
272         }
273         for delay := time.Second / 100; ; delay = delay * 2 {
274                 time.Sleep(delay)
275                 j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json")
276                 if os.IsNotExist(err) && deadline.After(time.Now()) {
277                         continue
278                 }
279                 c.Assert(err, check.IsNil)
280                 c.Logf("stderr:\n%s", stderr.String())
281                 c.Logf("json:\n%s", string(j))
282
283                 var loaded []struct{ URL string }
284                 err = json.Unmarshal(j, &loaded)
285                 c.Check(err, check.IsNil)
286                 if len(loaded) < max {
287                         // Dumped when #requests was >90% but <100% of
288                         // limit. If we stop now, we won't be able to
289                         // confirm (below) that management endpoints
290                         // are still accessible when normal requests
291                         // are at 100%.
292                         c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
293                         continue
294                 }
295                 c.Check(loaded, check.HasLen, max)
296                 c.Check(loaded[0].URL, check.Equals, "/testpath")
297                 break
298         }
299
300         for _, path := range []string{"/_inspect/requests", "/metrics"} {
301                 req, err := http.NewRequest("GET", "http://localhost:12345"+path, nil)
302                 c.Assert(err, check.IsNil)
303                 req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
304                 resp, err := client.Do(req)
305                 if !c.Check(err, check.IsNil) {
306                         break
307                 }
308                 c.Logf("got response for %s", path)
309                 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
310                 buf, err := ioutil.ReadAll(resp.Body)
311                 c.Check(err, check.IsNil)
312                 switch path {
313                 case "/metrics":
314                         c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`)
315                 case "/_inspect/requests":
316                         c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
317                 default:
318                         c.Error("oops, testing bug")
319                 }
320         }
321         close(hold)
322         cancel()
323
324 }
325
326 func (*Suite) TestTLS(c *check.C) {
327         cwd, err := os.Getwd()
328         c.Assert(err, check.IsNil)
329
330         stdin := bytes.NewBufferString(`
331 Clusters:
332  zzzzz:
333   SystemRootToken: abcde
334   Services:
335    Controller:
336     ExternalURL: "https://localhost:12345"
337     InternalURLs: {"https://localhost:12345": {}}
338   TLS:
339    Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
340    Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
341 `)
342
343         called := make(chan bool)
344         cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
345                 return &testHandler{handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
346                         w.Write([]byte("ok"))
347                         close(called)
348                 })}
349         })
350
351         exited := make(chan bool)
352         var stdout, stderr bytes.Buffer
353         go func() {
354                 cmd.RunCommand("arvados-controller", []string{"-config", "-"}, stdin, &stdout, &stderr)
355                 close(exited)
356         }()
357         got := make(chan bool)
358         go func() {
359                 defer close(got)
360                 client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
361                 for range time.NewTicker(time.Millisecond).C {
362                         resp, err := client.Get("https://localhost:12345")
363                         if err != nil {
364                                 c.Log(err)
365                                 continue
366                         }
367                         body, err := ioutil.ReadAll(resp.Body)
368                         c.Check(err, check.IsNil)
369                         c.Logf("status %d, body %s", resp.StatusCode, string(body))
370                         c.Check(resp.StatusCode, check.Equals, http.StatusOK)
371                         break
372                 }
373         }()
374         select {
375         case <-called:
376         case <-exited:
377                 c.Error("command exited without calling handler")
378         case <-time.After(time.Second):
379                 c.Error("timed out")
380         }
381         select {
382         case <-got:
383         case <-exited:
384                 c.Error("command exited before client received response")
385         case <-time.After(time.Second):
386                 c.Error("timed out")
387         }
388         c.Log(stderr.String())
389 }
390
391 type testHandler struct {
392         ctx         context.Context
393         handler     http.Handler
394         healthCheck chan bool
395 }
396
397 func (th *testHandler) Done() <-chan struct{}                            { return nil }
398 func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
399 func (th *testHandler) CheckHealth() error {
400         ctxlog.FromContext(th.ctx).Info("CheckHealth called")
401         select {
402         case th.healthCheck <- true:
403         default:
404         }
405         return nil
406 }