18790: Move to container_requests/*/log endpoint.
authorTom Clegg <tom@curii.com>
Thu, 20 Apr 2023 13:26:52 +0000 (09:26 -0400)
committerTom Clegg <tom@curii.com>
Tue, 25 Apr 2023 14:49:59 +0000 (10:49 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

cmd/arvados-client/container_gateway.go
cmd/arvados-client/container_gateway_test.go

index 9436959afd8cdadf2a949c7b6f73110d6bc49c0b..08f37d56419d65d20f3a633149c26d745330e648 100644 (file)
@@ -61,19 +61,12 @@ func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, st
        return 0
 }
 
-func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error {
+func (lc *logsCommand) tailf(crUUID string, stdout, stderr io.Writer, pollInterval time.Duration) error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
        rpcconn := rpcFromEnv()
-       ctrUUID, err := resolveToContainerUUID(rpcconn, target)
-       if err != nil {
-               return err
-       }
-       if ctrUUID != target {
-               fmt.Fprintln(stderr, "target container UUID is", ctrUUID)
-       }
-       err = lc.checkAPISupport(ctx, ctrUUID)
+       err := lc.checkAPISupport(ctx, crUUID)
        if err != nil {
                return err
        }
@@ -85,23 +78,34 @@ func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterv
                mark = map[string]int64{}
                // fnm => current size of file reported by api
                size = map[string]int64{}
-               // exit after fetching next log chunk
-               containerFinished = false
                // has anything worked? (if so, retry after errors)
                anySuccess = false
-               // container UUID that we most recently displayed in a
-               // "connected, polling" message (if any)
-               reportedConnection = ""
+               // container UUID whose logs we are displaying
+               displayingUUID = ""
+               // container UUID we last showed in a "connected,
+               // polling" message
+               reportedUUID = ""
        )
 
+       cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
+       if err != nil {
+               return fmt.Errorf("error retrieving %s: %w", crUUID, err)
+       }
+       displayingUUID = cr.ContainerUUID
 poll:
        for delay := pollInterval; ; time.Sleep(delay) {
-               // When /arvados/v1/containers/{uuid}/log_events is
+               if cr.ContainerUUID == "" {
+                       return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
+               }
+               if delay < pollInterval {
+                       delay = pollInterval
+               }
+               // When .../container_requests/{uuid}/log_events is
                // implemented, we'll wait here for the next
                // server-sent event to tell us some updated file
                // sizes. For now, we poll.
                for _, fnm := range watching {
-                       currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "-0", nil)
+                       currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
                        if err != nil {
                                if !anySuccess {
                                        return err
@@ -110,9 +114,9 @@ poll:
                                delay = pollInterval
                                continue poll
                        }
-                       if reportedConnection != ctrUUID {
-                               reportedConnection = ctrUUID
-                               fmt.Fprintln(stderr, "connected, polling for log data from container", ctrUUID)
+                       if reportedUUID != displayingUUID {
+                               fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
+                               reportedUUID = displayingUUID
                        }
                        size[fnm] = currentsize
                        if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
@@ -123,9 +127,7 @@ poll:
                                // Log collection must have been
                                // emptied and reset.
                                fmt.Fprintln(stderr, "--- log restarted ---")
-                               for fnm := range mark {
-                                       delete(mark, fnm)
-                               }
+                               mark = map[string]int64{}
                                delay = pollInterval
                                continue poll
                        }
@@ -134,7 +136,7 @@ poll:
                for _, fnm := range watching {
                        if size[fnm] > mark[fnm] {
                                newData[fnm] = &bytes.Buffer{}
-                               _, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
+                               _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
                                if err != nil {
                                        fmt.Fprintln(stderr, err)
                                }
@@ -145,77 +147,64 @@ poll:
                        }
                }
                checkState := lc.display(stdout, stderr, watching, newData)
-               if containerFinished {
-                       // If the caller specified a container request
-                       // UUID and the container we were watching has
-                       // been replaced by a new one, start watching
-                       // logs from the new one. Otherwise, we're
-                       // done.
-                       if target == ctrUUID {
-                               // caller specified container UUID
-                               return nil
-                       }
-                       newUUID, err := resolveToContainerUUID(rpcconn, target)
-                       if err != nil {
-                               return err
-                       }
-                       if newUUID == ctrUUID {
-                               // no further attempts
-                               return nil
-                       }
-                       ctrUUID = newUUID
-                       containerFinished = false
+               if displayingUUID != cr.ContainerUUID {
+                       // A different container had already been
+                       // assigned when we started fetching the
+                       // latest batch of logs. We can now safely
+                       // start displaying logs from the new
+                       // container, without missing any of the
+                       // previous container's logs.
+                       displayingUUID = cr.ContainerUUID
                        delay = 0
                        continue
-               }
-               if len(newData) > 0 {
+               } else if cr.State == arvados.ContainerRequestStateFinal {
+                       break
+               } else if len(newData) > 0 {
                        delay = pollInterval
-               }
-               if len(newData) == 0 || checkState {
+               } else {
                        delay = delay * 2
                        if delay > pollInterval*5 {
                                delay = pollInterval * 5
                        }
-                       ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}})
+                       checkState = true
+               }
+               if checkState {
+                       cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
                        if err != nil {
                                if !anySuccess {
-                                       return err
+                                       return fmt.Errorf("error retrieving %s: %w", crUUID, err)
                                }
                                fmt.Fprintln(stderr, err)
                                delay = pollInterval
                                continue
                        }
-                       if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete {
-                               containerFinished = true
-                               delay = 0
-                       }
                }
        }
        return nil
 }
 
-func (lc *logsCommand) srcURL(uuid, fnm string) string {
+func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
        u := url.URL{
                Scheme: "https",
                Host:   lc.ac.APIHost,
-               Path:   "/arvados/v1/containers/" + uuid + "/log/" + fnm,
+               Path:   "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
        }
        return u.String()
 }
 
 // Check whether the API is new enough to support the
-// .../containers/{uuid}/log endpoint.
+// .../container_requests/{uuid}/log/ endpoint.
 //
-// Older versions return 200 for an OPTIONS request at the .../logs/
+// Older versions return 200 for an OPTIONS request at the .../log/
 // API endpoint, but the response header does not have a "Dav" header.
 //
 // Note an error response with no "Dav" header is not taken to
 // indicate lack of API support. It may come from a new server that
 // has a configuration or networking problem.
-func (lc *logsCommand) checkAPISupport(ctx context.Context, uuid string) error {
+func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
        ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
        defer cancel()
-       req, err := http.NewRequestWithContext(ctx, "OPTIONS", lc.srcURL(uuid, ""), nil)
+       req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
        if err != nil {
                return err
        }
@@ -239,10 +228,10 @@ func (lc *logsCommand) checkAPISupport(ctx context.Context, uuid string) error {
 // Return values are current file size, bytes copied, error.
 //
 // If the file does not exist, return values are 0, 0, nil.
-func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) {
+func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
        ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
        defer cancel()
-       req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(uuid, fnm), nil)
+       req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
        if err != nil {
                return 0, 0, err
        }
index 7c2c7d121a910bb861d5c11cd4ef675e5934e3da..25c794e83d5e81df9e756dc641531579ff510db3 100644 (file)
@@ -182,7 +182,7 @@ func (s *ClientSuite) TestShellGateway(c *check.C) {
        wg.Wait()
 }
 
-func (s *ClientSuite) TestContainerLog(c *check.C) {
+func (s *ClientSuite) TestContainerRequestLog(c *check.C) {
        arvadostest.StartKeep(2, true)
        ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
        defer cancel()