18790: Error out instead of retrying if server is too old.
[arvados.git] / cmd / arvados-client / container_gateway_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/hmac"
11         "crypto/sha256"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net"
16         "net/http"
17         "net/url"
18         "os"
19         "os/exec"
20         "strings"
21         "sync"
22         "syscall"
23         "time"
24
25         "git.arvados.org/arvados.git/lib/controller/rpc"
26         "git.arvados.org/arvados.git/lib/crunchrun"
27         "git.arvados.org/arvados.git/sdk/go/arvados"
28         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
29         "git.arvados.org/arvados.git/sdk/go/arvadostest"
30         "git.arvados.org/arvados.git/sdk/go/ctxlog"
31         "git.arvados.org/arvados.git/sdk/go/httpserver"
32         "git.arvados.org/arvados.git/sdk/go/keepclient"
33         check "gopkg.in/check.v1"
34 )
35
36 func (s *ClientSuite) TestShellGatewayNotAvailable(c *check.C) {
37         var stdout, stderr bytes.Buffer
38         cmd := exec.Command("go", "run", ".", "shell", arvadostest.QueuedContainerUUID, "-o", "controlpath=none", "echo", "ok")
39         cmd.Env = append(cmd.Env, os.Environ()...)
40         cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
41         cmd.Stdout = &stdout
42         cmd.Stderr = &stderr
43         c.Check(cmd.Run(), check.NotNil)
44         c.Log(stderr.String())
45         c.Check(stderr.String(), check.Matches, `(?ms).*container is not running yet \(state is "Queued"\).*`)
46 }
47
48 func (s *ClientSuite) TestShellGateway(c *check.C) {
49         defer func() {
50                 c.Check(arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil), check.IsNil)
51         }()
52         uuid := arvadostest.QueuedContainerUUID
53         h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken))
54         fmt.Fprint(h, uuid)
55         authSecret := fmt.Sprintf("%x", h.Sum(nil))
56         gw := crunchrun.Gateway{
57                 ContainerUUID: uuid,
58                 Address:       "0.0.0.0:0",
59                 AuthSecret:    authSecret,
60                 Log:           ctxlog.TestLogger(c),
61                 // Just forward connections to localhost instead of a
62                 // container, so we can test without running a
63                 // container.
64                 Target: crunchrun.GatewayTargetStub{},
65         }
66         err := gw.Start()
67         c.Assert(err, check.IsNil)
68
69         rpcconn := rpc.NewConn("",
70                 &url.URL{
71                         Scheme: "https",
72                         Host:   os.Getenv("ARVADOS_API_HOST"),
73                 },
74                 true,
75                 func(context.Context) ([]string, error) {
76                         return []string{arvadostest.SystemRootToken}, nil
77                 })
78         _, err = rpcconn.ContainerUpdate(context.TODO(), arvados.UpdateOptions{UUID: uuid, Attrs: map[string]interface{}{
79                 "state": arvados.ContainerStateLocked,
80         }})
81         c.Assert(err, check.IsNil)
82         _, err = rpcconn.ContainerUpdate(context.TODO(), arvados.UpdateOptions{UUID: uuid, Attrs: map[string]interface{}{
83                 "state":           arvados.ContainerStateRunning,
84                 "gateway_address": gw.Address,
85         }})
86         c.Assert(err, check.IsNil)
87
88         var stdout, stderr bytes.Buffer
89         cmd := exec.Command("go", "run", ".", "shell", uuid, "-o", "controlpath=none", "-o", "userknownhostsfile="+c.MkDir()+"/known_hosts", "echo", "ok")
90         cmd.Env = append(cmd.Env, os.Environ()...)
91         cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
92         cmd.Stdout = &stdout
93         cmd.Stderr = &stderr
94         stdin, err := cmd.StdinPipe()
95         c.Assert(err, check.IsNil)
96         go fmt.Fprintln(stdin, "data appears on stdin, but stdin does not close; cmd should exit anyway, not hang")
97         time.AfterFunc(5*time.Second, func() {
98                 c.Errorf("timed out -- remote end is probably hung waiting for us to close stdin")
99                 stdin.Close()
100         })
101         c.Check(cmd.Run(), check.IsNil)
102         c.Check(stdout.String(), check.Equals, "ok\n")
103
104         // Set up an http server, and try using "arvados-client shell"
105         // to forward traffic to it.
106         httpTarget := &httpserver.Server{}
107         httpTarget.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
108                 c.Logf("httpTarget.Handler: incoming request: %s %s", r.Method, r.URL)
109                 if r.URL.Path == "/foo" {
110                         fmt.Fprintln(w, "bar baz")
111                 } else {
112                         w.WriteHeader(http.StatusNotFound)
113                 }
114         })
115         err = httpTarget.Start()
116         c.Assert(err, check.IsNil)
117
118         ln, err := net.Listen("tcp", ":0")
119         c.Assert(err, check.IsNil)
120         _, forwardedPort, _ := net.SplitHostPort(ln.Addr().String())
121         ln.Close()
122
123         stdout.Reset()
124         stderr.Reset()
125         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
126         defer cancel()
127         cmd = exec.CommandContext(ctx,
128                 "go", "run", ".", "shell", uuid,
129                 "-L", forwardedPort+":"+httpTarget.Addr,
130                 "-o", "controlpath=none",
131                 "-o", "userknownhostsfile="+c.MkDir()+"/known_hosts",
132                 "-N",
133         )
134         c.Logf("cmd.Args: %s", cmd.Args)
135         cmd.Env = append(cmd.Env, os.Environ()...)
136         cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
137         cmd.Stdout = &stdout
138         cmd.Stderr = &stderr
139         cmd.Start()
140
141         forwardedURL := fmt.Sprintf("http://localhost:%s/foo", forwardedPort)
142
143         for range time.NewTicker(time.Second / 20).C {
144                 resp, err := http.Get(forwardedURL)
145                 if err != nil {
146                         if !strings.Contains(err.Error(), "connect") {
147                                 c.Fatal(err)
148                         } else if ctx.Err() != nil {
149                                 if cmd.Process.Signal(syscall.Signal(0)) != nil {
150                                         c.Error("OpenSSH exited")
151                                 } else {
152                                         c.Errorf("timed out trying to connect: %s", err)
153                                 }
154                                 c.Logf("OpenSSH stdout:\n%s", stdout.String())
155                                 c.Logf("OpenSSH stderr:\n%s", stderr.String())
156                                 c.FailNow()
157                         }
158                         // Retry until OpenSSH starts listening
159                         continue
160                 }
161                 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
162                 body, err := ioutil.ReadAll(resp.Body)
163                 c.Check(err, check.IsNil)
164                 c.Check(string(body), check.Equals, "bar baz\n")
165                 break
166         }
167
168         var wg sync.WaitGroup
169         for i := 0; i < 10; i++ {
170                 wg.Add(1)
171                 go func() {
172                         defer wg.Done()
173                         resp, err := http.Get(forwardedURL)
174                         if !c.Check(err, check.IsNil) {
175                                 return
176                         }
177                         body, err := ioutil.ReadAll(resp.Body)
178                         c.Check(err, check.IsNil)
179                         c.Check(string(body), check.Equals, "bar baz\n")
180                 }()
181         }
182         wg.Wait()
183 }
184
185 func (s *ClientSuite) TestContainerLog(c *check.C) {
186         arvadostest.StartKeep(2, true)
187         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
188         defer cancel()
189
190         rpcconn := rpc.NewConn("",
191                 &url.URL{
192                         Scheme: "https",
193                         Host:   os.Getenv("ARVADOS_API_HOST"),
194                 },
195                 true,
196                 func(context.Context) ([]string, error) {
197                         return []string{arvadostest.SystemRootToken}, nil
198                 })
199         imageColl, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
200                 "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.tar\n",
201         }})
202         c.Assert(err, check.IsNil)
203         c.Logf("imageColl %+v", imageColl)
204         cr, err := rpcconn.ContainerRequestCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
205                 "state":           "Committed",
206                 "command":         []string{"echo", fmt.Sprintf("%d", time.Now().Unix())},
207                 "container_image": imageColl.PortableDataHash,
208                 "cwd":             "/",
209                 "output_path":     "/",
210                 "priority":        1,
211                 "runtime_constraints": arvados.RuntimeConstraints{
212                         VCPUs: 1,
213                         RAM:   1000000000,
214                 },
215                 "container_count_max": 1,
216         }})
217         c.Assert(err, check.IsNil)
218         h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken))
219         fmt.Fprint(h, cr.ContainerUUID)
220         authSecret := fmt.Sprintf("%x", h.Sum(nil))
221
222         coll := arvados.Collection{}
223         client := arvados.NewClientFromEnv()
224         ac, err := arvadosclient.New(client)
225         c.Assert(err, check.IsNil)
226         kc, err := keepclient.MakeKeepClient(ac)
227         c.Assert(err, check.IsNil)
228         cfs, err := coll.FileSystem(client, kc)
229         c.Assert(err, check.IsNil)
230
231         c.Log("running logs command on queued container")
232         var stdout, stderr bytes.Buffer
233         cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", "-poll=250ms", cr.UUID)
234         cmd.Env = append(cmd.Env, os.Environ()...)
235         cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken)
236         cmd.Stdout = io.MultiWriter(&stdout, os.Stderr)
237         cmd.Stderr = io.MultiWriter(&stderr, os.Stderr)
238         err = cmd.Start()
239         c.Assert(err, check.Equals, nil)
240
241         c.Log("changing container state to Locked")
242         _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
243                 "state": arvados.ContainerStateLocked,
244         }})
245         c.Assert(err, check.IsNil)
246         c.Log("starting gateway")
247         gw := crunchrun.Gateway{
248                 ContainerUUID: cr.ContainerUUID,
249                 Address:       "0.0.0.0:0",
250                 AuthSecret:    authSecret,
251                 Log:           ctxlog.TestLogger(c),
252                 Target:        crunchrun.GatewayTargetStub{},
253                 LogCollection: cfs,
254         }
255         err = gw.Start()
256         c.Assert(err, check.IsNil)
257         c.Log("updating container gateway address")
258         _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
259                 "gateway_address": gw.Address,
260                 "state":           arvados.ContainerStateRunning,
261         }})
262         c.Assert(err, check.IsNil)
263
264         fCrunchrun, err := cfs.OpenFile("crunch-run.txt", os.O_CREATE|os.O_WRONLY, 0777)
265         c.Assert(err, check.IsNil)
266         _, err = fmt.Fprintln(fCrunchrun, "line 1 of crunch-run.txt")
267         c.Assert(err, check.IsNil)
268         fStderr, err := cfs.OpenFile("stderr.txt", os.O_CREATE|os.O_WRONLY, 0777)
269         c.Assert(err, check.IsNil)
270         _, err = fmt.Fprintln(fStderr, "line 1 of stderr")
271         c.Assert(err, check.IsNil)
272         time.Sleep(time.Second * 2)
273         _, err = fmt.Fprintln(fCrunchrun, "line 2 of crunch-run.txt")
274         c.Assert(err, check.IsNil)
275         _, err = fmt.Fprintln(fStderr, "--end--")
276         c.Assert(err, check.IsNil)
277
278         for deadline := time.Now().Add(20 * time.Second); time.Now().Before(deadline) && !strings.Contains(stdout.String(), "--end--"); time.Sleep(time.Second / 10) {
279         }
280         c.Check(stdout.String(), check.Matches, `(?ms).*stderr\.txt +--end--\n.*`)
281
282         mtxt, err := cfs.MarshalManifest(".")
283         c.Assert(err, check.IsNil)
284         savedLog, err := rpcconn.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
285                 "manifest_text": mtxt,
286         }})
287         c.Assert(err, check.IsNil)
288         _, err = rpcconn.ContainerUpdate(ctx, arvados.UpdateOptions{UUID: cr.ContainerUUID, Attrs: map[string]interface{}{
289                 "state":     arvados.ContainerStateComplete,
290                 "log":       savedLog.PortableDataHash,
291                 "output":    "d41d8cd98f00b204e9800998ecf8427e+0",
292                 "exit_code": 0,
293         }})
294         c.Assert(err, check.IsNil)
295
296         err = cmd.Wait()
297         c.Check(err, check.IsNil)
298         // Ensure controller doesn't cheat by fetching data from the
299         // gateway after the container is complete.
300         gw.LogCollection = nil
301
302         c.Logf("re-running logs command on completed container")
303         {
304                 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*5))
305                 defer cancel()
306                 cmd := exec.CommandContext(ctx, "go", "run", ".", "logs", cr.UUID)
307                 cmd.Env = append(cmd.Env, os.Environ()...)
308                 cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.SystemRootToken)
309                 buf, err := cmd.CombinedOutput()
310                 c.Check(err, check.Equals, nil)
311                 c.Check(string(buf), check.Matches, `(?ms).*--end--\n`)
312         }
313 }