// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0

package main

import (
	"bytes"
	"context"
	"crypto/tls"
	"flag"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"os"
	"os/exec"
	"path/filepath"
	"sort"
	"strings"
	"syscall"
	"time"

	"git.arvados.org/arvados.git/lib/cmd"
	"git.arvados.org/arvados.git/lib/controller/rpc"
	"git.arvados.org/arvados.git/sdk/go/arvados"
)

// logsCommand displays logs from a running container.
type logsCommand struct {
	ac *arvados.Client
}

func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
	f := flag.NewFlagSet(prog, flag.ContinueOnError)
	follow := f.Bool("f", false, "follow: poll for new data until the container finishes")
	pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
	if ok, code := cmd.ParseFlags(f, prog, args, "container-request-uuid", stderr); !ok {
		return code
	} else if f.NArg() < 1 {
		fmt.Fprintf(stderr, "missing required argument: container-request-uuid (try -help)\n")
		return 2
	} else if f.NArg() > 1 {
		fmt.Fprintf(stderr, "encountered extra arguments after container-request-uuid (try -help)\n")
		return 2
	}
	target := f.Args()[0]

	lc.ac = arvados.NewClientFromEnv()
	lc.ac.Client = &http.Client{}
	if lc.ac.Insecure {
		lc.ac.Client.Transport = &http.Transport{
			TLSClientConfig: &tls.Config{
				InsecureSkipVerify: true}}
	}

	err := lc.tail(target, stdout, stderr, *follow, *pollInterval)
	if err != nil {
		fmt.Fprintln(stderr, err)
		return 1
	}
	return 0
}

func (lc *logsCommand) tail(crUUID string, stdout, stderr io.Writer, follow bool, pollInterval time.Duration) error {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	rpcconn, err := rpcFromEnv()
	if err != nil {
		return err
	}
	err = lc.checkAPISupport(ctx, crUUID)
	if err != nil {
		return err
	}

	var (
		// files to display
		watching = []string{"crunch-run.txt", "stderr.txt"}
		// fnm => file offset of next byte to display
		mark = map[string]int64{}
		// fnm => current size of file reported by api
		size = map[string]int64{}
		// has anything worked? (if so, retry after errors)
		anySuccess = false
		// container UUID whose logs we are displaying
		displayingUUID = ""
		// container UUID we last showed in a "connected,
		// polling" message
		reportedUUID = ""
	)

	cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
	if err != nil {
		return fmt.Errorf("error retrieving %s: %w", crUUID, err)
	}
	displayingUUID = cr.ContainerUUID
poll:
	for delay := pollInterval; ; time.Sleep(delay) {
		if cr.ContainerUUID == "" {
			return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
		}
		if delay < pollInterval {
			delay = pollInterval
		}
		// When .../container_requests/{uuid}/log_events is
		// implemented, we'll wait here for the next
		// server-sent event to tell us some updated file
		// sizes. For now, we poll.
		for _, fnm := range watching {
			currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
			if err != nil {
				if !anySuccess {
					return err
				}
				fmt.Fprintln(stderr, err)
				delay = pollInterval
				continue poll
			}
			if reportedUUID != displayingUUID {
				fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
				reportedUUID = displayingUUID
			}
			size[fnm] = currentsize
			if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
				mark[fnm] = currentsize - 10000
			} else if !seen {
				mark[fnm] = 0
			} else if currentsize < oldsize {
				// Log collection must have been
				// emptied and reset.
				fmt.Fprintln(stderr, "--- log restarted ---")
				mark = map[string]int64{}
				delay = pollInterval
				continue poll
			}
		}
		newData := map[string]*bytes.Buffer{}
		for _, fnm := range watching {
			if size[fnm] > mark[fnm] {
				newData[fnm] = &bytes.Buffer{}
				_, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
				if err != nil {
					fmt.Fprintln(stderr, err)
				}
				if n > 0 {
					mark[fnm] += n
					anySuccess = true
				}
			}
		}
		checkState := lc.display(stdout, stderr, watching, newData)
		if displayingUUID != cr.ContainerUUID {
			// A different container had already been
			// assigned when we started fetching the
			// latest batch of logs. We can now safely
			// start displaying logs from the new
			// container, without missing any of the
			// previous container's logs.
			displayingUUID = cr.ContainerUUID
			delay = 0
			continue
		} else if cr.State == arvados.ContainerRequestStateFinal || !follow {
			break
		} else if len(newData) > 0 {
			delay = pollInterval
		} else {
			delay = delay * 2
			if delay > pollInterval*5 {
				delay = pollInterval * 5
			}
			checkState = true
		}
		if checkState {
			cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
			if err != nil {
				if !anySuccess {
					return fmt.Errorf("error retrieving %s: %w", crUUID, err)
				}
				fmt.Fprintln(stderr, err)
				delay = pollInterval
				continue
			}
		}
	}
	return nil
}

func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
	u := url.URL{
		Scheme: "https",
		Host:   lc.ac.APIHost,
		Path:   "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
	}
	return u.String()
}

