21700: Install Bundler system-wide in Rails postinst
[arvados.git] / cmd / arvados-client / container_gateway.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/tls"
11         "flag"
12         "fmt"
13         "io"
14         "net/http"
15         "net/url"
16         "os"
17         "os/exec"
18         "path/filepath"
19         "sort"
20         "strings"
21         "syscall"
22         "time"
23
24         "git.arvados.org/arvados.git/lib/cmd"
25         "git.arvados.org/arvados.git/lib/controller/rpc"
26         "git.arvados.org/arvados.git/sdk/go/arvados"
27 )
28
29 // logsCommand displays logs from a running container.
30 type logsCommand struct {
31         ac *arvados.Client
32 }
33
34 func (lc logsCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
35         f := flag.NewFlagSet(prog, flag.ContinueOnError)
36         follow := f.Bool("f", false, "follow: poll for new data until the container finishes")
37         pollInterval := f.Duration("poll", time.Second*2, "minimum duration to wait before polling for new data")
38         if ok, code := cmd.ParseFlags(f, prog, args, "container-request-uuid", stderr); !ok {
39                 return code
40         } else if f.NArg() < 1 {
41                 fmt.Fprintf(stderr, "missing required argument: container-request-uuid (try -help)\n")
42                 return 2
43         } else if f.NArg() > 1 {
44                 fmt.Fprintf(stderr, "encountered extra arguments after container-request-uuid (try -help)\n")
45                 return 2
46         }
47         target := f.Args()[0]
48
49         lc.ac = arvados.NewClientFromEnv()
50         lc.ac.Client = &http.Client{}
51         if lc.ac.Insecure {
52                 lc.ac.Client.Transport = &http.Transport{
53                         TLSClientConfig: &tls.Config{
54                                 InsecureSkipVerify: true}}
55         }
56
57         err := lc.tail(target, stdout, stderr, *follow, *pollInterval)
58         if err != nil {
59                 fmt.Fprintln(stderr, err)
60                 return 1
61         }
62         return 0
63 }
64
65 func (lc *logsCommand) tail(crUUID string, stdout, stderr io.Writer, follow bool, pollInterval time.Duration) error {
66         ctx, cancel := context.WithCancel(context.Background())
67         defer cancel()
68
69         rpcconn, err := rpcFromEnv()
70         if err != nil {
71                 return err
72         }
73         err = lc.checkAPISupport(ctx, crUUID)
74         if err != nil {
75                 return err
76         }
77
78         var (
79                 // files to display
80                 watching = []string{"crunch-run.txt", "stderr.txt"}
81                 // fnm => file offset of next byte to display
82                 mark = map[string]int64{}
83                 // fnm => current size of file reported by api
84                 size = map[string]int64{}
85                 // has anything worked? (if so, retry after errors)
86                 anySuccess = false
87                 // container UUID whose logs we are displaying
88                 displayingUUID = ""
89                 // container UUID we last showed in a "connected,
90                 // polling" message
91                 reportedUUID = ""
92         )
93
94         cr, err := rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
95         if err != nil {
96                 return fmt.Errorf("error retrieving %s: %w", crUUID, err)
97         }
98         displayingUUID = cr.ContainerUUID
99 poll:
100         for delay := pollInterval; ; time.Sleep(delay) {
101                 if cr.ContainerUUID == "" {
102                         return fmt.Errorf("%s has no assigned container (state is %s)", crUUID, cr.State)
103                 }
104                 if delay < pollInterval {
105                         delay = pollInterval
106                 }
107                 // When .../container_requests/{uuid}/log_events is
108                 // implemented, we'll wait here for the next
109                 // server-sent event to tell us some updated file
110                 // sizes. For now, we poll.
111                 for _, fnm := range watching {
112                         currentsize, _, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, "-0", nil)
113                         if err != nil {
114                                 if !anySuccess {
115                                         return err
116                                 }
117                                 fmt.Fprintln(stderr, err)
118                                 delay = pollInterval
119                                 continue poll
120                         }
121                         if reportedUUID != displayingUUID {
122                                 fmt.Fprintln(stderr, "connected, polling for log data from container", displayingUUID)
123                                 reportedUUID = displayingUUID
124                         }
125                         size[fnm] = currentsize
126                         if oldsize, seen := mark[fnm]; !seen && currentsize > 10000 {
127                                 mark[fnm] = currentsize - 10000
128                         } else if !seen {
129                                 mark[fnm] = 0
130                         } else if currentsize < oldsize {
131                                 // Log collection must have been
132                                 // emptied and reset.
133                                 fmt.Fprintln(stderr, "--- log restarted ---")
134                                 mark = map[string]int64{}
135                                 delay = pollInterval
136                                 continue poll
137                         }
138                 }
139                 newData := map[string]*bytes.Buffer{}
140                 for _, fnm := range watching {
141                         if size[fnm] > mark[fnm] {
142                                 newData[fnm] = &bytes.Buffer{}
143                                 _, n, err := lc.copyRange(ctx, cr.UUID, displayingUUID, fnm, fmt.Sprintf("%d-", mark[fnm]), newData[fnm])
144                                 if err != nil {
145                                         fmt.Fprintln(stderr, err)
146                                 }
147                                 if n > 0 {
148                                         mark[fnm] += n
149                                         anySuccess = true
150                                 }
151                         }
152                 }
153                 checkState := lc.display(stdout, stderr, watching, newData)
154                 if displayingUUID != cr.ContainerUUID {
155                         // A different container had already been
156                         // assigned when we started fetching the
157                         // latest batch of logs. We can now safely
158                         // start displaying logs from the new
159                         // container, without missing any of the
160                         // previous container's logs.
161                         displayingUUID = cr.ContainerUUID
162                         delay = 0
163                         continue
164                 } else if cr.State == arvados.ContainerRequestStateFinal || !follow {
165                         break
166                 } else if len(newData) > 0 {
167                         delay = pollInterval
168                 } else {
169                         delay = delay * 2
170                         if delay > pollInterval*5 {
171                                 delay = pollInterval * 5
172                         }
173                         checkState = true
174                 }
175                 if checkState {
176                         cr, err = rpcconn.ContainerRequestGet(ctx, arvados.GetOptions{UUID: crUUID, Select: []string{"uuid", "container_uuid", "state"}})
177                         if err != nil {
178                                 if !anySuccess {
179                                         return fmt.Errorf("error retrieving %s: %w", crUUID, err)
180                                 }
181                                 fmt.Fprintln(stderr, err)
182                                 delay = pollInterval
183                                 continue
184                         }
185                 }
186         }
187         return nil
188 }
189
190 func (lc *logsCommand) srcURL(crUUID, cUUID, fnm string) string {
191         u := url.URL{
192                 Scheme: "https",
193                 Host:   lc.ac.APIHost,
194                 Path:   "/arvados/v1/container_requests/" + crUUID + "/log/" + cUUID + "/" + fnm,
195         }
196         return u.String()
197 }
198
199 // Check whether the API is new enough to support the
200 // .../container_requests/{uuid}/log/ endpoint.
201 //
202 // Older versions return 200 for an OPTIONS request at the .../log/
203 // API endpoint, but the response header does not have a "Dav" header.
204 //
205 // Note an error response with no "Dav" header is not taken to
206 // indicate lack of API support. It may come from a new server that
207 // has a configuration or networking problem.
208 func (lc *logsCommand) checkAPISupport(ctx context.Context, crUUID string) error {
209         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
210         defer cancel()
211         req, err := http.NewRequestWithContext(ctx, "OPTIONS", strings.TrimSuffix(lc.srcURL(crUUID, "", ""), "/"), nil)
212         if err != nil {
213                 return err
214         }
215         req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
216         resp, err := lc.ac.Client.Do(req)
217         if err != nil {
218                 return err
219         }
220         defer resp.Body.Close()
221         if resp.StatusCode == http.StatusOK && resp.Header.Get("Dav") == "" {
222                 return fmt.Errorf("server does not support container logs API (OPTIONS request returned HTTP %s, Dav: %q)", resp.Status, resp.Header.Get("Dav"))
223         }
224         return nil
225 }
226
227 // Retrieve specified byte range (e.g., "12-34", "1234-") from given
228 // fnm and write to out.
229 //
230 // If range is empty ("-0"), out can be nil.
231 //
232 // Return values are current file size, bytes copied, error.
233 //
234 // If the file does not exist, return values are 0, 0, nil.
235 func (lc *logsCommand) copyRange(ctx context.Context, crUUID, cUUID, fnm, byterange string, out io.Writer) (int64, int64, error) {
236         ctx, cancel := context.WithDeadline(ctx, time.Now().Add(20*time.Second))
237         defer cancel()
238         req, err := http.NewRequestWithContext(ctx, http.MethodGet, lc.srcURL(crUUID, cUUID, fnm), nil)
239         if err != nil {
240                 return 0, 0, err
241         }
242         req.Header.Set("Range", "bytes="+byterange)
243         req.Header.Set("Authorization", "Bearer "+lc.ac.AuthToken)
244         resp, err := lc.ac.Client.Do(req)
245         if err != nil {
246                 return 0, 0, err
247         }
248         defer resp.Body.Close()
249         if resp.StatusCode == http.StatusNotFound {
250                 return 0, 0, nil
251         }
252         if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
253                 body, _ := io.ReadAll(io.LimitReader(resp.Body, 10000))
254                 return 0, 0, fmt.Errorf("error getting %s: HTTP %s -- %s", fnm, resp.Status, bytes.TrimSuffix(body, []byte{'\n'}))
255         }
256         var rstart, rend, rsize int64
257         _, err = fmt.Sscanf(resp.Header.Get("Content-Range"), "bytes %d-%d/%d", &rstart, &rend, &rsize)
258         if err != nil {
259                 return 0, 0, fmt.Errorf("error parsing Content-Range header %q: %s", resp.Header.Get("Content-Range"), err)
260         }
261         if out == nil {
262                 return rsize, 0, nil
263         }
264         n, err := io.Copy(out, resp.Body)
265         return rsize, n, err
266 }
267
268 // display some log data, formatted as desired (prefixing each line
269 // with a tag indicating which file it came from, etc.).
270 //
271 // Return value is true if the log data contained a hint that it's a
272 // good time to check whether the container is finished so we can
273 // exit.
274 func (lc *logsCommand) display(out, stderr io.Writer, watching []string, received map[string]*bytes.Buffer) bool {
275         checkState := false
276         var sorted []string
277         for _, fnm := range watching {
278                 buf := received[fnm]
279                 if buf == nil || buf.Len() == 0 {
280                         continue
281                 }
282                 for _, line := range bytes.Split(bytes.TrimSuffix(buf.Bytes(), []byte{'\n'}), []byte{'\n'}) {
283                         sorted = append(sorted, fmt.Sprintf("%-14s %s\n", fnm, line))
284                         if fnm == "crunch-run.txt" {
285                                 checkState = checkState ||
286                                         bytes.HasSuffix(line, []byte("Complete")) ||
287                                         bytes.HasSuffix(line, []byte("Cancelled")) ||
288                                         bytes.HasSuffix(line, []byte("Queued"))
289                         }
290                 }
291         }
292         sort.Slice(sorted, func(i, j int) bool {
293                 return sorted[i][15:] < sorted[j][15:]
294         })
295         for _, s := range sorted {
296                 _, err := fmt.Fprint(out, s)
297                 if err != nil {
298                         fmt.Fprintln(stderr, err)
299                 }
300         }
301         return checkState
302 }
303
304 // shellCommand connects the terminal to an interactive shell on a
305 // running container.
306 type shellCommand struct{}
307
308 func (shellCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
309         f := flag.NewFlagSet(prog, flag.ContinueOnError)
310         detachKeys := f.String("detach-keys", "ctrl-],ctrl-]", "set detach key sequence, as in docker-attach(1)")
311         if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid [ssh-options] [remote-command [args...]]", stderr); !ok {
312                 return code
313         } else if f.NArg() < 1 {
314                 fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
315                 return 2
316         }
317         target := f.Args()[0]
318         if !strings.Contains(target, "@") {
319                 target = "root@" + target
320         }
321         sshargs := f.Args()[1:]
322
323         // Try setting up a tunnel, and exit right away if it
324         // fails. This tunnel won't get used -- we'll set up a new
325         // tunnel when running as SSH client's ProxyCommand child --
326         // but in most cases where the real tunnel setup would fail,
327         // we catch the problem earlier here. This makes it less
328         // likely that an error message about tunnel setup will get
329         // hidden behind noisy errors from SSH client like this:
330         //
331         // [useful tunnel setup error message here]
332         // kex_exchange_identification: Connection closed by remote host
333         // Connection closed by UNKNOWN port 65535
334         // exit status 255
335         //
336         // In case our target is a container request, the probe also
337         // resolves it to a container, so we don't connect to two
338         // different containers in a race.
339         var probetarget bytes.Buffer
340         exitcode := connectSSHCommand{}.RunCommand(
341                 "arvados-client connect-ssh",
342                 []string{"-detach-keys=" + *detachKeys, "-probe-only=true", target},
343                 &bytes.Buffer{}, &probetarget, stderr)
344         if exitcode != 0 {
345                 return exitcode
346         }
347         target = strings.Trim(probetarget.String(), "\n")
348
349         selfbin, err := os.Readlink("/proc/self/exe")
350         if err != nil {
351                 fmt.Fprintln(stderr, err)
352                 return 2
353         }
354         sshargs = append([]string{
355                 "ssh",
356                 "-o", "ProxyCommand " + selfbin + " connect-ssh -detach-keys=" + shellescape(*detachKeys) + " " + shellescape(target),
357                 "-o", "StrictHostKeyChecking no",
358                 target},
359                 sshargs...)
360         sshbin, err := exec.LookPath("ssh")
361         if err != nil {
362                 fmt.Fprintln(stderr, err)
363                 return 1
364         }
365         err = syscall.Exec(sshbin, sshargs, os.Environ())
366         fmt.Fprintf(stderr, "exec(%q) failed: %s\n", sshbin, err)
367         return 1
368 }
369
370 // connectSSHCommand connects stdin/stdout to a container's gateway
371 // server (see lib/crunchrun/ssh.go).
372 //
373 // It is intended to be invoked with OpenSSH client's ProxyCommand
374 // config.
375 type connectSSHCommand struct{}
376
377 func (connectSSHCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
378         f := flag.NewFlagSet(prog, flag.ContinueOnError)
379         f.SetOutput(stderr)
380         f.Usage = func() {
381                 _, prog := filepath.Split(prog)
382                 fmt.Fprint(stderr, prog+`: connect to the gateway service for a running container.
383
384 NOTE: You almost certainly don't want to use this command directly. It
385 is meant to be used internally. Use "arvados-client shell" instead.
386
387 Usage: `+prog+` [options] [username@]container-uuid
388
389 Options:
390 `)
391                 f.PrintDefaults()
392         }
393         probeOnly := f.Bool("probe-only", false, "do not transfer IO, just setup tunnel, print target UUID, and exit")
394         detachKeys := f.String("detach-keys", "", "set detach key sequence, as in docker-attach(1)")
395         if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid", stderr); !ok {
396                 return code
397         } else if f.NArg() != 1 {
398                 fmt.Fprintf(stderr, "missing required argument: [username@]container-uuid\n")
399                 return 2
400         }
401         targetUUID := f.Args()[0]
402         loginUsername := "root"
403         if i := strings.Index(targetUUID, "@"); i >= 0 {
404                 loginUsername = targetUUID[:i]
405                 targetUUID = targetUUID[i+1:]
406         }
407         rpcconn, err := rpcFromEnv()
408         if err != nil {
409                 fmt.Fprintln(stderr, err)
410                 return 1
411         }
412         targetUUID, err = resolveToContainerUUID(rpcconn, targetUUID)
413         if err != nil {
414                 fmt.Fprintln(stderr, err)
415                 return 1
416         }
417         fmt.Fprintln(stderr, "connecting to container", targetUUID)
418
419         ctx, cancel := context.WithCancel(context.Background())
420         defer cancel()
421         sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
422                 UUID:          targetUUID,
423                 DetachKeys:    *detachKeys,
424                 LoginUsername: loginUsername,
425         })
426         if err != nil {
427                 fmt.Fprintln(stderr, "error setting up tunnel:", err)
428                 return 1
429         }
430         defer sshconn.Conn.Close()
431
432         if *probeOnly {
433                 fmt.Fprintln(stdout, targetUUID)
434                 return 0
435         }
436
437         go func() {
438                 defer cancel()
439                 _, err := io.Copy(stdout, sshconn.Conn)
440                 if err != nil && ctx.Err() == nil {
441                         fmt.Fprintf(stderr, "receive: %v\n", err)
442                 }
443         }()
444         go func() {
445                 defer cancel()
446                 _, err := io.Copy(sshconn.Conn, stdin)
447                 if err != nil && ctx.Err() == nil {
448                         fmt.Fprintf(stderr, "send: %v\n", err)
449                 }
450         }()
451         <-ctx.Done()
452         return 0
453 }
454
455 func shellescape(s string) string {
456         return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
457 }
458
459 func rpcFromEnv() (*rpc.Conn, error) {
460         ac := arvados.NewClientFromEnv()
461         if ac.APIHost == "" || ac.AuthToken == "" {
462                 return nil, fmt.Errorf("fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set, and ~/.config/arvados/settings.conf is not readable")
463         }
464         return rpc.NewConn("",
465                 &url.URL{
466                         Scheme: "https",
467                         Host:   ac.APIHost,
468                 },
469                 ac.Insecure,
470                 func(context.Context) ([]string, error) {
471                         return []string{ac.AuthToken}, nil
472                 }), nil
473 }
474
475 func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {
476         switch {
477         case strings.Contains(targetUUID, "-dz642-") && len(targetUUID) == 27:
478                 return targetUUID, nil
479         case strings.Contains(targetUUID, "-xvhdp-") && len(targetUUID) == 27:
480                 crs, err := rpcconn.ContainerRequestList(context.TODO(), arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"uuid", "=", targetUUID}}})
481                 if err != nil {
482                         return "", err
483                 }
484                 if len(crs.Items) < 1 {
485                         return "", fmt.Errorf("container request %q not found", targetUUID)
486                 }
487                 cr := crs.Items[0]
488                 if cr.ContainerUUID == "" {
489                         return "", fmt.Errorf("no container assigned, container request state is %s", strings.ToLower(string(cr.State)))
490                 }
491                 return cr.ContainerUUID, nil
492         default:
493                 return "", fmt.Errorf("target UUID is not a container or container request UUID: %s", targetUUID)
494         }
495 }