Merge branch '20610-installer-load-balancer'. Refs #20610
[arvados.git] / cmd / arvados-client / container_gateway_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/hmac"
11         "crypto/sha256"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net"
16         "net/http"
17         "net/url"
18         "os"
19         "os/exec"
20         "strings"
21         "sync"
22         "syscall"
23         "time"
24
25         "git.arvados.org/arvados.git/lib/controller/rpc"
26         "git.arvados.org/arvados.git/lib/crunchrun"
27         "git.arvados.org/arvados.git/sdk/go/arvados"
28         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
29         "git.arvados.org/arvados.git/sdk/go/arvadostest"
30         "git.arvados.org/arvados.git/sdk/go/ctxlog"
31         "git.arvados.org/arvados.git/sdk/go/httpserver"
32         "git.arvados.org/arvados.git/sdk/go/keepclient"
33         check "gopkg.in/check.v1"
34 )
35
36 func (s *ClientSuite) TestShellGatewayNotAvailable(c *check.C) {
37         var stdout, stderr bytes.Buffer
38         cmd := exec.Command("go", "run", ".", "shell", arvadostest.QueuedContainerUUID, "-o", "controlpath=none", "echo", "ok")
39         cmd.Env = append(cmd.Env, os.Environ()...)
40         cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
41         cmd.Stdout = &stdout
42         cmd.Stderr = &stderr
43         c.Check(cmd.Run(), check.NotNil)
44         c.Log(stderr.String())
45         c.Check(stderr.String(), check.Matches, `(?ms).*container is not running yet \(state is "Queued"\).*`)
46 }
47
48 func (s *ClientSuite) TestShellGateway(c *check.C) {
49         defer func() {
50                 c.Check(arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil), check.IsNil)
51         }()
52         uuid := arvadostest.QueuedContainerUUID
53         h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken))
54         fmt.Fprint(h, uuid)
55         authSecret := fmt.Sprintf("%x", h.Sum(nil))
56         gw := crunchrun.Gateway{
57                 ContainerUUID: uuid,
58                 Address:       "0.0.0.0:0",
59                 AuthSecret:    authSecret,
60                 Log:           ctxlog.TestLogger(c),
61                 // Just forward connections to localhost instead of a
62                 // container, so we can test without running a
63                 // container.
64                 Target: crunchrun.GatewayTargetStub{},
65         }
66         err := gw.Start()
67         c.Assert(err, check.IsNil)
68
69         rpcconn := rpc.NewConn("",
70                 &url.URL{
71                         Scheme: "https",
72                         Host:   os.Getenv("ARVADOS_API_HOST"),
73                 },
74                 true,
75                 func(context.Context) ([]string, error) {
76                         return []string{arvadostest.SystemRootToken}, nil
77                 })
78         _, err = rpcconn.ContainerUpdate(context.TODO(), arvados.UpdateOptions{UUID: uuid, Attrs: map[string]interface{}{
79                 "state": arvados.ContainerStateLocked,
80         }})
81         c.Assert(err, check.IsNil)
82         _, err = rpcconn.ContainerUpdate(context.TODO(), arvados.UpdateOptions{UUID: uuid, Attrs: map[string]interface{}{
83                 "state":           arvados.ContainerStateRunning,
84                 "gateway_address": gw.Address,
85         }})
86         c.Assert(err, check.IsNil)
87
88         var stdout, stderr bytes.Buffer
89         cmd := exec.Command("go", "run", ".", "shell", uuid, "-o", "controlpath=none", "-o", "userknownhostsfile="+c.MkDir()+"/known_hosts", "echo", "ok")
90         cmd.Env = append(cmd.Env, os.Environ()...)
91         cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
92         cmd.Stdout = &stdout
93         cmd.Stderr = &stderr
94         stdin, err := cmd.StdinPipe()
95         c.Assert(err, check.IsNil)
96         go fmt.Fprintln(stdin, "data appears on stdin, but stdin does not close; cmd should exit anyway, not hang")
97         time.AfterFunc(5*time.Second, func() {
98                 c.Errorf("timed out -- remote end is probably hung waiting for us to close stdin")
99                 stdin.Close()
100         })
101         c.Check(cmd.Run(), check.IsNil)
102         c.Check(stdout.String(), check.Equals, "ok\n")
103
104         // Set up an http server, and try using "arvados-client shell"
105         // to forward traffic to it.
106         httpTarget := &httpserver.Server{}
107         httpTarget.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
108                 c.Logf("httpTarget.Handler: incoming request: %s %s", r.Method, r.URL)
109                 if r.URL.Path == "/foo" {
110                         fmt.Fprintln(w, "bar baz")
111                 } else {
112                         w.WriteHeader(http.StatusNotFound)
113                 }
114         })
115         err = httpTarget.Start()
116         c.Assert(err, check.IsNil)
117
118         ln, err := net.Listen("tcp", ":0")
119         c.Assert(err, check.IsNil)
120         _, forwardedPort, _ := net.SplitHostPort(ln.Addr().String())
121         ln.Close()
122
123         stdout.Reset()
124         stderr.Reset()
125         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
126         defer cancel()
127         cmd = exec.CommandContext(ctx,
128                 "go", "run", ".", "shell", uuid,
129                 "-L", forwardedPort+":"+httpTarget.Addr,
130                 "-o", "controlpath=none",
131                 "-o", "userknownhostsfile="+c.MkDir()+"/known_hosts",
132                 "-N",
133         )
134         c.Logf("cmd.Args: %s", cmd.Args)
135         cmd.Env = append(cmd.Env, os.Environ()...)
136         cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
137         cmd.Stdout = &stdout
138         cmd.Stderr = &stderr
139         cmd.Start()
140
141         forwardedURL := fmt.Sprintf("http://localhost:%s/foo", forwardedPort)
142
143         for range time.NewTicker(time.Second / 20).C {
144                 resp, err := http.Get(forwardedURL)
145                 if err != nil {
146                         if !strings.Contains(err.Error(), "connect") {
147                                 c.Fatal(err)
148                         } else if ctx.Err() != nil {
149                                 if cmd.Process.Signal(syscall.Signal(0)) != nil {
150                                         c.Error("OpenSSH exited")
151                                 } else {
152                                         c.Errorf("timed out trying to connect: %s", err)
153                                 }
154                                 c.Logf("OpenSSH stdout:\n%s", stdout.String())
155                                 c.Logf("OpenSSH stderr:\n%s", stderr.String())
156                                 c.FailNow()
157                         }
158                         // Retry until OpenSSH starts listening
159                         continue
160                 }
161                 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
162                 body, err := ioutil.ReadAll(resp.Body)
163                 c.Check(err, check.IsNil)
164                 c.Check(string(body), check.Equals, "bar baz\n")
165                 break
166         }
167
168         var wg sync.WaitGroup
169         for i := 0; i < 10; i++ {
170                 wg.Add(1)
171                 go func() {
172                         defer wg.Done()
173                         resp, err := http.Get(forwardedURL)
174                         if !c.Check(err, check.IsNil) {
175                                 return
176                         }
177                         body, err := ioutil.ReadAll(resp.Body)
178                         c.Check(err, check.IsNil)
179                         c.Check(string(body), check.Equals, "bar baz\n")
180                 }()
181         }
182         wg.Wait()
183 }
184
185 func (s *ClientSuite) TestContainerRequestLog(c *check.C) {
186         arvadostest.StartKeep(2, true)
187         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
188         defer cancel()
189
190         rpcconn := rpc.NewConn("",
191                 &url.URL{
192                         Scheme: "https",
193                         Host:   os.Getenv("ARVADOS_API_HOST"),
194                 },
195                 true,
196                 func(context.Context) ([]string, error) {
197                         return []string{arvadostest.SystemRootToken}, nil
198                 })
199         imageColl, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
200                 "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar\n",
201         }})
202         c.Assert(err, check.IsNil)
203         c.Logf("imageColl %+v", imageColl)
204         cr, err := rpcconn.ContainerRequestCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
205                 "state":           "Committed",
206                 "command":         []string{"echo", fmt.Sprintf("%d", time.Now().Unix())},
207                 "container_image": imageColl.PortableDataHash,
208                 "cwd":             "/",
209                 "output_path":     "/",
210                 "priority":        1,
211                 "runtime_constraints": arvados.RuntimeConstraints{
212                         VCPUs: 1,
213                         RAM:   1000000000,
214                 },
215                 "container_count_max": 1,
216         }})
217         c.Assert(err, check.IsNil)
218         h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken))
219         fmt.Fprint(h, cr.ContainerUUID)
220         authSecret := fmt.Sprintf("%x", h.Sum(nil))
221
222         coll := arvados.Collection{}
223         client := arvados.NewClientFromEnv()
224         ac, err := arvadosclient.New(client)
225         c.Assert(err, check.IsNil)
226         kc, err := keepclient.MakeKeepClient(ac)
227         c.Assert(err, check.IsNil)
228         cfs, err := coll.FileSystem(client, kc)
229         c.Assert(err, check.IsNil)
230
231         c.Log("running logs command on queued container")
232         var stdout, stderr bytes.Buffer
233         cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", "-f", "-poll=250ms", cr.UUID)
234         cmd.Env = append(cmd.Env, os.Environ()...)
235         cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken)
236         cmd.Stdout = io.MultiWriter(&stdout, os.Stderr)
237         cmd.Stderr = io.MultiWriter(&stderr, os.Stderr)
238         err = cmd.Start()
239         c.Assert(err, check.Equals, nil)
240
241         c.Log("changing container state to Locked")
242         _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
243                 "state": arvados.ContainerStateLocked,
244         }})
245         c.Assert(err, check.IsNil)
246         c.Log("starting gateway")
247         gw := crunchrun.Gateway{
248                 ContainerUUID: cr.ContainerUUID,
249                 Address:       "0.0.0.0:0",
250                 AuthSecret:    authSecret,
251                 Log:           ctxlog.TestLogger(c),
252                 Target:        crunchrun.GatewayTargetStub{},
253                 LogCollection: cfs,
254         }
255         err = gw.Start()
256         c.Assert(err, check.IsNil)
257         c.Log("updating container gateway address")
258         _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
259                 "gateway_address": gw.Address,
260                 "state":           arvados.ContainerStateRunning,
261         }})
262         c.Assert(err, check.IsNil)
263
264         const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
265         fCrunchrun, err := cfs.OpenFile("crunch-run.txt", os.O_CREATE|os.O_WRONLY, 0777)
266         c.Assert(err, check.IsNil)
267         _, err = fmt.Fprintf(fCrunchrun, "%s line 1 of crunch-run.txt\n", time.Now().UTC().Format(rfc3339NanoFixed))
268         c.Assert(err, check.IsNil)
269         fStderr, err := cfs.OpenFile("stderr.txt", os.O_CREATE|os.O_WRONLY, 0777)
270         c.Assert(err, check.IsNil)
271         _, err = fmt.Fprintf(fStderr, "%s line 1 of stderr\n", time.Now().UTC().Format(rfc3339NanoFixed))
272         c.Assert(err, check.IsNil)
273
274         {
275                 // Without "-f", just show the existing logs and
276                 // exit. Timeout needs to be long enough for "go run".
277                 ctxNoFollow, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*5))
278                 defer cancel()
279                 cmdNoFollow := exec.CommandContext(ctxNoFollow, "go", "run", ".", "logs", "-poll=250ms", cr.UUID)
280                 buf, err := cmdNoFollow.CombinedOutput()
281                 c.Check(err, check.IsNil)
282                 c.Check(string(buf), check.Matches, `(?ms).*line 1 of stderr\n`)
283         }
284
285         time.Sleep(time.Second * 2)
286         _, err = fmt.Fprintf(fCrunchrun, "%s line 2 of crunch-run.txt", time.Now().UTC().Format(rfc3339NanoFixed))
287         c.Assert(err, check.IsNil)
288         _, err = fmt.Fprintf(fStderr, "%s --end--", time.Now().UTC().Format(rfc3339NanoFixed))
289         c.Assert(err, check.IsNil)
290
291         for deadline := time.Now().Add(20 * time.Second); time.Now().Before(deadline) && !strings.Contains(stdout.String(), "--end--"); time.Sleep(time.Second / 10) {
292         }
293         c.Check(stdout.String(), check.Matches, `(?ms).*stderr\.txt +20\S+Z --end--\n.*`)
294
295         mtxt, err := cfs.MarshalManifest(".")
296         c.Assert(err, check.IsNil)
297         savedLog, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
298                 "manifest_text": mtxt,
299         }})
300         c.Assert(err, check.IsNil)
301         _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
302                 "state":     arvados.ContainerStateComplete,
303                 "log":       savedLog.PortableDataHash,
304                 "output":    "d41d8cd98f00b204e9800998ecf8427e+0",
305                 "exit_code": 0,
306         }})
307         c.Assert(err, check.IsNil)
308
309         err = cmd.Wait()
310         c.Check(err, check.IsNil)
311         // Ensure controller doesn't cheat by fetching data from the
312         // gateway after the container is complete.
313         gw.LogCollection = nil
314
315         c.Logf("re-running logs command on completed container")
316         {
317                 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*5))
318                 defer cancel()
319                 cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", "-f", cr.UUID)
320                 cmd.Env = append(cmd.Env, os.Environ()...)
321                 cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken)
322                 buf, err := cmd.CombinedOutput()
323                 c.Check(err, check.Equals, nil)
324                 c.Check(string(buf), check.Matches, `(?ms).*--end--\n`)
325         }
326 }