// Check whether the API is new enough to support the
// .../container_requests/{uuid}/log/ endpoint.
//
// Older versions return 200 for an OPTIONS request at the .../log/
// API endpoint, but the response header does not have a "Dav" header.
//
// Note an error response with no "Dav" header is not taken to
// indicate lack of API support. It may come from a new server that
// has a configuration or networking problem.
func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
	defer cancel()
	req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
	if err != nil {
		return err
	}
	req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
	resp, err := lc.ac.Client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
	if resp.StatusCode == http.StatusOK && resp.Header.Get("Dav") == "" {
		return fmt.Errorf("server does not support container logs API (OPTIONS request returned HTTP %s, Dav: %q)", resp.Status, resp.Header.Get("Dav"))
	}
	return nil
}

// Retrieve specified byte range (e.g., "12-34", "1234-") from given
// fnm and write to out.
//
// If range is empty ("-0"), out can be nil.
//
// Return values are current file size, bytes copied, error.
//
// If the file does not exist, return values are 0, 0, nil.
func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
	defer cancel()
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
	if err != nil {
		return 0, 0, err
	}
	req.Header.Set("Range", "bytes="+byterange)
	req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
	resp, err := lc.ac.Client.Do(req)
	if err != nil {
		return 0, 0, err
	}
	defer resp.Body.Close()
	if resp.StatusCode == http.StatusNotFound {
		return 0, 0, nil
	}
	if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
		body, _ := io.ReadAll(io.LimitReader(resp.Body, 10000))
		return 0, 0, fmt.Errorf("error getting %s: HTTP %s -- %s", fnm, resp.Status, bytes.TrimSuffix(body, []byte{'\n'}))
	}
	var rstart, rend, rsize int64
	_, err = fmt.Sscanf(resp.Header.Get("Content-Range"), "bytes %d-%d/%d", &rstart, &rend, &rsize)
	if err != nil {
		return 0, 0, fmt.Errorf("error parsing Content-Range header %q: %s", resp.Header.Get("Content-Range"), err)
	}
	if out == nil {
		return rsize, 0, nil
	}
	n, err := io.Copy(out, resp.Body)
	return rsize, n, err
}

// display some log data, formatted as desired (prefixing each line
// with a tag indicating which file it came from, etc.).
//
// Return value is true if the log data contained a hint that it's a
// good time to check whether the container is finished so we can
// exit.
func (lc *logsCommand) display(out, stderr io.Writer, watching []string, received map[string]*bytes.Buffer) bool {
	checkState := false
	var sorted []string
	for _, fnm := range watching {
		buf := received[fnm]
		if buf == nil || buf.Len() == 0 {
			continue
		}
		for _, line := range bytes.Split(bytes.TrimSuffix(buf.Bytes(), []byte{'\n'}), []byte{'\n'}) {
			sorted = append(sorted, fmt.Sprintf("%-14s %s\n", fnm, line))
			if fnm == "crunch-run.txt" {
				checkState = checkState ||
					bytes.HasSuffix(line, []byte("Complete")) ||
					bytes.HasSuffix(line, []byte("Cancelled")) ||
					bytes.HasSuffix(line, []byte("Queued"))
			}
		}
	}
	sort.Slice(sorted, func(i, j int) bool {
		return sorted[i][15:] < sorted[j][15:]
	})
	for _, s := range sorted {
		_, err := fmt.Fprint(out, s)
		if err != nil {
			fmt.Fprintln(stderr, err)
		}
	}
	return checkState
}

// shellCommand connects the terminal to an interactive shell on a
// running container.
type shellCommand struct{}

func (shellCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
	f := flag.NewFlagSet(prog, flag.ContinueOnError)
	detachKeys := f.String("detach-keys", "ctrl-],ctrl-]", "set detach key sequence, as in docker-attach(1)")
	if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid [ssh-options] [remote-command [args...]]", stderr); !ok {
		return code
	} else if f.NArg() < 1 {
		fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
		return 2
	}
	target := f.Args()[0]
	if !strings.Contains(target, "@") {
		target = "root@" + target
	}
	sshargs := f.Args()[1:]

	// Try setting up a tunnel, and exit right away if it
	// fails. This tunnel won't get used -- we'll set up a new
	// tunnel when running as SSH client's ProxyCommand child --
	// but in most cases where the real tunnel setup would fail,
	// we catch the problem earlier here. This makes it less
	// likely that an error message about tunnel setup will get
	// hidden behind noisy errors from SSH client like this:
	//
	// [useful tunnel setup error message here]
	// kex_exchange_identification: Connection closed by remote host
	// Connection closed by UNKNOWN port 65535
	// exit status 255
	//
	// In case our target is a container request, the probe also
	// resolves it to a container, so we don't connect to two
	// different containers in a race.
	var probetarget bytes.Buffer
	exitcode := connectSSHCommand{}.RunCommand(
		"arvados-client connect-ssh",
		[]string{"-detach-keys=" + *detachKeys, "-probe-only=true", target},
		&bytes.Buffer{}, &probetarget, stderr)
	if exitcode != 0 {
		return exitcode
	}
	target = strings.Trim(probetarget.String(), "\n")

	selfbin, err := os.Readlink("/proc/self/exe")
	if err != nil {
		fmt.Fprintln(stderr, err)
		return 2
	}
	sshargs = append([]string{
		"ssh",
		"-o", "ProxyCommand " + selfbin + " connect-ssh -detach-keys=" + shellescape(*detachKeys) + " " + shellescape(target),
		"-o", "StrictHostKeyChecking no",
		target},
		sshargs...)
	sshbin, err := exec.LookPath("ssh")
	if err != nil {
		fmt.Fprintln(stderr, err)
		return 1
	}
	err = syscall.Exec(sshbin, sshargs, os.Environ())
	fmt.Fprintf(stderr, "exec(%q) failed: %s\n", sshbin, err)
	return 1
}

