"os"
"os/exec"
"path/filepath"
+ "sort"
"strings"
"syscall"
"time"
func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
f := flag.NewFlagSet(prog, flag.ContinueOnError)
+ follow := f.Bool("f", false, "follow: poll for new data until the container finishes")
pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
- if ok, code := cmd.ParseFlags(f, prog, args, "container-uuid", stderr); !ok {
+ if ok, code := cmd.ParseFlags(f, prog, args, "container-request-uuid", stderr); !ok {
return code
} else if f.NArg() < 1 {
- fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+ fmt.Fprintf(stderr, "missing required argument: container-request-uuid (try -help)\n")
return 2
} else if f.NArg() > 1 {
- fmt.Fprintf(stderr, "encountered extra arguments after container-uuid (try -help)\n")
+ fmt.Fprintf(stderr, "encountered extra arguments after container-request-uuid (try -help)\n")
return 2
}
target := f.Args()[0]
InsecureSkipVerify: true}}
}
- err := lc.tailf(target, stdout, stderr, *pollInterval)
+ err := lc.tail(target, stdout, stderr, *follow, *pollInterval)
if err != nil {
fmt.Fprintln(stderr, err)
return 1
return 0
}
-func (lc *logsCommand) tailf(target string, stdout, stderr io.Writer, pollInterval time.Duration) error {
+func (lc *logsCommand) tail(crUUID string, stdout, stderr io.Writer, follow bool, pollInterval time.Duration) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- rpcconn := rpcFromEnv()
- ctrUUID, err := resolveToContainerUUID(rpcconn, target)
+ rpcconn, err := rpcFromEnv()
if err != nil {
return err
}
- err = lc.checkAPISupport(ctx, ctrUUID)
+ err = lc.checkAPISupport(ctx, crUUID)
if err != nil {
return err
}
- fmt.Fprintln(stderr, "connecting to container", ctrUUID)
var (
// files to display
mark = map[string]int64{}
// fnm => current size of file reported by api
size = map[string]int64{}
- // exit after fetching next log chunk
- containerFinished = false
// has anything worked? (if so, retry after errors)
anySuccess = false
+ // container UUID whose logs we are displaying
+ displayingUUID = ""
+ // container UUID we last showed in a "connected,
+ // polling" message
+ reportedUUID = ""
)
+ cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
+ if err != nil {
+ return fmt.Errorf("error retrieving %s: %w", crUUID, err)
+ }
+ displayingUUID = cr.ContainerUUID
poll:
for delay := pollInterval; ; time.Sleep(delay) {
- // When /arvados/v1/containers/{uuid}/log_events is
+ if cr.ContainerUUID == "" {
+ return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
+ }
+ if delay < pollInterval {
+ delay = pollInterval
+ }
+ // When .../container_requests/{uuid}/log_events is
// implemented, we'll wait here for the next
// server-sent event to tell us some updated file
// sizes. For now, we poll.
for _, fnm := range watching {
- currentsize, _, err := lc.copyRange(ctx, ctrUUID, fnm, "-0", nil)
+ currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
if err != nil {
if !anySuccess {
return err
delay = pollInterval
continue poll
}
+ if reportedUUID != displayingUUID {
+ fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
+ reportedUUID = displayingUUID
+ }
size[fnm] = currentsize
if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
mark[fnm] = currentsize - 10000
// Log collection must have been
// emptied and reset.
fmt.Fprintln(stderr, "--- log restarted ---")
- for fnm := range mark {
- delete(mark, fnm)
- }
+ mark = map[string]int64{}
delay = pollInterval
continue poll
}
for _, fnm := range watching {
if size[fnm] > mark[fnm] {
newData[fnm] = &bytes.Buffer{}
- _, n, err := lc.copyRange(ctx, ctrUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
+ _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
if err != nil {
fmt.Fprintln(stderr, err)
}
}
}
checkState := lc.display(stdout, stderr, watching, newData)
- if containerFinished {
- // If the caller specified a container request
- // UUID and the container we were watching has
- // been replaced by a new one, start watching
- // logs from the new one. Otherwise, we're
- // done.
- if target == ctrUUID {
- // caller specified container UUID
- return nil
- }
- newUUID, err := resolveToContainerUUID(rpcconn, target)
- if err != nil {
- return err
- }
- if newUUID == ctrUUID {
- // no further attempts
- return nil
- }
- ctrUUID = newUUID
- containerFinished = false
+ if displayingUUID != cr.ContainerUUID {
+ // A different container had already been
+ // assigned when we started fetching the
+ // latest batch of logs. We can now safely
+ // start displaying logs from the new
+ // container, without missing any of the
+ // previous container's logs.
+ displayingUUID = cr.ContainerUUID
delay = 0
continue
- }
- if len(newData) > 0 {
+ } else if cr.State == arvados.ContainerRequestStateFinal || !follow {
+ break
+ } else if len(newData) > 0 {
delay = pollInterval
- }
- if len(newData) == 0 || checkState {
+ } else {
delay = delay * 2
if delay > pollInterval*5 {
delay = pollInterval * 5
}
- ctr, err := rpcconn.ContainerGet(ctx, arvados.GetOptions{UUID: ctrUUID, Select: []string{"state"}})
+ checkState = true
+ }
+ if checkState {
+ cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
if err != nil {
if !anySuccess {
- return err
+ return fmt.Errorf("error retrieving %s: %w", crUUID, err)
}
fmt.Fprintln(stderr, err)
delay = pollInterval
continue
}
- if ctr.State == arvados.ContainerStateCancelled || ctr.State == arvados.ContainerStateComplete {
- containerFinished = true
- delay = 0
- }
}
}
return nil
}
-func (lc *logsCommand) srcURL(uuid, fnm string) string {
+func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
u := url.URL{
Scheme: "https",
Host: lc.ac.APIHost,
- Path: "/arvados/v1/containers/" + uuid + "/log/" + fnm,
+ Path: "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
}
return u.String()
}
// Check whether the API is new enough to support the
-// .../containers/{uuid}/log endpoint.
+// .../container_requests/{uuid}/log/ endpoint.
//
-// Older versions return 200 for an OPTIONS request at the .../logs/
+// Older versions return 200 for an OPTIONS request at the .../log/
// API endpoint, but the response header does not have a "Dav" header.
//
// Note an error response with no "Dav" header is not taken to
// indicate lack of API support. It may come from a new server that
// has a configuration or networking problem.
-func (lc *logsCommand) checkAPISupport(ctx context.Context, uuid string) error {
+func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
defer cancel()
- req, err := http.NewRequestWithContext(ctx, "OPTIONS", lc.srcURL(uuid, ""), nil)
+ req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
if err != nil {
return err
}
// Return values are current file size, bytes copied, error.
//
// If the file does not exist, return values are 0, 0, nil.
-func (lc *logsCommand) copyRange(ctx context.Context, uuid, fnm, byterange string, out io.Writer) (int64, int64, error) {
+func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
defer cancel()
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(uuid, fnm), nil)
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
if err != nil {
return 0, 0, err
}
// exit.
func (lc *logsCommand) display(out, stderr io.Writer, watching []string, received map[string]*bytes.Buffer) bool {
checkState := false
+ var sorted []string
for _, fnm := range watching {
buf := received[fnm]
if buf == nil || buf.Len() == 0 {
continue
}
for _, line := range bytes.Split(bytes.TrimSuffix(buf.Bytes(), []byte{'\n'}), []byte{'\n'}) {
- _, err := fmt.Fprintf(out, "%-14s %s\n", fnm, line)
- if err != nil {
- fmt.Fprintln(stderr, err)
- }
+ sorted = append(sorted, fmt.Sprintf("%-14s %s\n", fnm, line))
if fnm == "crunch-run.txt" {
checkState = checkState ||
bytes.HasSuffix(line, []byte("Complete")) ||
}
}
}
+ sort.Slice(sorted, func(i, j int) bool {
+ return sorted[i][15:] < sorted[j][15:]
+ })
+ for _, s := range sorted {
+ _, err := fmt.Fprint(out, s)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ }
+ }
return checkState
}
loginUsername = targetUUID[:i]
targetUUID = targetUUID[i+1:]
}
- if os.Getenv("ARVADOS_API_HOST") == "" || os.Getenv("ARVADOS_API_TOKEN") == "" {
- fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set")
+ rpcconn, err := rpcFromEnv()
+ if err != nil {
+ fmt.Fprintln(stderr, err)
return 1
}
- rpcconn := rpcFromEnv()
- targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID)
+ targetUUID, err = resolveToContainerUUID(rpcconn, targetUUID)
if err != nil {
fmt.Fprintln(stderr, err)
return 1
return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
}
-func rpcFromEnv() *rpc.Conn {
- insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
+func rpcFromEnv() (*rpc.Conn, error) {
+ ac := arvados.NewClientFromEnv()
+ if ac.APIHost == "" || ac.AuthToken == "" {
+ return nil, fmt.Errorf("fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set, and ~/.config/arvados/settings.conf is not readable")
+ }
return rpc.NewConn("",
&url.URL{
Scheme: "https",
- Host: os.Getenv("ARVADOS_API_HOST"),
+ Host: ac.APIHost,
},
- insecure == "1" || insecure == "yes" || insecure == "true",
+ ac.Insecure,
func(context.Context) ([]string, error) {
- return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
- })
+ return []string{ac.AuthToken}, nil
+ }), nil
}
func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {