+// logsCommand displays logs from a running container.
+type logsCommand struct {
+ ac *arvados.Client
+}
+
+func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ f := flag.NewFlagSet(prog, flag.ContinueOnError)
+ pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
+ if ok, code := cmd.ParseFlags(f, prog, args, "container-uuid", stderr); !ok {
+ return code
+ } else if f.NArg() < 1 {
+ fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+ return 2
+ } else if f.NArg() > 1 {
+ fmt.Fprintf(stderr, "encountered extra arguments after container-uuid (try -help)\n")
+ return 2
+ }
+ target := f.Args()[0]
+
+ lc.ac = arvados.NewClientFromEnv()
+ lc.ac.Client = &http.Client{}
+ if lc.ac.Insecure {
+ lc.ac.Client.Transport = &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true}}
+ }
+
+ err := lc.tailf(target, stdout, stderr, *pollInterval)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ return 1
+ }
+ return 0
+}
+
+func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ rpcconn := rpcFromEnv()
+ ctrUUID, err := resolveToContainerUUID(rpcconn, target)
+ if err != nil {
+ return err
+ }
+ fmt.Fprintln(stderr, "connecting to container", ctrUUID)
+
+ var (
+ // files to display
+ watching = []string{"crunch-run.txt", "stderr.txt"}
+ // fnm => file offset of next byte to display
+ mark = map[string]int64{}
+ // fnm => current size of file reported by api
+ size = map[string]int64{}
+ // exit after fetching next log chunk
+ containerFinished = false
+ )
+
+poll:
+ for delay := pollInterval; ; time.Sleep(delay) {
+ // When /arvados/v1/containers/{uuid}/log_events is
+ // implemented, we'll wait here for the next
+ // server-sent event to tell us some updated file
+ // sizes. For now, we poll.
+ for _, fnm := range watching {
+ currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "0-0", nil)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ delay = pollInterval
+ continue poll
+ }
+ size[fnm] = currentsize
+ if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
+ mark[fnm] = currentsize - 10000
+ } else if !seen {
+ mark[fnm] = 0
+ } else if currentsize < oldsize {
+ // Log collection must have been
+ // emptied and reset.
+ fmt.Fprintln(stderr, "--- log restarted ---")
+ for fnm := range mark {
+ delete(mark, fnm)
+ }
+ delay = pollInterval
+ continue poll
+ }
+ }
+ anyNewData := false
+ for _, fnm := range watching {
+ if size[fnm] > mark[fnm] {
+ anyNewData = true
+ _, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), stdout)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ }
+ mark[fnm] += n
+ }
+ }
+ if containerFinished {
+ // If the caller specified a container request
+ // UUID and the container we were watching has
+ // been replaced by a new one, start watching
+ // logs from the new one. Otherwise, we're
+ // done.
+ if target == ctrUUID {
+ // caller specified container UUID
+ return nil
+ }
+ newUUID, err := resolveToContainerUUID(rpcconn, target)
+ if err != nil {
+ return err
+ }
+ if newUUID == ctrUUID {
+ // no further attempts
+ return nil
+ }
+ ctrUUID = newUUID
+ containerFinished = false
+ delay = 0
+ } else if anyNewData {
+ delay = pollInterval
+ } else {
+ delay = delay * 2
+ if delay > pollInterval*5 {
+ delay = pollInterval * 5
+ }
+ ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}})
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ delay = pollInterval
+ continue
+ }
+ if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete {
+ containerFinished = true
+ delay = 0
+ }
+ }
+ }
+ return nil
+}
+
+// Retrieve specified byte range (e.g., "12-34", "1234-") from given
+// fnm and write to out.
+//
+// If range is empty ("0-0"), out can be nil.
+//
+// Return values are current file size, bytes copied, error.
+//
+// If the file does not exist, return values are 0, 0, nil.
+func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) {
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
+ defer cancel()
+ srcURL := url.URL{
+ Scheme: "https",
+ Host: lc.ac.APIHost,
+ Path: "/arvados/v1/containers/" + uuid + "/log/" + fnm,
+ }
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, srcURL.String(), nil)
+ if err != nil {
+ return 0, 0, err
+ }
+ req.Header.Set("Range", "bytes="+byterange)
+ req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
+ resp, err := lc.ac.Client.Do(req)
+ if err != nil {
+ return 0, 0, err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusNotFound {
+ return 0, 0, nil
+ }
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
+ return 0, 0, fmt.Errorf("error getting %s: %s", fnm, resp.Status)
+ }
+ var rstart, rend, rsize int64
+ _, err = fmt.Sscanf(resp.Header.Get("Content-Range"), "bytes %d-%d/%d", &rstart, &rend, &rsize)
+ if err != nil {
+ return 0, 0, fmt.Errorf("error parsing Content-Range header %q: %s", resp.Header.Get("Content-Range"), err)
+ }
+ if out == nil {
+ return rsize, 0, nil
+ }
+ n, err := io.Copy(out, resp.Body)
+ return rsize, n, err
+}
+