// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0

// package service provides a cmd.Handler that brings up a system service.
package service

import (
	"bytes"
	"context"
	"crypto/tls"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net"
	"net/http"
	"net/url"
	"os"
	"strings"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"git.arvados.org/arvados.git/sdk/go/arvados"
	"git.arvados.org/arvados.git/sdk/go/ctxlog"
	"github.com/prometheus/client_golang/prometheus"
	check "gopkg.in/check.v1"
)

func Test(t *testing.T) {
	check.TestingT(t)
}

var _ = check.Suite(&Suite{})

type Suite struct{}
type key int

const (
	contextKey key = iota
)

func unusedPort(c *check.C) string {
	// Find an available port on the testing host, so the test
	// cases don't get confused by "already in use" errors.
	listener, err := net.Listen("tcp", ":")
	c.Assert(err, check.IsNil)
	listener.Close()
	_, port, err := net.SplitHostPort(listener.Addr().String())
	c.Assert(err, check.IsNil)
	return port
}

func (*Suite) TestGetListenAddress(c *check.C) {
	port := unusedPort(c)
	defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
	for idx, trial := range []struct {
		// internalURL => listenURL, both with trailing "/"
		// because config loader always adds it
		internalURLs     map[string]string
		envVar           string
		expectErrorMatch string
		expectLogsMatch  string
		expectListen     string
		expectInternal   string
	}{
		{
			internalURLs:   map[string]string{"http://localhost:" + port + "/": ""},
			expectListen:   "http://localhost:" + port + "/",
			expectInternal: "http://localhost:" + port + "/",
		},
		{ // implicit port 80 in InternalURLs
			internalURLs:     map[string]string{"http://localhost/": ""},
			expectErrorMatch: `.*:80: bind: permission denied`,
		},
		{ // implicit port 443 in InternalURLs
			internalURLs:   map[string]string{"https://host.example/": "http://localhost:" + port + "/"},
			expectListen:   "http://localhost:" + port + "/",
			expectInternal: "https://host.example/",
		},
		{ // implicit port 443 in ListenURL
			internalURLs:     map[string]string{"wss://host.example/": "wss://localhost/"},
			expectErrorMatch: `.*:443: bind: permission denied`,
		},
		{
			internalURLs:   map[string]string{"https://hostname.example/": "http://localhost:8000/"},
			expectListen:   "http://localhost:8000/",
			expectInternal: "https://hostname.example/",
		},
		{
			internalURLs: map[string]string{
				"https://hostname1.example/": "http://localhost:12435/",
				"https://hostname2.example/": "http://localhost:" + port + "/",
			},
			envVar:         "https://hostname2.example", // note this works despite missing trailing "/"
			expectListen:   "http://localhost:" + port + "/",
			expectInternal: "https://hostname2.example/",
		},
		{ // cannot listen on any of the ListenURLs
			internalURLs: map[string]string{
				"https://hostname1.example/": "http://1.2.3.4:" + port + "/",
				"https://hostname2.example/": "http://1.2.3.4:" + port + "/",
			},
			expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
		},
		{ // cannot listen on any of the (implied) ListenURLs
			internalURLs: map[string]string{
				"https://1.2.3.4/": "",
				"https://1.2.3.5/": "",
			},
			expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
		},
		{ // impossible port number
			internalURLs: map[string]string{
				"https://host.example/": "http://0.0.0.0:1234567",
			},
			expectErrorMatch: `.*:1234567: listen tcp: address 1234567: invalid port`,
		},
		{
			// env var URL not mentioned in config = obey env var, with warning
			internalURLs:    map[string]string{"https://hostname1.example/": "http://localhost:8000/"},
			envVar:          "https://hostname2.example",
			expectListen:    "https://hostname2.example/",
			expectInternal:  "https://hostname2.example/",
			expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname2.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
		},
		{
			// env var + empty config = obey env var, with warning
			envVar:          "https://hostname.example",
			expectListen:    "https://hostname.example/",
			expectInternal:  "https://hostname.example/",
			expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
		},
	} {
		c.Logf("trial %d %+v", idx, trial)
		os.Setenv("ARVADOS_SERVICE_INTERNAL_URL", trial.envVar)
		var logbuf bytes.Buffer
		log := ctxlog.New(&logbuf, "text", "info")
		services := arvados.Services{Controller: arvados.Service{InternalURLs: map[arvados.URL]arvados.ServiceInstance{}}}
		for k, v := range trial.internalURLs {
			u, err := url.Parse(k)
			c.Assert(err, check.IsNil)
			si := arvados.ServiceInstance{}
			if v != "" {
				u, err := url.Parse(v)
				c.Assert(err, check.IsNil)
				si.ListenURL = arvados.URL(*u)
			}
			services.Controller.InternalURLs[arvados.URL(*u)] = si
		}
		listenURL, internalURL, err := getListenAddr(services, "arvados-controller", log)
		if trial.expectLogsMatch != "" {
			c.Check(logbuf.String(), check.Matches, trial.expectLogsMatch)
		}
		if trial.expectErrorMatch != "" {
			c.Check(err, check.ErrorMatches, trial.expectErrorMatch)
			continue
		}
		if !c.Check(err, check.IsNil) {
			continue
		}
		c.Check(listenURL.String(), check.Equals, trial.expectListen)
		c.Check(internalURL.String(), check.Equals, trial.expectInternal)
	}
}

