1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
24 "git.arvados.org/arvados.git/lib/cmd"
25 "git.arvados.org/arvados.git/lib/controller/rpc"
26 "git.arvados.org/arvados.git/sdk/go/arvados"
29 // logsCommand displays logs from a running container.
30 type logsCommand struct {
34 func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
35 f := flag.NewFlagSet(prog, flag.ContinueOnError)
36 follow := f.Bool("f", false, "follow: poll for new data until the container finishes")
37 pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
38 if ok, code := cmd.ParseFlags(f, prog, args, "container-request-uuid", stderr); !ok {
40 } else if f.NArg() < 1 {
41 fmt.Fprintf(stderr, "missing required argument: container-request-uuid (try -help)\n")
43 } else if f.NArg() > 1 {
44 fmt.Fprintf(stderr, "encountered extra arguments after container-request-uuid (try -help)\n")
49 lc.ac = arvados.NewClientFromEnv()
50 lc.ac.Client = &http.Client{}
52 lc.ac.Client.Transport = &http.Transport{
53 TLSClientConfig: &tls.Config{
54 InsecureSkipVerify: true}}
57 err := lc.tail(target, stdout, stderr, *follow, *pollInterval)
59 fmt.Fprintln(stderr, err)
65 func (lc *logsCommand) tail(crUUID string, stdout, stderr io.Writer, follow bool, pollInterval time.Duration) error {
66 ctx, cancel := context.WithCancel(context.Background())
69 rpcconn := rpcFromEnv()
70 err := lc.checkAPISupport(ctx, crUUID)
77 watching = []string{"crunch-run.txt", "stderr.txt"}
78 // fnm => file offset of next byte to display
79 mark = map[string]int64{}
80 // fnm => current size of file reported by api
81 size = map[string]int64{}
82 // has anything worked? (if so, retry after errors)
84 // container UUID whose logs we are displaying
86 // container UUID we last showed in a "connected,
91 cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
93 return fmt.Errorf("error retrieving %s: %w", crUUID, err)
95 displayingUUID = cr.ContainerUUID
97 for delay := pollInterval; ; time.Sleep(delay) {
98 if cr.ContainerUUID == "" {
99 return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
101 if delay < pollInterval {
104 // When .../container_requests/{uuid}/log_events is
105 // implemented, we'll wait here for the next
106 // server-sent event to tell us some updated file
107 // sizes. For now, we poll.
108 for _, fnm := range watching {
109 currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
114 fmt.Fprintln(stderr, err)
118 if reportedUUID != displayingUUID {
119 fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
120 reportedUUID = displayingUUID
122 size[fnm] = currentsize
123 if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
124 mark[fnm] = currentsize - 10000
127 } else if currentsize < oldsize {
128 // Log collection must have been
129 // emptied and reset.
130 fmt.Fprintln(stderr, "--- log restarted ---")
131 mark = map[string]int64{}
136 newData := map[string]*bytes.Buffer{}
137 for _, fnm := range watching {
138 if size[fnm] > mark[fnm] {
139 newData[fnm] = &bytes.Buffer{}
140 _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
142 fmt.Fprintln(stderr, err)
150 checkState := lc.display(stdout, stderr, watching, newData)
151 if displayingUUID != cr.ContainerUUID {
152 // A different container had already been
153 // assigned when we started fetching the
154 // latest batch of logs. We can now safely
155 // start displaying logs from the new
156 // container, without missing any of the
157 // previous container's logs.
158 displayingUUID = cr.ContainerUUID
161 } else if cr.State == arvados.ContainerRequestStateFinal || !follow {
163 } else if len(newData) > 0 {
167 if delay > pollInterval*5 {
168 delay = pollInterval * 5
173 cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
176 return fmt.Errorf("error retrieving %s: %w", crUUID, err)
178 fmt.Fprintln(stderr, err)
187 func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
191 Path: "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
196 // Check whether the API is new enough to support the
197 // .../container_requests/{uuid}/log/ endpoint.
199 // Older versions return 200 for an OPTIONS request at the .../log/
200 // API endpoint, but the response header does not have a "Dav" header.
202 // Note an error response with no "Dav" header is not taken to
203 // indicate lack of API support. It may come from a new server that
204 // has a configuration or networking problem.
205 func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
206 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
208 req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
212 req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
213 resp, err := lc.ac.Client.Do(req)
217 defer resp.Body.Close()
218 if resp.StatusCode == http.StatusOK && resp.Header.Get("Dav") == "" {
219 return fmt.Errorf("server does not support container logs API (OPTIONS request returned HTTP %s, Dav: %q)", resp.Status, resp.Header.Get("Dav"))
224 // Retrieve specified byte range (e.g., "12-34", "1234-") from given
225 // fnm and write to out.
227 // If range is empty ("-0"), out can be nil.
229 // Return values are current file size, bytes copied, error.
231 // If the file does not exist, return values are 0, 0, nil.
232 func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
233 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
235 req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
239 req.Header.Set("Range", "bytes="+byterange)
240 req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
241 resp, err := lc.ac.Client.Do(req)
245 defer resp.Body.Close()
246 if resp.StatusCode == http.StatusNotFound {
249 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
250 body, _ := io.ReadAll(io.LimitReader(resp.Body, 10000))
251 return 0, 0, fmt.Errorf("error getting %s: HTTP %s -- %s", fnm, resp.Status, bytes.TrimSuffix(body, []byte{'\n'}))
253 var rstart, rend, rsize int64
254 _, err = fmt.Sscanf(resp.Header.Get("Content-Range"), "bytes %d-%d/%d", &rstart, &rend, &rsize)
256 return 0, 0, fmt.Errorf("error parsing Content-Range header %q: %s", resp.Header.Get("Content-Range"), err)
261 n, err := io.Copy(out, resp.Body)
265 // display some log data, formatted as desired (prefixing each line
266 // with a tag indicating which file it came from, etc.).
268 // Return value is true if the log data contained a hint that it's a
269 // good time to check whether the container is finished so we can
271 func (lc *logsCommand) display(out, stderr io.Writer, watching []string, received map[string]*bytes.Buffer) bool {
274 for _, fnm := range watching {
276 if buf == nil || buf.Len() == 0 {
279 for _, line := range bytes.Split(bytes.TrimSuffix(buf.Bytes(), []byte{'\n'}), []byte{'\n'}) {
280 sorted = append(sorted, fmt.Sprintf("%-14s %s\n", fnm, line))
281 if fnm == "crunch-run.txt" {
282 checkState = checkState ||
283 bytes.HasSuffix(line, []byte("Complete")) ||
284 bytes.HasSuffix(line, []byte("Cancelled")) ||
285 bytes.HasSuffix(line, []byte("Queued"))
289 sort.Slice(sorted, func(i, j int) bool {
290 return sorted[i][15:] < sorted[j][15:]
292 for _, s := range sorted {
293 _, err := fmt.Fprint(out, s)
295 fmt.Fprintln(stderr, err)
301 // shellCommand connects the terminal to an interactive shell on a
302 // running container.
303 type shellCommand struct{}
305 func (shellCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
306 f := flag.NewFlagSet(prog, flag.ContinueOnError)
307 detachKeys := f.String("detach-keys", "ctrl-],ctrl-]", "set detach key sequence, as in docker-attach(1)")
308 if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid [ssh-options] [remote-command [args...]]", stderr); !ok {
310 } else if f.NArg() < 1 {
311 fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
314 target := f.Args()[0]
315 if !strings.Contains(target, "@") {
316 target = "root@" + target
318 sshargs := f.Args()[1:]
320 // Try setting up a tunnel, and exit right away if it
321 // fails. This tunnel won't get used -- we'll set up a new
322 // tunnel when running as SSH client's ProxyCommand child --
323 // but in most cases where the real tunnel setup would fail,
324 // we catch the problem earlier here. This makes it less
325 // likely that an error message about tunnel setup will get
326 // hidden behind noisy errors from SSH client like this:
328 // [useful tunnel setup error message here]
329 // kex_exchange_identification: Connection closed by remote host
330 // Connection closed by UNKNOWN port 65535
333 // In case our target is a container request, the probe also
334 // resolves it to a container, so we don't connect to two
335 // different containers in a race.
336 var probetarget bytes.Buffer
337 exitcode := connectSSHCommand{}.RunCommand(
338 "arvados-client connect-ssh",
339 []string{"-detach-keys=" + *detachKeys, "-probe-only=true", target},
340 &bytes.Buffer{}, &probetarget, stderr)
344 target = strings.Trim(probetarget.String(), "\n")
346 selfbin, err := os.Readlink("/proc/self/exe")
348 fmt.Fprintln(stderr, err)
351 sshargs = append([]string{
353 "-o", "ProxyCommand " + selfbin + " connect-ssh -detach-keys=" + shellescape(*detachKeys) + " " + shellescape(target),
354 "-o", "StrictHostKeyChecking no",
357 sshbin, err := exec.LookPath("ssh")
359 fmt.Fprintln(stderr, err)
362 err = syscall.Exec(sshbin, sshargs, os.Environ())
363 fmt.Fprintf(stderr, "exec(%q) failed: %s\n", sshbin, err)
367 // connectSSHCommand connects stdin/stdout to a container's gateway
368 // server (see lib/crunchrun/ssh.go).
370 // It is intended to be invoked with OpenSSH client's ProxyCommand
372 type connectSSHCommand struct{}
374 func (connectSSHCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
375 f := flag.NewFlagSet(prog, flag.ContinueOnError)
378 _, prog := filepath.Split(prog)
379 fmt.Fprint(stderr, prog+`: connect to the gateway service for a running container.
381 NOTE: You almost certainly don't want to use this command directly. It
382 is meant to be used internally. Use "arvados-client shell" instead.
384 Usage: `+prog+` [options] [username@]container-uuid
390 probeOnly := f.Bool("probe-only", false, "do not transfer IO, just setup tunnel, print target UUID, and exit")
391 detachKeys := f.String("detach-keys", "", "set detach key sequence, as in docker-attach(1)")
392 if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid", stderr); !ok {
394 } else if f.NArg() != 1 {
395 fmt.Fprintf(stderr, "missing required argument: [username@]container-uuid\n")
398 targetUUID := f.Args()[0]
399 loginUsername := "root"
400 if i := strings.Index(targetUUID, "@"); i >= 0 {
401 loginUsername = targetUUID[:i]
402 targetUUID = targetUUID[i+1:]
404 if os.Getenv("ARVADOS_API_HOST") == "" || os.Getenv("ARVADOS_API_TOKEN") == "" {
405 fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set")
408 rpcconn := rpcFromEnv()
409 targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID)
411 fmt.Fprintln(stderr, err)
414 fmt.Fprintln(stderr, "connecting to container", targetUUID)
416 ctx, cancel := context.WithCancel(context.Background())
418 sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
420 DetachKeys: *detachKeys,
421 LoginUsername: loginUsername,
424 fmt.Fprintln(stderr, "error setting up tunnel:", err)
427 defer sshconn.Conn.Close()
430 fmt.Fprintln(stdout, targetUUID)
436 _, err := io.Copy(stdout, sshconn.Conn)
437 if err != nil && ctx.Err() == nil {
438 fmt.Fprintf(stderr, "receive: %v\n", err)
443 _, err := io.Copy(sshconn.Conn, stdin)
444 if err != nil && ctx.Err() == nil {
445 fmt.Fprintf(stderr, "send: %v\n", err)
452 func shellescape(s string) string {
453 return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
456 func rpcFromEnv() *rpc.Conn {
457 insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
458 return rpc.NewConn("",
461 Host: os.Getenv("ARVADOS_API_HOST"),
463 insecure == "1" || insecure == "yes" || insecure == "true",
464 func(context.Context) ([]string, error) {
465 return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
469 func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {
471 case strings.Contains(targetUUID, "-dz642-") && len(targetUUID) == 27:
472 return targetUUID, nil
473 case strings.Contains(targetUUID, "-xvhdp-") && len(targetUUID) == 27:
474 crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}})
478 if len(crs.Items) < 1 {
479 return "", fmt.Errorf("container request %q not found", targetUUID)
482 if cr.ContainerUUID == "" {
483 return "", fmt.Errorf("no container assigned, container request state is %s", strings.ToLower(string(cr.State)))
485 return cr.ContainerUUID, nil
487 return "", fmt.Errorf("target UUID is not a container or container request UUID: %s", targetUUID)