18790: Fix usage messages.
[arvados.git] / cmd / arvados-client / container_gateway.go
index 806bcd277adcd1c8764b71f9672c57f077bc0734..94375075c0fd6b68425d49b8351cfe8f1a026b0a 100644 (file)
@@ -16,6 +16,7 @@ import (
        "os"
        "os/exec"
        "path/filepath"
+       "sort"
        "strings"
        "syscall"
        "time"
@@ -33,13 +34,13 @@ type logsCommand struct {
 func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
        f := flag.NewFlagSet(prog, flag.ContinueOnError)
        pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
-       if ok, code := cmd.ParseFlags(f, prog, args, "container-uuid", stderr); !ok {
+       if ok, code := cmd.ParseFlags(f, prog, args, "container-request-uuid", stderr); !ok {
                return code
        } else if f.NArg() < 1 {
-               fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+               fmt.Fprintf(stderr, "missing required argument: container-request-uuid (try -help)\n")
                return 2
        } else if f.NArg() > 1 {
-               fmt.Fprintf(stderr, "encountered extra arguments after container-uuid (try -help)\n")
+               fmt.Fprintf(stderr, "encountered extra arguments after container-request-uuid (try -help)\n")
                return 2
        }
        target := f.Args()[0]
@@ -60,20 +61,15 @@ func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, st
        return 0
 }
 
-func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error {
+func (lc *logsCommand) tailf(crUUID string, stdout, stderr io.Writer, pollInterval time.Duration) error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
        rpcconn := rpcFromEnv()
-       ctrUUID, err := resolveToContainerUUID(rpcconn, target)
-       if err != nil {
-               return err
-       }
-       err = lc.checkAPISupport(ctx, ctrUUID)
+       err := lc.checkAPISupport(ctx, crUUID)
        if err != nil {
                return err
        }
-       fmt.Fprintln(stderr, "connecting to container", ctrUUID)
 
        var (
                // files to display
@@ -82,20 +78,34 @@ func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterv
                mark = map[string]int64{}
                // fnm => current size of file reported by api
                size = map[string]int64{}
-               // exit after fetching next log chunk
-               containerFinished = false
                // has anything worked? (if so, retry after errors)
                anySuccess = false
+               // container UUID whose logs we are displaying
+               displayingUUID = ""
+               // container UUID we last showed in a "connected,
+               // polling" message
+               reportedUUID = ""
        )
 
+       cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
+       if err != nil {
+               return fmt.Errorf("error retrieving %s: %w", crUUID, err)
+       }
+       displayingUUID = cr.ContainerUUID
 poll:
        for delay := pollInterval; ; time.Sleep(delay) {
-               // When /arvados/v1/containers/{uuid}/log_events is
+               if cr.ContainerUUID == "" {
+                       return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
+               }
+               if delay < pollInterval {
+                       delay = pollInterval
+               }
+               // When .../container_requests/{uuid}/log_events is
                // implemented, we'll wait here for the next
                // server-sent event to tell us some updated file
                // sizes. For now, we poll.
                for _, fnm := range watching {
-                       currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "-0", nil)
+                       currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
                        if err != nil {
                                if !anySuccess {
                                        return err
@@ -104,6 +114,10 @@ poll:
                                delay = pollInterval
                                continue poll
                        }
+                       if reportedUUID != displayingUUID {
+                               fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
+                               reportedUUID = displayingUUID
+                       }
                        size[fnm] = currentsize
                        if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
                                mark[fnm] = currentsize - 10000
@@ -113,9 +127,7 @@ poll:
                                // Log collection must have been
                                // emptied and reset.
                                fmt.Fprintln(stderr, "--- log restarted ---")
-                               for fnm := range mark {
-                                       delete(mark, fnm)
-                               }
+                               mark = map[string]int64{}
                                delay = pollInterval
                                continue poll
                        }
@@ -124,7 +136,7 @@ poll:
                for _, fnm := range watching {
                        if size[fnm] > mark[fnm] {
                                newData[fnm] = &bytes.Buffer{}
-                               _, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
+                               _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
                                if err != nil {
                                        fmt.Fprintln(stderr, err)
                                }
@@ -135,77 +147,64 @@ poll:
                        }
                }
                checkState := lc.display(stdout, stderr, watching, newData)
-               if containerFinished {
-                       // If the caller specified a container request
-                       // UUID and the container we were watching has
-                       // been replaced by a new one, start watching
-                       // logs from the new one. Otherwise, we're
-                       // done.
-                       if target == ctrUUID {
-                               // caller specified container UUID
-                               return nil
-                       }
-                       newUUID, err := resolveToContainerUUID(rpcconn, target)
-                       if err != nil {
-                               return err
-                       }
-                       if newUUID == ctrUUID {
-                               // no further attempts
-                               return nil
-                       }
-                       ctrUUID = newUUID
-                       containerFinished = false
+               if displayingUUID != cr.ContainerUUID {
+                       // A different container had already been
+                       // assigned when we started fetching the
+                       // latest batch of logs. We can now safely
+                       // start displaying logs from the new
+                       // container, without missing any of the
+                       // previous container's logs.
+                       displayingUUID = cr.ContainerUUID
                        delay = 0
                        continue
-               }
-               if len(newData) > 0 {
+               } else if cr.State == arvados.ContainerRequestStateFinal {
+                       break
+               } else if len(newData) > 0 {
                        delay = pollInterval
-               }
-               if len(newData) == 0 || checkState {
+               } else {
                        delay = delay * 2
                        if delay > pollInterval*5 {
                                delay = pollInterval * 5
                        }
-                       ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}})
+                       checkState = true
+               }
+               if checkState {
+                       cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
                        if err != nil {
                                if !anySuccess {
-                                       return err
+                                       return fmt.Errorf("error retrieving %s: %w", crUUID, err)
                                }
                                fmt.Fprintln(stderr, err)
                                delay = pollInterval
                                continue
                        }
-                       if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete {
-                               containerFinished = true
-                               delay = 0
-                       }
                }
        }
        return nil
 }
 
