X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1c534c7835db9be61f76d0a68fdaedf640848f68..dc16046dbfdca4a9c0d94971730d220b27e80620:/cmd/arvados-client/container_gateway.go diff --git a/cmd/arvados-client/container_gateway.go b/cmd/arvados-client/container_gateway.go index 36c376e5c0..2baa8012ea 100644 --- a/cmd/arvados-client/container_gateway.go +++ b/cmd/arvados-client/container_gateway.go @@ -16,6 +16,7 @@ import ( "os" "os/exec" "path/filepath" + "sort" "strings" "syscall" "time" @@ -32,14 +33,15 @@ type logsCommand struct { func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { f := flag.NewFlagSet(prog, flag.ContinueOnError) + follow := f.Bool("f", false, "follow: poll for new data until the container finishes") pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data") - if ok, code := cmd.ParseFlags(f, prog, args, "container-uuid", stderr); !ok { + if ok, code := cmd.ParseFlags(f, prog, args, "container-request-uuid", stderr); !ok { return code } else if f.NArg() < 1 { - fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n") + fmt.Fprintf(stderr, "missing required argument: container-request-uuid (try -help)\n") return 2 } else if f.NArg() > 1 { - fmt.Fprintf(stderr, "encountered extra arguments after container-uuid (try -help)\n") + fmt.Fprintf(stderr, "encountered extra arguments after container-request-uuid (try -help)\n") return 2 } target := f.Args()[0] @@ -52,7 +54,7 @@ func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, st InsecureSkipVerify: true}} } - err := lc.tailf(target, stdout, stderr, *pollInterval) + err := lc.tail(target, stdout, stderr, *follow, *pollInterval) if err != nil { fmt.Fprintln(stderr, err) return 1 @@ -60,20 +62,18 @@ func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, st return 0 } -func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error { +func (lc *logsCommand) tail(crUUID string, stdout, stderr io.Writer, follow bool, pollInterval time.Duration) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rpcconn := rpcFromEnv() - ctrUUID, err := resolveToContainerUUID(rpcconn, target) + rpcconn, err := rpcFromEnv() if err != nil { return err } - err = lc.checkAPISupport(ctx, ctrUUID) + err = lc.checkAPISupport(ctx, crUUID) if err != nil { return err } - fmt.Fprintln(stderr, "connecting to container", ctrUUID) var ( // files to display @@ -82,20 +82,34 @@ func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterv mark = map[string]int64{} // fnm => current size of file reported by api size = map[string]int64{} - // exit after fetching next log chunk - containerFinished = false // has anything worked? (if so, retry after errors) anySuccess = false + // container UUID whose logs we are displaying + displayingUUID = "" + // container UUID we last showed in a "connected, + // polling" message + reportedUUID = "" ) + cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}}) + if err != nil { + return fmt.Errorf("error retrieving %s: %w", crUUID, err) + } + displayingUUID = cr.ContainerUUID poll: for delay := pollInterval; ; time.Sleep(delay) { - // When /arvados/v1/containers/{uuid}/log_events is + if cr.ContainerUUID == "" { + return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State) + } + if delay < pollInterval { + delay = pollInterval + } + // When .../container_requests/{uuid}/log_events is // implemented, we'll wait here for the next // server-sent event to tell us some updated file // sizes. For now, we poll. for _, fnm := range watching { - currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "-0", nil) + currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil) if err != nil { if !anySuccess { return err @@ -104,6 +118,10 @@ poll: delay = pollInterval continue poll } + if reportedUUID != displayingUUID { + fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID) + reportedUUID = displayingUUID + } size[fnm] = currentsize if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 { mark[fnm] = currentsize - 10000 @@ -113,9 +131,7 @@ poll: // Log collection must have been // emptied and reset. fmt.Fprintln(stderr, "--- log restarted ---") - for fnm := range mark { - delete(mark, fnm) - } + mark = map[string]int64{} delay = pollInterval continue poll } @@ -124,7 +140,7 @@ poll: for _, fnm := range watching { if size[fnm] > mark[fnm] { newData[fnm] = &bytes.Buffer{} - _, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm]) + _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm]) if err != nil { fmt.Fprintln(stderr, err) } @@ -135,77 +151,64 @@ poll: } } checkState := lc.display(stdout, stderr, watching, newData) - if containerFinished { - // If the caller specified a container request - // UUID and the container we were watching has - // been replaced by a new one, start watching - // logs from the new one. Otherwise, we're - // done. - if target == ctrUUID { - // caller specified container UUID - return nil - } - newUUID, err := resolveToContainerUUID(rpcconn, target) - if err != nil { - return err - } - if newUUID == ctrUUID { - // no further attempts - return nil - } - ctrUUID = newUUID - containerFinished = false + if displayingUUID != cr.ContainerUUID { + // A different container had already been + // assigned when we started fetching the + // latest batch of logs. We can now safely + // start displaying logs from the new + // container, without missing any of the + // previous container's logs. + displayingUUID = cr.ContainerUUID delay = 0 continue - } - if len(newData) > 0 { + } else if cr.State == arvados.ContainerRequestStateFinal || !follow { + break + } else if len(newData) > 0 { delay = pollInterval - } - if len(newData) == 0 || checkState { + } else { delay = delay * 2 if delay > pollInterval*5 { delay = pollInterval * 5 } - ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}}) + checkState = true + } + if checkState { + cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}}) if err != nil { if !anySuccess { - return err + return fmt.Errorf("error retrieving %s: %w", crUUID, err) } fmt.Fprintln(stderr, err) delay = pollInterval continue } - if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete { - containerFinished = true - delay = 0 - } } } return nil } -func (lc *logsCommand) srcURL(uuid, fnm string) string { +func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string { u := url.URL{ Scheme: "https", Host: lc.ac.APIHost, - Path: "/arvados/v1/containers/" + uuid + "/log/" + fnm, + Path: "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm, } return u.String() } // Check whether the API is new enough to support the -// .../containers/{uuid}/log endpoint. +// .../container_requests/{uuid}/log/ endpoint. // -// Older versions return 200 for an OPTIONS request at the .../logs/ +// Older versions return 200 for an OPTIONS request at the .../log/ // API endpoint, but the response header does not have a "Dav" header. // // Note an error response with no "Dav" header is not taken to // indicate lack of API support. It may come from a new server that // has a configuration or networking problem. -func (lc *logsCommand) checkAPISupport(ctx context.Context, uuid string) error { +func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error { ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second)) defer cancel() - req, err := http.NewRequestWithContext(ctx, "OPTIONS", lc.srcURL(uuid, ""), nil) + req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil) if err != nil { return err } @@ -229,10 +232,10 @@ func (lc *logsCommand) checkAPISupport(ctx context.Context, uuid string) error { // Return values are current file size, bytes copied, error. // // If the file does not exist, return values are 0, 0, nil. -func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) { +func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) { ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second)) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(uuid, fnm), nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil) if err != nil { return 0, 0, err } @@ -270,16 +273,14 @@ func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange strin // exit. func (lc *logsCommand) display(out, stderr io.Writer, watching []string, received map[string]*bytes.Buffer) bool { checkState := false + var sorted []string for _, fnm := range watching { buf := received[fnm] if buf == nil || buf.Len() == 0 { continue } for _, line := range bytes.Split(bytes.TrimSuffix(buf.Bytes(), []byte{'\n'}), []byte{'\n'}) { - _, err := fmt.Fprintf(out, "%-14s %s\n", fnm, line) - if err != nil { - fmt.Fprintln(stderr, err) - } + sorted = append(sorted, fmt.Sprintf("%-14s %s\n", fnm, line)) if fnm == "crunch-run.txt" { checkState = checkState || bytes.HasSuffix(line, []byte("Complete")) || @@ -288,6 +289,15 @@ func (lc *logsCommand) display(out, stderr io.Writer, watching []string, receive } } } + sort.Slice(sorted, func(i, j int) bool { + return sorted[i][15:] < sorted[j][15:] + }) + for _, s := range sorted { + _, err := fmt.Fprint(out, s) + if err != nil { + fmt.Fprintln(stderr, err) + } + } return checkState } @@ -394,12 +404,12 @@ Options: loginUsername = targetUUID[:i] targetUUID = targetUUID[i+1:] } - if os.Getenv("ARVADOS_API_HOST") == "" || os.Getenv("ARVADOS_API_TOKEN") == "" { - fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set") + rpcconn, err := rpcFromEnv() + if err != nil { + fmt.Fprintln(stderr, err) return 1 } - rpcconn := rpcFromEnv() - targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID) + targetUUID, err = resolveToContainerUUID(rpcconn, targetUUID) if err != nil { fmt.Fprintln(stderr, err) return 1 @@ -446,17 +456,20 @@ func shellescape(s string) string { return "'" + strings.Replace(s, "'", "'\\''", -1) + "'" } -func rpcFromEnv() *rpc.Conn { - insecure := os.Getenv("ARVADOS_API_HOST_INSECURE") +func rpcFromEnv() (*rpc.Conn, error) { + ac := arvados.NewClientFromEnv() + if ac.APIHost == "" || ac.AuthToken == "" { + return nil, fmt.Errorf("fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set, and ~/.config/arvados/settings.conf is not readable") + } return rpc.NewConn("", &url.URL{ Scheme: "https", - Host: os.Getenv("ARVADOS_API_HOST"), + Host: ac.APIHost, }, - insecure == "1" || insecure == "yes" || insecure == "true", + ac.Insecure, func(context.Context) ([]string, error) { - return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil - }) + return []string{ac.AuthToken}, nil + }), nil } func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {