Merge branch '20457-logs-and-mem-usage'
[arvados.git] / services / keepproxy / keepproxy_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepproxy
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "fmt"
12         "io/ioutil"
13         "math/rand"
14         "net"
15         "net/http"
16         "net/http/httptest"
17         "strings"
18         "sync"
19         "testing"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/config"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/arvadostest"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "git.arvados.org/arvados.git/sdk/go/httpserver"
28         "git.arvados.org/arvados.git/sdk/go/keepclient"
29         log "github.com/sirupsen/logrus"
30
31         "gopkg.in/check.v1"
32         . "gopkg.in/check.v1"
33 )
34
35 // Gocheck boilerplate
36 func Test(t *testing.T) {
37         TestingT(t)
38 }
39
40 // Gocheck boilerplate
41 var _ = Suite(&ServerRequiredSuite{})
42
43 // Tests that require the Keep server running
44 type ServerRequiredSuite struct{}
45
46 // Gocheck boilerplate
47 var _ = Suite(&ServerRequiredConfigYmlSuite{})
48
49 // Tests that require the Keep servers running as defined in config.yml
50 type ServerRequiredConfigYmlSuite struct{}
51
52 // Gocheck boilerplate
53 var _ = Suite(&NoKeepServerSuite{})
54
55 // Test with no keepserver to simulate errors
56 type NoKeepServerSuite struct{}
57
58 var TestProxyUUID = "zzzzz-bi6l4-lrixqc4fxofbmzz"
59
60 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
61         arvadostest.StartKeep(2, false)
62 }
63
64 func (s *ServerRequiredSuite) SetUpTest(c *C) {
65         arvadostest.ResetEnv()
66 }
67
68 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
69         arvadostest.StopKeep(2)
70 }
71
72 func (s *ServerRequiredConfigYmlSuite) SetUpSuite(c *C) {
73         // config.yml defines 4 keepstores
74         arvadostest.StartKeep(4, false)
75 }
76
77 func (s *ServerRequiredConfigYmlSuite) SetUpTest(c *C) {
78         arvadostest.ResetEnv()
79 }
80
81 func (s *ServerRequiredConfigYmlSuite) TearDownSuite(c *C) {
82         arvadostest.StopKeep(4)
83 }
84
85 func (s *NoKeepServerSuite) SetUpSuite(c *C) {
86         // We need API to have some keep services listed, but the
87         // services themselves should be unresponsive.
88         arvadostest.StartKeep(2, false)
89         arvadostest.StopKeep(2)
90 }
91
92 func (s *NoKeepServerSuite) SetUpTest(c *C) {
93         arvadostest.ResetEnv()
94 }
95
96 type testServer struct {
97         *httpserver.Server
98         proxyHandler *proxyHandler
99 }
100
101 func runProxy(c *C, bogusClientToken bool, loadKeepstoresFromConfig bool, kp *arvados.UploadDownloadRolePermissions) (*testServer, *keepclient.KeepClient, *bytes.Buffer) {
102         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
103         c.Assert(err, Equals, nil)
104         cluster, err := cfg.GetCluster("")
105         c.Assert(err, Equals, nil)
106
107         if !loadKeepstoresFromConfig {
108                 // Do not load Keepstore InternalURLs from the config file
109                 cluster.Services.Keepstore.InternalURLs = make(map[arvados.URL]arvados.ServiceInstance)
110         }
111
112         cluster.Services.Keepproxy.InternalURLs = map[arvados.URL]arvados.ServiceInstance{{Host: ":0"}: {}}
113
114         if kp != nil {
115                 cluster.Collections.KeepproxyPermission = *kp
116         }
117
118         logbuf := &bytes.Buffer{}
119         logger := log.New()
120         logger.Out = logbuf
121         ctx := ctxlog.Context(context.Background(), logger)
122
123         handler := newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*proxyHandler)
124         srv := &testServer{
125                 Server: &httpserver.Server{
126                         Server: http.Server{
127                                 BaseContext: func(net.Listener) context.Context { return ctx },
128                                 Handler: httpserver.AddRequestIDs(
129                                         httpserver.LogRequests(handler)),
130                         },
131                         Addr: ":",
132                 },
133                 proxyHandler: handler,
134         }
135         err = srv.Start()
136         c.Assert(err, IsNil)
137
138         client := arvados.NewClientFromEnv()
139         arv, err := arvadosclient.New(client)
140         c.Assert(err, IsNil)
141         if bogusClientToken {
142                 arv.ApiToken = "bogus-token"
143         }
144         kc := keepclient.New(arv)
145         sr := map[string]string{
146                 TestProxyUUID: "http://" + srv.Addr,
147         }
148         kc.SetServiceRoots(sr, sr, sr)
149         return srv, kc, logbuf
150 }
151
152 func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
153         srv, _, _ := runProxy(c, false, false, nil)
154         defer srv.Close()
155
156         req, err := http.NewRequest("POST",
157                 "http://"+srv.Addr+"/",
158                 strings.NewReader("TestViaHeader"))
159         c.Assert(err, Equals, nil)
160         req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
161         resp, err := (&http.Client{}).Do(req)
162         c.Assert(err, Equals, nil)
163         c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
164         c.Assert(resp.StatusCode, Equals, http.StatusOK)
165         locator, err := ioutil.ReadAll(resp.Body)
166         c.Assert(err, Equals, nil)
167         resp.Body.Close()
168
169         req, err = http.NewRequest("GET",
170                 "http://"+srv.Addr+"/"+string(locator),
171                 nil)
172         c.Assert(err, Equals, nil)
173         resp, err = (&http.Client{}).Do(req)
174         c.Assert(err, Equals, nil)
175         c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
176         resp.Body.Close()
177 }
178
179 func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
180         srv, kc, _ := runProxy(c, false, false, nil)
181         defer srv.Close()
182
183         sr := map[string]string{
184                 TestProxyUUID: "http://" + srv.Addr,
185         }
186         srv.proxyHandler.KeepClient.SetServiceRoots(sr, sr, sr)
187
188         content := []byte("TestLoopDetection")
189         _, _, err := kc.PutB(content)
190         c.Check(err, ErrorMatches, `.*loop detected.*`)
191
192         hash := fmt.Sprintf("%x", md5.Sum(content))
193         _, _, _, err = kc.Get(hash)
194         c.Check(err, ErrorMatches, `.*loop detected.*`)
195 }
196
197 func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
198         srv, kc, _ := runProxy(c, false, false, nil)
199         defer srv.Close()
200
201         // Set up fake keepstore to record request headers
202         var hdr http.Header
203         ts := httptest.NewServer(http.HandlerFunc(
204                 func(w http.ResponseWriter, r *http.Request) {
205                         hdr = r.Header
206                         http.Error(w, "Error", http.StatusInternalServerError)
207                 }))
208         defer ts.Close()
209
210         // Point keepproxy router's keepclient to the fake keepstore
211         sr := map[string]string{
212                 TestProxyUUID: ts.URL,
213         }
214         srv.proxyHandler.KeepClient.SetServiceRoots(sr, sr, sr)
215
216         // Set up client to ask for storage classes to keepproxy
217         kc.StorageClasses = []string{"secure"}
218         content := []byte("Very important data")
219         _, _, err := kc.PutB(content)
220         c.Check(err, NotNil)
221         c.Check(hdr.Get("X-Keep-Storage-Classes"), Equals, "secure")
222 }
223
224 func (s *ServerRequiredSuite) TestStorageClassesConfirmedHeader(c *C) {
225         srv, _, _ := runProxy(c, false, false, nil)
226         defer srv.Close()
227
228         content := []byte("foo")
229         hash := fmt.Sprintf("%x", md5.Sum(content))
230         client := &http.Client{}
231
232         req, err := http.NewRequest("PUT",
233                 fmt.Sprintf("http://%s/%s", srv.Addr, hash),
234                 bytes.NewReader(content))
235         c.Assert(err, IsNil)
236         req.Header.Set("X-Keep-Storage-Classes", "default")
237         req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
238         req.Header.Set("Content-Type", "application/octet-stream")
239
240         resp, err := client.Do(req)
241         c.Assert(err, IsNil)
242         c.Assert(resp.StatusCode, Equals, http.StatusOK)
243         c.Assert(resp.Header.Get("X-Keep-Storage-Classes-Confirmed"), Equals, "default=2")
244 }
245
246 func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
247         srv, kc, _ := runProxy(c, false, false, nil)
248         defer srv.Close()
249
250         content := []byte("TestDesiredReplicas")
251         hash := fmt.Sprintf("%x", md5.Sum(content))
252
253         for _, kc.Want_replicas = range []int{0, 1, 2, 3} {
254                 locator, rep, err := kc.PutB(content)
255                 if kc.Want_replicas < 3 {
256                         c.Check(err, Equals, nil)
257                         c.Check(rep, Equals, kc.Want_replicas)
258                         if rep > 0 {
259                                 c.Check(locator, Matches, fmt.Sprintf(`^%s\+%d(\+.+)?$`, hash, len(content)))
260                         }
261                 } else {
262                         c.Check(err, ErrorMatches, ".*503.*")
263                 }
264         }
265 }
266
267 func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
268         srv, kc, _ := runProxy(c, false, false, nil)
269         defer srv.Close()
270
271         content := []byte("TestPutWrongContentLength")
272         hash := fmt.Sprintf("%x", md5.Sum(content))
273
274         // If we use http.Client to send these requests to the network
275         // server we just started, the Go http library automatically
276         // fixes the invalid Content-Length header. In order to test
277         // our server behavior, we have to call the handler directly
278         // using an httptest.ResponseRecorder.
279         rtr, err := newHandler(context.Background(), kc, 10*time.Second, &arvados.Cluster{})
280         c.Assert(err, check.IsNil)
281
282         type testcase struct {
283                 sendLength   string
284                 expectStatus int
285         }
286
287         for _, t := range []testcase{
288                 {"1", http.StatusBadRequest},
289                 {"", http.StatusLengthRequired},
290                 {"-1", http.StatusLengthRequired},
291                 {"abcdef", http.StatusLengthRequired},
292         } {
293                 req, err := http.NewRequest("PUT",
294                         fmt.Sprintf("http://%s/%s+%d", srv.Addr, hash, len(content)),
295                         bytes.NewReader(content))
296                 c.Assert(err, IsNil)
297                 req.Header.Set("Content-Length", t.sendLength)
298                 req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
299                 req.Header.Set("Content-Type", "application/octet-stream")
300
301                 resp := httptest.NewRecorder()
302                 rtr.ServeHTTP(resp, req)
303                 c.Check(resp.Code, Equals, t.expectStatus)
304         }
305 }
306
307 func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
308         srv, kc, _ := runProxy(c, false, false, nil)
309         defer srv.Close()
310         srv.proxyHandler.timeout = time.Nanosecond
311
312         buf := make([]byte, 1<<20)
313         rand.Read(buf)
314         var wg sync.WaitGroup
315         for i := 0; i < 128; i++ {
316                 wg.Add(1)
317                 go func() {
318                         defer wg.Done()
319                         kc.PutB(buf)
320                 }()
321         }
322         done := make(chan bool)
323         go func() {
324                 wg.Wait()
325                 close(done)
326         }()
327         select {
328         case <-done:
329         case <-time.After(10 * time.Second):
330                 c.Error("timeout")
331         }
332 }
333
334 func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
335         srv, kc, logbuf := runProxy(c, false, false, nil)
336         defer srv.Close()
337
338         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
339         var hash2 string
340
341         {
342                 _, _, err := kc.Ask(hash)
343                 c.Check(err, Equals, keepclient.BlockNotFound)
344                 c.Log("Finished Ask (expected BlockNotFound)")
345         }
346
347         {
348                 reader, _, _, err := kc.Get(hash)
349                 c.Check(reader, Equals, nil)
350                 c.Check(err, Equals, keepclient.BlockNotFound)
351                 c.Log("Finished Get (expected BlockNotFound)")
352         }
353
354         // Note in bug #5309 among other errors keepproxy would set
355         // Content-Length incorrectly on the 404 BlockNotFound response, this
356         // would result in a protocol violation that would prevent reuse of the
357         // connection, which would manifest by the next attempt to use the
358         // connection (in this case the PutB below) failing.  So to test for
359         // that bug it's necessary to trigger an error response (such as
360         // BlockNotFound) and then do something else with the same httpClient
361         // connection.
362
363         {
364                 var rep int
365                 var err error
366                 hash2, rep, err = kc.PutB([]byte("foo"))
367                 c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
368                 c.Check(rep, Equals, 2)
369                 c.Check(err, Equals, nil)
370                 c.Log("Finished PutB (expected success)")
371
372                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
373                 logbuf.Reset()
374         }
375
376         {
377                 blocklen, _, err := kc.Ask(hash2)
378                 c.Assert(err, Equals, nil)
379                 c.Check(blocklen, Equals, int64(3))
380                 c.Log("Finished Ask (expected success)")
381                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
382                 logbuf.Reset()
383         }
384
385         {
386                 reader, blocklen, _, err := kc.Get(hash2)
387                 c.Assert(err, Equals, nil)
388                 all, err := ioutil.ReadAll(reader)
389                 c.Check(err, IsNil)
390                 c.Check(all, DeepEquals, []byte("foo"))
391                 c.Check(blocklen, Equals, int64(3))
392                 c.Log("Finished Get (expected success)")
393                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
394                 logbuf.Reset()
395         }
396
397         {
398                 var rep int
399                 var err error
400                 hash2, rep, err = kc.PutB([]byte(""))
401                 c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
402                 c.Check(rep, Equals, 2)
403                 c.Check(err, Equals, nil)
404                 c.Log("Finished PutB zero block")
405         }
406
407         {
408                 reader, blocklen, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e")
409                 c.Assert(err, Equals, nil)
410                 all, err := ioutil.ReadAll(reader)
411                 c.Check(err, IsNil)
412                 c.Check(all, DeepEquals, []byte(""))
413                 c.Check(blocklen, Equals, int64(0))
414                 c.Log("Finished Get zero block")
415         }
416 }
417
418 func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
419         srv, kc, _ := runProxy(c, true, false, nil)
420         defer srv.Close()
421
422         hash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
423
424         _, _, err := kc.Ask(hash)
425         c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
426
427         hash2, rep, err := kc.PutB([]byte("bar"))
428         c.Check(hash2, Equals, "")
429         c.Check(rep, Equals, 0)
430         c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError{})
431
432         blocklen, _, err := kc.Ask(hash)
433         c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
434         c.Check(err, ErrorMatches, ".*HTTP 403.*")
435         c.Check(blocklen, Equals, int64(0))
436
437         _, blocklen, _, err = kc.Get(hash)
438         c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
439         c.Check(err, ErrorMatches, ".*HTTP 403.*")
440         c.Check(blocklen, Equals, int64(0))
441 }
442
443 func testPermission(c *C, admin bool, perm arvados.UploadDownloadPermission) {
444         kp := arvados.UploadDownloadRolePermissions{}
445         if admin {
446                 kp.Admin = perm
447                 kp.User = arvados.UploadDownloadPermission{Upload: true, Download: true}
448         } else {
449                 kp.Admin = arvados.UploadDownloadPermission{Upload: true, Download: true}
450                 kp.User = perm
451         }
452
453         srv, kc, logbuf := runProxy(c, false, false, &kp)
454         defer srv.Close()
455         if admin {
456                 kc.Arvados.ApiToken = arvadostest.AdminToken
457         } else {
458                 kc.Arvados.ApiToken = arvadostest.ActiveToken
459         }
460
461         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
462         var hash2 string
463
464         {
465                 var rep int
466                 var err error
467                 hash2, rep, err = kc.PutB([]byte("foo"))
468
469                 if perm.Upload {
470                         c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
471                         c.Check(rep, Equals, 2)
472                         c.Check(err, Equals, nil)
473                         c.Log("Finished PutB (expected success)")
474                         if admin {
475                                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
476                         } else {
477
478                                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="Active User".* userUUID=zzzzz-tpzed-xurymjxw79nv3jz.*`)
479                         }
480                 } else {
481                         c.Check(hash2, Equals, "")
482                         c.Check(rep, Equals, 0)
483                         c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError{})
484                 }
485                 logbuf.Reset()
486         }
487         if perm.Upload {
488                 // can't test download without upload.
489
490                 reader, blocklen, _, err := kc.Get(hash2)
491                 if perm.Download {
492                         c.Assert(err, Equals, nil)
493                         all, err := ioutil.ReadAll(reader)
494                         c.Check(err, IsNil)
495                         c.Check(all, DeepEquals, []byte("foo"))
496                         c.Check(blocklen, Equals, int64(3))
497                         c.Log("Finished Get (expected success)")
498                         if admin {
499                                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
500                         } else {
501                                 c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="Active User".* userUUID=zzzzz-tpzed-xurymjxw79nv3jz.*`)
502                         }
503                 } else {
504                         c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
505                         c.Check(err, ErrorMatches, ".*Missing or invalid Authorization header, or method not allowed.*")
506                         c.Check(blocklen, Equals, int64(0))
507                 }
508                 logbuf.Reset()
509         }
510
511 }
512
513 func (s *ServerRequiredSuite) TestPutGetPermission(c *C) {
514
515         for _, adminperm := range []bool{true, false} {
516                 for _, userperm := range []bool{true, false} {
517
518                         testPermission(c, true,
519                                 arvados.UploadDownloadPermission{
520                                         Upload:   adminperm,
521                                         Download: true,
522                                 })
523                         testPermission(c, true,
524                                 arvados.UploadDownloadPermission{
525                                         Upload:   true,
526                                         Download: adminperm,
527                                 })
528                         testPermission(c, false,
529                                 arvados.UploadDownloadPermission{
530                                         Upload:   true,
531                                         Download: userperm,
532                                 })
533                         testPermission(c, false,
534                                 arvados.UploadDownloadPermission{
535                                         Upload:   true,
536                                         Download: userperm,
537                                 })
538                 }
539         }
540 }
541
542 func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
543         srv, _, _ := runProxy(c, false, false, nil)
544         defer srv.Close()
545
546         {
547                 client := http.Client{}
548                 req, err := http.NewRequest("OPTIONS",
549                         fmt.Sprintf("http://%s/%x+3", srv.Addr, md5.Sum([]byte("foo"))),
550                         nil)
551                 c.Assert(err, IsNil)
552                 req.Header.Add("Access-Control-Request-Method", "PUT")
553                 req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
554                 resp, err := client.Do(req)
555                 c.Check(err, Equals, nil)
556                 c.Check(resp.StatusCode, Equals, 200)
557                 body, err := ioutil.ReadAll(resp.Body)
558                 c.Check(err, IsNil)
559                 c.Check(string(body), Equals, "")
560                 c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, POST, PUT, OPTIONS")
561                 c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
562         }
563
564         {
565                 resp, err := http.Get(fmt.Sprintf("http://%s/%x+3", srv.Addr, md5.Sum([]byte("foo"))))
566                 c.Check(err, Equals, nil)
567                 c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
568                 c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
569         }
570 }
571
572 func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
573         srv, _, _ := runProxy(c, false, false, nil)
574         defer srv.Close()
575
576         {
577                 client := http.Client{}
578                 req, err := http.NewRequest("POST",
579                         "http://"+srv.Addr+"/",
580                         strings.NewReader("qux"))
581                 c.Check(err, IsNil)
582                 req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
583                 req.Header.Add("Content-Type", "application/octet-stream")
584                 resp, err := client.Do(req)
585                 c.Check(err, Equals, nil)
586                 body, err := ioutil.ReadAll(resp.Body)
587                 c.Check(err, Equals, nil)
588                 c.Check(string(body), Matches,
589                         fmt.Sprintf(`^%x\+3(\+.+)?$`, md5.Sum([]byte("qux"))))
590         }
591 }
592
593 func (s *ServerRequiredSuite) TestStripHint(c *C) {
594         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz", "$1"),
595                 Equals,
596                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
597         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73", "$1"),
598                 Equals,
599                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
600         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz", "$1"),
601                 Equals,
602                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz")
603         c.Check(removeHint.ReplaceAllString("http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73", "$1"),
604                 Equals,
605                 "http://keep.zzzzz.arvadosapi.com:25107/2228819a18d3727630fa30c81853d23f+67108864+K@zzzzz-zzzzz-zzzzzzzzzzzzzzz+A37b6ab198qqqq28d903b975266b23ee711e1852c@55635f73")
606
607 }
608
609 // Test GetIndex
610 //   Put one block, with 2 replicas
611 //   With no prefix (expect the block locator, twice)
612 //   With an existing prefix (expect the block locator, twice)
613 //   With a valid but non-existing prefix (expect "\n")
614 //   With an invalid prefix (expect error)
615 func (s *ServerRequiredSuite) TestGetIndex(c *C) {
616         getIndexWorker(c, false)
617 }
618
619 // Test GetIndex
620 //   Uses config.yml
621 //   Put one block, with 2 replicas
622 //   With no prefix (expect the block locator, twice)
623 //   With an existing prefix (expect the block locator, twice)
624 //   With a valid but non-existing prefix (expect "\n")
625 //   With an invalid prefix (expect error)
626 func (s *ServerRequiredConfigYmlSuite) TestGetIndex(c *C) {
627         getIndexWorker(c, true)
628 }
629
630 func getIndexWorker(c *C, useConfig bool) {
631         srv, kc, _ := runProxy(c, false, useConfig, nil)
632         defer srv.Close()
633
634         // Put "index-data" blocks
635         data := []byte("index-data")
636         hash := fmt.Sprintf("%x", md5.Sum(data))
637
638         hash2, rep, err := kc.PutB(data)
639         c.Check(hash2, Matches, fmt.Sprintf(`^%s\+10(\+.+)?$`, hash))
640         c.Check(rep, Equals, 2)
641         c.Check(err, Equals, nil)
642
643         reader, blocklen, _, err := kc.Get(hash)
644         c.Assert(err, IsNil)
645         c.Check(blocklen, Equals, int64(10))
646         all, err := ioutil.ReadAll(reader)
647         c.Assert(err, IsNil)
648         c.Check(all, DeepEquals, data)
649
650         // Put some more blocks
651         _, _, err = kc.PutB([]byte("some-more-index-data"))
652         c.Check(err, IsNil)
653
654         kc.Arvados.ApiToken = arvadostest.SystemRootToken
655
656         // Invoke GetIndex
657         for _, spec := range []struct {
658                 prefix         string
659                 expectTestHash bool
660                 expectOther    bool
661         }{
662                 {"", true, true},         // with no prefix
663                 {hash[:3], true, false},  // with matching prefix
664                 {"abcdef", false, false}, // with no such prefix
665         } {
666                 indexReader, err := kc.GetIndex(TestProxyUUID, spec.prefix)
667                 c.Assert(err, Equals, nil)
668                 indexResp, err := ioutil.ReadAll(indexReader)
669                 c.Assert(err, Equals, nil)
670                 locators := strings.Split(string(indexResp), "\n")
671                 gotTestHash := 0
672                 gotOther := 0
673                 for _, locator := range locators {
674                         if locator == "" {
675                                 continue
676                         }
677                         c.Check(locator[:len(spec.prefix)], Equals, spec.prefix)
678                         if locator[:32] == hash {
679                                 gotTestHash++
680                         } else {
681                                 gotOther++
682                         }
683                 }
684                 c.Check(gotTestHash == 2, Equals, spec.expectTestHash)
685                 c.Check(gotOther > 0, Equals, spec.expectOther)
686         }
687
688         // GetIndex with invalid prefix
689         _, err = kc.GetIndex(TestProxyUUID, "xyz")
690         c.Assert((err != nil), Equals, true)
691 }
692
693 func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
694         srv, kc, _ := runProxy(c, false, false, nil)
695         defer srv.Close()
696         hash, _, err := kc.PutB([]byte("shareddata"))
697         c.Check(err, IsNil)
698         kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken
699         rdr, _, _, err := kc.Get(hash)
700         c.Assert(err, IsNil)
701         data, err := ioutil.ReadAll(rdr)
702         c.Check(err, IsNil)
703         c.Check(data, DeepEquals, []byte("shareddata"))
704 }
705
706 func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
707         srv, kc, _ := runProxy(c, false, false, nil)
708         defer srv.Close()
709
710         // Put a test block
711         hash, rep, err := kc.PutB([]byte("foo"))
712         c.Check(err, IsNil)
713         c.Check(rep, Equals, 2)
714
715         for _, badToken := range []string{
716                 "nosuchtoken",
717                 "2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx", // expired
718         } {
719                 kc.Arvados.ApiToken = badToken
720
721                 // Ask and Get will fail only if the upstream
722                 // keepstore server checks for valid signatures.
723                 // Without knowing the blob signing key, there is no
724                 // way for keepproxy to know whether a given token is
725                 // permitted to read a block.  So these tests fail:
726                 if false {
727                         _, _, err = kc.Ask(hash)
728                         c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
729                         c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
730                         c.Check(err, ErrorMatches, ".*HTTP 403.*")
731
732                         _, _, _, err = kc.Get(hash)
733                         c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
734                         c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
735                         c.Check(err, ErrorMatches, ".*HTTP 403 \"Missing or invalid Authorization header, or method not allowed\".*")
736                 }
737
738                 _, _, err = kc.PutB([]byte("foo"))
739                 c.Check(err, ErrorMatches, ".*403.*Missing or invalid Authorization header, or method not allowed")
740         }
741 }
742
743 func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
744         srv, kc, _ := runProxy(c, false, false, nil)
745         defer srv.Close()
746
747         // Point keepproxy at a non-existent keepstore
748         locals := map[string]string{
749                 TestProxyUUID: "http://localhost:12345",
750         }
751         srv.proxyHandler.KeepClient.SetServiceRoots(locals, nil, nil)
752
753         // Ask should result in temporary bad gateway error
754         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
755         _, _, err := kc.Ask(hash)
756         c.Check(err, NotNil)
757         errNotFound, _ := err.(*keepclient.ErrNotFound)
758         c.Check(errNotFound.Temporary(), Equals, true)
759         c.Assert(err, ErrorMatches, ".*HTTP 502.*")
760
761         // Get should result in temporary bad gateway error
762         _, _, _, err = kc.Get(hash)
763         c.Check(err, NotNil)
764         errNotFound, _ = err.(*keepclient.ErrNotFound)
765         c.Check(errNotFound.Temporary(), Equals, true)
766         c.Assert(err, ErrorMatches, ".*HTTP 502.*")
767 }
768
769 func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
770         srv, kc, _ := runProxy(c, false, false, nil)
771         defer srv.Close()
772
773         hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
774         for _, f := range []func() error{
775                 func() error {
776                         _, _, err := kc.Ask(hash)
777                         return err
778                 },
779                 func() error {
780                         _, _, _, err := kc.Get(hash)
781                         return err
782                 },
783         } {
784                 err := f()
785                 c.Assert(err, NotNil)
786                 errNotFound, _ := err.(*keepclient.ErrNotFound)
787                 c.Check(errNotFound.Temporary(), Equals, true)
788                 c.Check(err, ErrorMatches, `.*HTTP 502.*`)
789         }
790 }
791
792 func (s *ServerRequiredSuite) TestPing(c *C) {
793         srv, kc, _ := runProxy(c, false, false, nil)
794         defer srv.Close()
795
796         rtr, err := newHandler(context.Background(), kc, 10*time.Second, &arvados.Cluster{ManagementToken: arvadostest.ManagementToken})
797         c.Assert(err, check.IsNil)
798
799         req, err := http.NewRequest("GET",
800                 "http://"+srv.Addr+"/_health/ping",
801                 nil)
802         c.Assert(err, IsNil)
803         req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
804
805         resp := httptest.NewRecorder()
806         rtr.ServeHTTP(resp, req)
807         c.Check(resp.Code, Equals, 200)
808         c.Assert(resp.Body.String(), Matches, `{"health":"OK"}\n?`)
809 }