Increase ping delay in WatchdogActorTest to try and reduce spurious test failures...
[arvados.git] / services / keepstore / handler_test.go
1 // Tests for Keep HTTP handlers:
2 //
3 //     GetBlockHandler
4 //     PutBlockHandler
5 //     IndexHandler
6 //
7 // The HTTP handlers are responsible for enforcing permission policy,
8 // so these tests must exercise all possible permission permutations.
9
10 package main
11
12 import (
13         "bytes"
14         "context"
15         "encoding/json"
16         "fmt"
17         "net/http"
18         "net/http/httptest"
19         "os"
20         "regexp"
21         "strings"
22         "testing"
23         "time"
24
25         "git.curoverse.com/arvados.git/sdk/go/arvados"
26 )
27
28 // A RequestTester represents the parameters for an HTTP request to
29 // be issued on behalf of a unit test.
30 type RequestTester struct {
31         uri         string
32         apiToken    string
33         method      string
34         requestBody []byte
35 }
36
37 // Test GetBlockHandler on the following situations:
38 //   - permissions off, unauthenticated request, unsigned locator
39 //   - permissions on, authenticated request, signed locator
40 //   - permissions on, authenticated request, unsigned locator
41 //   - permissions on, unauthenticated request, signed locator
42 //   - permissions on, authenticated request, expired locator
43 //
44 func TestGetHandler(t *testing.T) {
45         defer teardown()
46
47         // Prepare two test Keep volumes. Our block is stored on the second volume.
48         KeepVM = MakeTestVolumeManager(2)
49         defer KeepVM.Close()
50
51         vols := KeepVM.AllWritable()
52         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
53                 t.Error(err)
54         }
55
56         // Create locators for testing.
57         // Turn on permission settings so we can generate signed locators.
58         theConfig.RequireSignatures = true
59         theConfig.blobSigningKey = []byte(knownKey)
60         theConfig.BlobSignatureTTL.Set("5m")
61
62         var (
63                 unsignedLocator  = "/" + TestHash
64                 validTimestamp   = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
65                 expiredTimestamp = time.Now().Add(-time.Hour)
66                 signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
67                 expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
68         )
69
70         // -----------------
71         // Test unauthenticated request with permissions off.
72         theConfig.RequireSignatures = false
73
74         // Unauthenticated request, unsigned locator
75         // => OK
76         response := IssueRequest(
77                 &RequestTester{
78                         method: "GET",
79                         uri:    unsignedLocator,
80                 })
81         ExpectStatusCode(t,
82                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
83         ExpectBody(t,
84                 "Unauthenticated request, unsigned locator",
85                 string(TestBlock),
86                 response)
87
88         receivedLen := response.Header().Get("Content-Length")
89         expectedLen := fmt.Sprintf("%d", len(TestBlock))
90         if receivedLen != expectedLen {
91                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
92         }
93
94         // ----------------
95         // Permissions: on.
96         theConfig.RequireSignatures = true
97
98         // Authenticated request, signed locator
99         // => OK
100         response = IssueRequest(&RequestTester{
101                 method:   "GET",
102                 uri:      signedLocator,
103                 apiToken: knownToken,
104         })
105         ExpectStatusCode(t,
106                 "Authenticated request, signed locator", http.StatusOK, response)
107         ExpectBody(t,
108                 "Authenticated request, signed locator", string(TestBlock), response)
109
110         receivedLen = response.Header().Get("Content-Length")
111         expectedLen = fmt.Sprintf("%d", len(TestBlock))
112         if receivedLen != expectedLen {
113                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
114         }
115
116         // Authenticated request, unsigned locator
117         // => PermissionError
118         response = IssueRequest(&RequestTester{
119                 method:   "GET",
120                 uri:      unsignedLocator,
121                 apiToken: knownToken,
122         })
123         ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
124
125         // Unauthenticated request, signed locator
126         // => PermissionError
127         response = IssueRequest(&RequestTester{
128                 method: "GET",
129                 uri:    signedLocator,
130         })
131         ExpectStatusCode(t,
132                 "Unauthenticated request, signed locator",
133                 PermissionError.HTTPCode, response)
134
135         // Authenticated request, expired locator
136         // => ExpiredError
137         response = IssueRequest(&RequestTester{
138                 method:   "GET",
139                 uri:      expiredLocator,
140                 apiToken: knownToken,
141         })
142         ExpectStatusCode(t,
143                 "Authenticated request, expired locator",
144                 ExpiredError.HTTPCode, response)
145 }
146
147 // Test PutBlockHandler on the following situations:
148 //   - no server key
149 //   - with server key, authenticated request, unsigned locator
150 //   - with server key, unauthenticated request, unsigned locator
151 //
152 func TestPutHandler(t *testing.T) {
153         defer teardown()
154
155         // Prepare two test Keep volumes.
156         KeepVM = MakeTestVolumeManager(2)
157         defer KeepVM.Close()
158
159         // --------------
160         // No server key.
161
162         // Unauthenticated request, no server key
163         // => OK (unsigned response)
164         unsignedLocator := "/" + TestHash
165         response := IssueRequest(
166                 &RequestTester{
167                         method:      "PUT",
168                         uri:         unsignedLocator,
169                         requestBody: TestBlock,
170                 })
171
172         ExpectStatusCode(t,
173                 "Unauthenticated request, no server key", http.StatusOK, response)
174         ExpectBody(t,
175                 "Unauthenticated request, no server key",
176                 TestHashPutResp, response)
177
178         // ------------------
179         // With a server key.
180
181         theConfig.blobSigningKey = []byte(knownKey)
182         theConfig.BlobSignatureTTL.Set("5m")
183
184         // When a permission key is available, the locator returned
185         // from an authenticated PUT request will be signed.
186
187         // Authenticated PUT, signed locator
188         // => OK (signed response)
189         response = IssueRequest(
190                 &RequestTester{
191                         method:      "PUT",
192                         uri:         unsignedLocator,
193                         requestBody: TestBlock,
194                         apiToken:    knownToken,
195                 })
196
197         ExpectStatusCode(t,
198                 "Authenticated PUT, signed locator, with server key",
199                 http.StatusOK, response)
200         responseLocator := strings.TrimSpace(response.Body.String())
201         if VerifySignature(responseLocator, knownToken) != nil {
202                 t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
203                         "response '%s' does not contain a valid signature",
204                         responseLocator)
205         }
206
207         // Unauthenticated PUT, unsigned locator
208         // => OK
209         response = IssueRequest(
210                 &RequestTester{
211                         method:      "PUT",
212                         uri:         unsignedLocator,
213                         requestBody: TestBlock,
214                 })
215
216         ExpectStatusCode(t,
217                 "Unauthenticated PUT, unsigned locator, with server key",
218                 http.StatusOK, response)
219         ExpectBody(t,
220                 "Unauthenticated PUT, unsigned locator, with server key",
221                 TestHashPutResp, response)
222 }
223
224 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
225         defer teardown()
226         theConfig.systemAuthToken = "fake-data-manager-token"
227         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
228         vols[0].Readonly = true
229         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
230         defer KeepVM.Close()
231         IssueRequest(
232                 &RequestTester{
233                         method:      "PUT",
234                         uri:         "/" + TestHash,
235                         requestBody: TestBlock,
236                 })
237         defer func(orig bool) {
238                 theConfig.EnableDelete = orig
239         }(theConfig.EnableDelete)
240         theConfig.EnableDelete = true
241         IssueRequest(
242                 &RequestTester{
243                         method:      "DELETE",
244                         uri:         "/" + TestHash,
245                         requestBody: TestBlock,
246                         apiToken:    theConfig.systemAuthToken,
247                 })
248         type expect struct {
249                 volnum    int
250                 method    string
251                 callcount int
252         }
253         for _, e := range []expect{
254                 {0, "Get", 0},
255                 {0, "Compare", 0},
256                 {0, "Touch", 0},
257                 {0, "Put", 0},
258                 {0, "Delete", 0},
259                 {1, "Get", 0},
260                 {1, "Compare", 1},
261                 {1, "Touch", 1},
262                 {1, "Put", 1},
263                 {1, "Delete", 1},
264         } {
265                 if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
266                         t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
267                 }
268         }
269 }
270
271 // Test /index requests:
272 //   - unauthenticated /index request
273 //   - unauthenticated /index/prefix request
274 //   - authenticated   /index request        | non-superuser
275 //   - authenticated   /index/prefix request | non-superuser
276 //   - authenticated   /index request        | superuser
277 //   - authenticated   /index/prefix request | superuser
278 //
279 // The only /index requests that should succeed are those issued by the
280 // superuser. They should pass regardless of the value of RequireSignatures.
281 //
282 func TestIndexHandler(t *testing.T) {
283         defer teardown()
284
285         // Set up Keep volumes and populate them.
286         // Include multiple blocks on different volumes, and
287         // some metadata files (which should be omitted from index listings)
288         KeepVM = MakeTestVolumeManager(2)
289         defer KeepVM.Close()
290
291         vols := KeepVM.AllWritable()
292         vols[0].Put(context.Background(), TestHash, TestBlock)
293         vols[1].Put(context.Background(), TestHash2, TestBlock2)
294         vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
295         vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
296
297         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
298
299         unauthenticatedReq := &RequestTester{
300                 method: "GET",
301                 uri:    "/index",
302         }
303         authenticatedReq := &RequestTester{
304                 method:   "GET",
305                 uri:      "/index",
306                 apiToken: knownToken,
307         }
308         superuserReq := &RequestTester{
309                 method:   "GET",
310                 uri:      "/index",
311                 apiToken: theConfig.systemAuthToken,
312         }
313         unauthPrefixReq := &RequestTester{
314                 method: "GET",
315                 uri:    "/index/" + TestHash[0:3],
316         }
317         authPrefixReq := &RequestTester{
318                 method:   "GET",
319                 uri:      "/index/" + TestHash[0:3],
320                 apiToken: knownToken,
321         }
322         superuserPrefixReq := &RequestTester{
323                 method:   "GET",
324                 uri:      "/index/" + TestHash[0:3],
325                 apiToken: theConfig.systemAuthToken,
326         }
327         superuserNoSuchPrefixReq := &RequestTester{
328                 method:   "GET",
329                 uri:      "/index/abcd",
330                 apiToken: theConfig.systemAuthToken,
331         }
332         superuserInvalidPrefixReq := &RequestTester{
333                 method:   "GET",
334                 uri:      "/index/xyz",
335                 apiToken: theConfig.systemAuthToken,
336         }
337
338         // -------------------------------------------------------------
339         // Only the superuser should be allowed to issue /index requests.
340
341         // ---------------------------
342         // RequireSignatures enabled
343         // This setting should not affect tests passing.
344         theConfig.RequireSignatures = true
345
346         // unauthenticated /index request
347         // => UnauthorizedError
348         response := IssueRequest(unauthenticatedReq)
349         ExpectStatusCode(t,
350                 "RequireSignatures on, unauthenticated request",
351                 UnauthorizedError.HTTPCode,
352                 response)
353
354         // unauthenticated /index/prefix request
355         // => UnauthorizedError
356         response = IssueRequest(unauthPrefixReq)
357         ExpectStatusCode(t,
358                 "permissions on, unauthenticated /index/prefix request",
359                 UnauthorizedError.HTTPCode,
360                 response)
361
362         // authenticated /index request, non-superuser
363         // => UnauthorizedError
364         response = IssueRequest(authenticatedReq)
365         ExpectStatusCode(t,
366                 "permissions on, authenticated request, non-superuser",
367                 UnauthorizedError.HTTPCode,
368                 response)
369
370         // authenticated /index/prefix request, non-superuser
371         // => UnauthorizedError
372         response = IssueRequest(authPrefixReq)
373         ExpectStatusCode(t,
374                 "permissions on, authenticated /index/prefix request, non-superuser",
375                 UnauthorizedError.HTTPCode,
376                 response)
377
378         // superuser /index request
379         // => OK
380         response = IssueRequest(superuserReq)
381         ExpectStatusCode(t,
382                 "permissions on, superuser request",
383                 http.StatusOK,
384                 response)
385
386         // ----------------------------
387         // RequireSignatures disabled
388         // Valid Request should still pass.
389         theConfig.RequireSignatures = false
390
391         // superuser /index request
392         // => OK
393         response = IssueRequest(superuserReq)
394         ExpectStatusCode(t,
395                 "permissions on, superuser request",
396                 http.StatusOK,
397                 response)
398
399         expected := `^` + TestHash + `\+\d+ \d+\n` +
400                 TestHash2 + `\+\d+ \d+\n\n$`
401         match, _ := regexp.MatchString(expected, response.Body.String())
402         if !match {
403                 t.Errorf(
404                         "permissions on, superuser request: expected %s, got:\n%s",
405                         expected, response.Body.String())
406         }
407
408         // superuser /index/prefix request
409         // => OK
410         response = IssueRequest(superuserPrefixReq)
411         ExpectStatusCode(t,
412                 "permissions on, superuser request",
413                 http.StatusOK,
414                 response)
415
416         expected = `^` + TestHash + `\+\d+ \d+\n\n$`
417         match, _ = regexp.MatchString(expected, response.Body.String())
418         if !match {
419                 t.Errorf(
420                         "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
421                         expected, response.Body.String())
422         }
423
424         // superuser /index/{no-such-prefix} request
425         // => OK
426         response = IssueRequest(superuserNoSuchPrefixReq)
427         ExpectStatusCode(t,
428                 "permissions on, superuser request",
429                 http.StatusOK,
430                 response)
431
432         if "\n" != response.Body.String() {
433                 t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
434         }
435
436         // superuser /index/{invalid-prefix} request
437         // => StatusBadRequest
438         response = IssueRequest(superuserInvalidPrefixReq)
439         ExpectStatusCode(t,
440                 "permissions on, superuser request",
441                 http.StatusBadRequest,
442                 response)
443 }
444
445 // TestDeleteHandler
446 //
447 // Cases tested:
448 //
449 //   With no token and with a non-data-manager token:
450 //   * Delete existing block
451 //     (test for 403 Forbidden, confirm block not deleted)
452 //
453 //   With data manager token:
454 //
455 //   * Delete existing block
456 //     (test for 200 OK, response counts, confirm block deleted)
457 //
458 //   * Delete nonexistent block
459 //     (test for 200 OK, response counts)
460 //
461 //   TODO(twp):
462 //
463 //   * Delete block on read-only and read-write volume
464 //     (test for 200 OK, response with copies_deleted=1,
465 //     copies_failed=1, confirm block deleted only on r/w volume)
466 //
467 //   * Delete block on read-only volume only
468 //     (test for 200 OK, response with copies_deleted=0, copies_failed=1,
469 //     confirm block not deleted)
470 //
471 func TestDeleteHandler(t *testing.T) {
472         defer teardown()
473
474         // Set up Keep volumes and populate them.
475         // Include multiple blocks on different volumes, and
476         // some metadata files (which should be omitted from index listings)
477         KeepVM = MakeTestVolumeManager(2)
478         defer KeepVM.Close()
479
480         vols := KeepVM.AllWritable()
481         vols[0].Put(context.Background(), TestHash, TestBlock)
482
483         // Explicitly set the BlobSignatureTTL to 0 for these
484         // tests, to ensure the MockVolume deletes the blocks
485         // even though they have just been created.
486         theConfig.BlobSignatureTTL = arvados.Duration(0)
487
488         var userToken = "NOT DATA MANAGER TOKEN"
489         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
490
491         theConfig.EnableDelete = true
492
493         unauthReq := &RequestTester{
494                 method: "DELETE",
495                 uri:    "/" + TestHash,
496         }
497
498         userReq := &RequestTester{
499                 method:   "DELETE",
500                 uri:      "/" + TestHash,
501                 apiToken: userToken,
502         }
503
504         superuserExistingBlockReq := &RequestTester{
505                 method:   "DELETE",
506                 uri:      "/" + TestHash,
507                 apiToken: theConfig.systemAuthToken,
508         }
509
510         superuserNonexistentBlockReq := &RequestTester{
511                 method:   "DELETE",
512                 uri:      "/" + TestHash2,
513                 apiToken: theConfig.systemAuthToken,
514         }
515
516         // Unauthenticated request returns PermissionError.
517         var response *httptest.ResponseRecorder
518         response = IssueRequest(unauthReq)
519         ExpectStatusCode(t,
520                 "unauthenticated request",
521                 PermissionError.HTTPCode,
522                 response)
523
524         // Authenticated non-admin request returns PermissionError.
525         response = IssueRequest(userReq)
526         ExpectStatusCode(t,
527                 "authenticated non-admin request",
528                 PermissionError.HTTPCode,
529                 response)
530
531         // Authenticated admin request for nonexistent block.
532         type deletecounter struct {
533                 Deleted int `json:"copies_deleted"`
534                 Failed  int `json:"copies_failed"`
535         }
536         var responseDc, expectedDc deletecounter
537
538         response = IssueRequest(superuserNonexistentBlockReq)
539         ExpectStatusCode(t,
540                 "data manager request, nonexistent block",
541                 http.StatusNotFound,
542                 response)
543
544         // Authenticated admin request for existing block while EnableDelete is false.
545         theConfig.EnableDelete = false
546         response = IssueRequest(superuserExistingBlockReq)
547         ExpectStatusCode(t,
548                 "authenticated request, existing block, method disabled",
549                 MethodDisabledError.HTTPCode,
550                 response)
551         theConfig.EnableDelete = true
552
553         // Authenticated admin request for existing block.
554         response = IssueRequest(superuserExistingBlockReq)
555         ExpectStatusCode(t,
556                 "data manager request, existing block",
557                 http.StatusOK,
558                 response)
559         // Expect response {"copies_deleted":1,"copies_failed":0}
560         expectedDc = deletecounter{1, 0}
561         json.NewDecoder(response.Body).Decode(&responseDc)
562         if responseDc != expectedDc {
563                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
564                         expectedDc, responseDc)
565         }
566         // Confirm the block has been deleted
567         buf := make([]byte, BlockSize)
568         _, err := vols[0].Get(context.Background(), TestHash, buf)
569         var blockDeleted = os.IsNotExist(err)
570         if !blockDeleted {
571                 t.Error("superuserExistingBlockReq: block not deleted")
572         }
573
574         // A DELETE request on a block newer than BlobSignatureTTL
575         // should return success but leave the block on the volume.
576         vols[0].Put(context.Background(), TestHash, TestBlock)
577         theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
578
579         response = IssueRequest(superuserExistingBlockReq)
580         ExpectStatusCode(t,
581                 "data manager request, existing block",
582                 http.StatusOK,
583                 response)
584         // Expect response {"copies_deleted":1,"copies_failed":0}
585         expectedDc = deletecounter{1, 0}
586         json.NewDecoder(response.Body).Decode(&responseDc)
587         if responseDc != expectedDc {
588                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
589                         expectedDc, responseDc)
590         }
591         // Confirm the block has NOT been deleted.
592         _, err = vols[0].Get(context.Background(), TestHash, buf)
593         if err != nil {
594                 t.Errorf("testing delete on new block: %s\n", err)
595         }
596 }
597
598 // TestPullHandler
599 //
600 // Test handling of the PUT /pull statement.
601 //
602 // Cases tested: syntactically valid and invalid pull lists, from the
603 // data manager and from unprivileged users:
604 //
605 //   1. Valid pull list from an ordinary user
606 //      (expected result: 401 Unauthorized)
607 //
608 //   2. Invalid pull request from an ordinary user
609 //      (expected result: 401 Unauthorized)
610 //
611 //   3. Valid pull request from the data manager
612 //      (expected result: 200 OK with request body "Received 3 pull
613 //      requests"
614 //
615 //   4. Invalid pull request from the data manager
616 //      (expected result: 400 Bad Request)
617 //
618 // Test that in the end, the pull manager received a good pull list with
619 // the expected number of requests.
620 //
621 // TODO(twp): test concurrency: launch 100 goroutines to update the
622 // pull list simultaneously.  Make sure that none of them return 400
623 // Bad Request and that pullq.GetList() returns a valid list.
624 //
625 func TestPullHandler(t *testing.T) {
626         defer teardown()
627
628         var userToken = "USER TOKEN"
629         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
630
631         pullq = NewWorkQueue()
632
633         goodJSON := []byte(`[
634                 {
635                         "locator":"locator_with_two_servers",
636                         "servers":[
637                                 "server1",
638                                 "server2"
639                         ]
640                 },
641                 {
642                         "locator":"locator_with_no_servers",
643                         "servers":[]
644                 },
645                 {
646                         "locator":"",
647                         "servers":["empty_locator"]
648                 }
649         ]`)
650
651         badJSON := []byte(`{ "key":"I'm a little teapot" }`)
652
653         type pullTest struct {
654                 name         string
655                 req          RequestTester
656                 responseCode int
657                 responseBody string
658         }
659         var testcases = []pullTest{
660                 {
661                         "Valid pull list from an ordinary user",
662                         RequestTester{"/pull", userToken, "PUT", goodJSON},
663                         http.StatusUnauthorized,
664                         "Unauthorized\n",
665                 },
666                 {
667                         "Invalid pull request from an ordinary user",
668                         RequestTester{"/pull", userToken, "PUT", badJSON},
669                         http.StatusUnauthorized,
670                         "Unauthorized\n",
671                 },
672                 {
673                         "Valid pull request from the data manager",
674                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
675                         http.StatusOK,
676                         "Received 3 pull requests\n",
677                 },
678                 {
679                         "Invalid pull request from the data manager",
680                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
681                         http.StatusBadRequest,
682                         "",
683                 },
684         }
685
686         for _, tst := range testcases {
687                 response := IssueRequest(&tst.req)
688                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
689                 ExpectBody(t, tst.name, tst.responseBody, response)
690         }
691
692         // The Keep pull manager should have received one good list with 3
693         // requests on it.
694         for i := 0; i < 3; i++ {
695                 item := <-pullq.NextItem
696                 if _, ok := item.(PullRequest); !ok {
697                         t.Errorf("item %v could not be parsed as a PullRequest", item)
698                 }
699         }
700
701         expectChannelEmpty(t, pullq.NextItem)
702 }
703
704 // TestTrashHandler
705 //
706 // Test cases:
707 //
708 // Cases tested: syntactically valid and invalid trash lists, from the
709 // data manager and from unprivileged users:
710 //
711 //   1. Valid trash list from an ordinary user
712 //      (expected result: 401 Unauthorized)
713 //
714 //   2. Invalid trash list from an ordinary user
715 //      (expected result: 401 Unauthorized)
716 //
717 //   3. Valid trash list from the data manager
718 //      (expected result: 200 OK with request body "Received 3 trash
719 //      requests"
720 //
721 //   4. Invalid trash list from the data manager
722 //      (expected result: 400 Bad Request)
723 //
724 // Test that in the end, the trash collector received a good list
725 // trash list with the expected number of requests.
726 //
727 // TODO(twp): test concurrency: launch 100 goroutines to update the
728 // pull list simultaneously.  Make sure that none of them return 400
729 // Bad Request and that replica.Dump() returns a valid list.
730 //
731 func TestTrashHandler(t *testing.T) {
732         defer teardown()
733
734         var userToken = "USER TOKEN"
735         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
736
737         trashq = NewWorkQueue()
738
739         goodJSON := []byte(`[
740                 {
741                         "locator":"block1",
742                         "block_mtime":1409082153
743                 },
744                 {
745                         "locator":"block2",
746                         "block_mtime":1409082153
747                 },
748                 {
749                         "locator":"block3",
750                         "block_mtime":1409082153
751                 }
752         ]`)
753
754         badJSON := []byte(`I am not a valid JSON string`)
755
756         type trashTest struct {
757                 name         string
758                 req          RequestTester
759                 responseCode int
760                 responseBody string
761         }
762
763         var testcases = []trashTest{
764                 {
765                         "Valid trash list from an ordinary user",
766                         RequestTester{"/trash", userToken, "PUT", goodJSON},
767                         http.StatusUnauthorized,
768                         "Unauthorized\n",
769                 },
770                 {
771                         "Invalid trash list from an ordinary user",
772                         RequestTester{"/trash", userToken, "PUT", badJSON},
773                         http.StatusUnauthorized,
774                         "Unauthorized\n",
775                 },
776                 {
777                         "Valid trash list from the data manager",
778                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
779                         http.StatusOK,
780                         "Received 3 trash requests\n",
781                 },
782                 {
783                         "Invalid trash list from the data manager",
784                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
785                         http.StatusBadRequest,
786                         "",
787                 },
788         }
789
790         for _, tst := range testcases {
791                 response := IssueRequest(&tst.req)
792                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
793                 ExpectBody(t, tst.name, tst.responseBody, response)
794         }
795
796         // The trash collector should have received one good list with 3
797         // requests on it.
798         for i := 0; i < 3; i++ {
799                 item := <-trashq.NextItem
800                 if _, ok := item.(TrashRequest); !ok {
801                         t.Errorf("item %v could not be parsed as a TrashRequest", item)
802                 }
803         }
804
805         expectChannelEmpty(t, trashq.NextItem)
806 }
807
808 // ====================
809 // Helper functions
810 // ====================
811
812 // IssueTestRequest executes an HTTP request described by rt, to a
813 // REST router.  It returns the HTTP response to the request.
814 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
815         response := httptest.NewRecorder()
816         body := bytes.NewReader(rt.requestBody)
817         req, _ := http.NewRequest(rt.method, rt.uri, body)
818         if rt.apiToken != "" {
819                 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
820         }
821         loggingRouter := MakeRESTRouter()
822         loggingRouter.ServeHTTP(response, req)
823         return response
824 }
825
826 // ExpectStatusCode checks whether a response has the specified status code,
827 // and reports a test failure if not.
828 func ExpectStatusCode(
829         t *testing.T,
830         testname string,
831         expectedStatus int,
832         response *httptest.ResponseRecorder) {
833         if response.Code != expectedStatus {
834                 t.Errorf("%s: expected status %d, got %+v",
835                         testname, expectedStatus, response)
836         }
837 }
838
839 func ExpectBody(
840         t *testing.T,
841         testname string,
842         expectedBody string,
843         response *httptest.ResponseRecorder) {
844         if expectedBody != "" && response.Body.String() != expectedBody {
845                 t.Errorf("%s: expected response body '%s', got %+v",
846                         testname, expectedBody, response)
847         }
848 }
849
850 // See #7121
851 func TestPutNeedsOnlyOneBuffer(t *testing.T) {
852         defer teardown()
853         KeepVM = MakeTestVolumeManager(1)
854         defer KeepVM.Close()
855
856         defer func(orig *bufferPool) {
857                 bufs = orig
858         }(bufs)
859         bufs = newBufferPool(1, BlockSize)
860
861         ok := make(chan struct{})
862         go func() {
863                 for i := 0; i < 2; i++ {
864                         response := IssueRequest(
865                                 &RequestTester{
866                                         method:      "PUT",
867                                         uri:         "/" + TestHash,
868                                         requestBody: TestBlock,
869                                 })
870                         ExpectStatusCode(t,
871                                 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
872                 }
873                 ok <- struct{}{}
874         }()
875
876         select {
877         case <-ok:
878         case <-time.After(time.Second):
879                 t.Fatal("PUT deadlocks with MaxBuffers==1")
880         }
881 }
882
883 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
884 // leak.
885 func TestPutHandlerNoBufferleak(t *testing.T) {
886         defer teardown()
887
888         // Prepare two test Keep volumes.
889         KeepVM = MakeTestVolumeManager(2)
890         defer KeepVM.Close()
891
892         ok := make(chan bool)
893         go func() {
894                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
895                         // Unauthenticated request, no server key
896                         // => OK (unsigned response)
897                         unsignedLocator := "/" + TestHash
898                         response := IssueRequest(
899                                 &RequestTester{
900                                         method:      "PUT",
901                                         uri:         unsignedLocator,
902                                         requestBody: TestBlock,
903                                 })
904                         ExpectStatusCode(t,
905                                 "TestPutHandlerBufferleak", http.StatusOK, response)
906                         ExpectBody(t,
907                                 "TestPutHandlerBufferleak",
908                                 TestHashPutResp, response)
909                 }
910                 ok <- true
911         }()
912         select {
913         case <-time.After(20 * time.Second):
914                 // If the buffer pool leaks, the test goroutine hangs.
915                 t.Fatal("test did not finish, assuming pool leaked")
916         case <-ok:
917         }
918 }
919
920 type notifyingResponseRecorder struct {
921         *httptest.ResponseRecorder
922         closer chan bool
923 }
924
925 func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
926         return r.closer
927 }
928
929 func TestGetHandlerClientDisconnect(t *testing.T) {
930         defer func(was bool) {
931                 theConfig.RequireSignatures = was
932         }(theConfig.RequireSignatures)
933         theConfig.RequireSignatures = false
934
935         defer func(orig *bufferPool) {
936                 bufs = orig
937         }(bufs)
938         bufs = newBufferPool(1, BlockSize)
939         defer bufs.Put(bufs.Get(BlockSize))
940
941         KeepVM = MakeTestVolumeManager(2)
942         defer KeepVM.Close()
943
944         if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
945                 t.Error(err)
946         }
947
948         resp := &notifyingResponseRecorder{
949                 ResponseRecorder: httptest.NewRecorder(),
950                 closer:           make(chan bool, 1),
951         }
952         if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
953                 t.Fatal("notifyingResponseRecorder is broken")
954         }
955         // If anyone asks, the client has disconnected.
956         resp.closer <- true
957
958         ok := make(chan struct{})
959         go func() {
960                 req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
961                 (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
962                 ok <- struct{}{}
963         }()
964
965         select {
966         case <-time.After(20 * time.Second):
967                 t.Fatal("request took >20s, close notifier must be broken")
968         case <-ok:
969         }
970
971         ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
972         for i, v := range KeepVM.AllWritable() {
973                 if calls := v.(*MockVolume).called["GET"]; calls != 0 {
974                         t.Errorf("volume %d got %d calls, expected 0", i, calls)
975                 }
976         }
977 }
978
979 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
980 // leak.
981 func TestGetHandlerNoBufferLeak(t *testing.T) {
982         defer teardown()
983
984         // Prepare two test Keep volumes. Our block is stored on the second volume.
985         KeepVM = MakeTestVolumeManager(2)
986         defer KeepVM.Close()
987
988         vols := KeepVM.AllWritable()
989         if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
990                 t.Error(err)
991         }
992
993         ok := make(chan bool)
994         go func() {
995                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
996                         // Unauthenticated request, unsigned locator
997                         // => OK
998                         unsignedLocator := "/" + TestHash
999                         response := IssueRequest(
1000                                 &RequestTester{
1001                                         method: "GET",
1002                                         uri:    unsignedLocator,
1003                                 })
1004                         ExpectStatusCode(t,
1005                                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1006                         ExpectBody(t,
1007                                 "Unauthenticated request, unsigned locator",
1008                                 string(TestBlock),
1009                                 response)
1010                 }
1011                 ok <- true
1012         }()
1013         select {
1014         case <-time.After(20 * time.Second):
1015                 // If the buffer pool leaks, the test goroutine hangs.
1016                 t.Fatal("test did not finish, assuming pool leaked")
1017         case <-ok:
1018         }
1019 }
1020
1021 func TestPutReplicationHeader(t *testing.T) {
1022         defer teardown()
1023
1024         KeepVM = MakeTestVolumeManager(2)
1025         defer KeepVM.Close()
1026
1027         resp := IssueRequest(&RequestTester{
1028                 method:      "PUT",
1029                 uri:         "/" + TestHash,
1030                 requestBody: TestBlock,
1031         })
1032         if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
1033                 t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
1034         }
1035 }
1036
1037 func TestUntrashHandler(t *testing.T) {
1038         defer teardown()
1039
1040         // Set up Keep volumes
1041         KeepVM = MakeTestVolumeManager(2)
1042         defer KeepVM.Close()
1043         vols := KeepVM.AllWritable()
1044         vols[0].Put(context.Background(), TestHash, TestBlock)
1045
1046         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1047
1048         // unauthenticatedReq => UnauthorizedError
1049         unauthenticatedReq := &RequestTester{
1050                 method: "PUT",
1051                 uri:    "/untrash/" + TestHash,
1052         }
1053         response := IssueRequest(unauthenticatedReq)
1054         ExpectStatusCode(t,
1055                 "Unauthenticated request",
1056                 UnauthorizedError.HTTPCode,
1057                 response)
1058
1059         // notDataManagerReq => UnauthorizedError
1060         notDataManagerReq := &RequestTester{
1061                 method:   "PUT",
1062                 uri:      "/untrash/" + TestHash,
1063                 apiToken: knownToken,
1064         }
1065
1066         response = IssueRequest(notDataManagerReq)
1067         ExpectStatusCode(t,
1068                 "Non-datamanager token",
1069                 UnauthorizedError.HTTPCode,
1070                 response)
1071
1072         // datamanagerWithBadHashReq => StatusBadRequest
1073         datamanagerWithBadHashReq := &RequestTester{
1074                 method:   "PUT",
1075                 uri:      "/untrash/thisisnotalocator",
1076                 apiToken: theConfig.systemAuthToken,
1077         }
1078         response = IssueRequest(datamanagerWithBadHashReq)
1079         ExpectStatusCode(t,
1080                 "Bad locator in untrash request",
1081                 http.StatusBadRequest,
1082                 response)
1083
1084         // datamanagerWrongMethodReq => StatusBadRequest
1085         datamanagerWrongMethodReq := &RequestTester{
1086                 method:   "GET",
1087                 uri:      "/untrash/" + TestHash,
1088                 apiToken: theConfig.systemAuthToken,
1089         }
1090         response = IssueRequest(datamanagerWrongMethodReq)
1091         ExpectStatusCode(t,
1092                 "Only PUT method is supported for untrash",
1093                 http.StatusBadRequest,
1094                 response)
1095
1096         // datamanagerReq => StatusOK
1097         datamanagerReq := &RequestTester{
1098                 method:   "PUT",
1099                 uri:      "/untrash/" + TestHash,
1100                 apiToken: theConfig.systemAuthToken,
1101         }
1102         response = IssueRequest(datamanagerReq)
1103         ExpectStatusCode(t,
1104                 "",
1105                 http.StatusOK,
1106                 response)
1107         expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
1108         if response.Body.String() != expected {
1109                 t.Errorf(
1110                         "Untrash response mismatched: expected %s, got:\n%s",
1111                         expected, response.Body.String())
1112         }
1113 }
1114
1115 func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
1116         defer teardown()
1117
1118         // Set up readonly Keep volumes
1119         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
1120         vols[0].Readonly = true
1121         vols[1].Readonly = true
1122         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
1123         defer KeepVM.Close()
1124
1125         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1126
1127         // datamanagerReq => StatusOK
1128         datamanagerReq := &RequestTester{
1129                 method:   "PUT",
1130                 uri:      "/untrash/" + TestHash,
1131                 apiToken: theConfig.systemAuthToken,
1132         }
1133         response := IssueRequest(datamanagerReq)
1134         ExpectStatusCode(t,
1135                 "No writable volumes",
1136                 http.StatusNotFound,
1137                 response)
1138 }