func (*Suite) TestCommand(c *check.C) {
	cf, err := ioutil.TempFile("", "cmd_test.")
	c.Assert(err, check.IsNil)
	defer os.Remove(cf.Name())
	defer cf.Close()
	fmt.Fprintf(cf, "Clusters:\n zzzzz:\n  SystemRootToken: abcde\n  NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")

	healthCheck := make(chan bool, 1)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
		c.Check(ctx.Value(contextKey), check.Equals, "bar")
		c.Check(token, check.Equals, "abcde")
		return &testHandler{ctx: ctx, healthCheck: healthCheck}
	})
	cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")

	done := make(chan bool)
	var stdin, stdout, stderr bytes.Buffer

	go func() {
		cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
		close(done)
	}()
	select {
	case <-healthCheck:
	case <-done:
		c.Error("command exited without health check")
	}
	cancel()
	c.Check(stdout.String(), check.Equals, "")
	c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
}

func (s *Suite) TestTunnelPathRegexp(c *check.C) {
	c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
	c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
	c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
	c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
	c.Check(reTunnelPath.MatchString(`/blah/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, false)
	c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa`), check.Equals, false)
}

func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) {
	s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
}

func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) {
	s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
}

func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
	defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
	requestQueueDumpCheckInterval = time.Second / 10

	port := unusedPort(c)
	tmpdir := c.MkDir()
	cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
	c.Assert(err, check.IsNil)
	defer os.Remove(cf.Name())
	defer cf.Close()

	max := 24
	maxTunnels := 30
	fmt.Fprintf(cf, `
Clusters:
 zzzzz:
  SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
  ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
  API:
   `+maxReqsConfigKey+`: %d
   MaxQueuedRequests: 1
   MaxGatewayTunnels: %d
  SystemLogs: {RequestQueueDumpDirectory: %q}
  Services:
   Controller:
    ExternalURL: "http://localhost:`+port+`"
    InternalURLs: {"http://localhost:`+port+`": {}}
   WebDAV:
    ExternalURL: "http://localhost:`+port+`"
    InternalURLs: {"http://localhost:`+port+`": {}}
`, max, maxTunnels, tmpdir)
	cf.Close()

	started := make(chan bool, max+1)
	hold := make(chan bool)
	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if strings.Contains(r.URL.Path, "/ssh") || strings.Contains(r.URL.Path, "/gateway_tunnel") {
			<-hold
		} else {
			started <- true
			<-hold
		}
	})
	healthCheck := make(chan bool, 1)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	cmd := Command(serviceName, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
		return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
	})
	cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")

	exited := make(chan bool)
	var stdin, stdout, stderr bytes.Buffer

	go func() {
		cmd.RunCommand(string(serviceName), []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
		close(exited)
	}()
	select {
	case <-healthCheck:
	case <-exited:
		c.Logf("%s", stderr.String())
		c.Error("command exited without health check")
	}
	client := http.Client{}
	deadline := time.Now().Add(time.Second * 2)
	var activeReqs sync.WaitGroup

	// Start some API reqs
	var apiResp200, apiResp503 int64
	for i := 0; i < max+1; i++ {
		activeReqs.Add(1)
		go func() {
			defer activeReqs.Done()
			target := "http://localhost:" + port + "/testpath"
			resp, err := client.Get(target)
			for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
				time.Sleep(time.Second / 100)
				resp, err = client.Get(target)
			}
			if c.Check(err, check.IsNil) {
				if resp.StatusCode == http.StatusOK {
					atomic.AddInt64(&apiResp200, 1)
				} else if resp.StatusCode == http.StatusServiceUnavailable {
					atomic.AddInt64(&apiResp503, 1)
				}
			}
		}()
	}

	// Start some gateway tunnel reqs that don't count toward our
	// API req limit
	extraTunnelReqs := 20
	var tunnelResp200, tunnelResp503 int64
	var paths = []string{
		"/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
		"/" + strings.Replace(arvados.EndpointContainerSSHCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
		"/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
		"/" + strings.Replace(arvados.EndpointContainerGatewayTunnelCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
	}
	for i := 0; i < maxTunnels+extraTunnelReqs; i++ {
		i := i
		activeReqs.Add(1)
		go func() {
			defer activeReqs.Done()
			target := "http://localhost:" + port + paths[i%len(paths)]
			resp, err := client.Post(target, "application/octet-stream", nil)
			for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
				time.Sleep(time.Second / 100)
				resp, err = client.Post(target, "application/octet-stream", nil)
			}
			if c.Check(err, check.IsNil) {
				if resp.StatusCode == http.StatusOK {
					atomic.AddInt64(&tunnelResp200, 1)
				} else if resp.StatusCode == http.StatusServiceUnavailable {
					atomic.AddInt64(&tunnelResp503, 1)
				} else {
					c.Errorf("tunnel response code %d", resp.StatusCode)
				}
			}
		}()
	}
	for i := 0; i < max; i++ {
		select {
		case <-started:
		case <-time.After(time.Second):
			c.Logf("%s", stderr.String())
			c.Logf("apiResp200 %d", apiResp200)
			c.Logf("apiResp503 %d", apiResp503)
			c.Logf("tunnelResp200 %d", tunnelResp200)
			c.Logf("tunnelResp503 %d", tunnelResp503)
			c.Fatal("timed out")
		}
	}
	for delay := time.Second / 100; ; delay = delay * 2 {
		time.Sleep(delay)
		j, err := os.ReadFile(tmpdir + "/" + string(serviceName) + "-requests.json")
		if os.IsNotExist(err) && deadline.After(time.Now()) {
			continue
		}
		c.Assert(err, check.IsNil)
		c.Logf("stderr:\n%s", stderr.String())
		c.Logf("json:\n%s", string(j))

		var loaded []struct{ URL string }
		err = json.Unmarshal(j, &loaded)
		c.Check(err, check.IsNil)

		for i := 0; i < len(loaded); i++ {
			if strings.Contains(loaded[i].URL, "/ssh") || strings.Contains(loaded[i].URL, "/gateway_tunnel") {
				// Filter out a gateway tunnel req
				// that doesn't count toward our API
				// req limit
				if i < len(loaded)-1 {
					copy(loaded[i:], loaded[i+1:])
					i--
				}
				loaded = loaded[:len(loaded)-1]
			}
		}

		if len(loaded) < max {
			// Dumped when #requests was >90% but <100% of
			// limit. If we stop now, we won't be able to
			// confirm (below) that management endpoints
			// are still accessible when normal requests
			// are at 100%.
			c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
			continue
		}
		c.Check(loaded, check.HasLen, max+1)
		c.Check(loaded[0].URL, check.Equals, "/testpath")
		break
	}

	for _, path := range []string{"/_inspect/requests", "/metrics"} {
		req, err := http.NewRequest("GET", "http://localhost:"+port+""+path, nil)
		c.Assert(err, check.IsNil)
		req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
		resp, err := client.Do(req)
		if !c.Check(err, check.IsNil) {
			break
		}
		c.Logf("got response for %s", path)
		c.Check(resp.StatusCode, check.Equals, http.StatusOK)
		buf, err := ioutil.ReadAll(resp.Body)
		c.Check(err, check.IsNil)
		switch path {
		case "/metrics":
			c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`)
			c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`)
		case "/_inspect/requests":
			c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
		default:
			c.Error("oops, testing bug")
		}
	}
	close(hold)
	activeReqs.Wait()
	c.Check(int(apiResp200), check.Equals, max+1)
	c.Check(int(apiResp503), check.Equals, 0)
	c.Check(int(tunnelResp200), check.Equals, maxTunnels)
	c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs)
	cancel()
}

