7255: update TestPutAndGetCollectionsWithMultipleStreamsAndBlocks to verify that...
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "os/exec"
13         "regexp"
14         "strings"
15         "testing"
16         "time"
17 )
18
19 var arv arvadosclient.ArvadosClient
20 var keepClient *keepclient.KeepClient
21 var keepServers []string
22
23 func SetupDataManagerTest(t *testing.T) {
24         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
25
26         // start api and keep servers
27         arvadostest.ResetEnv()
28         arvadostest.StartAPI()
29         arvadostest.StartKeep(2, false)
30
31         arv = makeArvadosClient()
32         arv.ApiToken = arvadostest.DataManagerToken
33
34         // keep client
35         keepClient = &keepclient.KeepClient{
36                 Arvados:       &arv,
37                 Want_replicas: 2,
38                 Using_proxy:   true,
39                 Client:        &http.Client{},
40         }
41
42         // discover keep services
43         if err := keepClient.DiscoverKeepServers(); err != nil {
44                 t.Fatalf("Error discovering keep services: %s", err)
45         }
46         keepServers = []string{}
47         for _, host := range keepClient.LocalRoots() {
48                 keepServers = append(keepServers, host)
49         }
50 }
51
52 func TearDownDataManagerTest(t *testing.T) {
53         arvadostest.StopKeep(2)
54         arvadostest.StopAPI()
55 }
56
57 func putBlock(t *testing.T, data string) string {
58         locator, _, err := keepClient.PutB([]byte(data))
59         if err != nil {
60                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
61         }
62         if locator == "" {
63                 t.Fatalf("No locator found after putting test data")
64         }
65
66         splits := strings.Split(locator, "+")
67         return splits[0] + "+" + splits[1]
68 }
69
70 func getBlock(t *testing.T, locator string, data string) {
71         reader, blocklen, _, err := keepClient.Get(locator)
72         if err != nil {
73                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
74         }
75         if reader == nil {
76                 t.Fatalf("No reader found after putting test data")
77         }
78         if blocklen != int64(len(data)) {
79                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
80         }
81
82         all, err := ioutil.ReadAll(reader)
83         if string(all) != data {
84                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
85         }
86 }
87
88 // Create a collection using arv-put
89 func createCollection(t *testing.T, data string) string {
90         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
91         defer os.Remove(tempfile.Name())
92
93         _, err = tempfile.Write([]byte(data))
94         if err != nil {
95                 t.Fatalf("Error writing to tempfile %v", err)
96         }
97
98         // arv-put
99         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
100         if err != nil {
101                 t.Fatalf("Error running arv-put %s", err)
102         }
103
104         uuid := string(output[0:27]) // trim terminating char
105         return uuid
106 }
107
108 // Get collection locator
109 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
110
111 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
112         manifest := getCollection(t, uuid)["manifest_text"].(string)
113
114         locator := strings.Split(manifest, " ")[1]
115         match := locatorMatcher.FindStringSubmatch(locator)
116         if match == nil {
117                 t.Fatalf("No locator found in collection manifest %s", manifest)
118         }
119
120         return match[1] + "+" + match[2]
121 }
122
123 func switchToken(t string) func() {
124         orig := arv.ApiToken
125         restore := func() {
126                 arv.ApiToken = orig
127         }
128         arv.ApiToken = t
129         return restore
130 }
131
132 func getCollection(t *testing.T, uuid string) Dict {
133         defer switchToken(arvadostest.AdminToken)()
134
135         getback := make(Dict)
136         err := arv.Get("collections", uuid, nil, &getback)
137         if err != nil {
138                 t.Fatalf("Error getting collection %s", err)
139         }
140         if getback["uuid"] != uuid {
141                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
142         }
143
144         return getback
145 }
146
147 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
148         defer switchToken(arvadostest.AdminToken)()
149
150         err := arv.Update("collections", uuid, arvadosclient.Dict{
151                 "collection": arvadosclient.Dict{
152                         paramName: paramValue,
153                 },
154         }, &arvadosclient.Dict{})
155
156         if err != nil {
157                 t.Fatalf("Error updating collection %s", err)
158         }
159 }
160
161 type Dict map[string]interface{}
162
163 func deleteCollection(t *testing.T, uuid string) {
164         defer switchToken(arvadostest.AdminToken)()
165
166         getback := make(Dict)
167         err := arv.Delete("collections", uuid, nil, &getback)
168         if err != nil {
169                 t.Fatalf("Error deleting collection %s", err)
170         }
171         if getback["uuid"] != uuid {
172                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
173         }
174 }
175
176 func dataManagerSingleRun(t *testing.T) {
177         err := singlerun(arv)
178         if err != nil {
179                 t.Fatalf("Error during singlerun %s", err)
180         }
181 }
182
183 func getBlockIndexesForServer(t *testing.T, i int) []string {
184         var indexes []string
185
186         path := keepServers[i] + "/index"
187         client := http.Client{}
188         req, err := http.NewRequest("GET", path, nil)
189         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
190         req.Header.Add("Content-Type", "application/octet-stream")
191         resp, err := client.Do(req)
192         defer resp.Body.Close()
193
194         if err != nil {
195                 t.Fatalf("Error during %s %s", path, err)
196         }
197
198         body, err := ioutil.ReadAll(resp.Body)
199         if err != nil {
200                 t.Fatalf("Error reading response from %s %s", path, err)
201         }
202
203         lines := strings.Split(string(body), "\n")
204         for _, line := range lines {
205                 indexes = append(indexes, strings.Split(line, " ")...)
206         }
207
208         return indexes
209 }
210
211 func getBlockIndexes(t *testing.T) [][]string {
212         var indexes [][]string
213
214         for i := 0; i < len(keepServers); i++ {
215                 indexes = append(indexes, getBlockIndexesForServer(t, i))
216         }
217         return indexes
218 }
219
220 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
221         blocks := getBlockIndexes(t)
222
223         for _, block := range notExpected {
224                 for _, idx := range blocks {
225                         if valueInArray(block, idx) {
226                                 t.Fatalf("Found unexpected block %s", block)
227                         }
228                 }
229         }
230
231         for _, block := range expected {
232                 nFound := 0
233                 for _, idx := range blocks {
234                         if valueInArray(block, idx) {
235                                 nFound++
236                         }
237                 }
238                 if nFound < minReplication {
239                         t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
240                 }
241         }
242 }
243
244 func valueInArray(value string, list []string) bool {
245         for _, v := range list {
246                 if value == v {
247                         return true
248                 }
249         }
250         return false
251 }
252
253 /*
254 Test env uses two keep volumes. The volume names can be found by reading the files
255   ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
256
257 The keep volumes are of the dir structure:
258   volumeN/subdir/locator
259 */
260 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
261         // First get rid of any size hints in the locators
262         var trimmedBlockLocators []string
263         for _, block := range oldUnusedBlockLocators {
264                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
265         }
266
267         // Get the working dir so that we can read keep{n}.volume files
268         wd, err := os.Getwd()
269         if err != nil {
270                 t.Fatalf("Error getting working dir %s", err)
271         }
272
273         // Now cycle through the two keep volumes
274         oldTime := time.Now().AddDate(0, -2, 0)
275         for i := 0; i < 2; i++ {
276                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
277                 volumeDir, err := ioutil.ReadFile(filename)
278                 if err != nil {
279                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
280                 }
281
282                 // Read the keep volume dir structure
283                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
284                 if err != nil {
285                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
286                 }
287
288                 // Read each subdir for each of the keep volume dir
289                 for _, subdir := range volumeContents {
290                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
291                         subdirContents, err := ioutil.ReadDir(string(subdirName))
292                         if err != nil {
293                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
294                         }
295
296                         // Now we got to the files. The files are names are the block locators
297                         for _, fileInfo := range subdirContents {
298                                 blockName := fileInfo.Name()
299                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
300                                 if valueInArray(blockName, trimmedBlockLocators) {
301                                         err = os.Chtimes(myname, oldTime, oldTime)
302                                 }
303                         }
304                 }
305         }
306 }
307
308 func getStatus(t *testing.T, path string) interface{} {
309         client := http.Client{}
310         req, err := http.NewRequest("GET", path, nil)
311         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
312         req.Header.Add("Content-Type", "application/octet-stream")
313         resp, err := client.Do(req)
314         if err != nil {
315                 t.Fatalf("Error during %s %s", path, err)
316         }
317         defer resp.Body.Close()
318
319         var s interface{}
320         json.NewDecoder(resp.Body).Decode(&s)
321
322         return s
323 }
324
325 // Wait until PullQueue and TrashQueue are empty on all keepServers.
326 func waitUntilQueuesFinishWork(t *testing.T) {
327         for _, ks := range keepServers {
328                 for done := false; !done; {
329                         time.Sleep(100 * time.Millisecond)
330                         s := getStatus(t, ks+"/status.json")
331                         for _, qName := range []string{"PullQueue", "TrashQueue"} {
332                                 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
333                                 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
334                                         done = true
335                                 }
336                         }
337                 }
338         }
339 }
340
341 /*
342 Create some blocks and backdate some of them.
343 Also create some collections and delete some of them.
344 Verify block indexes.
345 */
346 func TestPutAndGetBlocks(t *testing.T) {
347         defer TearDownDataManagerTest(t)
348         SetupDataManagerTest(t)
349
350         // Put some blocks which will be backdated later on
351         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
352         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
353         var oldUnusedBlockLocators []string
354         oldUnusedBlockData := "this block will have older mtime"
355         for i := 0; i < 5; i++ {
356                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
357         }
358         for i := 0; i < 5; i++ {
359                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
360         }
361
362         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
363         oldUsedBlockData := "this collection block will have older mtime"
364         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
365         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
366
367         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
368         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
369         var newBlockLocators []string
370         newBlockData := "this block is newer"
371         for i := 0; i < 5; i++ {
372                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
373         }
374         for i := 0; i < 5; i++ {
375                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
376         }
377
378         // Create a collection that would be deleted later on
379         toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
380         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
381
382         // Create another collection that has the same data as the one of the old blocks
383         oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
384         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
385         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
386                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
387         }
388
389         // Create another collection whose replication level will be changed
390         replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
391         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
392
393         // Create two collections with same data; one will be deleted later on
394         dataForTwoCollections := "one of these collections will be deleted"
395         oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
396         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
397         secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
398         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
399         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
400                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
401         }
402
403         // create collection with empty manifest text
404         emptyBlockLocator := putBlock(t, "")
405         emptyCollection := createCollection(t, "")
406
407         // Verify blocks before doing any backdating / deleting.
408         var expected []string
409         expected = append(expected, oldUnusedBlockLocators...)
410         expected = append(expected, newBlockLocators...)
411         expected = append(expected, toBeDeletedCollectionLocator)
412         expected = append(expected, replicationCollectionLocator)
413         expected = append(expected, oneOfTwoWithSameDataLocator)
414         expected = append(expected, secondOfTwoWithSameDataLocator)
415         expected = append(expected, emptyBlockLocator)
416
417         verifyBlocks(t, nil, expected, 2)
418
419         // Run datamanager in singlerun mode
420         dataManagerSingleRun(t)
421         waitUntilQueuesFinishWork(t)
422
423         verifyBlocks(t, nil, expected, 2)
424
425         // Backdate the to-be old blocks and delete the collections
426         backdateBlocks(t, oldUnusedBlockLocators)
427         deleteCollection(t, toBeDeletedCollectionUUID)
428         deleteCollection(t, secondOfTwoWithSameDataUUID)
429         backdateBlocks(t, []string{emptyBlockLocator})
430         deleteCollection(t, emptyCollection)
431
432         // Run data manager again
433         dataManagerSingleRun(t)
434         waitUntilQueuesFinishWork(t)
435
436         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
437         expected = expected[:0]
438         expected = append(expected, oldUsedBlockLocator)
439         expected = append(expected, newBlockLocators...)
440         expected = append(expected, toBeDeletedCollectionLocator)
441         expected = append(expected, oneOfTwoWithSameDataLocator)
442         expected = append(expected, secondOfTwoWithSameDataLocator)
443         expected = append(expected, emptyBlockLocator) // even when unreferenced, this remains
444
445         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
446
447         // Reduce desired replication on replicationCollectionUUID
448         // collection, and verify that Data Manager does not reduce
449         // actual replication any further than that. (It might not
450         // reduce actual replication at all; that's OK for this test.)
451
452         // Reduce desired replication level.
453         updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
454         collection := getCollection(t, replicationCollectionUUID)
455         if collection["replication_desired"].(interface{}) != float64(1) {
456                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
457         }
458
459         // Verify data is currently overreplicated.
460         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
461
462         // Run data manager again
463         dataManagerSingleRun(t)
464         waitUntilQueuesFinishWork(t)
465
466         // Verify data is not underreplicated.
467         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
468
469         // Verify *other* collections' data is not underreplicated.
470         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
471 }
472
473 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
474         defer TearDownDataManagerTest(t)
475         SetupDataManagerTest(t)
476
477         for i := 0; i < 10; i++ {
478                 err := singlerun(arv)
479                 if err != nil {
480                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
481                 }
482         }
483 }
484
485 func TestGetStatusRepeatedly(t *testing.T) {
486         defer TearDownDataManagerTest(t)
487         SetupDataManagerTest(t)
488
489         for i := 0; i < 10; i++ {
490                 for j := 0; j < 2; j++ {
491                         s := getStatus(t, keepServers[j]+"/status.json")
492
493                         var pullQueueStatus interface{}
494                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
495                         var trashQueueStatus interface{}
496                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
497
498                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
499                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
500                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
501                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
502                                 t.Fatalf("PullQueue and TrashQueue status not found")
503                         }
504
505                         time.Sleep(100 * time.Millisecond)
506                 }
507         }
508 }
509
510 func TestRunDatamanagerWithBogusServer(t *testing.T) {
511         defer TearDownDataManagerTest(t)
512         SetupDataManagerTest(t)
513
514         arv.ApiServer = "bogus-server"
515
516         err := singlerun(arv)
517         if err == nil {
518                 t.Fatalf("Expected error during singlerun with bogus server")
519         }
520 }
521
522 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
523         defer TearDownDataManagerTest(t)
524         SetupDataManagerTest(t)
525
526         arv.ApiToken = arvadostest.ActiveToken
527
528         err := singlerun(arv)
529         if err == nil {
530                 t.Fatalf("Expected error during singlerun as non-admin user")
531         }
532 }
533
534 // Create a collection with multiple streams and blocks
535 func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, numBlocks int) (string, []string) {
536         defer switchToken(arvadostest.AdminToken)()
537
538         manifest := ""
539         locators := make(map[string]bool)
540         for s := 0; s < numStreams; s++ {
541                 manifest += fmt.Sprintf("./stream%d ", s)
542                 for b := 0; b < numBlocks; b++ {
543                         locator, _, err := keepClient.PutB([]byte(fmt.Sprintf("%s in stream %d and block %d", data, s, b)))
544                         if err != nil {
545                                 t.Fatalf("Error creating block %d in stream %d: %v", b, s, err)
546                         }
547                         locators[strings.Split(locator, "+A")[0]] = true
548                         manifest += locator + " "
549                 }
550                 manifest += "0:1:dummyfile.txt\n"
551         }
552
553         collection := make(Dict)
554         err := arv.Create("collections",
555                 arvadosclient.Dict{"collection": arvadosclient.Dict{"manifest_text": manifest}},
556                 &collection)
557
558         if err != nil {
559                 t.Fatalf("Error creating collection %v", err)
560         }
561
562         var locs []string
563         for k, _ := range locators {
564                 locs = append(locs, k)
565         }
566
567         return collection["uuid"].(string), locs
568 }
569
570 /*
571   Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
572   Create another collection with multiple streams and blocks; backdate it's blocks and delete the collection.
573   After datamanager run: expect blocks from the first collection, but none from the second collection.
574 */
575 func TestPutAndGetCollectionsWithMultipleStreamsAndBlocks(t *testing.T) {
576         defer TearDownDataManagerTest(t)
577         SetupDataManagerTest(t)
578
579         // create collection whose blocks will be backdated
580         collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", 100, 10)
581         if collectionWithOldBlocks == "" {
582                 t.Fatalf("Failed to create collection with 1000 blocks")
583         }
584         if len(oldBlocks) != 1000 {
585                 t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
586         }
587
588         // create another collection, whose blocks will be backdated and the collection will be deleted
589         toBeDeletedCollection, toBeDeletedCollectionBlocks := createMultiStreamBlockCollection(t, "new block", 2, 5)
590         if toBeDeletedCollection == "" {
591                 t.Fatalf("Failed to create collection with 10 blocks")
592         }
593         if len(toBeDeletedCollectionBlocks) != 10 {
594                 t.Fatalf("Not all blocks are created: expected %v, found %v", 10, len(toBeDeletedCollectionBlocks))
595         }
596
597         // create a stray block that will be backdated
598         strayOldBlock := putBlock(t, "this stray block is old")
599
600         // create another block that will not be backdated
601         strayNewBlock := putBlock(t, "this stray block is new")
602
603         expected := []string{}
604         expected = append(expected, oldBlocks...)
605         expected = append(expected, toBeDeletedCollectionBlocks...)
606         expected = append(expected, strayOldBlock)
607         expected = append(expected, strayNewBlock)
608         verifyBlocks(t, nil, expected, 2)
609
610         // Backdate old blocks; but the collection still references these blocks
611         backdateBlocks(t, oldBlocks)
612
613         // Backdate first block from the newer blocks and delete the collection; the rest are still be reachable
614         backdateBlocks(t, toBeDeletedCollectionBlocks)
615         deleteCollection(t, toBeDeletedCollection)
616
617         // also backdate the stray old block
618         backdateBlocks(t, []string{strayOldBlock})
619
620         // run datamanager
621         dataManagerSingleRun(t)
622
623         expected = []string{strayNewBlock}
624         expected = append(expected, oldBlocks...)
625
626         notExpected := []string{strayOldBlock}
627         notExpected = append(notExpected, toBeDeletedCollectionBlocks...)
628
629         verifyBlocks(t, notExpected, expected, 2)
630 }