Merge branch '4269-no-collection-uuid-in-script-params'
[arvados.git] / services / keepproxy / keepproxy_test.go
1 package main
2
3 import (
4         "git.curoverse.com/arvados.git/sdk/go/keepclient"
5         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
6         "crypto/md5"
7         "crypto/tls"
8         "fmt"
9         . "gopkg.in/check.v1"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "net/url"
15         "os"
16         "os/exec"
17         "strings"
18         "testing"
19         "time"
20 )
21
22 // Gocheck boilerplate
23 func Test(t *testing.T) {
24         TestingT(t)
25 }
26
27 // Gocheck boilerplate
28 var _ = Suite(&ServerRequiredSuite{})
29
30 // Tests that require the Keep server running
31 type ServerRequiredSuite struct{}
32
33 func pythonDir() string {
34         cwd, _ := os.Getwd()
35         return fmt.Sprintf("%s/../../sdk/python/tests", cwd)
36 }
37
38 // Wait (up to 1 second) for keepproxy to listen on a port. This
39 // avoids a race condition where we hit a "connection refused" error
40 // because we start testing the proxy too soon.
41 func waitForListener() {
42         const (ms = 5)
43         for i := 0; listener == nil && i < 1000; i += ms {
44                 time.Sleep(ms * time.Millisecond)
45         }
46         if listener == nil {
47                 log.Fatalf("Timed out waiting for listener to start")
48         }
49 }
50
51 func closeListener() {
52         if listener != nil {
53                 listener.Close()
54         }
55 }
56
57 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
58         cwd, _ := os.Getwd()
59         defer os.Chdir(cwd)
60
61         os.Chdir(pythonDir())
62         {
63                 cmd := exec.Command("python", "run_test_server.py", "start")
64                 stderr, err := cmd.StderrPipe()
65                 if err != nil {
66                         log.Fatalf("Setting up stderr pipe: %s", err)
67                 }
68                 go io.Copy(os.Stderr, stderr)
69                 if err := cmd.Run(); err != nil {
70                         panic(fmt.Sprintf("'python run_test_server.py start' returned error %s", err))
71                 }
72         }
73         {
74                 cmd := exec.Command("python", "run_test_server.py", "start_keep")
75                 stderr, err := cmd.StderrPipe()
76                 if err != nil {
77                         log.Fatalf("Setting up stderr pipe: %s", err)
78                 }
79                 go io.Copy(os.Stderr, stderr)
80                 if err := cmd.Run(); err != nil {
81                         panic(fmt.Sprintf("'python run_test_server.py start_keep' returned error %s", err))
82                 }
83         }
84
85         os.Setenv("ARVADOS_API_HOST", "localhost:3000")
86         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
87         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
88 }
89
90 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
91         cwd, _ := os.Getwd()
92         defer os.Chdir(cwd)
93
94         os.Chdir(pythonDir())
95         exec.Command("python", "run_test_server.py", "stop_keep").Run()
96         exec.Command("python", "run_test_server.py", "stop").Run()
97 }
98
99 func setupProxyService() {
100
101         client := &http.Client{Transport: &http.Transport{
102                 TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
103
104         var req *http.Request
105         var err error
106         if req, err = http.NewRequest("POST", fmt.Sprintf("https://%s/arvados/v1/keep_services", os.Getenv("ARVADOS_API_HOST")), nil); err != nil {
107                 panic(err.Error())
108         }
109         req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", os.Getenv("ARVADOS_API_TOKEN")))
110
111         reader, writer := io.Pipe()
112
113         req.Body = reader
114
115         go func() {
116                 data := url.Values{}
117                 data.Set("keep_service", `{
118   "service_host": "localhost",
119   "service_port": 29950,
120   "service_ssl_flag": false,
121   "service_type": "proxy"
122 }`)
123
124                 writer.Write([]byte(data.Encode()))
125                 writer.Close()
126         }()
127
128         var resp *http.Response
129         if resp, err = client.Do(req); err != nil {
130                 panic(err.Error())
131         }
132         if resp.StatusCode != 200 {
133                 panic(resp.Status)
134         }
135 }
136
137 func runProxy(c *C, args []string, token string, port int) keepclient.KeepClient {
138         os.Args = append(args, fmt.Sprintf("-listen=:%v", port))
139         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
140
141         listener = nil
142         go main()
143         time.Sleep(100 * time.Millisecond)
144
145         os.Setenv("ARVADOS_KEEP_PROXY", fmt.Sprintf("http://localhost:%v", port))
146         os.Setenv("ARVADOS_API_TOKEN", token)
147         arv, err := arvadosclient.MakeArvadosClient()
148         c.Assert(err, Equals, nil)
149         kc, err := keepclient.MakeKeepClient(&arv)
150         c.Assert(err, Equals, nil)
151         c.Check(kc.Using_proxy, Equals, true)
152         c.Check(len(kc.ServiceRoots()), Equals, 1)
153         for _, root := range(kc.ServiceRoots()) {
154                 c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
155         }
156         os.Setenv("ARVADOS_KEEP_PROXY", "")
157         log.Print("keepclient created")
158         return kc
159 }
160
161 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
162         log.Print("TestPutAndGet start")
163
164         os.Args = []string{"keepproxy", "-listen=:29950"}
165         os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
166         listener = nil
167         go main()
168         time.Sleep(100 * time.Millisecond)
169
170         setupProxyService()
171
172         os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
173         arv, err := arvadosclient.MakeArvadosClient()
174         c.Assert(err, Equals, nil)
175         kc, err := keepclient.MakeKeepClient(&arv)
176         c.Assert(err, Equals, nil)
177         c.Check(kc.Arvados.External, Equals, true)
178         c.Check(kc.Using_proxy, Equals, true)
179         c.Check(len(kc.ServiceRoots()), Equals, 1)
180         for _, root := range kc.ServiceRoots() {
181                 c.Check(root, Equals, "http://localhost:29950")
182         }
183         os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
184         log.Print("keepclient created")
185
186         waitForListener()
187         defer closeListener()
188
189         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
190         var hash2 string
191
192         {
193                 _, _, err := kc.Ask(hash)
194                 c.Check(err, Equals, keepclient.BlockNotFound)
195                 log.Print("Ask 1")
196         }
197
198         {
199                 var rep int
200                 var err error
201                 hash2, rep, err = kc.PutB([]byte("foo"))
202                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
203                 c.Check(rep, Equals, 2)
204                 c.Check(err, Equals, nil)
205                 log.Print("PutB")
206         }
207
208         {
209                 blocklen, _, err := kc.Ask(hash2)
210                 c.Assert(err, Equals, nil)
211                 c.Check(blocklen, Equals, int64(3))
212                 log.Print("Ask 2")
213         }
214
215         {
216                 reader, blocklen, _, err := kc.Get(hash2)
217                 c.Assert(err, Equals, nil)
218                 all, err := ioutil.ReadAll(reader)
219                 c.Check(all, DeepEquals, []byte("foo"))
220                 c.Check(blocklen, Equals, int64(3))
221                 log.Print("Get")
222         }
223
224         log.Print("TestPutAndGet done")
225 }
226
227 func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
228         log.Print("TestPutAskGetForbidden start")
229
230         kc := runProxy(c, []string{"keepproxy"}, "123abc", 29951)
231         waitForListener()
232         defer closeListener()
233
234         log.Print("keepclient created")
235
236         hash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
237
238         {
239                 _, _, err := kc.Ask(hash)
240                 c.Check(err, Equals, keepclient.BlockNotFound)
241                 log.Print("Ask 1")
242         }
243
244         {
245                 hash2, rep, err := kc.PutB([]byte("bar"))
246                 c.Check(hash2, Equals, "")
247                 c.Check(rep, Equals, 0)
248                 c.Check(err, Equals, keepclient.InsufficientReplicasError)
249                 log.Print("PutB")
250         }
251
252         {
253                 blocklen, _, err := kc.Ask(hash)
254                 c.Assert(err, Equals, keepclient.BlockNotFound)
255                 c.Check(blocklen, Equals, int64(0))
256                 log.Print("Ask 2")
257         }
258
259         {
260                 _, blocklen, _, err := kc.Get(hash)
261                 c.Assert(err, Equals, keepclient.BlockNotFound)
262                 c.Check(blocklen, Equals, int64(0))
263                 log.Print("Get")
264         }
265
266         log.Print("TestPutAskGetForbidden done")
267 }
268
269 func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
270         log.Print("TestGetDisabled start")
271
272         kc := runProxy(c, []string{"keepproxy", "-no-get"}, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h", 29952)
273         waitForListener()
274         defer closeListener()
275
276         hash := fmt.Sprintf("%x", md5.Sum([]byte("baz")))
277
278         {
279                 _, _, err := kc.Ask(hash)
280                 c.Check(err, Equals, keepclient.BlockNotFound)
281                 log.Print("Ask 1")
282         }
283
284         {
285                 hash2, rep, err := kc.PutB([]byte("baz"))
286                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
287                 c.Check(rep, Equals, 2)
288                 c.Check(err, Equals, nil)
289                 log.Print("PutB")
290         }
291
292         {
293                 blocklen, _, err := kc.Ask(hash)
294                 c.Assert(err, Equals, keepclient.BlockNotFound)
295                 c.Check(blocklen, Equals, int64(0))
296                 log.Print("Ask 2")
297         }
298
299         {
300                 _, blocklen, _, err := kc.Get(hash)
301                 c.Assert(err, Equals, keepclient.BlockNotFound)
302                 c.Check(blocklen, Equals, int64(0))
303                 log.Print("Get")
304         }
305
306         log.Print("TestGetDisabled done")
307 }
308
309 func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
310         log.Print("TestPutDisabled start")
311
312         kc := runProxy(c, []string{"keepproxy", "-no-put"}, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h", 29953)
313         waitForListener()
314         defer closeListener()
315
316         {
317                 hash2, rep, err := kc.PutB([]byte("quux"))
318                 c.Check(hash2, Equals, "")
319                 c.Check(rep, Equals, 0)
320                 c.Check(err, Equals, keepclient.InsufficientReplicasError)
321                 log.Print("PutB")
322         }
323
324         log.Print("TestPutDisabled done")
325 }
326
327 func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
328         runProxy(c, []string{"keepproxy"}, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h", 29954)
329         waitForListener()
330         defer closeListener()
331
332         {
333                 client := http.Client{}
334                 req, err := http.NewRequest("OPTIONS",
335                         fmt.Sprintf("http://localhost:29954/%x+3",
336                                 md5.Sum([]byte("foo"))),
337                         nil)
338                 req.Header.Add("Access-Control-Request-Method", "PUT")
339                 req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
340                 resp, err := client.Do(req)
341                 c.Check(err, Equals, nil)
342                 c.Check(resp.StatusCode, Equals, 200)
343                 body, err := ioutil.ReadAll(resp.Body)
344                 c.Check(string(body), Equals, "")
345                 c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, POST, PUT, OPTIONS")
346                 c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
347         }
348
349         {
350                 resp, err := http.Get(
351                         fmt.Sprintf("http://localhost:29954/%x+3",
352                                 md5.Sum([]byte("foo"))))
353                 c.Check(err, Equals, nil)
354                 c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
355                 c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
356         }
357 }
358
359 func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
360         runProxy(c, []string{"keepproxy"}, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h", 29955)
361         waitForListener()
362         defer closeListener()
363
364         {
365                 client := http.Client{}
366                 req, err := http.NewRequest("POST",
367                         "http://localhost:29955/",
368                         strings.NewReader("qux"))
369                 req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
370                 req.Header.Add("Content-Type", "application/octet-stream")
371                 resp, err := client.Do(req)
372                 c.Check(err, Equals, nil)
373                 body, err := ioutil.ReadAll(resp.Body)
374                 c.Check(err, Equals, nil)
375                 c.Check(string(body), Equals,
376                         fmt.Sprintf("%x+%d", md5.Sum([]byte("qux")), 3))
377         }
378 }