import (
"bytes"
"context"
+ "crypto/tls"
+ "encoding/json"
"fmt"
"io/ioutil"
+ "net"
"net/http"
+ "net/url"
"os"
+ "strings"
"testing"
+ "time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&Suite{})
type Suite struct{}
+type key int
+
+const (
+ contextKey key = iota
+)
+
+func (*Suite) TestGetListenAddress(c *check.C) {
+ // Find an available port on the testing host, so the test
+ // cases don't get confused by "already in use" errors.
+ listener, err := net.Listen("tcp", ":")
+ c.Assert(err, check.IsNil)
+ _, unusedPort, err := net.SplitHostPort(listener.Addr().String())
+ c.Assert(err, check.IsNil)
+ listener.Close()
+
+ defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
+ for idx, trial := range []struct {
+ // internalURL => listenURL, both with trailing "/"
+ // because config loader always adds it
+ internalURLs map[string]string
+ envVar string
+ expectErrorMatch string
+ expectLogsMatch string
+ expectListen string
+ expectInternal string
+ }{
+ {
+ internalURLs: map[string]string{"http://localhost:" + unusedPort + "/": ""},
+ expectListen: "http://localhost:" + unusedPort + "/",
+ expectInternal: "http://localhost:" + unusedPort + "/",
+ },
+ { // implicit port 80 in InternalURLs
+ internalURLs: map[string]string{"http://localhost/": ""},
+ expectErrorMatch: `.*:80: bind: permission denied`,
+ },
+ { // implicit port 443 in InternalURLs
+ internalURLs: map[string]string{"https://host.example/": "http://localhost:" + unusedPort + "/"},
+ expectListen: "http://localhost:" + unusedPort + "/",
+ expectInternal: "https://host.example/",
+ },
+ { // implicit port 443 in ListenURL
+ internalURLs: map[string]string{"wss://host.example/": "wss://localhost/"},
+ expectErrorMatch: `.*:443: bind: permission denied`,
+ },
+ {
+ internalURLs: map[string]string{"https://hostname.example/": "http://localhost:8000/"},
+ expectListen: "http://localhost:8000/",
+ expectInternal: "https://hostname.example/",
+ },
+ {
+ internalURLs: map[string]string{
+ "https://hostname1.example/": "http://localhost:12435/",
+ "https://hostname2.example/": "http://localhost:" + unusedPort + "/",
+ },
+ envVar: "https://hostname2.example", // note this works despite missing trailing "/"
+ expectListen: "http://localhost:" + unusedPort + "/",
+ expectInternal: "https://hostname2.example/",
+ },
+ { // cannot listen on any of the ListenURLs
+ internalURLs: map[string]string{
+ "https://hostname1.example/": "http://1.2.3.4:" + unusedPort + "/",
+ "https://hostname2.example/": "http://1.2.3.4:" + unusedPort + "/",
+ },
+ expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
+ },
+ { // cannot listen on any of the (implied) ListenURLs
+ internalURLs: map[string]string{
+ "https://1.2.3.4/": "",
+ "https://1.2.3.5/": "",
+ },
+ expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
+ },
+ { // impossible port number
+ internalURLs: map[string]string{
+ "https://host.example/": "http://0.0.0.0:1234567",
+ },
+ expectErrorMatch: `.*:1234567: listen tcp: address 1234567: invalid port`,
+ },
+ {
+ // env var URL not mentioned in config = obey env var, with warning
+ internalURLs: map[string]string{"https://hostname1.example/": "http://localhost:8000/"},
+ envVar: "https://hostname2.example",
+ expectListen: "https://hostname2.example/",
+ expectInternal: "https://hostname2.example/",
+ expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname2.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
+ },
+ {
+ // env var + empty config = obey env var, with warning
+ envVar: "https://hostname.example",
+ expectListen: "https://hostname.example/",
+ expectInternal: "https://hostname.example/",
+ expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
+ },
+ } {
+ c.Logf("trial %d %+v", idx, trial)
+ os.Setenv("ARVADOS_SERVICE_INTERNAL_URL", trial.envVar)
+ var logbuf bytes.Buffer
+ log := ctxlog.New(&logbuf, "text", "info")
+ services := arvados.Services{Controller: arvados.Service{InternalURLs: map[arvados.URL]arvados.ServiceInstance{}}}
+ for k, v := range trial.internalURLs {
+ u, err := url.Parse(k)
+ c.Assert(err, check.IsNil)
+ si := arvados.ServiceInstance{}
+ if v != "" {
+ u, err := url.Parse(v)
+ c.Assert(err, check.IsNil)
+ si.ListenURL = arvados.URL(*u)
+ }
+ services.Controller.InternalURLs[arvados.URL(*u)] = si
+ }
+ listenURL, internalURL, err := getListenAddr(services, "arvados-controller", log)
+ if trial.expectLogsMatch != "" {
+ c.Check(logbuf.String(), check.Matches, trial.expectLogsMatch)
+ }
+ if trial.expectErrorMatch != "" {
+ c.Check(err, check.ErrorMatches, trial.expectErrorMatch)
+ continue
+ }
+ if !c.Check(err, check.IsNil) {
+ continue
+ }
+ c.Check(listenURL.String(), check.Equals, trial.expectListen)
+ c.Check(internalURL.String(), check.Equals, trial.expectInternal)
+ }
+}
func (*Suite) TestCommand(c *check.C) {
cf, err := ioutil.TempFile("", "cmd_test.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string) Handler {
- c.Check(ctx.Value("foo"), check.Equals, "bar")
+ cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+ c.Check(ctx.Value(contextKey), check.Equals, "bar")
c.Check(token, check.Equals, "abcde")
return &testHandler{ctx: ctx, healthCheck: healthCheck}
})
- cmd.(*command).ctx = context.WithValue(ctx, "foo", "bar")
+ cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
done := make(chan bool)
var stdin, stdout, stderr bytes.Buffer
c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
}
+func (*Suite) TestDumpRequests(c *check.C) {
+ defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
+ requestQueueDumpCheckInterval = time.Second / 10
+
+ tmpdir := c.MkDir()
+ cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(cf.Name())
+ defer cf.Close()
+
+ max := 24
+ fmt.Fprintf(cf, `
+Clusters:
+ zzzzz:
+ SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+ ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
+ API:
+ MaxConcurrentRequests: %d
+ MaxQueuedRequests: 0
+ SystemLogs: {RequestQueueDumpDirectory: %q}
+ Services:
+ Controller:
+ ExternalURL: "http://localhost:12345"
+ InternalURLs: {"http://localhost:12345": {}}
+`, max, tmpdir)
+
+ started := make(chan bool, max+1)
+ hold := make(chan bool)
+ handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ started <- true
+ <-hold
+ })
+ healthCheck := make(chan bool, 1)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+ return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
+ })
+ cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
+
+ exited := make(chan bool)
+ var stdin, stdout, stderr bytes.Buffer
+
+ go func() {
+ cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
+ close(exited)
+ }()
+ select {
+ case <-healthCheck:
+ case <-exited:
+ c.Logf("%s", stderr.String())
+ c.Error("command exited without health check")
+ }
+ client := http.Client{}
+ deadline := time.Now().Add(time.Second * 2)
+ for i := 0; i < max+1; i++ {
+ go func() {
+ resp, err := client.Get("http://localhost:12345/testpath")
+ for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
+ time.Sleep(time.Second / 100)
+ resp, err = client.Get("http://localhost:12345/testpath")
+ }
+ if c.Check(err, check.IsNil) {
+ c.Logf("resp StatusCode %d", resp.StatusCode)
+ }
+ }()
+ }
+ for i := 0; i < max; i++ {
+ select {
+ case <-started:
+ case <-time.After(time.Second):
+ c.Logf("%s", stderr.String())
+ panic("timed out")
+ }
+ }
+ for delay := time.Second / 100; ; delay = delay * 2 {
+ time.Sleep(delay)
+ j, err := os.ReadFile(tmpdir + "/arvados-controller-requests.json")
+ if os.IsNotExist(err) && deadline.After(time.Now()) {
+ continue
+ }
+ c.Assert(err, check.IsNil)
+ c.Logf("stderr:\n%s", stderr.String())
+ c.Logf("json:\n%s", string(j))
+
+ var loaded []struct{ URL string }
+ err = json.Unmarshal(j, &loaded)
+ c.Check(err, check.IsNil)
+ if len(loaded) < max {
+ // Dumped when #requests was >90% but <100% of
+ // limit. If we stop now, we won't be able to
+ // confirm (below) that management endpoints
+ // are still accessible when normal requests
+ // are at 100%.
+ c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
+ continue
+ }
+ c.Check(loaded, check.HasLen, max)
+ c.Check(loaded[0].URL, check.Equals, "/testpath")
+ break
+ }
+
+ for _, path := range []string{"/_inspect/requests", "/metrics"} {
+ req, err := http.NewRequest("GET", "http://localhost:12345"+path, nil)
+ c.Assert(err, check.IsNil)
+ req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
+ resp, err := client.Do(req)
+ if !c.Check(err, check.IsNil) {
+ break
+ }
+ c.Logf("got response for %s", path)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, check.IsNil)
+ switch path {
+ case "/metrics":
+ c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`)
+ case "/_inspect/requests":
+ c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
+ default:
+ c.Error("oops, testing bug")
+ }
+ }
+ close(hold)
+ cancel()
+
+}
+
+func (*Suite) TestTLS(c *check.C) {
+ cwd, err := os.Getwd()
+ c.Assert(err, check.IsNil)
+
+ stdin := bytes.NewBufferString(`
+Clusters:
+ zzzzz:
+ SystemRootToken: abcde
+ Services:
+ Controller:
+ ExternalURL: "https://localhost:12345"
+ InternalURLs: {"https://localhost:12345": {}}
+ TLS:
+ Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
+ Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
+`)
+
+ called := make(chan bool)
+ cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
+ return &testHandler{handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Write([]byte("ok"))
+ close(called)
+ })}
+ })
+
+ exited := make(chan bool)
+ var stdout, stderr bytes.Buffer
+ go func() {
+ cmd.RunCommand("arvados-controller", []string{"-config", "-"}, stdin, &stdout, &stderr)
+ close(exited)
+ }()
+ got := make(chan bool)
+ go func() {
+ defer close(got)
+ client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
+ for range time.NewTicker(time.Millisecond).C {
+ resp, err := client.Get("https://localhost:12345")
+ if err != nil {
+ c.Log(err)
+ continue
+ }
+ body, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, check.IsNil)
+ c.Logf("status %d, body %s", resp.StatusCode, string(body))
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ break
+ }
+ }()
+ select {
+ case <-called:
+ case <-exited:
+ c.Error("command exited without calling handler")
+ case <-time.After(time.Second):
+ c.Error("timed out")
+ }
+ select {
+ case <-got:
+ case <-exited:
+ c.Error("command exited before client received response")
+ case <-time.After(time.Second):
+ c.Error("timed out")
+ }
+ c.Log(stderr.String())
+}
+
type testHandler struct {
ctx context.Context
+ handler http.Handler
healthCheck chan bool
}
-func (th *testHandler) ServeHTTP(http.ResponseWriter, *http.Request) {}
+func (th *testHandler) Done() <-chan struct{} { return nil }
+func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
func (th *testHandler) CheckHealth() error {
ctxlog.FromContext(th.ctx).Info("CheckHealth called")
select {