13937: Refactors approach to pass volume metrics as curried vecs (WIP)
[arvados.git] / services / keepstore / handler_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Tests for Keep HTTP handlers:
6 //
7 //     GetBlockHandler
8 //     PutBlockHandler
9 //     IndexHandler
10 //
11 // The HTTP handlers are responsible for enforcing permission policy,
12 // so these tests must exercise all possible permission permutations.
13
14 package main
15
16 import (
17         "bytes"
18         "context"
19         "encoding/json"
20         "fmt"
21         "net/http"
22         "net/http/httptest"
23         "os"
24         "regexp"
25         "strings"
26         "testing"
27         "time"
28
29         "git.curoverse.com/arvados.git/sdk/go/arvados"
30         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
31         "github.com/prometheus/client_golang/prometheus"
32 )
33
34 var testCluster = &arvados.Cluster{
35         ClusterID: "zzzzz",
36 }
37
38 // A RequestTester represents the parameters for an HTTP request to
39 // be issued on behalf of a unit test.
40 type RequestTester struct {
41         uri         string
42         apiToken    string
43         method      string
44         requestBody []byte
45 }
46
47 // Test GetBlockHandler on the following situations:
48 //   - permissions off, unauthenticated request, unsigned locator
49 //   - permissions on, authenticated request, signed locator
50 //   - permissions on, authenticated request, unsigned locator
51 //   - permissions on, unauthenticated request, signed locator
52 //   - permissions on, authenticated request, expired locator
53 //
54 func TestGetHandler(t *testing.T) {
55         defer teardown()
56
57         // Prepare two test Keep volumes. Our block is stored on the second volume.
58         KeepVM = MakeTestVolumeManager(2)
59         defer KeepVM.Close()
60
61         vols := KeepVM.AllWritable()
62         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
63                 t.Error(err)
64         }
65
66         // Create locators for testing.
67         // Turn on permission settings so we can generate signed locators.
68         theConfig.RequireSignatures = true
69         theConfig.blobSigningKey = []byte(knownKey)
70         theConfig.BlobSignatureTTL.Set("5m")
71
72         var (
73                 unsignedLocator  = "/" + TestHash
74                 validTimestamp   = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
75                 expiredTimestamp = time.Now().Add(-time.Hour)
76                 signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
77                 expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
78         )
79
80         // -----------------
81         // Test unauthenticated request with permissions off.
82         theConfig.RequireSignatures = false
83
84         // Unauthenticated request, unsigned locator
85         // => OK
86         response := IssueRequest(
87                 &RequestTester{
88                         method: "GET",
89                         uri:    unsignedLocator,
90                 })
91         ExpectStatusCode(t,
92                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
93         ExpectBody(t,
94                 "Unauthenticated request, unsigned locator",
95                 string(TestBlock),
96                 response)
97
98         receivedLen := response.Header().Get("Content-Length")
99         expectedLen := fmt.Sprintf("%d", len(TestBlock))
100         if receivedLen != expectedLen {
101                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
102         }
103
104         // ----------------
105         // Permissions: on.
106         theConfig.RequireSignatures = true
107
108         // Authenticated request, signed locator
109         // => OK
110         response = IssueRequest(&RequestTester{
111                 method:   "GET",
112                 uri:      signedLocator,
113                 apiToken: knownToken,
114         })
115         ExpectStatusCode(t,
116                 "Authenticated request, signed locator", http.StatusOK, response)
117         ExpectBody(t,
118                 "Authenticated request, signed locator", string(TestBlock), response)
119
120         receivedLen = response.Header().Get("Content-Length")
121         expectedLen = fmt.Sprintf("%d", len(TestBlock))
122         if receivedLen != expectedLen {
123                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
124         }
125
126         // Authenticated request, unsigned locator
127         // => PermissionError
128         response = IssueRequest(&RequestTester{
129                 method:   "GET",
130                 uri:      unsignedLocator,
131                 apiToken: knownToken,
132         })
133         ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
134
135         // Unauthenticated request, signed locator
136         // => PermissionError
137         response = IssueRequest(&RequestTester{
138                 method: "GET",
139                 uri:    signedLocator,
140         })
141         ExpectStatusCode(t,
142                 "Unauthenticated request, signed locator",
143                 PermissionError.HTTPCode, response)
144
145         // Authenticated request, expired locator
146         // => ExpiredError
147         response = IssueRequest(&RequestTester{
148                 method:   "GET",
149                 uri:      expiredLocator,
150                 apiToken: knownToken,
151         })
152         ExpectStatusCode(t,
153                 "Authenticated request, expired locator",
154                 ExpiredError.HTTPCode, response)
155 }
156
157 // Test PutBlockHandler on the following situations:
158 //   - no server key
159 //   - with server key, authenticated request, unsigned locator
160 //   - with server key, unauthenticated request, unsigned locator
161 //
162 func TestPutHandler(t *testing.T) {
163         defer teardown()
164
165         // Prepare two test Keep volumes.
166         KeepVM = MakeTestVolumeManager(2)
167         defer KeepVM.Close()
168
169         // --------------
170         // No server key.
171
172         // Unauthenticated request, no server key
173         // => OK (unsigned response)
174         unsignedLocator := "/" + TestHash
175         response := IssueRequest(
176                 &RequestTester{
177                         method:      "PUT",
178                         uri:         unsignedLocator,
179                         requestBody: TestBlock,
180                 })
181
182         ExpectStatusCode(t,
183                 "Unauthenticated request, no server key", http.StatusOK, response)
184         ExpectBody(t,
185                 "Unauthenticated request, no server key",
186                 TestHashPutResp, response)
187
188         // ------------------
189         // With a server key.
190
191         theConfig.blobSigningKey = []byte(knownKey)
192         theConfig.BlobSignatureTTL.Set("5m")
193
194         // When a permission key is available, the locator returned
195         // from an authenticated PUT request will be signed.
196
197         // Authenticated PUT, signed locator
198         // => OK (signed response)
199         response = IssueRequest(
200                 &RequestTester{
201                         method:      "PUT",
202                         uri:         unsignedLocator,
203                         requestBody: TestBlock,
204                         apiToken:    knownToken,
205                 })
206
207         ExpectStatusCode(t,
208                 "Authenticated PUT, signed locator, with server key",
209                 http.StatusOK, response)
210         responseLocator := strings.TrimSpace(response.Body.String())
211         if VerifySignature(responseLocator, knownToken) != nil {
212                 t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
213                         "response '%s' does not contain a valid signature",
214                         responseLocator)
215         }
216
217         // Unauthenticated PUT, unsigned locator
218         // => OK
219         response = IssueRequest(
220                 &RequestTester{
221                         method:      "PUT",
222                         uri:         unsignedLocator,
223                         requestBody: TestBlock,
224                 })
225
226         ExpectStatusCode(t,
227                 "Unauthenticated PUT, unsigned locator, with server key",
228                 http.StatusOK, response)
229         ExpectBody(t,
230                 "Unauthenticated PUT, unsigned locator, with server key",
231                 TestHashPutResp, response)
232 }
233
234 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
235         defer teardown()
236         theConfig.systemAuthToken = "fake-data-manager-token"
237         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
238         vols[0].Readonly = true
239         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
240         defer KeepVM.Close()
241         IssueRequest(
242                 &RequestTester{
243                         method:      "PUT",
244                         uri:         "/" + TestHash,
245                         requestBody: TestBlock,
246                 })
247         defer func(orig bool) {
248                 theConfig.EnableDelete = orig
249         }(theConfig.EnableDelete)
250         theConfig.EnableDelete = true
251         IssueRequest(
252                 &RequestTester{
253                         method:      "DELETE",
254                         uri:         "/" + TestHash,
255                         requestBody: TestBlock,
256                         apiToken:    theConfig.systemAuthToken,
257                 })
258         type expect struct {
259                 volnum    int
260                 method    string
261                 callcount int
262         }
263         for _, e := range []expect{
264                 {0, "Get", 0},
265                 {0, "Compare", 0},
266                 {0, "Touch", 0},
267                 {0, "Put", 0},
268                 {0, "Delete", 0},
269                 {1, "Get", 0},
270                 {1, "Compare", 1},
271                 {1, "Touch", 1},
272                 {1, "Put", 1},
273                 {1, "Delete", 1},
274         } {
275                 if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
276                         t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
277                 }
278         }
279 }
280
281 // Test /index requests:
282 //   - unauthenticated /index request
283 //   - unauthenticated /index/prefix request
284 //   - authenticated   /index request        | non-superuser
285 //   - authenticated   /index/prefix request | non-superuser
286 //   - authenticated   /index request        | superuser
287 //   - authenticated   /index/prefix request | superuser
288 //
289 // The only /index requests that should succeed are those issued by the
290 // superuser. They should pass regardless of the value of RequireSignatures.
291 //
292 func TestIndexHandler(t *testing.T) {
293         defer teardown()
294
295         // Set up Keep volumes and populate them.
296         // Include multiple blocks on different volumes, and
297         // some metadata files (which should be omitted from index listings)
298         KeepVM = MakeTestVolumeManager(2)
299         defer KeepVM.Close()
300
301         vols := KeepVM.AllWritable()
302         vols[0].Put(context.Background(), TestHash, TestBlock)
303         vols[1].Put(context.Background(), TestHash2, TestBlock2)
304         vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
305         vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
306
307         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
308
309         unauthenticatedReq := &RequestTester{
310                 method: "GET",
311                 uri:    "/index",
312         }
313         authenticatedReq := &RequestTester{
314                 method:   "GET",
315                 uri:      "/index",
316                 apiToken: knownToken,
317         }
318         superuserReq := &RequestTester{
319                 method:   "GET",
320                 uri:      "/index",
321                 apiToken: theConfig.systemAuthToken,
322         }
323         unauthPrefixReq := &RequestTester{
324                 method: "GET",
325                 uri:    "/index/" + TestHash[0:3],
326         }
327         authPrefixReq := &RequestTester{
328                 method:   "GET",
329                 uri:      "/index/" + TestHash[0:3],
330                 apiToken: knownToken,
331         }
332         superuserPrefixReq := &RequestTester{
333                 method:   "GET",
334                 uri:      "/index/" + TestHash[0:3],
335                 apiToken: theConfig.systemAuthToken,
336         }
337         superuserNoSuchPrefixReq := &RequestTester{
338                 method:   "GET",
339                 uri:      "/index/abcd",
340                 apiToken: theConfig.systemAuthToken,
341         }
342         superuserInvalidPrefixReq := &RequestTester{
343                 method:   "GET",
344                 uri:      "/index/xyz",
345                 apiToken: theConfig.systemAuthToken,
346         }
347
348         // -------------------------------------------------------------
349         // Only the superuser should be allowed to issue /index requests.
350
351         // ---------------------------
352         // RequireSignatures enabled
353         // This setting should not affect tests passing.
354         theConfig.RequireSignatures = true
355
356         // unauthenticated /index request
357         // => UnauthorizedError
358         response := IssueRequest(unauthenticatedReq)
359         ExpectStatusCode(t,
360                 "RequireSignatures on, unauthenticated request",
361                 UnauthorizedError.HTTPCode,
362                 response)
363
364         // unauthenticated /index/prefix request
365         // => UnauthorizedError
366         response = IssueRequest(unauthPrefixReq)
367         ExpectStatusCode(t,
368                 "permissions on, unauthenticated /index/prefix request",
369                 UnauthorizedError.HTTPCode,
370                 response)
371
372         // authenticated /index request, non-superuser
373         // => UnauthorizedError
374         response = IssueRequest(authenticatedReq)
375         ExpectStatusCode(t,
376                 "permissions on, authenticated request, non-superuser",
377                 UnauthorizedError.HTTPCode,
378                 response)
379
380         // authenticated /index/prefix request, non-superuser
381         // => UnauthorizedError
382         response = IssueRequest(authPrefixReq)
383         ExpectStatusCode(t,
384                 "permissions on, authenticated /index/prefix request, non-superuser",
385                 UnauthorizedError.HTTPCode,
386                 response)
387
388         // superuser /index request
389         // => OK
390         response = IssueRequest(superuserReq)
391         ExpectStatusCode(t,
392                 "permissions on, superuser request",
393                 http.StatusOK,
394                 response)
395
396         // ----------------------------
397         // RequireSignatures disabled
398         // Valid Request should still pass.
399         theConfig.RequireSignatures = false
400
401         // superuser /index request
402         // => OK
403         response = IssueRequest(superuserReq)
404         ExpectStatusCode(t,
405                 "permissions on, superuser request",
406                 http.StatusOK,
407                 response)
408
409         expected := `^` + TestHash + `\+\d+ \d+\n` +
410                 TestHash2 + `\+\d+ \d+\n\n$`
411         match, _ := regexp.MatchString(expected, response.Body.String())
412         if !match {
413                 t.Errorf(
414                         "permissions on, superuser request: expected %s, got:\n%s",
415                         expected, response.Body.String())
416         }
417
418         // superuser /index/prefix request
419         // => OK
420         response = IssueRequest(superuserPrefixReq)
421         ExpectStatusCode(t,
422                 "permissions on, superuser request",
423                 http.StatusOK,
424                 response)
425
426         expected = `^` + TestHash + `\+\d+ \d+\n\n$`
427         match, _ = regexp.MatchString(expected, response.Body.String())
428         if !match {
429                 t.Errorf(
430                         "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
431                         expected, response.Body.String())
432         }
433
434         // superuser /index/{no-such-prefix} request
435         // => OK
436         response = IssueRequest(superuserNoSuchPrefixReq)
437         ExpectStatusCode(t,
438                 "permissions on, superuser request",
439                 http.StatusOK,
440                 response)
441
442         if "\n" != response.Body.String() {
443                 t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
444         }
445
446         // superuser /index/{invalid-prefix} request
447         // => StatusBadRequest
448         response = IssueRequest(superuserInvalidPrefixReq)
449         ExpectStatusCode(t,
450                 "permissions on, superuser request",
451                 http.StatusBadRequest,
452                 response)
453 }
454
455 // TestDeleteHandler
456 //
457 // Cases tested:
458 //
459 //   With no token and with a non-data-manager token:
460 //   * Delete existing block
461 //     (test for 403 Forbidden, confirm block not deleted)
462 //
463 //   With data manager token:
464 //
465 //   * Delete existing block
466 //     (test for 200 OK, response counts, confirm block deleted)
467 //
468 //   * Delete nonexistent block
469 //     (test for 200 OK, response counts)
470 //
471 //   TODO(twp):
472 //
473 //   * Delete block on read-only and read-write volume
474 //     (test for 200 OK, response with copies_deleted=1,
475 //     copies_failed=1, confirm block deleted only on r/w volume)
476 //
477 //   * Delete block on read-only volume only
478 //     (test for 200 OK, response with copies_deleted=0, copies_failed=1,
479 //     confirm block not deleted)
480 //
481 func TestDeleteHandler(t *testing.T) {
482         defer teardown()
483
484         // Set up Keep volumes and populate them.
485         // Include multiple blocks on different volumes, and
486         // some metadata files (which should be omitted from index listings)
487         KeepVM = MakeTestVolumeManager(2)
488         defer KeepVM.Close()
489
490         vols := KeepVM.AllWritable()
491         vols[0].Put(context.Background(), TestHash, TestBlock)
492
493         // Explicitly set the BlobSignatureTTL to 0 for these
494         // tests, to ensure the MockVolume deletes the blocks
495         // even though they have just been created.
496         theConfig.BlobSignatureTTL = arvados.Duration(0)
497
498         var userToken = "NOT DATA MANAGER TOKEN"
499         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
500
501         theConfig.EnableDelete = true
502
503         unauthReq := &RequestTester{
504                 method: "DELETE",
505                 uri:    "/" + TestHash,
506         }
507
508         userReq := &RequestTester{
509                 method:   "DELETE",
510                 uri:      "/" + TestHash,
511                 apiToken: userToken,
512         }
513
514         superuserExistingBlockReq := &RequestTester{
515                 method:   "DELETE",
516                 uri:      "/" + TestHash,
517                 apiToken: theConfig.systemAuthToken,
518         }
519
520         superuserNonexistentBlockReq := &RequestTester{
521                 method:   "DELETE",
522                 uri:      "/" + TestHash2,
523                 apiToken: theConfig.systemAuthToken,
524         }
525
526         // Unauthenticated request returns PermissionError.
527         var response *httptest.ResponseRecorder
528         response = IssueRequest(unauthReq)
529         ExpectStatusCode(t,
530                 "unauthenticated request",
531                 PermissionError.HTTPCode,
532                 response)
533
534         // Authenticated non-admin request returns PermissionError.
535         response = IssueRequest(userReq)
536         ExpectStatusCode(t,
537                 "authenticated non-admin request",
538                 PermissionError.HTTPCode,
539                 response)
540
541         // Authenticated admin request for nonexistent block.
542         type deletecounter struct {
543                 Deleted int `json:"copies_deleted"`
544                 Failed  int `json:"copies_failed"`
545         }
546         var responseDc, expectedDc deletecounter
547
548         response = IssueRequest(superuserNonexistentBlockReq)
549         ExpectStatusCode(t,
550                 "data manager request, nonexistent block",
551                 http.StatusNotFound,
552                 response)
553
554         // Authenticated admin request for existing block while EnableDelete is false.
555         theConfig.EnableDelete = false
556         response = IssueRequest(superuserExistingBlockReq)
557         ExpectStatusCode(t,
558                 "authenticated request, existing block, method disabled",
559                 MethodDisabledError.HTTPCode,
560                 response)
561         theConfig.EnableDelete = true
562
563         // Authenticated admin request for existing block.
564         response = IssueRequest(superuserExistingBlockReq)
565         ExpectStatusCode(t,
566                 "data manager request, existing block",
567                 http.StatusOK,
568                 response)
569         // Expect response {"copies_deleted":1,"copies_failed":0}
570         expectedDc = deletecounter{1, 0}
571         json.NewDecoder(response.Body).Decode(&responseDc)
572         if responseDc != expectedDc {
573                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
574                         expectedDc, responseDc)
575         }
576         // Confirm the block has been deleted
577         buf := make([]byte, BlockSize)
578         _, err := vols[0].Get(context.Background(), TestHash, buf)
579         var blockDeleted = os.IsNotExist(err)
580         if !blockDeleted {
581                 t.Error("superuserExistingBlockReq: block not deleted")
582         }
583
584         // A DELETE request on a block newer than BlobSignatureTTL
585         // should return success but leave the block on the volume.
586         vols[0].Put(context.Background(), TestHash, TestBlock)
587         theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
588
589         response = IssueRequest(superuserExistingBlockReq)
590         ExpectStatusCode(t,
591                 "data manager request, existing block",
592                 http.StatusOK,
593                 response)
594         // Expect response {"copies_deleted":1,"copies_failed":0}
595         expectedDc = deletecounter{1, 0}
596         json.NewDecoder(response.Body).Decode(&responseDc)
597         if responseDc != expectedDc {
598                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
599                         expectedDc, responseDc)
600         }
601         // Confirm the block has NOT been deleted.
602         _, err = vols[0].Get(context.Background(), TestHash, buf)
603         if err != nil {
604                 t.Errorf("testing delete on new block: %s\n", err)
605         }
606 }
607
608 // TestPullHandler
609 //
610 // Test handling of the PUT /pull statement.
611 //
612 // Cases tested: syntactically valid and invalid pull lists, from the
613 // data manager and from unprivileged users:
614 //
615 //   1. Valid pull list from an ordinary user
616 //      (expected result: 401 Unauthorized)
617 //
618 //   2. Invalid pull request from an ordinary user
619 //      (expected result: 401 Unauthorized)
620 //
621 //   3. Valid pull request from the data manager
622 //      (expected result: 200 OK with request body "Received 3 pull
623 //      requests"
624 //
625 //   4. Invalid pull request from the data manager
626 //      (expected result: 400 Bad Request)
627 //
628 // Test that in the end, the pull manager received a good pull list with
629 // the expected number of requests.
630 //
631 // TODO(twp): test concurrency: launch 100 goroutines to update the
632 // pull list simultaneously.  Make sure that none of them return 400
633 // Bad Request and that pullq.GetList() returns a valid list.
634 //
635 func TestPullHandler(t *testing.T) {
636         defer teardown()
637
638         var userToken = "USER TOKEN"
639         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
640
641         pullq = NewWorkQueue()
642
643         goodJSON := []byte(`[
644                 {
645                         "locator":"locator_with_two_servers",
646                         "servers":[
647                                 "server1",
648                                 "server2"
649                         ]
650                 },
651                 {
652                         "locator":"locator_with_no_servers",
653                         "servers":[]
654                 },
655                 {
656                         "locator":"",
657                         "servers":["empty_locator"]
658                 }
659         ]`)
660
661         badJSON := []byte(`{ "key":"I'm a little teapot" }`)
662
663         type pullTest struct {
664                 name         string
665                 req          RequestTester
666                 responseCode int
667                 responseBody string
668         }
669         var testcases = []pullTest{
670                 {
671                         "Valid pull list from an ordinary user",
672                         RequestTester{"/pull", userToken, "PUT", goodJSON},
673                         http.StatusUnauthorized,
674                         "Unauthorized\n",
675                 },
676                 {
677                         "Invalid pull request from an ordinary user",
678                         RequestTester{"/pull", userToken, "PUT", badJSON},
679                         http.StatusUnauthorized,
680                         "Unauthorized\n",
681                 },
682                 {
683                         "Valid pull request from the data manager",
684                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
685                         http.StatusOK,
686                         "Received 3 pull requests\n",
687                 },
688                 {
689                         "Invalid pull request from the data manager",
690                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
691                         http.StatusBadRequest,
692                         "",
693                 },
694         }
695
696         for _, tst := range testcases {
697                 response := IssueRequest(&tst.req)
698                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
699                 ExpectBody(t, tst.name, tst.responseBody, response)
700         }
701
702         // The Keep pull manager should have received one good list with 3
703         // requests on it.
704         for i := 0; i < 3; i++ {
705                 item := <-pullq.NextItem
706                 if _, ok := item.(PullRequest); !ok {
707                         t.Errorf("item %v could not be parsed as a PullRequest", item)
708                 }
709         }
710
711         expectChannelEmpty(t, pullq.NextItem)
712 }
713
714 // TestTrashHandler
715 //
716 // Test cases:
717 //
718 // Cases tested: syntactically valid and invalid trash lists, from the
719 // data manager and from unprivileged users:
720 //
721 //   1. Valid trash list from an ordinary user
722 //      (expected result: 401 Unauthorized)
723 //
724 //   2. Invalid trash list from an ordinary user
725 //      (expected result: 401 Unauthorized)
726 //
727 //   3. Valid trash list from the data manager
728 //      (expected result: 200 OK with request body "Received 3 trash
729 //      requests"
730 //
731 //   4. Invalid trash list from the data manager
732 //      (expected result: 400 Bad Request)
733 //
734 // Test that in the end, the trash collector received a good list
735 // trash list with the expected number of requests.
736 //
737 // TODO(twp): test concurrency: launch 100 goroutines to update the
738 // pull list simultaneously.  Make sure that none of them return 400
739 // Bad Request and that replica.Dump() returns a valid list.
740 //
741 func TestTrashHandler(t *testing.T) {
742         defer teardown()
743
744         var userToken = "USER TOKEN"
745         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
746
747         trashq = NewWorkQueue()
748
749         goodJSON := []byte(`[
750                 {
751                         "locator":"block1",
752                         "block_mtime":1409082153
753                 },
754                 {
755                         "locator":"block2",
756                         "block_mtime":1409082153
757                 },
758                 {
759                         "locator":"block3",
760                         "block_mtime":1409082153
761                 }
762         ]`)
763
764         badJSON := []byte(`I am not a valid JSON string`)
765
766         type trashTest struct {
767                 name         string
768                 req          RequestTester
769                 responseCode int
770                 responseBody string
771         }
772
773         var testcases = []trashTest{
774                 {
775                         "Valid trash list from an ordinary user",
776                         RequestTester{"/trash", userToken, "PUT", goodJSON},
777                         http.StatusUnauthorized,
778                         "Unauthorized\n",
779                 },
780                 {
781                         "Invalid trash list from an ordinary user",
782                         RequestTester{"/trash", userToken, "PUT", badJSON},
783                         http.StatusUnauthorized,
784                         "Unauthorized\n",
785                 },
786                 {
787                         "Valid trash list from the data manager",
788                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
789                         http.StatusOK,
790                         "Received 3 trash requests\n",
791                 },
792                 {
793                         "Invalid trash list from the data manager",
794                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
795                         http.StatusBadRequest,
796                         "",
797                 },
798         }
799
800         for _, tst := range testcases {
801                 response := IssueRequest(&tst.req)
802                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
803                 ExpectBody(t, tst.name, tst.responseBody, response)
804         }
805
806         // The trash collector should have received one good list with 3
807         // requests on it.
808         for i := 0; i < 3; i++ {
809                 item := <-trashq.NextItem
810                 if _, ok := item.(TrashRequest); !ok {
811                         t.Errorf("item %v could not be parsed as a TrashRequest", item)
812                 }
813         }
814
815         expectChannelEmpty(t, trashq.NextItem)
816 }
817
818 // ====================
819 // Helper functions
820 // ====================
821
822 // IssueTestRequest executes an HTTP request described by rt, to a
823 // REST router.  It returns the HTTP response to the request.
824 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
825         response := httptest.NewRecorder()
826         body := bytes.NewReader(rt.requestBody)
827         req, _ := http.NewRequest(rt.method, rt.uri, body)
828         if rt.apiToken != "" {
829                 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
830         }
831         loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
832         loggingRouter.ServeHTTP(response, req)
833         return response
834 }
835
836 func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
837         response := httptest.NewRecorder()
838         body := bytes.NewReader(rt.requestBody)
839         req, _ := http.NewRequest(rt.method, rt.uri, body)
840         if rt.apiToken != "" {
841                 req.Header.Set("Authorization", "Bearer "+rt.apiToken)
842         }
843         loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
844         loggingRouter.ServeHTTP(response, req)
845         return response
846 }
847
848 // ExpectStatusCode checks whether a response has the specified status code,
849 // and reports a test failure if not.
850 func ExpectStatusCode(
851         t *testing.T,
852         testname string,
853         expectedStatus int,
854         response *httptest.ResponseRecorder) {
855         if response.Code != expectedStatus {
856                 t.Errorf("%s: expected status %d, got %+v",
857                         testname, expectedStatus, response)
858         }
859 }
860
861 func ExpectBody(
862         t *testing.T,
863         testname string,
864         expectedBody string,
865         response *httptest.ResponseRecorder) {
866         if expectedBody != "" && response.Body.String() != expectedBody {
867                 t.Errorf("%s: expected response body '%s', got %+v",
868                         testname, expectedBody, response)
869         }
870 }
871
872 // See #7121
873 func TestPutNeedsOnlyOneBuffer(t *testing.T) {
874         defer teardown()
875         KeepVM = MakeTestVolumeManager(1)
876         defer KeepVM.Close()
877
878         defer func(orig *bufferPool) {
879                 bufs = orig
880         }(bufs)
881         bufs = newBufferPool(1, BlockSize)
882
883         ok := make(chan struct{})
884         go func() {
885                 for i := 0; i < 2; i++ {
886                         response := IssueRequest(
887                                 &RequestTester{
888                                         method:      "PUT",
889                                         uri:         "/" + TestHash,
890                                         requestBody: TestBlock,
891                                 })
892                         ExpectStatusCode(t,
893                                 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
894                 }
895                 ok <- struct{}{}
896         }()
897
898         select {
899         case <-ok:
900         case <-time.After(time.Second):
901                 t.Fatal("PUT deadlocks with MaxBuffers==1")
902         }
903 }
904
905 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
906 // leak.
907 func TestPutHandlerNoBufferleak(t *testing.T) {
908         defer teardown()
909
910         // Prepare two test Keep volumes.
911         KeepVM = MakeTestVolumeManager(2)
912         defer KeepVM.Close()
913
914         ok := make(chan bool)
915         go func() {
916                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
917                         // Unauthenticated request, no server key
918                         // => OK (unsigned response)
919                         unsignedLocator := "/" + TestHash
920                         response := IssueRequest(
921                                 &RequestTester{
922                                         method:      "PUT",
923                                         uri:         unsignedLocator,
924                                         requestBody: TestBlock,
925                                 })
926                         ExpectStatusCode(t,
927                                 "TestPutHandlerBufferleak", http.StatusOK, response)
928                         ExpectBody(t,
929                                 "TestPutHandlerBufferleak",
930                                 TestHashPutResp, response)
931                 }
932                 ok <- true
933         }()
934         select {
935         case <-time.After(20 * time.Second):
936                 // If the buffer pool leaks, the test goroutine hangs.
937                 t.Fatal("test did not finish, assuming pool leaked")
938         case <-ok:
939         }
940 }
941
942 type notifyingResponseRecorder struct {
943         *httptest.ResponseRecorder
944         closer chan bool
945 }
946
947 func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
948         return r.closer
949 }
950
951 func TestGetHandlerClientDisconnect(t *testing.T) {
952         defer func(was bool) {
953                 theConfig.RequireSignatures = was
954         }(theConfig.RequireSignatures)
955         theConfig.RequireSignatures = false
956
957         defer func(orig *bufferPool) {
958                 bufs = orig
959         }(bufs)
960         bufs = newBufferPool(1, BlockSize)
961         defer bufs.Put(bufs.Get(BlockSize))
962
963         KeepVM = MakeTestVolumeManager(2)
964         defer KeepVM.Close()
965
966         if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
967                 t.Error(err)
968         }
969
970         resp := &notifyingResponseRecorder{
971                 ResponseRecorder: httptest.NewRecorder(),
972                 closer:           make(chan bool, 1),
973         }
974         if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
975                 t.Fatal("notifyingResponseRecorder is broken")
976         }
977         // If anyone asks, the client has disconnected.
978         resp.closer <- true
979
980         ok := make(chan struct{})
981         go func() {
982                 req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
983                 MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
984                 ok <- struct{}{}
985         }()
986
987         select {
988         case <-time.After(20 * time.Second):
989                 t.Fatal("request took >20s, close notifier must be broken")
990         case <-ok:
991         }
992
993         ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
994         for i, v := range KeepVM.AllWritable() {
995                 if calls := v.(*MockVolume).called["GET"]; calls != 0 {
996                         t.Errorf("volume %d got %d calls, expected 0", i, calls)
997                 }
998         }
999 }
1000
1001 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
1002 // leak.
1003 func TestGetHandlerNoBufferLeak(t *testing.T) {
1004         defer teardown()
1005
1006         // Prepare two test Keep volumes. Our block is stored on the second volume.
1007         KeepVM = MakeTestVolumeManager(2)
1008         defer KeepVM.Close()
1009
1010         vols := KeepVM.AllWritable()
1011         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
1012                 t.Error(err)
1013         }
1014
1015         ok := make(chan bool)
1016         go func() {
1017                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
1018                         // Unauthenticated request, unsigned locator
1019                         // => OK
1020                         unsignedLocator := "/" + TestHash
1021                         response := IssueRequest(
1022                                 &RequestTester{
1023                                         method: "GET",
1024                                         uri:    unsignedLocator,
1025                                 })
1026                         ExpectStatusCode(t,
1027                                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1028                         ExpectBody(t,
1029                                 "Unauthenticated request, unsigned locator",
1030                                 string(TestBlock),
1031                                 response)
1032                 }
1033                 ok <- true
1034         }()
1035         select {
1036         case <-time.After(20 * time.Second):
1037                 // If the buffer pool leaks, the test goroutine hangs.
1038                 t.Fatal("test did not finish, assuming pool leaked")
1039         case <-ok:
1040         }
1041 }
1042
1043 func TestPutReplicationHeader(t *testing.T) {
1044         defer teardown()
1045
1046         KeepVM = MakeTestVolumeManager(2)
1047         defer KeepVM.Close()
1048
1049         resp := IssueRequest(&RequestTester{
1050                 method:      "PUT",
1051                 uri:         "/" + TestHash,
1052                 requestBody: TestBlock,
1053         })
1054         if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
1055                 t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
1056         }
1057 }
1058
1059 func TestUntrashHandler(t *testing.T) {
1060         defer teardown()
1061
1062         // Set up Keep volumes
1063         KeepVM = MakeTestVolumeManager(2)
1064         defer KeepVM.Close()
1065         vols := KeepVM.AllWritable()
1066         vols[0].Put(context.Background(), TestHash, TestBlock)
1067
1068         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1069
1070         // unauthenticatedReq => UnauthorizedError
1071         unauthenticatedReq := &RequestTester{
1072                 method: "PUT",
1073                 uri:    "/untrash/" + TestHash,
1074         }
1075         response := IssueRequest(unauthenticatedReq)
1076         ExpectStatusCode(t,
1077                 "Unauthenticated request",
1078                 UnauthorizedError.HTTPCode,
1079                 response)
1080
1081         // notDataManagerReq => UnauthorizedError
1082         notDataManagerReq := &RequestTester{
1083                 method:   "PUT",
1084                 uri:      "/untrash/" + TestHash,
1085                 apiToken: knownToken,
1086         }
1087
1088         response = IssueRequest(notDataManagerReq)
1089         ExpectStatusCode(t,
1090                 "Non-datamanager token",
1091                 UnauthorizedError.HTTPCode,
1092                 response)
1093
1094         // datamanagerWithBadHashReq => StatusBadRequest
1095         datamanagerWithBadHashReq := &RequestTester{
1096                 method:   "PUT",
1097                 uri:      "/untrash/thisisnotalocator",
1098                 apiToken: theConfig.systemAuthToken,
1099         }
1100         response = IssueRequest(datamanagerWithBadHashReq)
1101         ExpectStatusCode(t,
1102                 "Bad locator in untrash request",
1103                 http.StatusBadRequest,
1104                 response)
1105
1106         // datamanagerWrongMethodReq => StatusBadRequest
1107         datamanagerWrongMethodReq := &RequestTester{
1108                 method:   "GET",
1109                 uri:      "/untrash/" + TestHash,
1110                 apiToken: theConfig.systemAuthToken,
1111         }
1112         response = IssueRequest(datamanagerWrongMethodReq)
1113         ExpectStatusCode(t,
1114                 "Only PUT method is supported for untrash",
1115                 http.StatusMethodNotAllowed,
1116                 response)
1117
1118         // datamanagerReq => StatusOK
1119         datamanagerReq := &RequestTester{
1120                 method:   "PUT",
1121                 uri:      "/untrash/" + TestHash,
1122                 apiToken: theConfig.systemAuthToken,
1123         }
1124         response = IssueRequest(datamanagerReq)
1125         ExpectStatusCode(t,
1126                 "",
1127                 http.StatusOK,
1128                 response)
1129         expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
1130         if response.Body.String() != expected {
1131                 t.Errorf(
1132                         "Untrash response mismatched: expected %s, got:\n%s",
1133                         expected, response.Body.String())
1134         }
1135 }
1136
1137 func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
1138         defer teardown()
1139
1140         // Set up readonly Keep volumes
1141         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
1142         vols[0].Readonly = true
1143         vols[1].Readonly = true
1144         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
1145         defer KeepVM.Close()
1146
1147         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1148
1149         // datamanagerReq => StatusOK
1150         datamanagerReq := &RequestTester{
1151                 method:   "PUT",
1152                 uri:      "/untrash/" + TestHash,
1153                 apiToken: theConfig.systemAuthToken,
1154         }
1155         response := IssueRequest(datamanagerReq)
1156         ExpectStatusCode(t,
1157                 "No writable volumes",
1158                 http.StatusNotFound,
1159                 response)
1160 }
1161
1162 func TestHealthCheckPing(t *testing.T) {
1163         theConfig.ManagementToken = arvadostest.ManagementToken
1164         pingReq := &RequestTester{
1165                 method:   "GET",
1166                 uri:      "/_health/ping",
1167                 apiToken: arvadostest.ManagementToken,
1168         }
1169         response := IssueHealthCheckRequest(pingReq)
1170         ExpectStatusCode(t,
1171                 "",
1172                 http.StatusOK,
1173                 response)
1174         want := `{"health":"OK"}`
1175         if !strings.Contains(response.Body.String(), want) {
1176                 t.Errorf("expected response to include %s: got %s", want, response.Body.String())
1177         }
1178 }