From 02199566253fabd839e61d30e510f58ceb87a16f Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 4 Apr 2023 03:17:12 -0400 Subject: [PATCH] 18790: Add arvados-client logs command. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- cmd/arvados-client/cmd.go | 9 +- cmd/arvados-client/container_gateway.go | 258 ++++++++++++++++--- cmd/arvados-client/container_gateway_test.go | 133 ++++++++++ lib/controller/rpc/conn.go | 7 + 4 files changed, 374 insertions(+), 33 deletions(-) diff --git a/cmd/arvados-client/cmd.go b/cmd/arvados-client/cmd.go index c10783c978..19d13437c8 100644 --- a/cmd/arvados-client/cmd.go +++ b/cmd/arvados-client/cmd.go @@ -55,12 +55,13 @@ var ( "virtual_machine": cli.APICall, "workflow": cli.APICall, - "mount": mount.Command, - "deduplication-report": deduplicationreport.Command, - "costanalyzer": costanalyzer.Command, - "shell": shellCommand{}, "connect-ssh": connectSSHCommand{}, + "costanalyzer": costanalyzer.Command, + "deduplication-report": deduplicationreport.Command, "diagnostics": diagnostics.Command{}, + "logs": logsCommand{}, + "mount": mount.Command, + "shell": shellCommand{}, "sudo": sudoCommand{}, }) ) diff --git a/cmd/arvados-client/container_gateway.go b/cmd/arvados-client/container_gateway.go index 55f8c33bc7..7a35190427 100644 --- a/cmd/arvados-client/container_gateway.go +++ b/cmd/arvados-client/container_gateway.go @@ -7,21 +7,209 @@ package main import ( "bytes" "context" + "crypto/tls" "flag" "fmt" "io" + "net/http" "net/url" "os" "os/exec" "path/filepath" "strings" "syscall" + "time" "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/lib/controller/rpc" "git.arvados.org/arvados.git/sdk/go/arvados" ) +// logsCommand displays logs from a running container. +type logsCommand struct { + ac *arvados.Client +} + +func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + f := flag.NewFlagSet(prog, flag.ContinueOnError) + pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data") + if ok, code := cmd.ParseFlags(f, prog, args, "container-uuid", stderr); !ok { + return code + } else if f.NArg() < 1 { + fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n") + return 2 + } else if f.NArg() > 1 { + fmt.Fprintf(stderr, "encountered extra arguments after container-uuid (try -help)\n") + return 2 + } + target := f.Args()[0] + + lc.ac = arvados.NewClientFromEnv() + lc.ac.Client = &http.Client{} + if lc.ac.Insecure { + lc.ac.Client.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true}} + } + + err := lc.tailf(target, stdout, stderr, *pollInterval) + if err != nil { + fmt.Fprintln(stderr, err) + return 1 + } + return 0 +} + +func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rpcconn := rpcFromEnv() + ctrUUID, err := resolveToContainerUUID(rpcconn, target) + if err != nil { + return err + } + fmt.Fprintln(stderr, "connecting to container", ctrUUID) + + var ( + // files to display + watching = []string{"crunch-run.txt", "stderr.txt"} + // fnm => file offset of next byte to display + mark = map[string]int64{} + // fnm => current size of file reported by api + size = map[string]int64{} + // exit after fetching next log chunk + containerFinished = false + ) + +poll: + for delay := pollInterval; ; time.Sleep(delay) { + // When /arvados/v1/containers/{uuid}/log_events is + // implemented, we'll wait here for the next + // server-sent event to tell us some updated file + // sizes. For now, we poll. + for _, fnm := range watching { + currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "0-0", nil) + if err != nil { + fmt.Fprintln(stderr, err) + delay = pollInterval + continue poll + } + size[fnm] = currentsize + if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 { + mark[fnm] = currentsize - 10000 + } else if !seen { + mark[fnm] = 0 + } else if currentsize < oldsize { + // Log collection must have been + // emptied and reset. + fmt.Fprintln(stderr, "--- log restarted ---") + for fnm := range mark { + delete(mark, fnm) + } + delay = pollInterval + continue poll + } + } + anyNewData := false + for _, fnm := range watching { + if size[fnm] > mark[fnm] { + anyNewData = true + _, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), stdout) + if err != nil { + fmt.Fprintln(stderr, err) + } + mark[fnm] += n + } + } + if containerFinished { + // If the caller specified a container request + // UUID and the container we were watching has + // been replaced by a new one, start watching + // logs from the new one. Otherwise, we're + // done. + if target == ctrUUID { + // caller specified container UUID + return nil + } + newUUID, err := resolveToContainerUUID(rpcconn, target) + if err != nil { + return err + } + if newUUID == ctrUUID { + // no further attempts + return nil + } + ctrUUID = newUUID + containerFinished = false + delay = 0 + } else if anyNewData { + delay = pollInterval + } else { + delay = delay * 2 + if delay > pollInterval*5 { + delay = pollInterval * 5 + } + ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}}) + if err != nil { + fmt.Fprintln(stderr, err) + delay = pollInterval + continue + } + if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete { + containerFinished = true + delay = 0 + } + } + } + return nil +} + +// Retrieve specified byte range (e.g., "12-34", "1234-") from given +// fnm and write to out. +// +// If range is empty ("0-0"), out can be nil. +// +// Return values are current file size, bytes copied, error. +// +// If the file does not exist, return values are 0, 0, nil. +func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) { + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second)) + defer cancel() + srcURL := url.URL{ + Scheme: "https", + Host: lc.ac.APIHost, + Path: "/arvados/v1/containers/" + uuid + "/log/" + fnm, + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, srcURL.String(), nil) + if err != nil { + return 0, 0, err + } + req.Header.Set("Range", "bytes="+byterange) + req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken) + resp, err := lc.ac.Client.Do(req) + if err != nil { + return 0, 0, err + } + defer resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + return 0, 0, nil + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + return 0, 0, fmt.Errorf("error getting %s: %s", fnm, resp.Status) + } + var rstart, rend, rsize int64 + _, err = fmt.Sscanf(resp.Header.Get("Content-Range"), "bytes %d-%d/%d", &rstart, &rend, &rsize) + if err != nil { + return 0, 0, fmt.Errorf("error parsing Content-Range header %q: %s", resp.Header.Get("Content-Range"), err) + } + if out == nil { + return rsize, 0, nil + } + n, err := io.Copy(out, resp.Body) + return rsize, n, err +} + // shellCommand connects the terminal to an interactive shell on a // running container. type shellCommand struct{} @@ -129,37 +317,14 @@ Options: fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set") return 1 } - insecure := os.Getenv("ARVADOS_API_HOST_INSECURE") - rpcconn := rpc.NewConn("", - &url.URL{ - Scheme: "https", - Host: os.Getenv("ARVADOS_API_HOST"), - }, - insecure == "1" || insecure == "yes" || insecure == "true", - func(context.Context) ([]string, error) { - return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil - }) - if strings.Contains(targetUUID, "-xvhdp-") { - crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}}) - if err != nil { - fmt.Fprintln(stderr, err) - return 1 - } - if len(crs.Items) < 1 { - fmt.Fprintf(stderr, "container request %q not found\n", targetUUID) - return 1 - } - cr := crs.Items[0] - if cr.ContainerUUID == "" { - fmt.Fprintf(stderr, "no container assigned, container request state is %s\n", strings.ToLower(string(cr.State))) - return 1 - } - targetUUID = cr.ContainerUUID - fmt.Fprintln(stderr, "connecting to container", targetUUID) - } else if !strings.Contains(targetUUID, "-dz642-") { - fmt.Fprintf(stderr, "target UUID is not a container or container request UUID: %s\n", targetUUID) + rpcconn := rpcFromEnv() + targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID) + if err != nil { + fmt.Fprintln(stderr, err) return 1 } + fmt.Fprintln(stderr, "connecting to container", targetUUID) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{ @@ -199,3 +364,38 @@ Options: func shellescape(s string) string { return "'" + strings.Replace(s, "'", "'\\''", -1) + "'" } + +func rpcFromEnv() *rpc.Conn { + insecure := os.Getenv("ARVADOS_API_HOST_INSECURE") + return rpc.NewConn("", + &url.URL{ + Scheme: "https", + Host: os.Getenv("ARVADOS_API_HOST"), + }, + insecure == "1" || insecure == "yes" || insecure == "true", + func(context.Context) ([]string, error) { + return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil + }) +} + +func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) { + switch { + case strings.Contains(targetUUID, "-dz642-"): + return targetUUID, nil + case strings.Contains(targetUUID, "-xvhdp-"): + crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}}) + if err != nil { + return "", err + } + if len(crs.Items) < 1 { + return "", fmt.Errorf("container request %q not found\n", targetUUID) + } + cr := crs.Items[0] + if cr.ContainerUUID == "" { + return "", fmt.Errorf("no container assigned, container request state is %s\n", strings.ToLower(string(cr.State))) + } + return cr.ContainerUUID, nil + default: + return "", fmt.Errorf("target UUID is not a container or container request UUID: %s\n", targetUUID) + } +} diff --git a/cmd/arvados-client/container_gateway_test.go b/cmd/arvados-client/container_gateway_test.go index 743b91d69b..0e5aad709e 100644 --- a/cmd/arvados-client/container_gateway_test.go +++ b/cmd/arvados-client/container_gateway_test.go @@ -10,6 +10,7 @@ import ( "crypto/hmac" "crypto/sha256" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -24,9 +25,11 @@ import ( "git.arvados.org/arvados.git/lib/controller/rpc" "git.arvados.org/arvados.git/lib/crunchrun" "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/arvadostest" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/keepclient" check "gopkg.in/check.v1" ) @@ -178,3 +181,133 @@ func (s *ClientSuite) TestShellGateway(c *check.C) { } wg.Wait() } + +func (s *ClientSuite) TestContainerLog(c *check.C) { + arvadostest.StartKeep(2, true) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second)) + defer cancel() + + rpcconn := rpc.NewConn("", + &url.URL{ + Scheme: "https", + Host: os.Getenv("ARVADOS_API_HOST"), + }, + true, + func(context.Context) ([]string, error) { + return []string{arvadostest.SystemRootToken}, nil + }) + imageColl, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{ + "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar\n", + }}) + c.Assert(err, check.IsNil) + c.Logf("imageColl %+v", imageColl) + cr, err := rpcconn.ContainerRequestCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{ + "state": "Committed", + "command": []string{"echo", fmt.Sprintf("%d", time.Now().Unix())}, + "container_image": imageColl.PortableDataHash, + "cwd": "/", + "output_path": "/", + "priority": 1, + "runtime_constraints": arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1000000000, + }, + "container_count_max": 1, + }}) + c.Assert(err, check.IsNil) + h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken)) + fmt.Fprint(h, cr.ContainerUUID) + authSecret := fmt.Sprintf("%x", h.Sum(nil)) + + coll := arvados.Collection{} + client := arvados.NewClientFromEnv() + ac, err := arvadosclient.New(client) + c.Assert(err, check.IsNil) + kc, err := keepclient.MakeKeepClient(ac) + c.Assert(err, check.IsNil) + cfs, err := coll.FileSystem(client, kc) + c.Assert(err, check.IsNil) + + c.Log("running logs command on queued container") + var stdout, stderr bytes.Buffer + cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", "-poll=250ms", cr.UUID) + cmd.Env = append(cmd.Env, os.Environ()...) + cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken) + cmd.Stdout = io.MultiWriter(&stdout, os.Stderr) + cmd.Stderr = io.MultiWriter(&stderr, os.Stderr) + err = cmd.Start() + c.Assert(err, check.Equals, nil) + + c.Log("changing container state to Locked") + _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{ + "state": arvados.ContainerStateLocked, + }}) + c.Assert(err, check.IsNil) + c.Log("starting gateway") + gw := crunchrun.Gateway{ + ContainerUUID: cr.ContainerUUID, + Address: "0.0.0.0:0", + AuthSecret: authSecret, + Log: ctxlog.TestLogger(c), + Target: crunchrun.GatewayTargetStub{}, + LogCollection: cfs, + } + err = gw.Start() + c.Assert(err, check.IsNil) + c.Log("updating container gateway address") + _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{ + "gateway_address": gw.Address, + "state": arvados.ContainerStateRunning, + }}) + c.Assert(err, check.IsNil) + + fCrunchrun, err := cfs.OpenFile("crunch-run.txt", os.O_CREATE|os.O_WRONLY, 0777) + c.Assert(err, check.IsNil) + _, err = fmt.Fprintln(fCrunchrun, "line 1 of crunch-run.txt") + c.Assert(err, check.IsNil) + fStderr, err := cfs.OpenFile("stderr.txt", os.O_CREATE|os.O_WRONLY, 0777) + c.Assert(err, check.IsNil) + _, err = fmt.Fprintln(fStderr, "line 1 of stderr") + c.Assert(err, check.IsNil) + time.Sleep(time.Second * 2) + _, err = fmt.Fprintln(fCrunchrun, "line 2 of crunch-run.txt") + c.Assert(err, check.IsNil) + _, err = fmt.Fprintln(fStderr, "--end--") + c.Assert(err, check.IsNil) + + for deadline := time.Now().Add(20 * time.Second); time.Now().Before(deadline) && !strings.Contains(stdout.String(), "--end--"); time.Sleep(time.Second / 10) { + } + c.Check(stdout.String(), check.Matches, `(?ms).*--end--\n.*`) + + mtxt, err := cfs.MarshalManifest(".") + c.Assert(err, check.IsNil) + savedLog, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{ + "manifest_text": mtxt, + }}) + c.Assert(err, check.IsNil) + _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{ + "state": arvados.ContainerStateComplete, + "log": savedLog.PortableDataHash, + "output": "d41d8cd98f00b204e9800998ecf8427e+0", + "exit_code": 0, + }}) + c.Assert(err, check.IsNil) + + err = cmd.Wait() + c.Check(err, check.IsNil) + // Ensure controller doesn't cheat by fetching data from the + // gateway after the container is complete. + gw.LogCollection = nil + + c.Logf("re-running logs command on completed container") + { + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*5)) + defer cancel() + cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", cr.UUID) + cmd.Env = append(cmd.Env, os.Environ()...) + cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken) + buf, err := cmd.CombinedOutput() + c.Check(err, check.Equals, nil) + c.Check(string(buf), check.Matches, `(?ms).*--end--\n`) + } +} diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go index 70a936a6f6..9856eb5760 100644 --- a/lib/controller/rpc/conn.go +++ b/lib/controller/rpc/conn.go @@ -340,6 +340,12 @@ func (conn *Conn) ContainerUnlock(ctx context.Context, options arvados.GetOption } func (conn *Conn) ContainerLog(ctx context.Context, options arvados.ContainerLogOptions) (resp http.Handler, err error) { + tokens, err := conn.tokenProvider(ctx) + if err != nil { + return nil, err + } else if len(tokens) < 1 { + return nil, httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized) + } proxy := &httputil.ReverseProxy{ Transport: conn.httpClient.Transport, Director: func(r *http.Request) { @@ -347,6 +353,7 @@ func (conn *Conn) ContainerLog(ctx context.Context, options arvados.ContainerLog u.Path = r.URL.Path u.RawQuery = fmt.Sprintf("no_forward=%v", options.NoForward) r.URL = &u + r.Header.Set("Authorization", "Bearer "+tokens[0]) }, } return proxy, nil -- 2.30.2