Merge branch '21666-provision-test-improvement'
[arvados.git] / cmd / arvados-client / container_gateway.go
index 2f45e75188b79baedd82f10f3983f701fccd330b..2baa8012eae6dcb49c9ac9f4bd06e29f46a948b1 100644 (file)
@@ -16,6 +16,7 @@ import (
        "os"
        "os/exec"
        "path/filepath"
+       "sort"
        "strings"
        "syscall"
        "time"
@@ -32,14 +33,15 @@ type logsCommand struct {
 
 func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
        f := flag.NewFlagSet(prog, flag.ContinueOnError)
+       follow := f.Bool("f", false, "follow: poll for new data until the container finishes")
        pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
-       if ok, code := cmd.ParseFlags(f, prog, args, "container-uuid", stderr); !ok {
+       if ok, code := cmd.ParseFlags(f, prog, args, "container-request-uuid", stderr); !ok {
                return code
        } else if f.NArg() < 1 {
-               fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+               fmt.Fprintf(stderr, "missing required argument: container-request-uuid (try -help)\n")
                return 2
        } else if f.NArg() > 1 {
-               fmt.Fprintf(stderr, "encountered extra arguments after container-uuid (try -help)\n")
+               fmt.Fprintf(stderr, "encountered extra arguments after container-request-uuid (try -help)\n")
                return 2
        }
        target := f.Args()[0]
@@ -52,7 +54,7 @@ func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, st
                                InsecureSkipVerify: true}}
        }
 
-       err := lc.tailf(target, stdout, stderr, *pollInterval)
+       err := lc.tail(target, stdout, stderr, *follow, *pollInterval)
        if err != nil {
                fmt.Fprintln(stderr, err)
                return 1
@@ -60,16 +62,18 @@ func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, st
        return 0
 }
 
-func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error {
+func (lc *logsCommand) tail(crUUID string, stdout, stderr io.Writer, follow bool, pollInterval time.Duration) error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
-       rpcconn := rpcFromEnv()
-       ctrUUID, err := resolveToContainerUUID(rpcconn, target)
+       rpcconn, err := rpcFromEnv()
+       if err != nil {
+               return err
+       }
+       err = lc.checkAPISupport(ctx, crUUID)
        if err != nil {
                return err
        }
-       fmt.Fprintln(stderr, "connecting to container", ctrUUID)
 
        var (
                // files to display
@@ -78,20 +82,34 @@ func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterv
                mark = map[string]int64{}
                // fnm => current size of file reported by api
                size = map[string]int64{}
-               // exit after fetching next log chunk
-               containerFinished = false
                // has anything worked? (if so, retry after errors)
                anySuccess = false
+               // container UUID whose logs we are displaying
+               displayingUUID = ""
+               // container UUID we last showed in a "connected,
+               // polling" message
+               reportedUUID = ""
        )
 
+       cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
+       if err != nil {
+               return fmt.Errorf("error retrieving %s: %w", crUUID, err)
+       }
+       displayingUUID = cr.ContainerUUID
 poll:
        for delay := pollInterval; ; time.Sleep(delay) {
-               // When /arvados/v1/containers/{uuid}/log_events is
+               if cr.ContainerUUID == "" {
+                       return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
+               }
+               if delay < pollInterval {
+                       delay = pollInterval
+               }
+               // When .../container_requests/{uuid}/log_events is
                // implemented, we'll wait here for the next
                // server-sent event to tell us some updated file
                // sizes. For now, we poll.
                for _, fnm := range watching {
-                       currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "-0", nil)
+                       currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
                        if err != nil {
                                if !anySuccess {
                                        return err
@@ -100,6 +118,10 @@ poll:
                                delay = pollInterval
                                continue poll
                        }
+                       if reportedUUID != displayingUUID {
+                               fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
+                               reportedUUID = displayingUUID
+                       }
                        size[fnm] = currentsize
                        if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
                                mark[fnm] = currentsize - 10000
@@ -109,9 +131,7 @@ poll:
                                // Log collection must have been
                                // emptied and reset.
                                fmt.Fprintln(stderr, "--- log restarted ---")
-                               for fnm := range mark {
-                                       delete(mark, fnm)
-                               }
+                               mark = map[string]int64{}
                                delay = pollInterval
                                continue poll
                        }
@@ -120,7 +140,7 @@ poll:
                for _, fnm := range watching {
                        if size[fnm] > mark[fnm] {
                                newData[fnm] = &bytes.Buffer{}
-                               _, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
+                               _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
                                if err != nil {
                                        fmt.Fprintln(stderr, err)
                                }
@@ -131,55 +151,79 @@ poll:
                        }
                }
                checkState := lc.display(stdout, stderr, watching, newData)
-               if containerFinished {
-                       // If the caller specified a container request
-                       // UUID and the container we were watching has
-                       // been replaced by a new one, start watching
-                       // logs from the new one. Otherwise, we're
-                       // done.
-                       if target == ctrUUID {
-                               // caller specified container UUID
-                               return nil
-                       }
-                       newUUID, err := resolveToContainerUUID(rpcconn, target)
-                       if err != nil {
-                               return err
-                       }
-                       if newUUID == ctrUUID {
-                               // no further attempts
-                               return nil
-                       }
-                       ctrUUID = newUUID
-                       containerFinished = false
+               if displayingUUID != cr.ContainerUUID {
+                       // A different container had already been
+                       // assigned when we started fetching the
+                       // latest batch of logs. We can now safely
+                       // start displaying logs from the new
+                       // container, without missing any of the
+                       // previous container's logs.
+                       displayingUUID = cr.ContainerUUID
                        delay = 0
                        continue
-               }
-               if len(newData) > 0 {
+               } else if cr.State == arvados.ContainerRequestStateFinal || !follow {
+                       break
+               } else if len(newData) > 0 {
                        delay = pollInterval
-               }
-               if len(newData) == 0 || checkState {
+               } else {
                        delay = delay * 2
                        if delay > pollInterval*5 {
                                delay = pollInterval * 5
                        }
-                       ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}})
+                       checkState = true
+               }
+               if checkState {
+                       cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
                        if err != nil {
                                if !anySuccess {
-                                       return err
+                                       return fmt.Errorf("error retrieving %s: %w", crUUID, err)
                                }
                                fmt.Fprintln(stderr, err)
                                delay = pollInterval
                                continue
                        }
-                       if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete {
-                               containerFinished = true
-                               delay = 0
-                       }
                }
        }
        return nil
 }
 
