18790: Enable follow mode with -f flag.
[arvados.git] / cmd / arvados-client / container_gateway.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/tls"
11         "flag"
12         "fmt"
13         "io"
14         "net/http"
15         "net/url"
16         "os"
17         "os/exec"
18         "path/filepath"
19         "sort"
20         "strings"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/lib/cmd"
25         "git.arvados.org/arvados.git/lib/controller/rpc"
26         "git.arvados.org/arvados.git/sdk/go/arvados"
27 )
28
29 // logsCommand displays logs from a running container.
30 type logsCommand struct {
31         ac *arvados.Client
32 }
33
34 func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
35         f := flag.NewFlagSet(prog, flag.ContinueOnError)
36         follow := f.Bool("f", false, "follow: poll for new data until the container finishes")
37         pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
38         if ok, code := cmd.ParseFlags(f, prog, args, "container-request-uuid", stderr); !ok {
39                 return code
40         } else if f.NArg() < 1 {
41                 fmt.Fprintf(stderr, "missing required argument: container-request-uuid (try -help)\n")
42                 return 2
43         } else if f.NArg() > 1 {
44                 fmt.Fprintf(stderr, "encountered extra arguments after container-request-uuid (try -help)\n")
45                 return 2
46         }
47         target := f.Args()[0]
48
49         lc.ac = arvados.NewClientFromEnv()
50         lc.ac.Client = &http.Client{}
51         if lc.ac.Insecure {
52                 lc.ac.Client.Transport = &http.Transport{
53                         TLSClientConfig: &tls.Config{
54                                 InsecureSkipVerify: true}}
55         }
56
57         err := lc.tail(target, stdout, stderr, *follow, *pollInterval)
58         if err != nil {
59                 fmt.Fprintln(stderr, err)
60                 return 1
61         }
62         return 0
63 }
64
65 func (lc *logsCommand) tail(crUUID string, stdout, stderr io.Writer, follow bool, pollInterval time.Duration) error {
66         ctx, cancel := context.WithCancel(context.Background())
67         defer cancel()
68
69         rpcconn := rpcFromEnv()
70         err := lc.checkAPISupport(ctx, crUUID)
71         if err != nil {
72                 return err
73         }
74
75         var (
76                 // files to display
77                 watching = []string{"crunch-run.txt", "stderr.txt"}
78                 // fnm => file offset of next byte to display
79                 mark = map[string]int64{}
80                 // fnm => current size of file reported by api
81                 size = map[string]int64{}
82                 // has anything worked? (if so, retry after errors)
83                 anySuccess = false
84                 // container UUID whose logs we are displaying
85                 displayingUUID = ""
86                 // container UUID we last showed in a "connected,
87                 // polling" message
88                 reportedUUID = ""
89         )
90
91         cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
92         if err != nil {
93                 return fmt.Errorf("error retrieving %s: %w", crUUID, err)
94         }
95         displayingUUID = cr.ContainerUUID
96 poll:
97         for delay := pollInterval; ; time.Sleep(delay) {
98                 if cr.ContainerUUID == "" {
99                         return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
100                 }
101                 if delay < pollInterval {
102                         delay = pollInterval
103                 }
104                 // When .../container_requests/{uuid}/log_events is
105                 // implemented, we'll wait here for the next
106                 // server-sent event to tell us some updated file
107                 // sizes. For now, we poll.
108                 for _, fnm := range watching {
109                         currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
110                         if err != nil {
111                                 if !anySuccess {
112                                         return err
113                                 }
114                                 fmt.Fprintln(stderr, err)
115                                 delay = pollInterval
116                                 continue poll
117                         }
118                         if reportedUUID != displayingUUID {
119                                 fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
120                                 reportedUUID = displayingUUID
121                         }
122                         size[fnm] = currentsize
123                         if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
124                                 mark[fnm] = currentsize - 10000
125                         } else if !seen {
126                                 mark[fnm] = 0
127                         } else if currentsize < oldsize {
128                                 // Log collection must have been
129                                 // emptied and reset.
130                                 fmt.Fprintln(stderr, "--- log restarted ---")
131                                 mark = map[string]int64{}
132                                 delay = pollInterval
133                                 continue poll
134                         }
135                 }
136                 newData := map[string]*bytes.Buffer{}
137                 for _, fnm := range watching {
138                         if size[fnm] > mark[fnm] {
139                                 newData[fnm] = &bytes.Buffer{}
140                                 _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
141                                 if err != nil {
142                                         fmt.Fprintln(stderr, err)
143                                 }
144                                 if n > 0 {
145                                         mark[fnm] += n
146                                         anySuccess = true
147                                 }
148                         }
149                 }
150                 checkState := lc.display(stdout, stderr, watching, newData)
151                 if displayingUUID != cr.ContainerUUID {
152                         // A different container had already been
153                         // assigned when we started fetching the
154                         // latest batch of logs. We can now safely
155                         // start displaying logs from the new
156                         // container, without missing any of the
157                         // previous container's logs.
158                         displayingUUID = cr.ContainerUUID
159                         delay = 0
160                         continue
161                 } else if cr.State == arvados.ContainerRequestStateFinal || !follow {
162                         break
163                 } else if len(newData) > 0 {
164                         delay = pollInterval
165                 } else {
166                         delay = delay * 2
167                         if delay > pollInterval*5 {
168                                 delay = pollInterval * 5
169                         }
170                         checkState = true
171                 }
172                 if checkState {
173                         cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
174                         if err != nil {
175                                 if !anySuccess {
176                                         return fmt.Errorf("error retrieving %s: %w", crUUID, err)
177                                 }
178                                 fmt.Fprintln(stderr, err)
179                                 delay = pollInterval
180                                 continue
181                         }
182                 }
183         }
184         return nil
185 }
186
187 func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
188         u := url.URL{
189                 Scheme: "https",
190                 Host:   lc.ac.APIHost,
191                 Path:   "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
192         }
193         return u.String()
194 }
195
196 // Check whether the API is new enough to support the
197 // .../container_requests/{uuid}/log/ endpoint.
198 //
199 // Older versions return 200 for an OPTIONS request at the .../log/
200 // API endpoint, but the response header does not have a "Dav" header.
201 //
202 // Note an error response with no "Dav" header is not taken to
203 // indicate lack of API support. It may come from a new server that
204 // has a configuration or networking problem.
205 func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
206         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
207         defer cancel()
208         req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
209         if err != nil {
210                 return err
211         }
212         req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
213         resp, err := lc.ac.Client.Do(req)
214         if err != nil {
215                 return err
216         }
217         defer resp.Body.Close()
218         if resp.StatusCode == http.StatusOK && resp.Header.Get("Dav") == "" {
219                 return fmt.Errorf("server does not support container logs API (OPTIONS request returned HTTP %s, Dav: %q)", resp.Status, resp.Header.Get("Dav"))
220         }
221         return nil
222 }
223
224 // Retrieve specified byte range (e.g., "12-34", "1234-") from given
225 // fnm and write to out.
226 //
227 // If range is empty ("-0"), out can be nil.
228 //
229 // Return values are current file size, bytes copied, error.
230 //
231 // If the file does not exist, return values are 0, 0, nil.
232 func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
233         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
234         defer cancel()
235         req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
236         if err != nil {
237                 return 0, 0, err
238         }
239         req.Header.Set("Range", "bytes="+byterange)
240         req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
241         resp, err := lc.ac.Client.Do(req)
242         if err != nil {
243                 return 0, 0, err
244         }
245         defer resp.Body.Close()
246         if resp.StatusCode == http.StatusNotFound {
247                 return 0, 0, nil
248         }
249         if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
250                 body, _ := io.ReadAll(io.LimitReader(resp.Body, 10000))
251                 return 0, 0, fmt.Errorf("error getting %s: HTTP %s -- %s", fnm, resp.Status, bytes.TrimSuffix(body, []byte{'\n'}))
252         }
253         var rstart, rend, rsize int64
254         _, err = fmt.Sscanf(resp.Header.Get("Content-Range"), "bytes %d-%d/%d", &rstart, &rend, &rsize)
255         if err != nil {
256                 return 0, 0, fmt.Errorf("error parsing Content-Range header %q: %s", resp.Header.Get("Content-Range"), err)
257         }
258         if out == nil {
259                 return rsize, 0, nil
260         }
261         n, err := io.Copy(out, resp.Body)
262         return rsize, n, err
263 }
264
265 // display some log data, formatted as desired (prefixing each line
266 // with a tag indicating which file it came from, etc.).
267 //
268 // Return value is true if the log data contained a hint that it's a
269 // good time to check whether the container is finished so we can
270 // exit.
271 func (lc *logsCommand) display(out, stderr io.Writer, watching []string, received map[string]*bytes.Buffer) bool {
272         checkState := false
273         var sorted []string
274         for _, fnm := range watching {
275                 buf := received[fnm]
276                 if buf == nil || buf.Len() == 0 {
277                         continue
278                 }
279                 for _, line := range bytes.Split(bytes.TrimSuffix(buf.Bytes(), []byte{'\n'}), []byte{'\n'}) {
280                         sorted = append(sorted, fmt.Sprintf("%-14s %s\n", fnm, line))
281                         if fnm == "crunch-run.txt" {
282                                 checkState = checkState ||
283                                         bytes.HasSuffix(line, []byte("Complete")) ||
284                                         bytes.HasSuffix(line, []byte("Cancelled")) ||
285                                         bytes.HasSuffix(line, []byte("Queued"))
286                         }
287                 }
288         }
289         sort.Slice(sorted, func(i, j int) bool {
290                 return sorted[i][15:] < sorted[j][15:]
291         })
292         for _, s := range sorted {
293                 _, err := fmt.Fprint(out, s)
294                 if err != nil {
295                         fmt.Fprintln(stderr, err)
296                 }
297         }
298         return checkState
299 }
300
301 // shellCommand connects the terminal to an interactive shell on a
302 // running container.
303 type shellCommand struct{}
304
305 func (shellCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
306         f := flag.NewFlagSet(prog, flag.ContinueOnError)
307         detachKeys := f.String("detach-keys", "ctrl-],ctrl-]", "set detach key sequence, as in docker-attach(1)")
308         if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid [ssh-options] [remote-command [args...]]", stderr); !ok {
309                 return code
310         } else if f.NArg() < 1 {
311                 fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
312                 return 2
313         }
314         target := f.Args()[0]
315         if !strings.Contains(target, "@") {
316                 target = "root@" + target
317         }
318         sshargs := f.Args()[1:]
319
320         // Try setting up a tunnel, and exit right away if it
321         // fails. This tunnel won't get used -- we'll set up a new
322         // tunnel when running as SSH client's ProxyCommand child --
323         // but in most cases where the real tunnel setup would fail,
324         // we catch the problem earlier here. This makes it less
325         // likely that an error message about tunnel setup will get
326         // hidden behind noisy errors from SSH client like this:
327         //
328         // [useful tunnel setup error message here]
329         // kex_exchange_identification: Connection closed by remote host
330         // Connection closed by UNKNOWN port 65535
331         // exit status 255
332         //
333         // In case our target is a container request, the probe also
334         // resolves it to a container, so we don't connect to two
335         // different containers in a race.
336         var probetarget bytes.Buffer
337         exitcode := connectSSHCommand{}.RunCommand(
338                 "arvados-client connect-ssh",
339                 []string{"-detach-keys=" + *detachKeys, "-probe-only=true", target},
340                 &bytes.Buffer{}, &probetarget, stderr)
341         if exitcode != 0 {
342                 return exitcode
343         }
344         target = strings.Trim(probetarget.String(), "\n")
345
346         selfbin, err := os.Readlink("/proc/self/exe")
347         if err != nil {
348                 fmt.Fprintln(stderr, err)
349                 return 2
350         }
351         sshargs = append([]string{
352                 "ssh",
353                 "-o", "ProxyCommand " + selfbin + " connect-ssh -detach-keys=" + shellescape(*detachKeys) + " " + shellescape(target),
354                 "-o", "StrictHostKeyChecking no",
355                 target},
356                 sshargs...)
357         sshbin, err := exec.LookPath("ssh")
358         if err != nil {
359                 fmt.Fprintln(stderr, err)
360                 return 1
361         }
362         err = syscall.Exec(sshbin, sshargs, os.Environ())
363         fmt.Fprintf(stderr, "exec(%q) failed: %s\n", sshbin, err)
364         return 1
365 }
366
367 // connectSSHCommand connects stdin/stdout to a container's gateway
368 // server (see lib/crunchrun/ssh.go).
369 //
370 // It is intended to be invoked with OpenSSH client's ProxyCommand
371 // config.
372 type connectSSHCommand struct{}
373
374 func (connectSSHCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
375         f := flag.NewFlagSet(prog, flag.ContinueOnError)
376         f.SetOutput(stderr)
377         f.Usage = func() {
378                 _, prog := filepath.Split(prog)
379                 fmt.Fprint(stderr, prog+`: connect to the gateway service for a running container.
380
381 NOTE: You almost certainly don't want to use this command directly. It
382 is meant to be used internally. Use "arvados-client shell" instead.
383
384 Usage: `+prog+` [options] [username@]container-uuid
385
386 Options:
387 `)
388                 f.PrintDefaults()
389         }
390         probeOnly := f.Bool("probe-only", false, "do not transfer IO, just setup tunnel, print target UUID, and exit")
391         detachKeys := f.String("detach-keys", "", "set detach key sequence, as in docker-attach(1)")
392         if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid", stderr); !ok {
393                 return code
394         } else if f.NArg() != 1 {
395                 fmt.Fprintf(stderr, "missing required argument: [username@]container-uuid\n")
396                 return 2
397         }
398         targetUUID := f.Args()[0]
399         loginUsername := "root"
400         if i := strings.Index(targetUUID, "@"); i >= 0 {
401                 loginUsername = targetUUID[:i]
402                 targetUUID = targetUUID[i+1:]
403         }
404         if os.Getenv("ARVADOS_API_HOST") == "" || os.Getenv("ARVADOS_API_TOKEN") == "" {
405                 fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set")
406                 return 1
407         }
408         rpcconn := rpcFromEnv()
409         targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID)
410         if err != nil {
411                 fmt.Fprintln(stderr, err)
412                 return 1
413         }
414         fmt.Fprintln(stderr, "connecting to container", targetUUID)
415
416         ctx, cancel := context.WithCancel(context.Background())
417         defer cancel()
418         sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
419                 UUID:          targetUUID,
420                 DetachKeys:    *detachKeys,
421                 LoginUsername: loginUsername,
422         })
423         if err != nil {
424                 fmt.Fprintln(stderr, "error setting up tunnel:", err)
425                 return 1
426         }
427         defer sshconn.Conn.Close()
428
429         if *probeOnly {
430                 fmt.Fprintln(stdout, targetUUID)
431                 return 0
432         }
433
434         go func() {
435                 defer cancel()
436                 _, err := io.Copy(stdout, sshconn.Conn)
437                 if err != nil && ctx.Err() == nil {
438                         fmt.Fprintf(stderr, "receive: %v\n", err)
439                 }
440         }()
441         go func() {
442                 defer cancel()
443                 _, err := io.Copy(sshconn.Conn, stdin)
444                 if err != nil && ctx.Err() == nil {
445                         fmt.Fprintf(stderr, "send: %v\n", err)
446                 }
447         }()
448         <-ctx.Done()
449         return 0
450 }
451
452 func shellescape(s string) string {
453         return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
454 }
455
456 func rpcFromEnv() *rpc.Conn {
457         insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
458         return rpc.NewConn("",
459                 &url.URL{
460                         Scheme: "https",
461                         Host:   os.Getenv("ARVADOS_API_HOST"),
462                 },
463                 insecure == "1" || insecure == "yes" || insecure == "true",
464                 func(context.Context) ([]string, error) {
465                         return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
466                 })
467 }
468
469 func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {
470         switch {
471         case strings.Contains(targetUUID, "-dz642-") && len(targetUUID) == 27:
472                 return targetUUID, nil
473         case strings.Contains(targetUUID, "-xvhdp-") && len(targetUUID) == 27:
474                 crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}})
475                 if err != nil {
476                         return "", err
477                 }
478                 if len(crs.Items) < 1 {
479                         return "", fmt.Errorf("container request %q not found", targetUUID)
480                 }
481                 cr := crs.Items[0]
482                 if cr.ContainerUUID == "" {
483                         return "", fmt.Errorf("no container assigned, container request state is %s", strings.ToLower(string(cr.State)))
484                 }
485                 return cr.ContainerUUID, nil
486         default:
487                 return "", fmt.Errorf("target UUID is not a container or container request UUID: %s", targetUUID)
488         }
489 }