-func (lc *logsCommand) srcURL(uuid, fnm string) string {
+func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
        u := url.URL{
                Scheme: "https",
                Host:   lc.ac.APIHost,
-               Path:   "/arvados/v1/containers/" + uuid + "/log/" + fnm,
+               Path:   "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
        }
        return u.String()
 }
 
 // Check whether the API is new enough to support the
-// .../containers/{uuid}/log endpoint.
+// .../container_requests/{uuid}/log/ endpoint.
 //
-// Older versions return 200 for an OPTIONS request at the .../logs/
+// Older versions return 200 for an OPTIONS request at the .../log/
 // API endpoint, but the response header does not have a "Dav" header.
 //
 // Note an error response with no "Dav" header is not taken to
 // indicate lack of API support. It may come from a new server that
 // has a configuration or networking problem.
-func (lc *logsCommand) checkAPISupport(ctx context.Context, uuid string) error {
+func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
        ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
        defer cancel()
-       req, err := http.NewRequestWithContext(ctx, "OPTIONS", lc.srcURL(uuid, ""), nil)
+       req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
        if err != nil {
                return err
        }
@@ -229,10 +228,10 @@ func (lc *logsCommand) checkAPISupport(ctx context.Context, uuid string) error {
 // Return values are current file size, bytes copied, error.
 //
 // If the file does not exist, return values are 0, 0, nil.
-func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) {
+func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
        ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
        defer cancel()
-       req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(uuid, fnm), nil)
+       req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
        if err != nil {
                return 0, 0, err
        }
@@ -270,20 +269,29 @@ func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange strin
 // exit.
 func (lc *logsCommand) display(out, stderr io.Writer, watching []string, received map[string]*bytes.Buffer) bool {
        checkState := false
+       var sorted []string
        for _, fnm := range watching {
                buf := received[fnm]
                if buf == nil || buf.Len() == 0 {
                        continue
                }
                for _, line := range bytes.Split(bytes.TrimSuffix(buf.Bytes(), []byte{'\n'}), []byte{'\n'}) {
-                       _, err := fmt.Fprintf(out, "%-14s %s\n", fnm, line)
-                       if err != nil {
-                               fmt.Fprintln(stderr, err)
+                       sorted = append(sorted, fmt.Sprintf("%-14s %s\n", fnm, line))
+                       if fnm == "crunch-run.txt" {
+                               checkState = checkState ||
+                                       bytes.HasSuffix(line, []byte("Complete")) ||
+                                       bytes.HasSuffix(line, []byte("Cancelled")) ||
+                                       bytes.HasSuffix(line, []byte("Queued"))
                        }
-                       checkState = checkState ||
-                               bytes.HasSuffix(line, []byte("Complete")) ||
-                               bytes.HasSuffix(line, []byte("Cancelled")) ||
-                               bytes.HasSuffix(line, []byte("Queued"))
+               }
+       }
+       sort.Slice(sorted, func(i, j int) bool {
+               return sorted[i][15:] < sorted[j][15:]
+       })
+       for _, s := range sorted {
+               _, err := fmt.Fprint(out, s)
+               if err != nil {
+                       fmt.Fprintln(stderr, err)
                }
        }
        return checkState