9570: Use special path mapper for staging files to output dir.
[arvados.git] / services / keepproxy / keepproxy_test.go
1 package main
2
3 import (
4         "bytes"
5         "crypto/md5"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "io/ioutil"
11         "log"
12         "net/http"
13         "net/http/httptest"
14         "os"
15         "strings"
16         "testing"
17         "time"
18
19         . "gopkg.in/check.v1"
20 )
21
22 // Gocheck boilerplate
23 func Test(t *testing.T) {
24         TestingT(t)
25 }
26
27 // Gocheck boilerplate
28 var _ = Suite(&ServerRequiredSuite{})
29
30 // Tests that require the Keep server running
31 type ServerRequiredSuite struct{}
32
33 // Gocheck boilerplate
34 var _ = Suite(&NoKeepServerSuite{})
35
36 // Test with no keepserver to simulate errors
37 type NoKeepServerSuite struct{}
38
39 var TestProxyUUID = "zzzzz-bi6l4-lrixqc4fxofbmzz"
40
41 // Wait (up to 1 second) for keepproxy to listen on a port. This
42 // avoids a race condition where we hit a "connection refused" error
43 // because we start testing the proxy too soon.
44 func waitForListener() {
45         const (
46                 ms = 5
47         )
48         for i := 0; listener == nil && i < 10000; i += ms {
49                 time.Sleep(ms * time.Millisecond)
50         }
51         if listener == nil {
52                 log.Fatalf("Timed out waiting for listener to start")
53         }
54 }
55
56 func closeListener() {
57         if listener != nil {
58                 listener.Close()
59         }
60 }
61
62 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
63         arvadostest.StartAPI()
64         arvadostest.StartKeep(2, false)
65 }
66
67 func (s *ServerRequiredSuite) SetUpTest(c *C) {
68         arvadostest.ResetEnv()
69 }
70
71 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
72         arvadostest.StopKeep(2)
73         arvadostest.StopAPI()
74 }
75
76 func (s *NoKeepServerSuite) SetUpSuite(c *C) {
77         arvadostest.StartAPI()
78         // We need API to have some keep services listed, but the
79         // services themselves should be unresponsive.
80         arvadostest.StartKeep(2, false)
81         arvadostest.StopKeep(2)
82 }
83
84 func (s *NoKeepServerSuite) SetUpTest(c *C) {
85         arvadostest.ResetEnv()
86 }
87
88 func (s *NoKeepServerSuite) TearDownSuite(c *C) {
89         arvadostest.StopAPI()
90 }
91
92 func runProxy(c *C, args []string, bogusClientToken bool) *keepclient.KeepClient {
93         args = append([]string{"keepproxy"}, args...)
94         os.Args = append(args, "-listen=:0")
95         listener = nil
96         go main()
97         waitForListener()
98
99         arv, err := arvadosclient.MakeArvadosClient()
100         c.Assert(err, Equals, nil)
101         if bogusClientToken {
102                 arv.ApiToken = "bogus-token"
103         }
104         kc := keepclient.New(&arv)
105         sr := map[string]string{
106                 TestProxyUUID: "http://" + listener.Addr().String(),
107         }
108         kc.SetServiceRoots(sr, sr, sr)
109         kc.Arvados.External = true
110
111         return kc
112 }
113
114 func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
115         kc := runProxy(c, nil, false)
116         defer closeListener()
117
118         content := []byte("TestDesiredReplicas")
119         hash := fmt.Sprintf("%x", md5.Sum(content))
120
121         for _, kc.Want_replicas = range []int{0, 1, 2} {
122                 locator, rep, err := kc.PutB(content)
123                 c.Check(err, Equals, nil)
124                 c.Check(rep, Equals, kc.Want_replicas)
125                 if rep > 0 {
126                         c.Check(locator, Matches, fmt.Sprintf(`^%s\+%d(\+.+)?$`, hash, len(content)))
127                 }
128         }
129 }
130
131 func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
132         kc := runProxy(c, nil, false)
133         defer closeListener()
134
135         content := []byte("TestPutWrongContentLength")
136         hash := fmt.Sprintf("%x", md5.Sum(content))
137
138         // If we use http.Client to send these requests to the network
139         // server we just started, the Go http library automatically
140         // fixes the invalid Content-Length header. In order to test
141         // our server behavior, we have to call the handler directly
142         // using an httptest.ResponseRecorder.
143         rtr := MakeRESTRouter(true, true, kc)
144
145         type testcase struct {
146                 sendLength   string
147                 expectStatus int
148         }
149
150         for _, t := range []testcase{
151                 {"1", http.StatusBadRequest},
152                 {"", http.StatusLengthRequired},
153                 {"-1", http.StatusLengthRequired},
154                 {"abcdef", http.StatusLengthRequired},
155         } {
156                 req, err := http.NewRequest("PUT",
157                         fmt.Sprintf("http://%s/%s+%d", listener.Addr().String(), hash, len(content)),
158                         bytes.NewReader(content))
159                 c.Assert(err, IsNil)
160                 req.Header.Set("Content-Length", t.sendLength)
161                 req.Header.Set("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
162                 req.Header.Set("Content-Type", "application/octet-stream")
163
164                 resp := httptest.NewRecorder()
165                 rtr.ServeHTTP(resp, req)
166                 c.Check(resp.Code, Equals, t.expectStatus)
167         }
168 }
169
170 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
171         kc := runProxy(c, nil, false)
172         defer closeListener()
173
174         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
175         var hash2 string
176
177         {
178                 _, _, err := kc.Ask(hash)
179                 c.Check(err, Equals, keepclient.BlockNotFound)
180                 log.Print("Finished Ask (expected BlockNotFound)")
181         }
182
183         {
184                 reader, _, _, err := kc.Get(hash)
185                 c.Check(reader, Equals, nil)
186                 c.Check(err, Equals, keepclient.BlockNotFound)
187                 log.Print("Finished Get (expected BlockNotFound)")
188         }
189
190         // Note in bug #5309 among other errors keepproxy would set
191         // Content-Length incorrectly on the 404 BlockNotFound response, this
192         // would result in a protocol violation that would prevent reuse of the
193         // connection, which would manifest by the next attempt to use the
194         // connection (in this case the PutB below) failing.  So to test for
195         // that bug it's necessary to trigger an error response (such as
196         // BlockNotFound) and then do something else with the same httpClient
197         // connection.
198
199         {
200                 var rep int
201                 var err error
202                 hash2, rep, err = kc.PutB([]byte("foo"))
203                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
204                 c.Check(rep, Equals, 2)
205                 c.Check(err, Equals, nil)
206                 log.Print("Finished PutB (expected success)")
207         }
208
209         {
210                 blocklen, _, err := kc.Ask(hash2)
211                 c.Assert(err, Equals, nil)
212                 c.Check(blocklen, Equals, int64(3))
213                 log.Print("Finished Ask (expected success)")
214         }
215
216         {
217                 reader, blocklen, _, err := kc.Get(hash2)
218                 c.Assert(err, Equals, nil)
219                 all, err := ioutil.ReadAll(reader)
220                 c.Check(all, DeepEquals, []byte("foo"))
221                 c.Check(blocklen, Equals, int64(3))
222                 log.Print("Finished Get (expected success)")
223         }
224
225         {
226                 var rep int
227                 var err error
228                 hash2, rep, err = kc.PutB([]byte(""))
229                 c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
230                 c.Check(rep, Equals, 2)
231                 c.Check(err, Equals, nil)
232                 log.Print("Finished PutB zero block")
233         }
234
235         {
236                 reader, blocklen, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e")
237                 c.Assert(err, Equals, nil)
238                 all, err := ioutil.ReadAll(reader)
239                 c.Check(all, DeepEquals, []byte(""))
240                 c.Check(blocklen, Equals, int64(0))
241                 log.Print("Finished Get zero block")
242         }
243 }
244
245 func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
246         kc := runProxy(c, nil, true)
247         defer closeListener()
248
249         hash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
250
251         {
252                 _, _, err := kc.Ask(hash)
253                 errNotFound, _ := err.(keepclient.ErrNotFound)
254                 c.Check(errNotFound, NotNil)
255                 c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
256                 log.Print("Ask 1")
257         }
258
259         {
260                 hash2, rep, err := kc.PutB([]byte("bar"))
261                 c.Check(hash2, Equals, "")
262                 c.Check(rep, Equals, 0)
263                 c.Check(err, Equals, keepclient.InsufficientReplicasError)
264                 log.Print("PutB")
265         }
266
267         {
268                 blocklen, _, err := kc.Ask(hash)
269                 errNotFound, _ := err.(keepclient.ErrNotFound)
270                 c.Check(errNotFound, NotNil)
271                 c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
272                 c.Check(blocklen, Equals, int64(0))
273                 log.Print("Ask 2")
274         }
275
276         {
277                 _, blocklen, _, err := kc.Get(hash)
278                 errNotFound, _ := err.(keepclient.ErrNotFound)
279                 c.Check(errNotFound, NotNil)
280                 c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
281                 c.Check(blocklen, Equals, int64(0))
282                 log.Print("Get")
283         }
284 }
285
286 func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
287         kc := runProxy(c, []string{"-no-get"}, false)
288         defer closeListener()
289
290         hash := fmt.Sprintf("%x", md5.Sum([]byte("baz")))
291
292         {
293                 _, _, err := kc.Ask(hash)
294                 errNotFound, _ := err.(keepclient.ErrNotFound)
295                 c.Check(errNotFound, NotNil)
296                 c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
297                 log.Print("Ask 1")
298         }
299
300         {
301                 hash2, rep, err := kc.PutB([]byte("baz"))
302                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
303                 c.Check(rep, Equals, 2)
304                 c.Check(err, Equals, nil)
305                 log.Print("PutB")
306         }
307
308         {
309                 blocklen, _, err := kc.Ask(hash)
310                 errNotFound, _ := err.(keepclient.ErrNotFound)
311                 c.Check(errNotFound, NotNil)
312                 c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
313                 c.Check(blocklen, Equals, int64(0))
314                 log.Print("Ask 2")
315         }
316
317         {
318                 _, blocklen, _, err := kc.Get(hash)
319                 errNotFound, _ := err.(keepclient.ErrNotFound)
320                 c.Check(errNotFound, NotNil)
321                 c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
322                 c.Check(blocklen, Equals, int64(0))
323                 log.Print("Get")
324         }
325 }
326
327 func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
328         kc := runProxy(c, []string{"-no-put"}, false)
329         defer closeListener()
330
331         hash2, rep, err := kc.PutB([]byte("quux"))
332         c.Check(hash2, Equals, "")
333         c.Check(rep, Equals, 0)
334         c.Check(err, Equals, keepclient.InsufficientReplicasError)
335 }
336
337 func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
338         runProxy(c, nil, false)
339         defer closeListener()
340
341         {
342                 client := http.Client{}
343                 req, err := http.NewRequest("OPTIONS",
344                         fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))),
345                         nil)
346                 req.Header.Add("Access-Control-Request-Method", "PUT")
347                 req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
348                 resp, err := client.Do(req)
349                 c.Check(err, Equals, nil)
350                 c.Check(resp.StatusCode, Equals, 200)
351                 body, err := ioutil.ReadAll(resp.Body)
352                 c.Check(string(body), Equals, "")
353                 c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, POST, PUT, OPTIONS")
354                 c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
355         }
356
357         {
358                 resp, err := http.Get(
359                         fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))))
360                 c.Check(err, Equals, nil)
361                 c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
362                 c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
363         }
364 }
365
366 func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
367         runProxy(c, nil, false)
368         defer closeListener()
369
370         {
371                 client := http.Client{}
372                 req, err := http.NewRequest("POST",
373                         "http://"+listener.Addr().String()+"/",
374                         strings.NewReader("qux"))
375                 req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
376                 req.Header.Add("Content-Type", "application/octet-stream")
377                 resp, err := client.Do(req)
378                 c.Check(err, Equals, nil)
379                 body, err := ioutil.ReadAll(resp.Body)
380                 c.Check(err, Equals, nil)
381                 c.Check(string(body), Matches,
382                         fmt.Sprintf(`^%x\+3(\+.+)?$`, md5.Sum([]byte("qux"))))
383         }
384 }
385
386 func (s *ServerRequiredSuite) TestStripHint(c *C) {
387         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz", "$1"),
388                 Equals,
389                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
390         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73", "$1"),
391                 Equals,
392                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
393         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz", "$1"),
394                 Equals,
395                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz")
396         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73", "$1"),
397                 Equals,
398                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
399
400 }
401
402 // Test GetIndex
403 //   Put one block, with 2 replicas
404 //   With no prefix (expect the block locator, twice)
405 //   With an existing prefix (expect the block locator, twice)
406 //   With a valid but non-existing prefix (expect "\n")
407 //   With an invalid prefix (expect error)
408 func (s *ServerRequiredSuite) TestGetIndex(c *C) {
409         kc := runProxy(c, nil, false)
410         defer closeListener()
411
412         // Put "index-data" blocks
413         data := []byte("index-data")
414         hash := fmt.Sprintf("%x", md5.Sum(data))
415
416         hash2, rep, err := kc.PutB(data)
417         c.Check(hash2, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, hash))
418         c.Check(rep, Equals, 2)
419         c.Check(err, Equals, nil)
420
421         reader, blocklen, _, err := kc.Get(hash)
422         c.Assert(err, Equals, nil)
423         c.Check(blocklen, Equals, int64(10))
424         all, err := ioutil.ReadAll(reader)
425         c.Check(all, DeepEquals, data)
426
427         // Put some more blocks
428         _, rep, err = kc.PutB([]byte("some-more-index-data"))
429         c.Check(err, Equals, nil)
430
431         kc.Arvados.ApiToken = arvadostest.DataManagerToken
432
433         // Invoke GetIndex
434         for _, spec := range []struct {
435                 prefix         string
436                 expectTestHash bool
437                 expectOther    bool
438         }{
439                 {"", true, true},         // with no prefix
440                 {hash[:3], true, false},  // with matching prefix
441                 {"abcdef", false, false}, // with no such prefix
442         } {
443                 indexReader, err := kc.GetIndex(TestProxyUUID, spec.prefix)
444                 c.Assert(err, Equals, nil)
445                 indexResp, err := ioutil.ReadAll(indexReader)
446                 c.Assert(err, Equals, nil)
447                 locators := strings.Split(string(indexResp), "\n")
448                 gotTestHash := 0
449                 gotOther := 0
450                 for _, locator := range locators {
451                         if locator == "" {
452                                 continue
453                         }
454                         c.Check(locator[:len(spec.prefix)], Equals, spec.prefix)
455                         if locator[:32] == hash {
456                                 gotTestHash++
457                         } else {
458                                 gotOther++
459                         }
460                 }
461                 c.Check(gotTestHash == 2, Equals, spec.expectTestHash)
462                 c.Check(gotOther > 0, Equals, spec.expectOther)
463         }
464
465         // GetIndex with invalid prefix
466         _, err = kc.GetIndex(TestProxyUUID, "xyz")
467         c.Assert((err != nil), Equals, true)
468 }
469
470 func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
471         kc := runProxy(c, nil, false)
472         defer closeListener()
473
474         // Put a test block
475         hash, rep, err := kc.PutB([]byte("foo"))
476         c.Check(err, Equals, nil)
477         c.Check(rep, Equals, 2)
478
479         for _, token := range []string{
480                 "nosuchtoken",
481                 "2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx", // expired
482         } {
483                 // Change token to given bad token
484                 kc.Arvados.ApiToken = token
485
486                 // Ask should result in error
487                 _, _, err = kc.Ask(hash)
488                 c.Check(err, NotNil)
489                 errNotFound, _ := err.(keepclient.ErrNotFound)
490                 c.Check(errNotFound.Temporary(), Equals, false)
491                 c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
492
493                 // Get should result in error
494                 _, _, _, err = kc.Get(hash)
495                 c.Check(err, NotNil)
496                 errNotFound, _ = err.(keepclient.ErrNotFound)
497                 c.Check(errNotFound.Temporary(), Equals, false)
498                 c.Assert(strings.Contains(err.Error(), "HTTP 403 \"Missing or invalid Authorization header\""), Equals, true)
499         }
500 }
501
502 func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
503         arv, err := arvadosclient.MakeArvadosClient()
504         c.Assert(err, Equals, nil)
505
506         // keepclient with no such keep server
507         kc := keepclient.New(&arv)
508         locals := map[string]string{
509                 TestProxyUUID: "http://localhost:12345",
510         }
511         kc.SetServiceRoots(locals, nil, nil)
512
513         // Ask should result in temporary connection refused error
514         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
515         _, _, err = kc.Ask(hash)
516         c.Check(err, NotNil)
517         errNotFound, _ := err.(*keepclient.ErrNotFound)
518         c.Check(errNotFound.Temporary(), Equals, true)
519         c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
520
521         // Get should result in temporary connection refused error
522         _, _, _, err = kc.Get(hash)
523         c.Check(err, NotNil)
524         errNotFound, _ = err.(*keepclient.ErrNotFound)
525         c.Check(errNotFound.Temporary(), Equals, true)
526         c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
527 }
528
529 func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
530         kc := runProxy(c, nil, false)
531         defer closeListener()
532
533         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
534         for _, f := range []func() error{
535                 func() error {
536                         _, _, err := kc.Ask(hash)
537                         return err
538                 },
539                 func() error {
540                         _, _, _, err := kc.Get(hash)
541                         return err
542                 },
543         } {
544                 err := f()
545                 c.Assert(err, NotNil)
546                 errNotFound, _ := err.(*keepclient.ErrNotFound)
547                 c.Check(errNotFound.Temporary(), Equals, true)
548                 c.Check(err, ErrorMatches, `.*HTTP 502.*`)
549         }
550 }