12538: Refactor crunch-run shutdown
[arvados.git] / services / keepstore / handler_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 // Tests for Keep HTTP handlers:
6 //
7 //     GetBlockHandler
8 //     PutBlockHandler
9 //     IndexHandler
10 //
11 // The HTTP handlers are responsible for enforcing permission policy,
12 // so these tests must exercise all possible permission permutations.
13
14 package main
15
16 import (
17         "bytes"
18         "context"
19         "encoding/json"
20         "fmt"
21         "net/http"
22         "net/http/httptest"
23         "os"
24         "regexp"
25         "strings"
26         "testing"
27         "time"
28
29         "git.curoverse.com/arvados.git/sdk/go/arvados"
30         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
31 )
32
33 // A RequestTester represents the parameters for an HTTP request to
34 // be issued on behalf of a unit test.
35 type RequestTester struct {
36         uri         string
37         apiToken    string
38         method      string
39         requestBody []byte
40 }
41
42 // Test GetBlockHandler on the following situations:
43 //   - permissions off, unauthenticated request, unsigned locator
44 //   - permissions on, authenticated request, signed locator
45 //   - permissions on, authenticated request, unsigned locator
46 //   - permissions on, unauthenticated request, signed locator
47 //   - permissions on, authenticated request, expired locator
48 //
49 func TestGetHandler(t *testing.T) {
50         defer teardown()
51
52         // Prepare two test Keep volumes. Our block is stored on the second volume.
53         KeepVM = MakeTestVolumeManager(2)
54         defer KeepVM.Close()
55
56         vols := KeepVM.AllWritable()
57         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
58                 t.Error(err)
59         }
60
61         // Create locators for testing.
62         // Turn on permission settings so we can generate signed locators.
63         theConfig.RequireSignatures = true
64         theConfig.blobSigningKey = []byte(knownKey)
65         theConfig.BlobSignatureTTL.Set("5m")
66
67         var (
68                 unsignedLocator  = "/" + TestHash
69                 validTimestamp   = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
70                 expiredTimestamp = time.Now().Add(-time.Hour)
71                 signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
72                 expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
73         )
74
75         // -----------------
76         // Test unauthenticated request with permissions off.
77         theConfig.RequireSignatures = false
78
79         // Unauthenticated request, unsigned locator
80         // => OK
81         response := IssueRequest(
82                 &RequestTester{
83                         method: "GET",
84                         uri:    unsignedLocator,
85                 })
86         ExpectStatusCode(t,
87                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
88         ExpectBody(t,
89                 "Unauthenticated request, unsigned locator",
90                 string(TestBlock),
91                 response)
92
93         receivedLen := response.Header().Get("Content-Length")
94         expectedLen := fmt.Sprintf("%d", len(TestBlock))
95         if receivedLen != expectedLen {
96                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
97         }
98
99         // ----------------
100         // Permissions: on.
101         theConfig.RequireSignatures = true
102
103         // Authenticated request, signed locator
104         // => OK
105         response = IssueRequest(&RequestTester{
106                 method:   "GET",
107                 uri:      signedLocator,
108                 apiToken: knownToken,
109         })
110         ExpectStatusCode(t,
111                 "Authenticated request, signed locator", http.StatusOK, response)
112         ExpectBody(t,
113                 "Authenticated request, signed locator", string(TestBlock), response)
114
115         receivedLen = response.Header().Get("Content-Length")
116         expectedLen = fmt.Sprintf("%d", len(TestBlock))
117         if receivedLen != expectedLen {
118                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
119         }
120
121         // Authenticated request, unsigned locator
122         // => PermissionError
123         response = IssueRequest(&RequestTester{
124                 method:   "GET",
125                 uri:      unsignedLocator,
126                 apiToken: knownToken,
127         })
128         ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
129
130         // Unauthenticated request, signed locator
131         // => PermissionError
132         response = IssueRequest(&RequestTester{
133                 method: "GET",
134                 uri:    signedLocator,
135         })
136         ExpectStatusCode(t,
137                 "Unauthenticated request, signed locator",
138                 PermissionError.HTTPCode, response)
139
140         // Authenticated request, expired locator
141         // => ExpiredError
142         response = IssueRequest(&RequestTester{
143                 method:   "GET",
144                 uri:      expiredLocator,
145                 apiToken: knownToken,
146         })
147         ExpectStatusCode(t,
148                 "Authenticated request, expired locator",
149                 ExpiredError.HTTPCode, response)
150 }
151
152 // Test PutBlockHandler on the following situations:
153 //   - no server key
154 //   - with server key, authenticated request, unsigned locator
155 //   - with server key, unauthenticated request, unsigned locator
156 //
157 func TestPutHandler(t *testing.T) {
158         defer teardown()
159
160         // Prepare two test Keep volumes.
161         KeepVM = MakeTestVolumeManager(2)
162         defer KeepVM.Close()
163
164         // --------------
165         // No server key.
166
167         // Unauthenticated request, no server key
168         // => OK (unsigned response)
169         unsignedLocator := "/" + TestHash
170         response := IssueRequest(
171                 &RequestTester{
172                         method:      "PUT",
173                         uri:         unsignedLocator,
174                         requestBody: TestBlock,
175                 })
176
177         ExpectStatusCode(t,
178                 "Unauthenticated request, no server key", http.StatusOK, response)
179         ExpectBody(t,
180                 "Unauthenticated request, no server key",
181                 TestHashPutResp, response)
182
183         // ------------------
184         // With a server key.
185
186         theConfig.blobSigningKey = []byte(knownKey)
187         theConfig.BlobSignatureTTL.Set("5m")
188
189         // When a permission key is available, the locator returned
190         // from an authenticated PUT request will be signed.
191
192         // Authenticated PUT, signed locator
193         // => OK (signed response)
194         response = IssueRequest(
195                 &RequestTester{
196                         method:      "PUT",
197                         uri:         unsignedLocator,
198                         requestBody: TestBlock,
199                         apiToken:    knownToken,
200                 })
201
202         ExpectStatusCode(t,
203                 "Authenticated PUT, signed locator, with server key",
204                 http.StatusOK, response)
205         responseLocator := strings.TrimSpace(response.Body.String())
206         if VerifySignature(responseLocator, knownToken) != nil {
207                 t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
208                         "response '%s' does not contain a valid signature",
209                         responseLocator)
210         }
211
212         // Unauthenticated PUT, unsigned locator
213         // => OK
214         response = IssueRequest(
215                 &RequestTester{
216                         method:      "PUT",
217                         uri:         unsignedLocator,
218                         requestBody: TestBlock,
219                 })
220
221         ExpectStatusCode(t,
222                 "Unauthenticated PUT, unsigned locator, with server key",
223                 http.StatusOK, response)
224         ExpectBody(t,
225                 "Unauthenticated PUT, unsigned locator, with server key",
226                 TestHashPutResp, response)
227 }
228
229 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
230         defer teardown()
231         theConfig.systemAuthToken = "fake-data-manager-token"
232         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
233         vols[0].Readonly = true
234         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
235         defer KeepVM.Close()
236         IssueRequest(
237                 &RequestTester{
238                         method:      "PUT",
239                         uri:         "/" + TestHash,
240                         requestBody: TestBlock,
241                 })
242         defer func(orig bool) {
243                 theConfig.EnableDelete = orig
244         }(theConfig.EnableDelete)
245         theConfig.EnableDelete = true
246         IssueRequest(
247                 &RequestTester{
248                         method:      "DELETE",
249                         uri:         "/" + TestHash,
250                         requestBody: TestBlock,
251                         apiToken:    theConfig.systemAuthToken,
252                 })
253         type expect struct {
254                 volnum    int
255                 method    string
256                 callcount int
257         }
258         for _, e := range []expect{
259                 {0, "Get", 0},
260                 {0, "Compare", 0},
261                 {0, "Touch", 0},
262                 {0, "Put", 0},
263                 {0, "Delete", 0},
264                 {1, "Get", 0},
265                 {1, "Compare", 1},
266                 {1, "Touch", 1},
267                 {1, "Put", 1},
268                 {1, "Delete", 1},
269         } {
270                 if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
271                         t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
272                 }
273         }
274 }
275
276 // Test /index requests:
277 //   - unauthenticated /index request
278 //   - unauthenticated /index/prefix request
279 //   - authenticated   /index request        | non-superuser
280 //   - authenticated   /index/prefix request | non-superuser
281 //   - authenticated   /index request        | superuser
282 //   - authenticated   /index/prefix request | superuser
283 //
284 // The only /index requests that should succeed are those issued by the
285 // superuser. They should pass regardless of the value of RequireSignatures.
286 //
287 func TestIndexHandler(t *testing.T) {
288         defer teardown()
289
290         // Set up Keep volumes and populate them.
291         // Include multiple blocks on different volumes, and
292         // some metadata files (which should be omitted from index listings)
293         KeepVM = MakeTestVolumeManager(2)
294         defer KeepVM.Close()
295
296         vols := KeepVM.AllWritable()
297         vols[0].Put(context.Background(), TestHash, TestBlock)
298         vols[1].Put(context.Background(), TestHash2, TestBlock2)
299         vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
300         vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
301
302         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
303
304         unauthenticatedReq := &RequestTester{
305                 method: "GET",
306                 uri:    "/index",
307         }
308         authenticatedReq := &RequestTester{
309                 method:   "GET",
310                 uri:      "/index",
311                 apiToken: knownToken,
312         }
313         superuserReq := &RequestTester{
314                 method:   "GET",
315                 uri:      "/index",
316                 apiToken: theConfig.systemAuthToken,
317         }
318         unauthPrefixReq := &RequestTester{
319                 method: "GET",
320                 uri:    "/index/" + TestHash[0:3],
321         }
322         authPrefixReq := &RequestTester{
323                 method:   "GET",
324                 uri:      "/index/" + TestHash[0:3],
325                 apiToken: knownToken,
326         }
327         superuserPrefixReq := &RequestTester{
328                 method:   "GET",
329                 uri:      "/index/" + TestHash[0:3],
330                 apiToken: theConfig.systemAuthToken,
331         }
332         superuserNoSuchPrefixReq := &RequestTester{
333                 method:   "GET",
334                 uri:      "/index/abcd",
335                 apiToken: theConfig.systemAuthToken,
336         }
337         superuserInvalidPrefixReq := &RequestTester{
338                 method:   "GET",
339                 uri:      "/index/xyz",
340                 apiToken: theConfig.systemAuthToken,
341         }
342
343         // -------------------------------------------------------------
344         // Only the superuser should be allowed to issue /index requests.
345
346         // ---------------------------
347         // RequireSignatures enabled
348         // This setting should not affect tests passing.
349         theConfig.RequireSignatures = true
350
351         // unauthenticated /index request
352         // => UnauthorizedError
353         response := IssueRequest(unauthenticatedReq)
354         ExpectStatusCode(t,
355                 "RequireSignatures on, unauthenticated request",
356                 UnauthorizedError.HTTPCode,
357                 response)
358
359         // unauthenticated /index/prefix request
360         // => UnauthorizedError
361         response = IssueRequest(unauthPrefixReq)
362         ExpectStatusCode(t,
363                 "permissions on, unauthenticated /index/prefix request",
364                 UnauthorizedError.HTTPCode,
365                 response)
366
367         // authenticated /index request, non-superuser
368         // => UnauthorizedError
369         response = IssueRequest(authenticatedReq)
370         ExpectStatusCode(t,
371                 "permissions on, authenticated request, non-superuser",
372                 UnauthorizedError.HTTPCode,
373                 response)
374
375         // authenticated /index/prefix request, non-superuser
376         // => UnauthorizedError
377         response = IssueRequest(authPrefixReq)
378         ExpectStatusCode(t,
379                 "permissions on, authenticated /index/prefix request, non-superuser",
380                 UnauthorizedError.HTTPCode,
381                 response)
382
383         // superuser /index request
384         // => OK
385         response = IssueRequest(superuserReq)
386         ExpectStatusCode(t,
387                 "permissions on, superuser request",
388                 http.StatusOK,
389                 response)
390
391         // ----------------------------
392         // RequireSignatures disabled
393         // Valid Request should still pass.
394         theConfig.RequireSignatures = false
395
396         // superuser /index request
397         // => OK
398         response = IssueRequest(superuserReq)
399         ExpectStatusCode(t,
400                 "permissions on, superuser request",
401                 http.StatusOK,
402                 response)
403
404         expected := `^` + TestHash + `\+\d+ \d+\n` +
405                 TestHash2 + `\+\d+ \d+\n\n$`
406         match, _ := regexp.MatchString(expected, response.Body.String())
407         if !match {
408                 t.Errorf(
409                         "permissions on, superuser request: expected %s, got:\n%s",
410                         expected, response.Body.String())
411         }
412
413         // superuser /index/prefix request
414         // => OK
415         response = IssueRequest(superuserPrefixReq)
416         ExpectStatusCode(t,
417                 "permissions on, superuser request",
418                 http.StatusOK,
419                 response)
420
421         expected = `^` + TestHash + `\+\d+ \d+\n\n$`
422         match, _ = regexp.MatchString(expected, response.Body.String())
423         if !match {
424                 t.Errorf(
425                         "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
426                         expected, response.Body.String())
427         }
428
429         // superuser /index/{no-such-prefix} request
430         // => OK
431         response = IssueRequest(superuserNoSuchPrefixReq)
432         ExpectStatusCode(t,
433                 "permissions on, superuser request",
434                 http.StatusOK,
435                 response)
436
437         if "\n" != response.Body.String() {
438                 t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
439         }
440
441         // superuser /index/{invalid-prefix} request
442         // => StatusBadRequest
443         response = IssueRequest(superuserInvalidPrefixReq)
444         ExpectStatusCode(t,
445                 "permissions on, superuser request",
446                 http.StatusBadRequest,
447                 response)
448 }
449
450 // TestDeleteHandler
451 //
452 // Cases tested:
453 //
454 //   With no token and with a non-data-manager token:
455 //   * Delete existing block
456 //     (test for 403 Forbidden, confirm block not deleted)
457 //
458 //   With data manager token:
459 //
460 //   * Delete existing block
461 //     (test for 200 OK, response counts, confirm block deleted)
462 //
463 //   * Delete nonexistent block
464 //     (test for 200 OK, response counts)
465 //
466 //   TODO(twp):
467 //
468 //   * Delete block on read-only and read-write volume
469 //     (test for 200 OK, response with copies_deleted=1,
470 //     copies_failed=1, confirm block deleted only on r/w volume)
471 //
472 //   * Delete block on read-only volume only
473 //     (test for 200 OK, response with copies_deleted=0, copies_failed=1,
474 //     confirm block not deleted)
475 //
476 func TestDeleteHandler(t *testing.T) {
477         defer teardown()
478
479         // Set up Keep volumes and populate them.
480         // Include multiple blocks on different volumes, and
481         // some metadata files (which should be omitted from index listings)
482         KeepVM = MakeTestVolumeManager(2)
483         defer KeepVM.Close()
484
485         vols := KeepVM.AllWritable()
486         vols[0].Put(context.Background(), TestHash, TestBlock)
487
488         // Explicitly set the BlobSignatureTTL to 0 for these
489         // tests, to ensure the MockVolume deletes the blocks
490         // even though they have just been created.
491         theConfig.BlobSignatureTTL = arvados.Duration(0)
492
493         var userToken = "NOT DATA MANAGER TOKEN"
494         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
495
496         theConfig.EnableDelete = true
497
498         unauthReq := &RequestTester{
499                 method: "DELETE",
500                 uri:    "/" + TestHash,
501         }
502
503         userReq := &RequestTester{
504                 method:   "DELETE",
505                 uri:      "/" + TestHash,
506                 apiToken: userToken,
507         }
508
509         superuserExistingBlockReq := &RequestTester{
510                 method:   "DELETE",
511                 uri:      "/" + TestHash,
512                 apiToken: theConfig.systemAuthToken,
513         }
514
515         superuserNonexistentBlockReq := &RequestTester{
516                 method:   "DELETE",
517                 uri:      "/" + TestHash2,
518                 apiToken: theConfig.systemAuthToken,
519         }
520
521         // Unauthenticated request returns PermissionError.
522         var response *httptest.ResponseRecorder
523         response = IssueRequest(unauthReq)
524         ExpectStatusCode(t,
525                 "unauthenticated request",
526                 PermissionError.HTTPCode,
527                 response)
528
529         // Authenticated non-admin request returns PermissionError.
530         response = IssueRequest(userReq)
531         ExpectStatusCode(t,
532                 "authenticated non-admin request",
533                 PermissionError.HTTPCode,
534                 response)
535
536         // Authenticated admin request for nonexistent block.
537         type deletecounter struct {
538                 Deleted int `json:"copies_deleted"`
539                 Failed  int `json:"copies_failed"`
540         }
541         var responseDc, expectedDc deletecounter
542
543         response = IssueRequest(superuserNonexistentBlockReq)
544         ExpectStatusCode(t,
545                 "data manager request, nonexistent block",
546                 http.StatusNotFound,
547                 response)
548
549         // Authenticated admin request for existing block while EnableDelete is false.
550         theConfig.EnableDelete = false
551         response = IssueRequest(superuserExistingBlockReq)
552         ExpectStatusCode(t,
553                 "authenticated request, existing block, method disabled",
554                 MethodDisabledError.HTTPCode,
555                 response)
556         theConfig.EnableDelete = true
557
558         // Authenticated admin request for existing block.
559         response = IssueRequest(superuserExistingBlockReq)
560         ExpectStatusCode(t,
561                 "data manager request, existing block",
562                 http.StatusOK,
563                 response)
564         // Expect response {"copies_deleted":1,"copies_failed":0}
565         expectedDc = deletecounter{1, 0}
566         json.NewDecoder(response.Body).Decode(&responseDc)
567         if responseDc != expectedDc {
568                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
569                         expectedDc, responseDc)
570         }
571         // Confirm the block has been deleted
572         buf := make([]byte, BlockSize)
573         _, err := vols[0].Get(context.Background(), TestHash, buf)
574         var blockDeleted = os.IsNotExist(err)
575         if !blockDeleted {
576                 t.Error("superuserExistingBlockReq: block not deleted")
577         }
578
579         // A DELETE request on a block newer than BlobSignatureTTL
580         // should return success but leave the block on the volume.
581         vols[0].Put(context.Background(), TestHash, TestBlock)
582         theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
583
584         response = IssueRequest(superuserExistingBlockReq)
585         ExpectStatusCode(t,
586                 "data manager request, existing block",
587                 http.StatusOK,
588                 response)
589         // Expect response {"copies_deleted":1,"copies_failed":0}
590         expectedDc = deletecounter{1, 0}
591         json.NewDecoder(response.Body).Decode(&responseDc)
592         if responseDc != expectedDc {
593                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
594                         expectedDc, responseDc)
595         }
596         // Confirm the block has NOT been deleted.
597         _, err = vols[0].Get(context.Background(), TestHash, buf)
598         if err != nil {
599                 t.Errorf("testing delete on new block: %s\n", err)
600         }
601 }
602
603 // TestPullHandler
604 //
605 // Test handling of the PUT /pull statement.
606 //
607 // Cases tested: syntactically valid and invalid pull lists, from the
608 // data manager and from unprivileged users:
609 //
610 //   1. Valid pull list from an ordinary user
611 //      (expected result: 401 Unauthorized)
612 //
613 //   2. Invalid pull request from an ordinary user
614 //      (expected result: 401 Unauthorized)
615 //
616 //   3. Valid pull request from the data manager
617 //      (expected result: 200 OK with request body "Received 3 pull
618 //      requests"
619 //
620 //   4. Invalid pull request from the data manager
621 //      (expected result: 400 Bad Request)
622 //
623 // Test that in the end, the pull manager received a good pull list with
624 // the expected number of requests.
625 //
626 // TODO(twp): test concurrency: launch 100 goroutines to update the
627 // pull list simultaneously.  Make sure that none of them return 400
628 // Bad Request and that pullq.GetList() returns a valid list.
629 //
630 func TestPullHandler(t *testing.T) {
631         defer teardown()
632
633         var userToken = "USER TOKEN"
634         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
635
636         pullq = NewWorkQueue()
637
638         goodJSON := []byte(`[
639                 {
640                         "locator":"locator_with_two_servers",
641                         "servers":[
642                                 "server1",
643                                 "server2"
644                         ]
645                 },
646                 {
647                         "locator":"locator_with_no_servers",
648                         "servers":[]
649                 },
650                 {
651                         "locator":"",
652                         "servers":["empty_locator"]
653                 }
654         ]`)
655
656         badJSON := []byte(`{ "key":"I'm a little teapot" }`)
657
658         type pullTest struct {
659                 name         string
660                 req          RequestTester
661                 responseCode int
662                 responseBody string
663         }
664         var testcases = []pullTest{
665                 {
666                         "Valid pull list from an ordinary user",
667                         RequestTester{"/pull", userToken, "PUT", goodJSON},
668                         http.StatusUnauthorized,
669                         "Unauthorized\n",
670                 },
671                 {
672                         "Invalid pull request from an ordinary user",
673                         RequestTester{"/pull", userToken, "PUT", badJSON},
674                         http.StatusUnauthorized,
675                         "Unauthorized\n",
676                 },
677                 {
678                         "Valid pull request from the data manager",
679                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
680                         http.StatusOK,
681                         "Received 3 pull requests\n",
682                 },
683                 {
684                         "Invalid pull request from the data manager",
685                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
686                         http.StatusBadRequest,
687                         "",
688                 },
689         }
690
691         for _, tst := range testcases {
692                 response := IssueRequest(&tst.req)
693                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
694                 ExpectBody(t, tst.name, tst.responseBody, response)
695         }
696
697         // The Keep pull manager should have received one good list with 3
698         // requests on it.
699         for i := 0; i < 3; i++ {
700                 item := <-pullq.NextItem
701                 if _, ok := item.(PullRequest); !ok {
702                         t.Errorf("item %v could not be parsed as a PullRequest", item)
703                 }
704         }
705
706         expectChannelEmpty(t, pullq.NextItem)
707 }
708
709 // TestTrashHandler
710 //
711 // Test cases:
712 //
713 // Cases tested: syntactically valid and invalid trash lists, from the
714 // data manager and from unprivileged users:
715 //
716 //   1. Valid trash list from an ordinary user
717 //      (expected result: 401 Unauthorized)
718 //
719 //   2. Invalid trash list from an ordinary user
720 //      (expected result: 401 Unauthorized)
721 //
722 //   3. Valid trash list from the data manager
723 //      (expected result: 200 OK with request body "Received 3 trash
724 //      requests"
725 //
726 //   4. Invalid trash list from the data manager
727 //      (expected result: 400 Bad Request)
728 //
729 // Test that in the end, the trash collector received a good list
730 // trash list with the expected number of requests.
731 //
732 // TODO(twp): test concurrency: launch 100 goroutines to update the
733 // pull list simultaneously.  Make sure that none of them return 400
734 // Bad Request and that replica.Dump() returns a valid list.
735 //
736 func TestTrashHandler(t *testing.T) {
737         defer teardown()
738
739         var userToken = "USER TOKEN"
740         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
741
742         trashq = NewWorkQueue()
743
744         goodJSON := []byte(`[
745                 {
746                         "locator":"block1",
747                         "block_mtime":1409082153
748                 },
749                 {
750                         "locator":"block2",
751                         "block_mtime":1409082153
752                 },
753                 {
754                         "locator":"block3",
755                         "block_mtime":1409082153
756                 }
757         ]`)
758
759         badJSON := []byte(`I am not a valid JSON string`)
760
761         type trashTest struct {
762                 name         string
763                 req          RequestTester
764                 responseCode int
765                 responseBody string
766         }
767
768         var testcases = []trashTest{
769                 {
770                         "Valid trash list from an ordinary user",
771                         RequestTester{"/trash", userToken, "PUT", goodJSON},
772                         http.StatusUnauthorized,
773                         "Unauthorized\n",
774                 },
775                 {
776                         "Invalid trash list from an ordinary user",
777                         RequestTester{"/trash", userToken, "PUT", badJSON},
778                         http.StatusUnauthorized,
779                         "Unauthorized\n",
780                 },
781                 {
782                         "Valid trash list from the data manager",
783                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
784                         http.StatusOK,
785                         "Received 3 trash requests\n",
786                 },
787                 {
788                         "Invalid trash list from the data manager",
789                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
790                         http.StatusBadRequest,
791                         "",
792                 },
793         }
794
795         for _, tst := range testcases {
796                 response := IssueRequest(&tst.req)
797                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
798                 ExpectBody(t, tst.name, tst.responseBody, response)
799         }
800
801         // The trash collector should have received one good list with 3
802         // requests on it.
803         for i := 0; i < 3; i++ {
804                 item := <-trashq.NextItem
805                 if _, ok := item.(TrashRequest); !ok {
806                         t.Errorf("item %v could not be parsed as a TrashRequest", item)
807                 }
808         }
809
810         expectChannelEmpty(t, trashq.NextItem)
811 }
812
813 // ====================
814 // Helper functions
815 // ====================
816
817 // IssueTestRequest executes an HTTP request described by rt, to a
818 // REST router.  It returns the HTTP response to the request.
819 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
820         response := httptest.NewRecorder()
821         body := bytes.NewReader(rt.requestBody)
822         req, _ := http.NewRequest(rt.method, rt.uri, body)
823         if rt.apiToken != "" {
824                 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
825         }
826         loggingRouter := MakeRESTRouter()
827         loggingRouter.ServeHTTP(response, req)
828         return response
829 }
830
831 func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
832         response := httptest.NewRecorder()
833         body := bytes.NewReader(rt.requestBody)
834         req, _ := http.NewRequest(rt.method, rt.uri, body)
835         if rt.apiToken != "" {
836                 req.Header.Set("Authorization", "Bearer "+rt.apiToken)
837         }
838         loggingRouter := MakeRESTRouter()
839         loggingRouter.ServeHTTP(response, req)
840         return response
841 }
842
843 // ExpectStatusCode checks whether a response has the specified status code,
844 // and reports a test failure if not.
845 func ExpectStatusCode(
846         t *testing.T,
847         testname string,
848         expectedStatus int,
849         response *httptest.ResponseRecorder) {
850         if response.Code != expectedStatus {
851                 t.Errorf("%s: expected status %d, got %+v",
852                         testname, expectedStatus, response)
853         }
854 }
855
856 func ExpectBody(
857         t *testing.T,
858         testname string,
859         expectedBody string,
860         response *httptest.ResponseRecorder) {
861         if expectedBody != "" && response.Body.String() != expectedBody {
862                 t.Errorf("%s: expected response body '%s', got %+v",
863                         testname, expectedBody, response)
864         }
865 }
866
867 // See #7121
868 func TestPutNeedsOnlyOneBuffer(t *testing.T) {
869         defer teardown()
870         KeepVM = MakeTestVolumeManager(1)
871         defer KeepVM.Close()
872
873         defer func(orig *bufferPool) {
874                 bufs = orig
875         }(bufs)
876         bufs = newBufferPool(1, BlockSize)
877
878         ok := make(chan struct{})
879         go func() {
880                 for i := 0; i < 2; i++ {
881                         response := IssueRequest(
882                                 &RequestTester{
883                                         method:      "PUT",
884                                         uri:         "/" + TestHash,
885                                         requestBody: TestBlock,
886                                 })
887                         ExpectStatusCode(t,
888                                 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
889                 }
890                 ok <- struct{}{}
891         }()
892
893         select {
894         case <-ok:
895         case <-time.After(time.Second):
896                 t.Fatal("PUT deadlocks with MaxBuffers==1")
897         }
898 }
899
900 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
901 // leak.
902 func TestPutHandlerNoBufferleak(t *testing.T) {
903         defer teardown()
904
905         // Prepare two test Keep volumes.
906         KeepVM = MakeTestVolumeManager(2)
907         defer KeepVM.Close()
908
909         ok := make(chan bool)
910         go func() {
911                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
912                         // Unauthenticated request, no server key
913                         // => OK (unsigned response)
914                         unsignedLocator := "/" + TestHash
915                         response := IssueRequest(
916                                 &RequestTester{
917                                         method:      "PUT",
918                                         uri:         unsignedLocator,
919                                         requestBody: TestBlock,
920                                 })
921                         ExpectStatusCode(t,
922                                 "TestPutHandlerBufferleak", http.StatusOK, response)
923                         ExpectBody(t,
924                                 "TestPutHandlerBufferleak",
925                                 TestHashPutResp, response)
926                 }
927                 ok <- true
928         }()
929         select {
930         case <-time.After(20 * time.Second):
931                 // If the buffer pool leaks, the test goroutine hangs.
932                 t.Fatal("test did not finish, assuming pool leaked")
933         case <-ok:
934         }
935 }
936
937 type notifyingResponseRecorder struct {
938         *httptest.ResponseRecorder
939         closer chan bool
940 }
941
942 func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
943         return r.closer
944 }
945
946 func TestGetHandlerClientDisconnect(t *testing.T) {
947         defer func(was bool) {
948                 theConfig.RequireSignatures = was
949         }(theConfig.RequireSignatures)
950         theConfig.RequireSignatures = false
951
952         defer func(orig *bufferPool) {
953                 bufs = orig
954         }(bufs)
955         bufs = newBufferPool(1, BlockSize)
956         defer bufs.Put(bufs.Get(BlockSize))
957
958         KeepVM = MakeTestVolumeManager(2)
959         defer KeepVM.Close()
960
961         if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
962                 t.Error(err)
963         }
964
965         resp := &notifyingResponseRecorder{
966                 ResponseRecorder: httptest.NewRecorder(),
967                 closer:           make(chan bool, 1),
968         }
969         if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
970                 t.Fatal("notifyingResponseRecorder is broken")
971         }
972         // If anyone asks, the client has disconnected.
973         resp.closer <- true
974
975         ok := make(chan struct{})
976         go func() {
977                 req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
978                 (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
979                 ok <- struct{}{}
980         }()
981
982         select {
983         case <-time.After(20 * time.Second):
984                 t.Fatal("request took >20s, close notifier must be broken")
985         case <-ok:
986         }
987
988         ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
989         for i, v := range KeepVM.AllWritable() {
990                 if calls := v.(*MockVolume).called["GET"]; calls != 0 {
991                         t.Errorf("volume %d got %d calls, expected 0", i, calls)
992                 }
993         }
994 }
995
996 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
997 // leak.
998 func TestGetHandlerNoBufferLeak(t *testing.T) {
999         defer teardown()
1000
1001         // Prepare two test Keep volumes. Our block is stored on the second volume.
1002         KeepVM = MakeTestVolumeManager(2)
1003         defer KeepVM.Close()
1004
1005         vols := KeepVM.AllWritable()
1006         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
1007                 t.Error(err)
1008         }
1009
1010         ok := make(chan bool)
1011         go func() {
1012                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
1013                         // Unauthenticated request, unsigned locator
1014                         // => OK
1015                         unsignedLocator := "/" + TestHash
1016                         response := IssueRequest(
1017                                 &RequestTester{
1018                                         method: "GET",
1019                                         uri:    unsignedLocator,
1020                                 })
1021                         ExpectStatusCode(t,
1022                                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1023                         ExpectBody(t,
1024                                 "Unauthenticated request, unsigned locator",
1025                                 string(TestBlock),
1026                                 response)
1027                 }
1028                 ok <- true
1029         }()
1030         select {
1031         case <-time.After(20 * time.Second):
1032                 // If the buffer pool leaks, the test goroutine hangs.
1033                 t.Fatal("test did not finish, assuming pool leaked")
1034         case <-ok:
1035         }
1036 }
1037
1038 func TestPutReplicationHeader(t *testing.T) {
1039         defer teardown()
1040
1041         KeepVM = MakeTestVolumeManager(2)
1042         defer KeepVM.Close()
1043
1044         resp := IssueRequest(&RequestTester{
1045                 method:      "PUT",
1046                 uri:         "/" + TestHash,
1047                 requestBody: TestBlock,
1048         })
1049         if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
1050                 t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
1051         }
1052 }
1053
1054 func TestUntrashHandler(t *testing.T) {
1055         defer teardown()
1056
1057         // Set up Keep volumes
1058         KeepVM = MakeTestVolumeManager(2)
1059         defer KeepVM.Close()
1060         vols := KeepVM.AllWritable()
1061         vols[0].Put(context.Background(), TestHash, TestBlock)
1062
1063         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1064
1065         // unauthenticatedReq => UnauthorizedError
1066         unauthenticatedReq := &RequestTester{
1067                 method: "PUT",
1068                 uri:    "/untrash/" + TestHash,
1069         }
1070         response := IssueRequest(unauthenticatedReq)
1071         ExpectStatusCode(t,
1072                 "Unauthenticated request",
1073                 UnauthorizedError.HTTPCode,
1074                 response)
1075
1076         // notDataManagerReq => UnauthorizedError
1077         notDataManagerReq := &RequestTester{
1078                 method:   "PUT",
1079                 uri:      "/untrash/" + TestHash,
1080                 apiToken: knownToken,
1081         }
1082
1083         response = IssueRequest(notDataManagerReq)
1084         ExpectStatusCode(t,
1085                 "Non-datamanager token",
1086                 UnauthorizedError.HTTPCode,
1087                 response)
1088
1089         // datamanagerWithBadHashReq => StatusBadRequest
1090         datamanagerWithBadHashReq := &RequestTester{
1091                 method:   "PUT",
1092                 uri:      "/untrash/thisisnotalocator",
1093                 apiToken: theConfig.systemAuthToken,
1094         }
1095         response = IssueRequest(datamanagerWithBadHashReq)
1096         ExpectStatusCode(t,
1097                 "Bad locator in untrash request",
1098                 http.StatusBadRequest,
1099                 response)
1100
1101         // datamanagerWrongMethodReq => StatusBadRequest
1102         datamanagerWrongMethodReq := &RequestTester{
1103                 method:   "GET",
1104                 uri:      "/untrash/" + TestHash,
1105                 apiToken: theConfig.systemAuthToken,
1106         }
1107         response = IssueRequest(datamanagerWrongMethodReq)
1108         ExpectStatusCode(t,
1109                 "Only PUT method is supported for untrash",
1110                 http.StatusBadRequest,
1111                 response)
1112
1113         // datamanagerReq => StatusOK
1114         datamanagerReq := &RequestTester{
1115                 method:   "PUT",
1116                 uri:      "/untrash/" + TestHash,
1117                 apiToken: theConfig.systemAuthToken,
1118         }
1119         response = IssueRequest(datamanagerReq)
1120         ExpectStatusCode(t,
1121                 "",
1122                 http.StatusOK,
1123                 response)
1124         expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
1125         if response.Body.String() != expected {
1126                 t.Errorf(
1127                         "Untrash response mismatched: expected %s, got:\n%s",
1128                         expected, response.Body.String())
1129         }
1130 }
1131
1132 func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
1133         defer teardown()
1134
1135         // Set up readonly Keep volumes
1136         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
1137         vols[0].Readonly = true
1138         vols[1].Readonly = true
1139         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
1140         defer KeepVM.Close()
1141
1142         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1143
1144         // datamanagerReq => StatusOK
1145         datamanagerReq := &RequestTester{
1146                 method:   "PUT",
1147                 uri:      "/untrash/" + TestHash,
1148                 apiToken: theConfig.systemAuthToken,
1149         }
1150         response := IssueRequest(datamanagerReq)
1151         ExpectStatusCode(t,
1152                 "No writable volumes",
1153                 http.StatusNotFound,
1154                 response)
1155 }
1156
1157 func TestHealthCheckPing(t *testing.T) {
1158         theConfig.ManagementToken = arvadostest.ManagementToken
1159         pingReq := &RequestTester{
1160                 method:   "GET",
1161                 uri:      "/_health/ping",
1162                 apiToken: arvadostest.ManagementToken,
1163         }
1164         response := IssueHealthCheckRequest(pingReq)
1165         ExpectStatusCode(t,
1166                 "",
1167                 http.StatusOK,
1168                 response)
1169         want := `{"health":"OK"}`
1170         if !strings.Contains(response.Body.String(), want) {
1171                 t.Errorf("expected response to include %s: got %s", want, response.Body.String())
1172         }
1173 }