8831: Add crunchrunner to shell node dependencies.
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/services/datamanager/collection"
10         "git.curoverse.com/arvados.git/services/datamanager/summary"
11         "io/ioutil"
12         "net/http"
13         "os"
14         "os/exec"
15         "path"
16         "regexp"
17         "strings"
18         "testing"
19         "time"
20 )
21
22 var arv arvadosclient.ArvadosClient
23 var keepClient *keepclient.KeepClient
24 var keepServers []string
25
26 func SetupDataManagerTest(t *testing.T) {
27         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
28
29         // start api and keep servers
30         arvadostest.ResetEnv()
31         arvadostest.StartAPI()
32         arvadostest.StartKeep(2, false)
33
34         var err error
35         arv, err = arvadosclient.MakeArvadosClient()
36         if err != nil {
37                 t.Fatalf("Error making arvados client: %s", err)
38         }
39         arv.ApiToken = arvadostest.DataManagerToken
40
41         // keep client
42         keepClient = &keepclient.KeepClient{
43                 Arvados:       &arv,
44                 Want_replicas: 2,
45                 Client:        &http.Client{},
46         }
47
48         // discover keep services
49         if err = keepClient.DiscoverKeepServers(); err != nil {
50                 t.Fatalf("Error discovering keep services: %s", err)
51         }
52         keepServers = []string{}
53         for _, host := range keepClient.LocalRoots() {
54                 keepServers = append(keepServers, host)
55         }
56 }
57
58 func TearDownDataManagerTest(t *testing.T) {
59         arvadostest.StopKeep(2)
60         arvadostest.StopAPI()
61         summary.WriteDataTo = ""
62         collection.HeapProfileFilename = ""
63 }
64
65 func putBlock(t *testing.T, data string) string {
66         locator, _, err := keepClient.PutB([]byte(data))
67         if err != nil {
68                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
69         }
70         if locator == "" {
71                 t.Fatalf("No locator found after putting test data")
72         }
73
74         splits := strings.Split(locator, "+")
75         return splits[0] + "+" + splits[1]
76 }
77
78 func getBlock(t *testing.T, locator string, data string) {
79         reader, blocklen, _, err := keepClient.Get(locator)
80         if err != nil {
81                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
82         }
83         if reader == nil {
84                 t.Fatalf("No reader found after putting test data")
85         }
86         if blocklen != int64(len(data)) {
87                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
88         }
89
90         all, err := ioutil.ReadAll(reader)
91         if string(all) != data {
92                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
93         }
94 }
95
96 // Create a collection using arv-put
97 func createCollection(t *testing.T, data string) string {
98         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
99         defer os.Remove(tempfile.Name())
100
101         _, err = tempfile.Write([]byte(data))
102         if err != nil {
103                 t.Fatalf("Error writing to tempfile %v", err)
104         }
105
106         // arv-put
107         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
108         if err != nil {
109                 t.Fatalf("Error running arv-put %s", err)
110         }
111
112         uuid := string(output[0:27]) // trim terminating char
113         return uuid
114 }
115
116 // Get collection locator
117 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
118
119 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
120         manifest := getCollection(t, uuid)["manifest_text"].(string)
121
122         locator := strings.Split(manifest, " ")[1]
123         match := locatorMatcher.FindStringSubmatch(locator)
124         if match == nil {
125                 t.Fatalf("No locator found in collection manifest %s", manifest)
126         }
127
128         return match[1] + "+" + match[2]
129 }
130
131 func switchToken(t string) func() {
132         orig := arv.ApiToken
133         restore := func() {
134                 arv.ApiToken = orig
135         }
136         arv.ApiToken = t
137         return restore
138 }
139
140 func getCollection(t *testing.T, uuid string) Dict {
141         defer switchToken(arvadostest.AdminToken)()
142
143         getback := make(Dict)
144         err := arv.Get("collections", uuid, nil, &getback)
145         if err != nil {
146                 t.Fatalf("Error getting collection %s", err)
147         }
148         if getback["uuid"] != uuid {
149                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
150         }
151
152         return getback
153 }
154
155 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
156         defer switchToken(arvadostest.AdminToken)()
157
158         err := arv.Update("collections", uuid, arvadosclient.Dict{
159                 "collection": arvadosclient.Dict{
160                         paramName: paramValue,
161                 },
162         }, &arvadosclient.Dict{})
163
164         if err != nil {
165                 t.Fatalf("Error updating collection %s", err)
166         }
167 }
168
169 type Dict map[string]interface{}
170
171 func deleteCollection(t *testing.T, uuid string) {
172         defer switchToken(arvadostest.AdminToken)()
173
174         getback := make(Dict)
175         err := arv.Delete("collections", uuid, nil, &getback)
176         if err != nil {
177                 t.Fatalf("Error deleting collection %s", err)
178         }
179         if getback["uuid"] != uuid {
180                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
181         }
182 }
183
184 func dataManagerSingleRun(t *testing.T) {
185         err := singlerun(arv)
186         if err != nil {
187                 t.Fatalf("Error during singlerun %s", err)
188         }
189 }
190
191 func getBlockIndexesForServer(t *testing.T, i int) []string {
192         var indexes []string
193
194         path := keepServers[i] + "/index"
195         client := http.Client{}
196         req, err := http.NewRequest("GET", path, nil)
197         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
198         req.Header.Add("Content-Type", "application/octet-stream")
199         resp, err := client.Do(req)
200         defer resp.Body.Close()
201
202         if err != nil {
203                 t.Fatalf("Error during %s %s", path, err)
204         }
205
206         body, err := ioutil.ReadAll(resp.Body)
207         if err != nil {
208                 t.Fatalf("Error reading response from %s %s", path, err)
209         }
210
211         lines := strings.Split(string(body), "\n")
212         for _, line := range lines {
213                 indexes = append(indexes, strings.Split(line, " ")...)
214         }
215
216         return indexes
217 }
218
219 func getBlockIndexes(t *testing.T) [][]string {
220         var indexes [][]string
221
222         for i := 0; i < len(keepServers); i++ {
223                 indexes = append(indexes, getBlockIndexesForServer(t, i))
224         }
225         return indexes
226 }
227
228 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
229         blocks := getBlockIndexes(t)
230
231         for _, block := range notExpected {
232                 for _, idx := range blocks {
233                         if valueInArray(block, idx) {
234                                 t.Fatalf("Found unexpected block %s", block)
235                         }
236                 }
237         }
238
239         for _, block := range expected {
240                 nFound := 0
241                 for _, idx := range blocks {
242                         if valueInArray(block, idx) {
243                                 nFound++
244                         }
245                 }
246                 if nFound < minReplication {
247                         t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
248                 }
249         }
250 }
251
252 func valueInArray(value string, list []string) bool {
253         for _, v := range list {
254                 if value == v {
255                         return true
256                 }
257         }
258         return false
259 }
260
261 // Test env uses two keep volumes. The volume names can be found by reading the files
262 // ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
263 //
264 // The keep volumes are of the dir structure: volumeN/subdir/locator
265 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
266         // First get rid of any size hints in the locators
267         var trimmedBlockLocators []string
268         for _, block := range oldUnusedBlockLocators {
269                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
270         }
271
272         // Get the working dir so that we can read keep{n}.volume files
273         wd, err := os.Getwd()
274         if err != nil {
275                 t.Fatalf("Error getting working dir %s", err)
276         }
277
278         // Now cycle through the two keep volumes
279         oldTime := time.Now().AddDate(0, -2, 0)
280         for i := 0; i < 2; i++ {
281                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
282                 volumeDir, err := ioutil.ReadFile(filename)
283                 if err != nil {
284                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
285                 }
286
287                 // Read the keep volume dir structure
288                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
289                 if err != nil {
290                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
291                 }
292
293                 // Read each subdir for each of the keep volume dir
294                 for _, subdir := range volumeContents {
295                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
296                         subdirContents, err := ioutil.ReadDir(string(subdirName))
297                         if err != nil {
298                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
299                         }
300
301                         // Now we got to the files. The files are names are the block locators
302                         for _, fileInfo := range subdirContents {
303                                 blockName := fileInfo.Name()
304                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
305                                 if valueInArray(blockName, trimmedBlockLocators) {
306                                         err = os.Chtimes(myname, oldTime, oldTime)
307                                 }
308                         }
309                 }
310         }
311 }
312
313 func getStatus(t *testing.T, path string) interface{} {
314         client := http.Client{}
315         req, err := http.NewRequest("GET", path, nil)
316         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
317         req.Header.Add("Content-Type", "application/octet-stream")
318         resp, err := client.Do(req)
319         if err != nil {
320                 t.Fatalf("Error during %s %s", path, err)
321         }
322         defer resp.Body.Close()
323
324         var s interface{}
325         json.NewDecoder(resp.Body).Decode(&s)
326
327         return s
328 }
329
330 // Wait until PullQueue and TrashQueue are empty on all keepServers.
331 func waitUntilQueuesFinishWork(t *testing.T) {
332         for _, ks := range keepServers {
333                 for done := false; !done; {
334                         time.Sleep(100 * time.Millisecond)
335                         s := getStatus(t, ks+"/status.json")
336                         for _, qName := range []string{"PullQueue", "TrashQueue"} {
337                                 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
338                                 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
339                                         done = true
340                                 }
341                         }
342                 }
343         }
344 }
345
346 // Create some blocks and backdate some of them.
347 // Also create some collections and delete some of them.
348 // Verify block indexes.
349 func TestPutAndGetBlocks(t *testing.T) {
350         defer TearDownDataManagerTest(t)
351         SetupDataManagerTest(t)
352
353         // Put some blocks which will be backdated later on
354         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
355         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
356         var oldUnusedBlockLocators []string
357         oldUnusedBlockData := "this block will have older mtime"
358         for i := 0; i < 5; i++ {
359                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
360         }
361         for i := 0; i < 5; i++ {
362                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
363         }
364
365         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
366         oldUsedBlockData := "this collection block will have older mtime"
367         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
368         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
369
370         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
371         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
372         var newBlockLocators []string
373         newBlockData := "this block is newer"
374         for i := 0; i < 5; i++ {
375                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
376         }
377         for i := 0; i < 5; i++ {
378                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
379         }
380
381         // Create a collection that would be deleted later on
382         toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
383         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
384
385         // Create another collection that has the same data as the one of the old blocks
386         oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
387         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
388         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
389                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
390         }
391
392         // Create another collection whose replication level will be changed
393         replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
394         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
395
396         // Create two collections with same data; one will be deleted later on
397         dataForTwoCollections := "one of these collections will be deleted"
398         oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
399         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
400         secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
401         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
402         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
403                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
404         }
405
406         // create collection with empty manifest text
407         emptyBlockLocator := putBlock(t, "")
408         emptyCollection := createCollection(t, "")
409
410         // Verify blocks before doing any backdating / deleting.
411         var expected []string
412         expected = append(expected, oldUnusedBlockLocators...)
413         expected = append(expected, newBlockLocators...)
414         expected = append(expected, toBeDeletedCollectionLocator)
415         expected = append(expected, replicationCollectionLocator)
416         expected = append(expected, oneOfTwoWithSameDataLocator)
417         expected = append(expected, secondOfTwoWithSameDataLocator)
418         expected = append(expected, emptyBlockLocator)
419
420         verifyBlocks(t, nil, expected, 2)
421
422         // Run datamanager in singlerun mode
423         dataManagerSingleRun(t)
424         waitUntilQueuesFinishWork(t)
425
426         verifyBlocks(t, nil, expected, 2)
427
428         // Backdate the to-be old blocks and delete the collections
429         backdateBlocks(t, oldUnusedBlockLocators)
430         deleteCollection(t, toBeDeletedCollectionUUID)
431         deleteCollection(t, secondOfTwoWithSameDataUUID)
432         backdateBlocks(t, []string{emptyBlockLocator})
433         deleteCollection(t, emptyCollection)
434
435         // Run data manager again
436         dataManagerSingleRun(t)
437         waitUntilQueuesFinishWork(t)
438
439         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
440         expected = expected[:0]
441         expected = append(expected, oldUsedBlockLocator)
442         expected = append(expected, newBlockLocators...)
443         expected = append(expected, toBeDeletedCollectionLocator)
444         expected = append(expected, oneOfTwoWithSameDataLocator)
445         expected = append(expected, secondOfTwoWithSameDataLocator)
446         expected = append(expected, emptyBlockLocator) // even when unreferenced, this remains
447
448         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
449
450         // Reduce desired replication on replicationCollectionUUID
451         // collection, and verify that Data Manager does not reduce
452         // actual replication any further than that. (It might not
453         // reduce actual replication at all; that's OK for this test.)
454
455         // Reduce desired replication level.
456         updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
457         collection := getCollection(t, replicationCollectionUUID)
458         if collection["replication_desired"].(interface{}) != float64(1) {
459                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
460         }
461
462         // Verify data is currently overreplicated.
463         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
464
465         // Run data manager again
466         dataManagerSingleRun(t)
467         waitUntilQueuesFinishWork(t)
468
469         // Verify data is not underreplicated.
470         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
471
472         // Verify *other* collections' data is not underreplicated.
473         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
474 }
475
476 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
477         defer TearDownDataManagerTest(t)
478         SetupDataManagerTest(t)
479
480         for i := 0; i < 10; i++ {
481                 err := singlerun(arv)
482                 if err != nil {
483                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
484                 }
485         }
486 }
487
488 func TestGetStatusRepeatedly(t *testing.T) {
489         defer TearDownDataManagerTest(t)
490         SetupDataManagerTest(t)
491
492         for i := 0; i < 10; i++ {
493                 for j := 0; j < 2; j++ {
494                         s := getStatus(t, keepServers[j]+"/status.json")
495
496                         var pullQueueStatus interface{}
497                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
498                         var trashQueueStatus interface{}
499                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
500
501                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
502                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
503                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
504                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
505                                 t.Fatalf("PullQueue and TrashQueue status not found")
506                         }
507
508                         time.Sleep(100 * time.Millisecond)
509                 }
510         }
511 }
512
513 func TestRunDatamanagerWithBogusServer(t *testing.T) {
514         defer TearDownDataManagerTest(t)
515         SetupDataManagerTest(t)
516
517         arv.ApiServer = "bogus-server"
518
519         err := singlerun(arv)
520         if err == nil {
521                 t.Fatalf("Expected error during singlerun with bogus server")
522         }
523 }
524
525 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
526         defer TearDownDataManagerTest(t)
527         SetupDataManagerTest(t)
528
529         arv.ApiToken = arvadostest.ActiveToken
530
531         err := singlerun(arv)
532         if err == nil {
533                 t.Fatalf("Expected error during singlerun as non-admin user")
534         }
535 }
536
537 func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
538         testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
539 }
540
541 func createBadPath(t *testing.T) (badpath string) {
542         tempdir, err := ioutil.TempDir("", "bad")
543         if err != nil {
544                 t.Fatalf("Could not create temporary directory for bad path: %v", err)
545         }
546         badpath = path.Join(tempdir, "bad")
547         return
548 }
549
550 func destroyBadPath(t *testing.T, badpath string) {
551         tempdir := path.Join(badpath, "..")
552         err := os.Remove(tempdir)
553         if err != nil {
554                 t.Fatalf("Could not remove bad path temporary directory %v: %v", tempdir, err)
555         }
556 }
557
558 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
559         badpath := createBadPath(t)
560         defer destroyBadPath(t, badpath)
561         testOldBlocksNotDeletedOnDataManagerError(t, path.Join(badpath, "writetofile"), "", true, true)
562 }
563
564 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
565         badpath := createBadPath(t)
566         defer destroyBadPath(t, badpath)
567         testOldBlocksNotDeletedOnDataManagerError(t, "", path.Join(badpath, "heapprofilefile"), true, true)
568 }
569
570 // Create some blocks and backdate some of them.
571 // Run datamanager while producing an error condition.
572 // Verify that the blocks are hence not deleted.
573 func testOldBlocksNotDeletedOnDataManagerError(t *testing.T, writeDataTo string, heapProfileFile string, expectError bool, expectOldBlocks bool) {
574         defer TearDownDataManagerTest(t)
575         SetupDataManagerTest(t)
576
577         // Put some blocks and backdate them.
578         var oldUnusedBlockLocators []string
579         oldUnusedBlockData := "this block will have older mtime"
580         for i := 0; i < 5; i++ {
581                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
582         }
583         backdateBlocks(t, oldUnusedBlockLocators)
584
585         // Run data manager
586         summary.WriteDataTo = writeDataTo
587         collection.HeapProfileFilename = heapProfileFile
588
589         err := singlerun(arv)
590         if !expectError {
591                 if err != nil {
592                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
593                 }
594         } else {
595                 if err == nil {
596                         t.Fatalf("Expected error during datamanager singlerun")
597                 }
598         }
599         waitUntilQueuesFinishWork(t)
600
601         // Get block indexes and verify that all backdated blocks are not/deleted as expected
602         if expectOldBlocks {
603                 verifyBlocks(t, nil, oldUnusedBlockLocators, 2)
604         } else {
605                 verifyBlocks(t, oldUnusedBlockLocators, nil, 2)
606         }
607 }
608
609 // Create a collection with multiple streams and blocks
610 func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, numBlocks int) (string, []string) {
611         defer switchToken(arvadostest.AdminToken)()
612
613         manifest := ""
614         locators := make(map[string]bool)
615         for s := 0; s < numStreams; s++ {
616                 manifest += fmt.Sprintf("./stream%d ", s)
617                 for b := 0; b < numBlocks; b++ {
618                         locator, _, err := keepClient.PutB([]byte(fmt.Sprintf("%s in stream %d and block %d", data, s, b)))
619                         if err != nil {
620                                 t.Fatalf("Error creating block %d in stream %d: %v", b, s, err)
621                         }
622                         locators[strings.Split(locator, "+A")[0]] = true
623                         manifest += locator + " "
624                 }
625                 manifest += "0:1:dummyfile.txt\n"
626         }
627
628         collection := make(Dict)
629         err := arv.Create("collections",
630                 arvadosclient.Dict{"collection": arvadosclient.Dict{"manifest_text": manifest}},
631                 &collection)
632
633         if err != nil {
634                 t.Fatalf("Error creating collection %v", err)
635         }
636
637         var locs []string
638         for k := range locators {
639                 locs = append(locs, k)
640         }
641
642         return collection["uuid"].(string), locs
643 }
644
645 // Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
646 // Also, create stray block and backdate it.
647 // After datamanager run: expect blocks from the collection, but not the stray block.
648 func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
649         testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
650 }
651
652 // Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
653 // keepstore of a service type other than "disk". Only the "disk" type services
654 // will be indexed by datamanager and hence should work the same way.
655 func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
656         testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
657 }
658
659 // Test datamanager with dry-run. Expect no block to be deleted.
660 func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
661         testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
662 }
663
664 func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
665         defer TearDownDataManagerTest(t)
666         SetupDataManagerTest(t)
667
668         // create collection whose blocks will be backdated
669         collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
670         if collectionWithOldBlocks == "" {
671                 t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
672         }
673         if len(oldBlocks) != numStreams*numBlocks {
674                 t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
675         }
676
677         // create a stray block that will be backdated
678         strayOldBlock := putBlock(t, "this stray block is old")
679
680         expected := []string{strayOldBlock}
681         expected = append(expected, oldBlocks...)
682         verifyBlocks(t, nil, expected, 2)
683
684         // Backdate old blocks; but the collection still references these blocks
685         backdateBlocks(t, oldBlocks)
686
687         // also backdate the stray old block
688         backdateBlocks(t, []string{strayOldBlock})
689
690         // If requested, create an extra keepserver with the given type
691         // This should be ignored during indexing and hence not change the datamanager outcome
692         var extraKeepServerUUID string
693         if createExtraKeepServerWithType != "" {
694                 extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
695                 defer deleteExtraKeepServer(extraKeepServerUUID)
696         }
697
698         // run datamanager
699         dryRun = isDryRun
700         dataManagerSingleRun(t)
701
702         if dryRun {
703                 // verify that all blocks, including strayOldBlock, are still to be found
704                 verifyBlocks(t, nil, expected, 2)
705         } else {
706                 // verify that strayOldBlock is not to be found, but the collections blocks are still there
707                 verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
708         }
709 }
710
711 // Add one more keepstore with the given service type
712 func addExtraKeepServer(t *testing.T, serviceType string) string {
713         defer switchToken(arvadostest.AdminToken)()
714
715         extraKeepService := make(arvadosclient.Dict)
716         err := arv.Create("keep_services",
717                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
718                         "service_host":     "localhost",
719                         "service_port":     "21321",
720                         "service_ssl_flag": false,
721                         "service_type":     serviceType}},
722                 &extraKeepService)
723         if err != nil {
724                 t.Fatal(err)
725         }
726
727         return extraKeepService["uuid"].(string)
728 }
729
730 func deleteExtraKeepServer(uuid string) {
731         defer switchToken(arvadostest.AdminToken)()
732         arv.Delete("keep_services", uuid, nil, nil)
733 }