+func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
+       u := url.URL{
+               Scheme: "https",
+               Host:   lc.ac.APIHost,
+               Path:   "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
+       }
+       return u.String()
+}
+
+// Check whether the API is new enough to support the
+// .../container_requests/{uuid}/log/ endpoint.
+//
+// Older versions return 200 for an OPTIONS request at the .../log/
+// API endpoint, but the response header does not have a "Dav" header.
+//
+// Note an error response with no "Dav" header is not taken to
+// indicate lack of API support. It may come from a new server that
+// has a configuration or networking problem.
+func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
+       ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
+       defer cancel()
+       req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
+       if err != nil {
+               return err
+       }
+       req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
+       resp, err := lc.ac.Client.Do(req)
+       if err != nil {
+               return err
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode == http.StatusOK && resp.Header.Get("Dav") == "" {
+               return fmt.Errorf("server does not support container logs API (OPTIONS request returned HTTP %s, Dav: %q)", resp.Status, resp.Header.Get("Dav"))
+       }
+       return nil
+}
+
 // Retrieve specified byte range (e.g., "12-34", "1234-") from given
 // fnm and write to out.
 //
@@ -188,15 +232,10 @@ poll:
 // Return values are current file size, bytes copied, error.
 //
 // If the file does not exist, return values are 0, 0, nil.
-func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) {
+func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
        ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
        defer cancel()
-       srcURL := url.URL{
-               Scheme: "https",
-               Host:   lc.ac.APIHost,
-               Path:   "/arvados/v1/containers/" + uuid + "/log/" + fnm,
-       }
-       req, err := http.NewRequestWithContext(ctx, http.MethodGet, srcURL.String(), nil)
+       req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
        if err != nil {
                return 0, 0, err
        }
@@ -234,20 +273,29 @@ func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange strin
 // exit.
 func (lc *logsCommand) display(out, stderr io.Writer, watching []string, received map[string]*bytes.Buffer) bool {
        checkState := false
+       var sorted []string
        for _, fnm := range watching {
                buf := received[fnm]
                if buf == nil || buf.Len() == 0 {
                        continue
                }
                for _, line := range bytes.Split(bytes.TrimSuffix(buf.Bytes(), []byte{'\n'}), []byte{'\n'}) {
-                       _, err := fmt.Fprintf(out, "%-14s %s\n", fnm, line)
-                       if err != nil {
-                               fmt.Fprintln(stderr, err)
+                       sorted = append(sorted, fmt.Sprintf("%-14s %s\n", fnm, line))
+                       if fnm == "crunch-run.txt" {
+                               checkState = checkState ||
+                                       bytes.HasSuffix(line, []byte("Complete")) ||
+                                       bytes.HasSuffix(line, []byte("Cancelled")) ||
+                                       bytes.HasSuffix(line, []byte("Queued"))
                        }
-                       checkState = checkState ||
-                               bytes.HasSuffix(line, []byte("Complete")) ||
-                               bytes.HasSuffix(line, []byte("Cancelled")) ||
-                               bytes.HasSuffix(line, []byte("Queued"))
+               }
+       }
+       sort.Slice(sorted, func(i, j int) bool {
+               return sorted[i][15:] < sorted[j][15:]
+       })
+       for _, s := range sorted {
+               _, err := fmt.Fprint(out, s)
+               if err != nil {
+                       fmt.Fprintln(stderr, err)
                }
        }
        return checkState
@@ -356,12 +404,12 @@ Options:
                loginUsername = targetUUID[:i]
                targetUUID = targetUUID[i+1:]
        }
-       if os.Getenv("ARVADOS_API_HOST") == "" || os.Getenv("ARVADOS_API_TOKEN") == "" {
-               fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set")
+       rpcconn, err := rpcFromEnv()
+       if err != nil {
+               fmt.Fprintln(stderr, err)
                return 1
        }
-       rpcconn := rpcFromEnv()
-       targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID)
+       targetUUID, err = resolveToContainerUUID(rpcconn, targetUUID)
        if err != nil {
                fmt.Fprintln(stderr, err)
                return 1
@@ -408,17 +456,20 @@ func shellescape(s string) string {
        return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
 }
 
-func rpcFromEnv() *rpc.Conn {
-       insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
+func rpcFromEnv() (*rpc.Conn, error) {
+       ac := arvados.NewClientFromEnv()
+       if ac.APIHost == "" || ac.AuthToken == "" {
+               return nil, fmt.Errorf("fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set, and ~/.config/arvados/settings.conf is not readable")
+       }
        return rpc.NewConn("",
                &url.URL{
                        Scheme: "https",
-                       Host:   os.Getenv("ARVADOS_API_HOST"),
+                       Host:   ac.APIHost,
                },
-               insecure == "1" || insecure == "yes" || insecure == "true",
+               ac.Insecure,
                func(context.Context) ([]string, error) {
-                       return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
-               })
+                       return []string{ac.AuthToken}, nil
+               }), nil
 }
 
 func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {