+ return 1
+ }
+ return 0
+}
+
+func (lc *logsCommand) tail(crUUID string, stdout, stderr io.Writer, follow bool, pollInterval time.Duration) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ rpcconn, err := rpcFromEnv()
+ if err != nil {
+ return err
+ }
+ err = lc.checkAPISupport(ctx, crUUID)
+ if err != nil {
+ return err
+ }
+
+ var (
+ // files to display
+ watching = []string{"crunch-run.txt", "stderr.txt"}
+ // fnm => file offset of next byte to display
+ mark = map[string]int64{}
+ // fnm => current size of file reported by api
+ size = map[string]int64{}
+ // has anything worked? (if so, retry after errors)
+ anySuccess = false
+ // container UUID whose logs we are displaying
+ displayingUUID = ""
+ // container UUID we last showed in a "connected,
+ // polling" message
+ reportedUUID = ""
+ )
+
+ cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
+ if err != nil {
+ return fmt.Errorf("error retrieving %s: %w", crUUID, err)
+ }
+ displayingUUID = cr.ContainerUUID
+poll:
+ for delay := pollInterval; ; time.Sleep(delay) {
+ if cr.ContainerUUID == "" {
+ return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
+ }
+ if delay < pollInterval {
+ delay = pollInterval
+ }
+ // When .../container_requests/{uuid}/log_events is
+ // implemented, we'll wait here for the next
+ // server-sent event to tell us some updated file
+ // sizes. For now, we poll.
+ for _, fnm := range watching {
+ currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
+ if err != nil {
+ if !anySuccess {
+ return err
+ }
+ fmt.Fprintln(stderr, err)
+ delay = pollInterval
+ continue poll
+ }
+ if reportedUUID != displayingUUID {
+ fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
+ reportedUUID = displayingUUID
+ }
+ size[fnm] = currentsize
+ if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
+ mark[fnm] = currentsize - 10000
+ } else if !seen {
+ mark[fnm] = 0
+ } else if currentsize < oldsize {
+ // Log collection must have been
+ // emptied and reset.
+ fmt.Fprintln(stderr, "--- log restarted ---")
+ mark = map[string]int64{}
+ delay = pollInterval
+ continue poll
+ }
+ }
+ newData := map[string]*bytes.Buffer{}
+ for _, fnm := range watching {
+ if size[fnm] > mark[fnm] {
+ newData[fnm] = &bytes.Buffer{}
+ _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ }
+ if n > 0 {
+ mark[fnm] += n
+ anySuccess = true
+ }
+ }
+ }
+ checkState := lc.display(stdout, stderr, watching, newData)
+ if displayingUUID != cr.ContainerUUID {
+ // A different container had already been
+ // assigned when we started fetching the
+ // latest batch of logs. We can now safely
+ // start displaying logs from the new
+ // container, without missing any of the
+ // previous container's logs.
+ displayingUUID = cr.ContainerUUID
+ delay = 0
+ continue
+ } else if cr.State == arvados.ContainerRequestStateFinal || !follow {
+ break
+ } else if len(newData) > 0 {
+ delay = pollInterval
+ } else {
+ delay = delay * 2
+ if delay > pollInterval*5 {
+ delay = pollInterval * 5
+ }
+ checkState = true
+ }
+ if checkState {
+ cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
+ if err != nil {
+ if !anySuccess {
+ return fmt.Errorf("error retrieving %s: %w", crUUID, err)
+ }
+ fmt.Fprintln(stderr, err)
+ delay = pollInterval
+ continue
+ }
+ }
+ }
+ return nil
+}
+
+func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
+ u := url.URL{
+ Scheme: "https",
+ Host: lc.ac.APIHost,
+ Path: "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
+ }
+ return u.String()
+}
+
+// Check whether the API is new enough to support the
+// .../container_requests/{uuid}/log/ endpoint.
+//
+// Older versions return 200 for an OPTIONS request at the .../log/
+// API endpoint, but the response header does not have a "Dav" header.
+//
+// Note an error response with no "Dav" header is not taken to
+// indicate lack of API support. It may come from a new server that
+// has a configuration or networking problem.
+func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
+ defer cancel()
+ req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
+ resp, err := lc.ac.Client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusOK && resp.Header.Get("Dav") == "" {
+ return fmt.Errorf("server does not support container logs API (OPTIONS request returned HTTP %s, Dav: %q)", resp.Status, resp.Header.Get("Dav"))
+ }
+ return nil
+}
+
+// Retrieve specified byte range (e.g., "12-34", "1234-") from given
+// fnm and write to out.
+//
+// If range is empty ("-0"), out can be nil.
+//
+// Return values are current file size, bytes copied, error.
+//
+// If the file does not exist, return values are 0, 0, nil.
+func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
+ defer cancel()
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
+ if err != nil {
+ return 0, 0, err
+ }
+ req.Header.Set("Range", "bytes="+byterange)
+ req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
+ resp, err := lc.ac.Client.Do(req)
+ if err != nil {
+ return 0, 0, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusNotFound {
+ return 0, 0, nil