// connectSSHCommand connects stdin/stdout to a container's gateway
// server (see lib/crunchrun/ssh.go).
//
// It is intended to be invoked with OpenSSH client's ProxyCommand
// config.
type connectSSHCommand struct{}

func (connectSSHCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
	f := flag.NewFlagSet(prog, flag.ContinueOnError)
	f.SetOutput(stderr)
	f.Usage = func() {
		_, prog := filepath.Split(prog)
		fmt.Fprint(stderr, prog+`: connect to the gateway service for a running container.

NOTE: You almost certainly don't want to use this command directly. It
is meant to be used internally. Use "arvados-client shell" instead.

Usage: `+prog+` [options] [username@]container-uuid

Options:
`)
		f.PrintDefaults()
	}
	probeOnly := f.Bool("probe-only", false, "do not transfer IO, just setup tunnel, print target UUID, and exit")
	detachKeys := f.String("detach-keys", "", "set detach key sequence, as in docker-attach(1)")
	if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid", stderr); !ok {
		return code
	} else if f.NArg() != 1 {
		fmt.Fprintf(stderr, "missing required argument: [username@]container-uuid\n")
		return 2
	}
	targetUUID := f.Args()[0]
	loginUsername := "root"
	if i := strings.Index(targetUUID, "@"); i >= 0 {
		loginUsername = targetUUID[:i]
		targetUUID = targetUUID[i+1:]
	}
	rpcconn, err := rpcFromEnv()
	if err != nil {
		fmt.Fprintln(stderr, err)
		return 1
	}
	targetUUID, err = resolveToContainerUUID(rpcconn, targetUUID)
	if err != nil {
		fmt.Fprintln(stderr, err)
		return 1
	}
	fmt.Fprintln(stderr, "connecting to container", targetUUID)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
		UUID:          targetUUID,
		DetachKeys:    *detachKeys,
		LoginUsername: loginUsername,
	})
	if err != nil {
		fmt.Fprintln(stderr, "error setting up tunnel:", err)
		return 1
	}
	defer sshconn.Conn.Close()

	if *probeOnly {
		fmt.Fprintln(stdout, targetUUID)
		return 0
	}

	go func() {
		defer cancel()
		_, err := io.Copy(stdout, sshconn.Conn)
		if err != nil && ctx.Err() == nil {
			fmt.Fprintf(stderr, "receive: %v\n", err)
		}
	}()
	go func() {
		defer cancel()
		_, err := io.Copy(sshconn.Conn, stdin)
		if err != nil && ctx.Err() == nil {
			fmt.Fprintf(stderr, "send: %v\n", err)
		}
	}()
	<-ctx.Done()
	return 0
}

func shellescape(s string) string {
	return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
}

func rpcFromEnv() (*rpc.Conn, error) {
	ac := arvados.NewClientFromEnv()
	if ac.APIHost == "" || ac.AuthToken == "" {
		return nil, fmt.Errorf("fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set, and ~/.config/arvados/settings.conf is not readable")
	}
	return rpc.NewConn("",
		&url.URL{
			Scheme: "https",
			Host:   ac.APIHost,
		},
		ac.Insecure,
		func(context.Context) ([]string, error) {
			return []string{ac.AuthToken}, nil
		}), nil
}

func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {
	switch {
	case strings.Contains(targetUUID, "-dz642-") && len(targetUUID) == 27:
		return targetUUID, nil
	case strings.Contains(targetUUID, "-xvhdp-") && len(targetUUID) == 27:
		crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}})
		if err != nil {
			return "", err
		}
		if len(crs.Items) < 1 {
			return "", fmt.Errorf("container request %q not found", targetUUID)
		}
		cr := crs.Items[0]
		if cr.ContainerUUID == "" {
			return "", fmt.Errorf("no container assigned, container request state is %s", strings.ToLower(string(cr.State)))
		}
		return cr.ContainerUUID, nil
	default:
		return "", fmt.Errorf("target UUID is not a container or container request UUID: %s", targetUUID)
	}
}