func (*Suite) TestTLS(c *check.C) {
	port := unusedPort(c)
	cwd, err := os.Getwd()
	c.Assert(err, check.IsNil)

	stdin := bytes.NewBufferString(`
Clusters:
 zzzzz:
  SystemRootToken: abcde
  Services:
   Controller:
    ExternalURL: "https://localhost:` + port + `"
    InternalURLs: {"https://localhost:` + port + `": {}}
  TLS:
   Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
   Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
`)

	called := make(chan bool)
	cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
		return &testHandler{handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.Write([]byte("ok"))
			close(called)
		})}
	})

	exited := make(chan bool)
	var stdout, stderr bytes.Buffer
	go func() {
		cmd.RunCommand("arvados-controller", []string{"-config", "-"}, stdin, &stdout, &stderr)
		close(exited)
	}()
	got := make(chan bool)
	go func() {
		defer close(got)
		client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
		for range time.NewTicker(time.Millisecond).C {
			resp, err := client.Get("https://localhost:" + port)
			if err != nil {
				c.Log(err)
				continue
			}
			body, err := ioutil.ReadAll(resp.Body)
			c.Check(err, check.IsNil)
			c.Logf("status %d, body %s", resp.StatusCode, string(body))
			c.Check(resp.StatusCode, check.Equals, http.StatusOK)
			break
		}
	}()
	select {
	case <-called:
	case <-exited:
		c.Error("command exited without calling handler")
	case <-time.After(time.Second):
		c.Error("timed out")
	}
	select {
	case <-got:
	case <-exited:
		c.Error("command exited before client received response")
	case <-time.After(time.Second):
		c.Error("timed out")
	}
	c.Log(stderr.String())
}

type testHandler struct {
	ctx         context.Context
	handler     http.Handler
	healthCheck chan bool
}

func (th *testHandler) Done() <-chan struct{}                            { return nil }
func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
func (th *testHandler) CheckHealth() error {
	ctxlog.FromContext(th.ctx).Info("CheckHealth called")
	select {
	case th.healthCheck <- true:
	default:
	}
	return nil
}