9956: Load volume config from YAML file
[arvados.git] / services / keepstore / handler_test.go
1 // Tests for Keep HTTP handlers:
2 //
3 //     GetBlockHandler
4 //     PutBlockHandler
5 //     IndexHandler
6 //
7 // The HTTP handlers are responsible for enforcing permission policy,
8 // so these tests must exercise all possible permission permutations.
9
10 package main
11
12 import (
13         "bytes"
14         "encoding/json"
15         "fmt"
16         "net/http"
17         "net/http/httptest"
18         "os"
19         "regexp"
20         "strings"
21         "testing"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25 )
26
27 // A RequestTester represents the parameters for an HTTP request to
28 // be issued on behalf of a unit test.
29 type RequestTester struct {
30         uri         string
31         apiToken    string
32         method      string
33         requestBody []byte
34 }
35
36 // Test GetBlockHandler on the following situations:
37 //   - permissions off, unauthenticated request, unsigned locator
38 //   - permissions on, authenticated request, signed locator
39 //   - permissions on, authenticated request, unsigned locator
40 //   - permissions on, unauthenticated request, signed locator
41 //   - permissions on, authenticated request, expired locator
42 //
43 func TestGetHandler(t *testing.T) {
44         defer teardown()
45
46         // Prepare two test Keep volumes. Our block is stored on the second volume.
47         KeepVM = MakeTestVolumeManager(2)
48         defer KeepVM.Close()
49
50         vols := KeepVM.AllWritable()
51         if err := vols[0].Put(TestHash, TestBlock); err != nil {
52                 t.Error(err)
53         }
54
55         // Create locators for testing.
56         // Turn on permission settings so we can generate signed locators.
57         theConfig.RequireSignatures = true
58         theConfig.blobSigningKey = []byte(knownKey)
59         theConfig.BlobSignatureTTL.Set("5m")
60
61         var (
62                 unsignedLocator  = "/" + TestHash
63                 validTimestamp   = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
64                 expiredTimestamp = time.Now().Add(-time.Hour)
65                 signedLocator    = "/" + SignLocator(TestHash, knownToken, validTimestamp)
66                 expiredLocator   = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
67         )
68
69         // -----------------
70         // Test unauthenticated request with permissions off.
71         theConfig.RequireSignatures = false
72
73         // Unauthenticated request, unsigned locator
74         // => OK
75         response := IssueRequest(
76                 &RequestTester{
77                         method: "GET",
78                         uri:    unsignedLocator,
79                 })
80         ExpectStatusCode(t,
81                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
82         ExpectBody(t,
83                 "Unauthenticated request, unsigned locator",
84                 string(TestBlock),
85                 response)
86
87         receivedLen := response.Header().Get("Content-Length")
88         expectedLen := fmt.Sprintf("%d", len(TestBlock))
89         if receivedLen != expectedLen {
90                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
91         }
92
93         // ----------------
94         // Permissions: on.
95         theConfig.RequireSignatures = true
96
97         // Authenticated request, signed locator
98         // => OK
99         response = IssueRequest(&RequestTester{
100                 method:   "GET",
101                 uri:      signedLocator,
102                 apiToken: knownToken,
103         })
104         ExpectStatusCode(t,
105                 "Authenticated request, signed locator", http.StatusOK, response)
106         ExpectBody(t,
107                 "Authenticated request, signed locator", string(TestBlock), response)
108
109         receivedLen = response.Header().Get("Content-Length")
110         expectedLen = fmt.Sprintf("%d", len(TestBlock))
111         if receivedLen != expectedLen {
112                 t.Errorf("expected Content-Length %s, got %s", expectedLen, receivedLen)
113         }
114
115         // Authenticated request, unsigned locator
116         // => PermissionError
117         response = IssueRequest(&RequestTester{
118                 method:   "GET",
119                 uri:      unsignedLocator,
120                 apiToken: knownToken,
121         })
122         ExpectStatusCode(t, "unsigned locator", PermissionError.HTTPCode, response)
123
124         // Unauthenticated request, signed locator
125         // => PermissionError
126         response = IssueRequest(&RequestTester{
127                 method: "GET",
128                 uri:    signedLocator,
129         })
130         ExpectStatusCode(t,
131                 "Unauthenticated request, signed locator",
132                 PermissionError.HTTPCode, response)
133
134         // Authenticated request, expired locator
135         // => ExpiredError
136         response = IssueRequest(&RequestTester{
137                 method:   "GET",
138                 uri:      expiredLocator,
139                 apiToken: knownToken,
140         })
141         ExpectStatusCode(t,
142                 "Authenticated request, expired locator",
143                 ExpiredError.HTTPCode, response)
144 }
145
146 // Test PutBlockHandler on the following situations:
147 //   - no server key
148 //   - with server key, authenticated request, unsigned locator
149 //   - with server key, unauthenticated request, unsigned locator
150 //
151 func TestPutHandler(t *testing.T) {
152         defer teardown()
153
154         // Prepare two test Keep volumes.
155         KeepVM = MakeTestVolumeManager(2)
156         defer KeepVM.Close()
157
158         // --------------
159         // No server key.
160
161         // Unauthenticated request, no server key
162         // => OK (unsigned response)
163         unsignedLocator := "/" + TestHash
164         response := IssueRequest(
165                 &RequestTester{
166                         method:      "PUT",
167                         uri:         unsignedLocator,
168                         requestBody: TestBlock,
169                 })
170
171         ExpectStatusCode(t,
172                 "Unauthenticated request, no server key", http.StatusOK, response)
173         ExpectBody(t,
174                 "Unauthenticated request, no server key",
175                 TestHashPutResp, response)
176
177         // ------------------
178         // With a server key.
179
180         theConfig.blobSigningKey = []byte(knownKey)
181         theConfig.BlobSignatureTTL.Set("5m")
182
183         // When a permission key is available, the locator returned
184         // from an authenticated PUT request will be signed.
185
186         // Authenticated PUT, signed locator
187         // => OK (signed response)
188         response = IssueRequest(
189                 &RequestTester{
190                         method:      "PUT",
191                         uri:         unsignedLocator,
192                         requestBody: TestBlock,
193                         apiToken:    knownToken,
194                 })
195
196         ExpectStatusCode(t,
197                 "Authenticated PUT, signed locator, with server key",
198                 http.StatusOK, response)
199         responseLocator := strings.TrimSpace(response.Body.String())
200         if VerifySignature(responseLocator, knownToken) != nil {
201                 t.Errorf("Authenticated PUT, signed locator, with server key:\n"+
202                         "response '%s' does not contain a valid signature",
203                         responseLocator)
204         }
205
206         // Unauthenticated PUT, unsigned locator
207         // => OK
208         response = IssueRequest(
209                 &RequestTester{
210                         method:      "PUT",
211                         uri:         unsignedLocator,
212                         requestBody: TestBlock,
213                 })
214
215         ExpectStatusCode(t,
216                 "Unauthenticated PUT, unsigned locator, with server key",
217                 http.StatusOK, response)
218         ExpectBody(t,
219                 "Unauthenticated PUT, unsigned locator, with server key",
220                 TestHashPutResp, response)
221 }
222
223 func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
224         defer teardown()
225         theConfig.systemAuthToken = "fake-data-manager-token"
226         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
227         vols[0].Readonly = true
228         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
229         defer KeepVM.Close()
230         IssueRequest(
231                 &RequestTester{
232                         method:      "PUT",
233                         uri:         "/" + TestHash,
234                         requestBody: TestBlock,
235                 })
236         defer func(orig bool) {
237                 theConfig.EnableDelete = orig
238         }(theConfig.EnableDelete)
239         theConfig.EnableDelete = true
240         IssueRequest(
241                 &RequestTester{
242                         method:      "DELETE",
243                         uri:         "/" + TestHash,
244                         requestBody: TestBlock,
245                         apiToken:    theConfig.systemAuthToken,
246                 })
247         type expect struct {
248                 volnum    int
249                 method    string
250                 callcount int
251         }
252         for _, e := range []expect{
253                 {0, "Get", 0},
254                 {0, "Compare", 0},
255                 {0, "Touch", 0},
256                 {0, "Put", 0},
257                 {0, "Delete", 0},
258                 {1, "Get", 0},
259                 {1, "Compare", 1},
260                 {1, "Touch", 1},
261                 {1, "Put", 1},
262                 {1, "Delete", 1},
263         } {
264                 if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
265                         t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
266                 }
267         }
268 }
269
270 // Test /index requests:
271 //   - unauthenticated /index request
272 //   - unauthenticated /index/prefix request
273 //   - authenticated   /index request        | non-superuser
274 //   - authenticated   /index/prefix request | non-superuser
275 //   - authenticated   /index request        | superuser
276 //   - authenticated   /index/prefix request | superuser
277 //
278 // The only /index requests that should succeed are those issued by the
279 // superuser. They should pass regardless of the value of RequireSignatures.
280 //
281 func TestIndexHandler(t *testing.T) {
282         defer teardown()
283
284         // Set up Keep volumes and populate them.
285         // Include multiple blocks on different volumes, and
286         // some metadata files (which should be omitted from index listings)
287         KeepVM = MakeTestVolumeManager(2)
288         defer KeepVM.Close()
289
290         vols := KeepVM.AllWritable()
291         vols[0].Put(TestHash, TestBlock)
292         vols[1].Put(TestHash2, TestBlock2)
293         vols[0].Put(TestHash+".meta", []byte("metadata"))
294         vols[1].Put(TestHash2+".meta", []byte("metadata"))
295
296         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
297
298         unauthenticatedReq := &RequestTester{
299                 method: "GET",
300                 uri:    "/index",
301         }
302         authenticatedReq := &RequestTester{
303                 method:   "GET",
304                 uri:      "/index",
305                 apiToken: knownToken,
306         }
307         superuserReq := &RequestTester{
308                 method:   "GET",
309                 uri:      "/index",
310                 apiToken: theConfig.systemAuthToken,
311         }
312         unauthPrefixReq := &RequestTester{
313                 method: "GET",
314                 uri:    "/index/" + TestHash[0:3],
315         }
316         authPrefixReq := &RequestTester{
317                 method:   "GET",
318                 uri:      "/index/" + TestHash[0:3],
319                 apiToken: knownToken,
320         }
321         superuserPrefixReq := &RequestTester{
322                 method:   "GET",
323                 uri:      "/index/" + TestHash[0:3],
324                 apiToken: theConfig.systemAuthToken,
325         }
326         superuserNoSuchPrefixReq := &RequestTester{
327                 method:   "GET",
328                 uri:      "/index/abcd",
329                 apiToken: theConfig.systemAuthToken,
330         }
331         superuserInvalidPrefixReq := &RequestTester{
332                 method:   "GET",
333                 uri:      "/index/xyz",
334                 apiToken: theConfig.systemAuthToken,
335         }
336
337         // -------------------------------------------------------------
338         // Only the superuser should be allowed to issue /index requests.
339
340         // ---------------------------
341         // RequireSignatures enabled
342         // This setting should not affect tests passing.
343         theConfig.RequireSignatures = true
344
345         // unauthenticated /index request
346         // => UnauthorizedError
347         response := IssueRequest(unauthenticatedReq)
348         ExpectStatusCode(t,
349                 "RequireSignatures on, unauthenticated request",
350                 UnauthorizedError.HTTPCode,
351                 response)
352
353         // unauthenticated /index/prefix request
354         // => UnauthorizedError
355         response = IssueRequest(unauthPrefixReq)
356         ExpectStatusCode(t,
357                 "permissions on, unauthenticated /index/prefix request",
358                 UnauthorizedError.HTTPCode,
359                 response)
360
361         // authenticated /index request, non-superuser
362         // => UnauthorizedError
363         response = IssueRequest(authenticatedReq)
364         ExpectStatusCode(t,
365                 "permissions on, authenticated request, non-superuser",
366                 UnauthorizedError.HTTPCode,
367                 response)
368
369         // authenticated /index/prefix request, non-superuser
370         // => UnauthorizedError
371         response = IssueRequest(authPrefixReq)
372         ExpectStatusCode(t,
373                 "permissions on, authenticated /index/prefix request, non-superuser",
374                 UnauthorizedError.HTTPCode,
375                 response)
376
377         // superuser /index request
378         // => OK
379         response = IssueRequest(superuserReq)
380         ExpectStatusCode(t,
381                 "permissions on, superuser request",
382                 http.StatusOK,
383                 response)
384
385         // ----------------------------
386         // RequireSignatures disabled
387         // Valid Request should still pass.
388         theConfig.RequireSignatures = false
389
390         // superuser /index request
391         // => OK
392         response = IssueRequest(superuserReq)
393         ExpectStatusCode(t,
394                 "permissions on, superuser request",
395                 http.StatusOK,
396                 response)
397
398         expected := `^` + TestHash + `\+\d+ \d+\n` +
399                 TestHash2 + `\+\d+ \d+\n\n$`
400         match, _ := regexp.MatchString(expected, response.Body.String())
401         if !match {
402                 t.Errorf(
403                         "permissions on, superuser request: expected %s, got:\n%s",
404                         expected, response.Body.String())
405         }
406
407         // superuser /index/prefix request
408         // => OK
409         response = IssueRequest(superuserPrefixReq)
410         ExpectStatusCode(t,
411                 "permissions on, superuser request",
412                 http.StatusOK,
413                 response)
414
415         expected = `^` + TestHash + `\+\d+ \d+\n\n$`
416         match, _ = regexp.MatchString(expected, response.Body.String())
417         if !match {
418                 t.Errorf(
419                         "permissions on, superuser /index/prefix request: expected %s, got:\n%s",
420                         expected, response.Body.String())
421         }
422
423         // superuser /index/{no-such-prefix} request
424         // => OK
425         response = IssueRequest(superuserNoSuchPrefixReq)
426         ExpectStatusCode(t,
427                 "permissions on, superuser request",
428                 http.StatusOK,
429                 response)
430
431         if "\n" != response.Body.String() {
432                 t.Errorf("Expected empty response for %s. Found %s", superuserNoSuchPrefixReq.uri, response.Body.String())
433         }
434
435         // superuser /index/{invalid-prefix} request
436         // => StatusBadRequest
437         response = IssueRequest(superuserInvalidPrefixReq)
438         ExpectStatusCode(t,
439                 "permissions on, superuser request",
440                 http.StatusBadRequest,
441                 response)
442 }
443
444 // TestDeleteHandler
445 //
446 // Cases tested:
447 //
448 //   With no token and with a non-data-manager token:
449 //   * Delete existing block
450 //     (test for 403 Forbidden, confirm block not deleted)
451 //
452 //   With data manager token:
453 //
454 //   * Delete existing block
455 //     (test for 200 OK, response counts, confirm block deleted)
456 //
457 //   * Delete nonexistent block
458 //     (test for 200 OK, response counts)
459 //
460 //   TODO(twp):
461 //
462 //   * Delete block on read-only and read-write volume
463 //     (test for 200 OK, response with copies_deleted=1,
464 //     copies_failed=1, confirm block deleted only on r/w volume)
465 //
466 //   * Delete block on read-only volume only
467 //     (test for 200 OK, response with copies_deleted=0, copies_failed=1,
468 //     confirm block not deleted)
469 //
470 func TestDeleteHandler(t *testing.T) {
471         defer teardown()
472
473         // Set up Keep volumes and populate them.
474         // Include multiple blocks on different volumes, and
475         // some metadata files (which should be omitted from index listings)
476         KeepVM = MakeTestVolumeManager(2)
477         defer KeepVM.Close()
478
479         vols := KeepVM.AllWritable()
480         vols[0].Put(TestHash, TestBlock)
481
482         // Explicitly set the BlobSignatureTTL to 0 for these
483         // tests, to ensure the MockVolume deletes the blocks
484         // even though they have just been created.
485         theConfig.BlobSignatureTTL = arvados.Duration(0)
486
487         var userToken = "NOT DATA MANAGER TOKEN"
488         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
489
490         theConfig.EnableDelete = true
491
492         unauthReq := &RequestTester{
493                 method: "DELETE",
494                 uri:    "/" + TestHash,
495         }
496
497         userReq := &RequestTester{
498                 method:   "DELETE",
499                 uri:      "/" + TestHash,
500                 apiToken: userToken,
501         }
502
503         superuserExistingBlockReq := &RequestTester{
504                 method:   "DELETE",
505                 uri:      "/" + TestHash,
506                 apiToken: theConfig.systemAuthToken,
507         }
508
509         superuserNonexistentBlockReq := &RequestTester{
510                 method:   "DELETE",
511                 uri:      "/" + TestHash2,
512                 apiToken: theConfig.systemAuthToken,
513         }
514
515         // Unauthenticated request returns PermissionError.
516         var response *httptest.ResponseRecorder
517         response = IssueRequest(unauthReq)
518         ExpectStatusCode(t,
519                 "unauthenticated request",
520                 PermissionError.HTTPCode,
521                 response)
522
523         // Authenticated non-admin request returns PermissionError.
524         response = IssueRequest(userReq)
525         ExpectStatusCode(t,
526                 "authenticated non-admin request",
527                 PermissionError.HTTPCode,
528                 response)
529
530         // Authenticated admin request for nonexistent block.
531         type deletecounter struct {
532                 Deleted int `json:"copies_deleted"`
533                 Failed  int `json:"copies_failed"`
534         }
535         var responseDc, expectedDc deletecounter
536
537         response = IssueRequest(superuserNonexistentBlockReq)
538         ExpectStatusCode(t,
539                 "data manager request, nonexistent block",
540                 http.StatusNotFound,
541                 response)
542
543         // Authenticated admin request for existing block while EnableDelete is false.
544         theConfig.EnableDelete = false
545         response = IssueRequest(superuserExistingBlockReq)
546         ExpectStatusCode(t,
547                 "authenticated request, existing block, method disabled",
548                 MethodDisabledError.HTTPCode,
549                 response)
550         theConfig.EnableDelete = true
551
552         // Authenticated admin request for existing block.
553         response = IssueRequest(superuserExistingBlockReq)
554         ExpectStatusCode(t,
555                 "data manager request, existing block",
556                 http.StatusOK,
557                 response)
558         // Expect response {"copies_deleted":1,"copies_failed":0}
559         expectedDc = deletecounter{1, 0}
560         json.NewDecoder(response.Body).Decode(&responseDc)
561         if responseDc != expectedDc {
562                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
563                         expectedDc, responseDc)
564         }
565         // Confirm the block has been deleted
566         buf := make([]byte, BlockSize)
567         _, err := vols[0].Get(TestHash, buf)
568         var blockDeleted = os.IsNotExist(err)
569         if !blockDeleted {
570                 t.Error("superuserExistingBlockReq: block not deleted")
571         }
572
573         // A DELETE request on a block newer than BlobSignatureTTL
574         // should return success but leave the block on the volume.
575         vols[0].Put(TestHash, TestBlock)
576         theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
577
578         response = IssueRequest(superuserExistingBlockReq)
579         ExpectStatusCode(t,
580                 "data manager request, existing block",
581                 http.StatusOK,
582                 response)
583         // Expect response {"copies_deleted":1,"copies_failed":0}
584         expectedDc = deletecounter{1, 0}
585         json.NewDecoder(response.Body).Decode(&responseDc)
586         if responseDc != expectedDc {
587                 t.Errorf("superuserExistingBlockReq\nexpected: %+v\nreceived: %+v",
588                         expectedDc, responseDc)
589         }
590         // Confirm the block has NOT been deleted.
591         _, err = vols[0].Get(TestHash, buf)
592         if err != nil {
593                 t.Errorf("testing delete on new block: %s\n", err)
594         }
595 }
596
597 // TestPullHandler
598 //
599 // Test handling of the PUT /pull statement.
600 //
601 // Cases tested: syntactically valid and invalid pull lists, from the
602 // data manager and from unprivileged users:
603 //
604 //   1. Valid pull list from an ordinary user
605 //      (expected result: 401 Unauthorized)
606 //
607 //   2. Invalid pull request from an ordinary user
608 //      (expected result: 401 Unauthorized)
609 //
610 //   3. Valid pull request from the data manager
611 //      (expected result: 200 OK with request body "Received 3 pull
612 //      requests"
613 //
614 //   4. Invalid pull request from the data manager
615 //      (expected result: 400 Bad Request)
616 //
617 // Test that in the end, the pull manager received a good pull list with
618 // the expected number of requests.
619 //
620 // TODO(twp): test concurrency: launch 100 goroutines to update the
621 // pull list simultaneously.  Make sure that none of them return 400
622 // Bad Request and that pullq.GetList() returns a valid list.
623 //
624 func TestPullHandler(t *testing.T) {
625         defer teardown()
626
627         var userToken = "USER TOKEN"
628         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
629
630         pullq = NewWorkQueue()
631
632         goodJSON := []byte(`[
633                 {
634                         "locator":"locator_with_two_servers",
635                         "servers":[
636                                 "server1",
637                                 "server2"
638                         ]
639                 },
640                 {
641                         "locator":"locator_with_no_servers",
642                         "servers":[]
643                 },
644                 {
645                         "locator":"",
646                         "servers":["empty_locator"]
647                 }
648         ]`)
649
650         badJSON := []byte(`{ "key":"I'm a little teapot" }`)
651
652         type pullTest struct {
653                 name         string
654                 req          RequestTester
655                 responseCode int
656                 responseBody string
657         }
658         var testcases = []pullTest{
659                 {
660                         "Valid pull list from an ordinary user",
661                         RequestTester{"/pull", userToken, "PUT", goodJSON},
662                         http.StatusUnauthorized,
663                         "Unauthorized\n",
664                 },
665                 {
666                         "Invalid pull request from an ordinary user",
667                         RequestTester{"/pull", userToken, "PUT", badJSON},
668                         http.StatusUnauthorized,
669                         "Unauthorized\n",
670                 },
671                 {
672                         "Valid pull request from the data manager",
673                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
674                         http.StatusOK,
675                         "Received 3 pull requests\n",
676                 },
677                 {
678                         "Invalid pull request from the data manager",
679                         RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
680                         http.StatusBadRequest,
681                         "",
682                 },
683         }
684
685         for _, tst := range testcases {
686                 response := IssueRequest(&tst.req)
687                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
688                 ExpectBody(t, tst.name, tst.responseBody, response)
689         }
690
691         // The Keep pull manager should have received one good list with 3
692         // requests on it.
693         for i := 0; i < 3; i++ {
694                 item := <-pullq.NextItem
695                 if _, ok := item.(PullRequest); !ok {
696                         t.Errorf("item %v could not be parsed as a PullRequest", item)
697                 }
698         }
699
700         expectChannelEmpty(t, pullq.NextItem)
701 }
702
703 // TestTrashHandler
704 //
705 // Test cases:
706 //
707 // Cases tested: syntactically valid and invalid trash lists, from the
708 // data manager and from unprivileged users:
709 //
710 //   1. Valid trash list from an ordinary user
711 //      (expected result: 401 Unauthorized)
712 //
713 //   2. Invalid trash list from an ordinary user
714 //      (expected result: 401 Unauthorized)
715 //
716 //   3. Valid trash list from the data manager
717 //      (expected result: 200 OK with request body "Received 3 trash
718 //      requests"
719 //
720 //   4. Invalid trash list from the data manager
721 //      (expected result: 400 Bad Request)
722 //
723 // Test that in the end, the trash collector received a good list
724 // trash list with the expected number of requests.
725 //
726 // TODO(twp): test concurrency: launch 100 goroutines to update the
727 // pull list simultaneously.  Make sure that none of them return 400
728 // Bad Request and that replica.Dump() returns a valid list.
729 //
730 func TestTrashHandler(t *testing.T) {
731         defer teardown()
732
733         var userToken = "USER TOKEN"
734         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
735
736         trashq = NewWorkQueue()
737
738         goodJSON := []byte(`[
739                 {
740                         "locator":"block1",
741                         "block_mtime":1409082153
742                 },
743                 {
744                         "locator":"block2",
745                         "block_mtime":1409082153
746                 },
747                 {
748                         "locator":"block3",
749                         "block_mtime":1409082153
750                 }
751         ]`)
752
753         badJSON := []byte(`I am not a valid JSON string`)
754
755         type trashTest struct {
756                 name         string
757                 req          RequestTester
758                 responseCode int
759                 responseBody string
760         }
761
762         var testcases = []trashTest{
763                 {
764                         "Valid trash list from an ordinary user",
765                         RequestTester{"/trash", userToken, "PUT", goodJSON},
766                         http.StatusUnauthorized,
767                         "Unauthorized\n",
768                 },
769                 {
770                         "Invalid trash list from an ordinary user",
771                         RequestTester{"/trash", userToken, "PUT", badJSON},
772                         http.StatusUnauthorized,
773                         "Unauthorized\n",
774                 },
775                 {
776                         "Valid trash list from the data manager",
777                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
778                         http.StatusOK,
779                         "Received 3 trash requests\n",
780                 },
781                 {
782                         "Invalid trash list from the data manager",
783                         RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
784                         http.StatusBadRequest,
785                         "",
786                 },
787         }
788
789         for _, tst := range testcases {
790                 response := IssueRequest(&tst.req)
791                 ExpectStatusCode(t, tst.name, tst.responseCode, response)
792                 ExpectBody(t, tst.name, tst.responseBody, response)
793         }
794
795         // The trash collector should have received one good list with 3
796         // requests on it.
797         for i := 0; i < 3; i++ {
798                 item := <-trashq.NextItem
799                 if _, ok := item.(TrashRequest); !ok {
800                         t.Errorf("item %v could not be parsed as a TrashRequest", item)
801                 }
802         }
803
804         expectChannelEmpty(t, trashq.NextItem)
805 }
806
807 // ====================
808 // Helper functions
809 // ====================
810
811 // IssueTestRequest executes an HTTP request described by rt, to a
812 // REST router.  It returns the HTTP response to the request.
813 func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
814         response := httptest.NewRecorder()
815         body := bytes.NewReader(rt.requestBody)
816         req, _ := http.NewRequest(rt.method, rt.uri, body)
817         if rt.apiToken != "" {
818                 req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
819         }
820         loggingRouter := MakeRESTRouter()
821         loggingRouter.ServeHTTP(response, req)
822         return response
823 }
824
825 // ExpectStatusCode checks whether a response has the specified status code,
826 // and reports a test failure if not.
827 func ExpectStatusCode(
828         t *testing.T,
829         testname string,
830         expectedStatus int,
831         response *httptest.ResponseRecorder) {
832         if response.Code != expectedStatus {
833                 t.Errorf("%s: expected status %d, got %+v",
834                         testname, expectedStatus, response)
835         }
836 }
837
838 func ExpectBody(
839         t *testing.T,
840         testname string,
841         expectedBody string,
842         response *httptest.ResponseRecorder) {
843         if expectedBody != "" && response.Body.String() != expectedBody {
844                 t.Errorf("%s: expected response body '%s', got %+v",
845                         testname, expectedBody, response)
846         }
847 }
848
849 // See #7121
850 func TestPutNeedsOnlyOneBuffer(t *testing.T) {
851         defer teardown()
852         KeepVM = MakeTestVolumeManager(1)
853         defer KeepVM.Close()
854
855         defer func(orig *bufferPool) {
856                 bufs = orig
857         }(bufs)
858         bufs = newBufferPool(1, BlockSize)
859
860         ok := make(chan struct{})
861         go func() {
862                 for i := 0; i < 2; i++ {
863                         response := IssueRequest(
864                                 &RequestTester{
865                                         method:      "PUT",
866                                         uri:         "/" + TestHash,
867                                         requestBody: TestBlock,
868                                 })
869                         ExpectStatusCode(t,
870                                 "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
871                 }
872                 ok <- struct{}{}
873         }()
874
875         select {
876         case <-ok:
877         case <-time.After(time.Second):
878                 t.Fatal("PUT deadlocks with MaxBuffers==1")
879         }
880 }
881
882 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
883 // leak.
884 func TestPutHandlerNoBufferleak(t *testing.T) {
885         defer teardown()
886
887         // Prepare two test Keep volumes.
888         KeepVM = MakeTestVolumeManager(2)
889         defer KeepVM.Close()
890
891         ok := make(chan bool)
892         go func() {
893                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
894                         // Unauthenticated request, no server key
895                         // => OK (unsigned response)
896                         unsignedLocator := "/" + TestHash
897                         response := IssueRequest(
898                                 &RequestTester{
899                                         method:      "PUT",
900                                         uri:         unsignedLocator,
901                                         requestBody: TestBlock,
902                                 })
903                         ExpectStatusCode(t,
904                                 "TestPutHandlerBufferleak", http.StatusOK, response)
905                         ExpectBody(t,
906                                 "TestPutHandlerBufferleak",
907                                 TestHashPutResp, response)
908                 }
909                 ok <- true
910         }()
911         select {
912         case <-time.After(20 * time.Second):
913                 // If the buffer pool leaks, the test goroutine hangs.
914                 t.Fatal("test did not finish, assuming pool leaked")
915         case <-ok:
916         }
917 }
918
919 type notifyingResponseRecorder struct {
920         *httptest.ResponseRecorder
921         closer chan bool
922 }
923
924 func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
925         return r.closer
926 }
927
928 func TestGetHandlerClientDisconnect(t *testing.T) {
929         defer func(was bool) {
930                 theConfig.RequireSignatures = was
931         }(theConfig.RequireSignatures)
932         theConfig.RequireSignatures = false
933
934         defer func(orig *bufferPool) {
935                 bufs = orig
936         }(bufs)
937         bufs = newBufferPool(1, BlockSize)
938         defer bufs.Put(bufs.Get(BlockSize))
939
940         KeepVM = MakeTestVolumeManager(2)
941         defer KeepVM.Close()
942
943         if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
944                 t.Error(err)
945         }
946
947         resp := &notifyingResponseRecorder{
948                 ResponseRecorder: httptest.NewRecorder(),
949                 closer:           make(chan bool, 1),
950         }
951         if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
952                 t.Fatal("notifyingResponseRecorder is broken")
953         }
954         // If anyone asks, the client has disconnected.
955         resp.closer <- true
956
957         ok := make(chan struct{})
958         go func() {
959                 req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
960                 (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
961                 ok <- struct{}{}
962         }()
963
964         select {
965         case <-time.After(20 * time.Second):
966                 t.Fatal("request took >20s, close notifier must be broken")
967         case <-ok:
968         }
969
970         ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
971         for i, v := range KeepVM.AllWritable() {
972                 if calls := v.(*MockVolume).called["GET"]; calls != 0 {
973                         t.Errorf("volume %d got %d calls, expected 0", i, calls)
974                 }
975         }
976 }
977
978 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
979 // leak.
980 func TestGetHandlerNoBufferLeak(t *testing.T) {
981         defer teardown()
982
983         // Prepare two test Keep volumes. Our block is stored on the second volume.
984         KeepVM = MakeTestVolumeManager(2)
985         defer KeepVM.Close()
986
987         vols := KeepVM.AllWritable()
988         if err := vols[0].Put(TestHash, TestBlock); err != nil {
989                 t.Error(err)
990         }
991
992         ok := make(chan bool)
993         go func() {
994                 for i := 0; i < theConfig.MaxBuffers+1; i++ {
995                         // Unauthenticated request, unsigned locator
996                         // => OK
997                         unsignedLocator := "/" + TestHash
998                         response := IssueRequest(
999                                 &RequestTester{
1000                                         method: "GET",
1001                                         uri:    unsignedLocator,
1002                                 })
1003                         ExpectStatusCode(t,
1004                                 "Unauthenticated request, unsigned locator", http.StatusOK, response)
1005                         ExpectBody(t,
1006                                 "Unauthenticated request, unsigned locator",
1007                                 string(TestBlock),
1008                                 response)
1009                 }
1010                 ok <- true
1011         }()
1012         select {
1013         case <-time.After(20 * time.Second):
1014                 // If the buffer pool leaks, the test goroutine hangs.
1015                 t.Fatal("test did not finish, assuming pool leaked")
1016         case <-ok:
1017         }
1018 }
1019
1020 func TestPutReplicationHeader(t *testing.T) {
1021         defer teardown()
1022
1023         KeepVM = MakeTestVolumeManager(2)
1024         defer KeepVM.Close()
1025
1026         resp := IssueRequest(&RequestTester{
1027                 method:      "PUT",
1028                 uri:         "/" + TestHash,
1029                 requestBody: TestBlock,
1030         })
1031         if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
1032                 t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
1033         }
1034 }
1035
1036 func TestUntrashHandler(t *testing.T) {
1037         defer teardown()
1038
1039         // Set up Keep volumes
1040         KeepVM = MakeTestVolumeManager(2)
1041         defer KeepVM.Close()
1042         vols := KeepVM.AllWritable()
1043         vols[0].Put(TestHash, TestBlock)
1044
1045         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1046
1047         // unauthenticatedReq => UnauthorizedError
1048         unauthenticatedReq := &RequestTester{
1049                 method: "PUT",
1050                 uri:    "/untrash/" + TestHash,
1051         }
1052         response := IssueRequest(unauthenticatedReq)
1053         ExpectStatusCode(t,
1054                 "Unauthenticated request",
1055                 UnauthorizedError.HTTPCode,
1056                 response)
1057
1058         // notDataManagerReq => UnauthorizedError
1059         notDataManagerReq := &RequestTester{
1060                 method:   "PUT",
1061                 uri:      "/untrash/" + TestHash,
1062                 apiToken: knownToken,
1063         }
1064
1065         response = IssueRequest(notDataManagerReq)
1066         ExpectStatusCode(t,
1067                 "Non-datamanager token",
1068                 UnauthorizedError.HTTPCode,
1069                 response)
1070
1071         // datamanagerWithBadHashReq => StatusBadRequest
1072         datamanagerWithBadHashReq := &RequestTester{
1073                 method:   "PUT",
1074                 uri:      "/untrash/thisisnotalocator",
1075                 apiToken: theConfig.systemAuthToken,
1076         }
1077         response = IssueRequest(datamanagerWithBadHashReq)
1078         ExpectStatusCode(t,
1079                 "Bad locator in untrash request",
1080                 http.StatusBadRequest,
1081                 response)
1082
1083         // datamanagerWrongMethodReq => StatusBadRequest
1084         datamanagerWrongMethodReq := &RequestTester{
1085                 method:   "GET",
1086                 uri:      "/untrash/" + TestHash,
1087                 apiToken: theConfig.systemAuthToken,
1088         }
1089         response = IssueRequest(datamanagerWrongMethodReq)
1090         ExpectStatusCode(t,
1091                 "Only PUT method is supported for untrash",
1092                 http.StatusBadRequest,
1093                 response)
1094
1095         // datamanagerReq => StatusOK
1096         datamanagerReq := &RequestTester{
1097                 method:   "PUT",
1098                 uri:      "/untrash/" + TestHash,
1099                 apiToken: theConfig.systemAuthToken,
1100         }
1101         response = IssueRequest(datamanagerReq)
1102         ExpectStatusCode(t,
1103                 "",
1104                 http.StatusOK,
1105                 response)
1106         expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
1107         if response.Body.String() != expected {
1108                 t.Errorf(
1109                         "Untrash response mismatched: expected %s, got:\n%s",
1110                         expected, response.Body.String())
1111         }
1112 }
1113
1114 func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
1115         defer teardown()
1116
1117         // Set up readonly Keep volumes
1118         vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
1119         vols[0].Readonly = true
1120         vols[1].Readonly = true
1121         KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
1122         defer KeepVM.Close()
1123
1124         theConfig.systemAuthToken = "DATA MANAGER TOKEN"
1125
1126         // datamanagerReq => StatusOK
1127         datamanagerReq := &RequestTester{
1128                 method:   "PUT",
1129                 uri:      "/untrash/" + TestHash,
1130                 apiToken: theConfig.systemAuthToken,
1131         }
1132         response := IssueRequest(datamanagerReq)
1133         ExpectStatusCode(t,
1134                 "No writable volumes",
1135                 http.StatusNotFound,
1136                 response)
1137 }