Merge branch 'master' into 7607-getting-started-flag
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/services/datamanager/collection"
10         "git.curoverse.com/arvados.git/services/datamanager/summary"
11         "io/ioutil"
12         "net/http"
13         "os"
14         "os/exec"
15         "regexp"
16         "strings"
17         "testing"
18         "time"
19 )
20
21 var arv arvadosclient.ArvadosClient
22 var keepClient *keepclient.KeepClient
23 var keepServers []string
24
25 func SetupDataManagerTest(t *testing.T) {
26         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
27
28         // start api and keep servers
29         arvadostest.ResetEnv()
30         arvadostest.StartAPI()
31         arvadostest.StartKeep(2, false)
32
33         var err error
34         arv, err = arvadosclient.MakeArvadosClient()
35         if err != nil {
36                 t.Fatalf("Error making arvados client: %s", err)
37         }
38         arv.ApiToken = arvadostest.DataManagerToken
39
40         // keep client
41         keepClient = &keepclient.KeepClient{
42                 Arvados:       &arv,
43                 Want_replicas: 2,
44                 Client:        &http.Client{},
45         }
46
47         // discover keep services
48         if err = keepClient.DiscoverKeepServers(); err != nil {
49                 t.Fatalf("Error discovering keep services: %s", err)
50         }
51         keepServers = []string{}
52         for _, host := range keepClient.LocalRoots() {
53                 keepServers = append(keepServers, host)
54         }
55 }
56
57 func TearDownDataManagerTest(t *testing.T) {
58         arvadostest.StopKeep(2)
59         arvadostest.StopAPI()
60         summary.WriteDataTo = ""
61         collection.HeapProfileFilename = ""
62 }
63
64 func putBlock(t *testing.T, data string) string {
65         locator, _, err := keepClient.PutB([]byte(data))
66         if err != nil {
67                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
68         }
69         if locator == "" {
70                 t.Fatalf("No locator found after putting test data")
71         }
72
73         splits := strings.Split(locator, "+")
74         return splits[0] + "+" + splits[1]
75 }
76
77 func getBlock(t *testing.T, locator string, data string) {
78         reader, blocklen, _, err := keepClient.Get(locator)
79         if err != nil {
80                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
81         }
82         if reader == nil {
83                 t.Fatalf("No reader found after putting test data")
84         }
85         if blocklen != int64(len(data)) {
86                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
87         }
88
89         all, err := ioutil.ReadAll(reader)
90         if string(all) != data {
91                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
92         }
93 }
94
95 // Create a collection using arv-put
96 func createCollection(t *testing.T, data string) string {
97         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
98         defer os.Remove(tempfile.Name())
99
100         _, err = tempfile.Write([]byte(data))
101         if err != nil {
102                 t.Fatalf("Error writing to tempfile %v", err)
103         }
104
105         // arv-put
106         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
107         if err != nil {
108                 t.Fatalf("Error running arv-put %s", err)
109         }
110
111         uuid := string(output[0:27]) // trim terminating char
112         return uuid
113 }
114
115 // Get collection locator
116 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
117
118 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
119         manifest := getCollection(t, uuid)["manifest_text"].(string)
120
121         locator := strings.Split(manifest, " ")[1]
122         match := locatorMatcher.FindStringSubmatch(locator)
123         if match == nil {
124                 t.Fatalf("No locator found in collection manifest %s", manifest)
125         }
126
127         return match[1] + "+" + match[2]
128 }
129
130 func switchToken(t string) func() {
131         orig := arv.ApiToken
132         restore := func() {
133                 arv.ApiToken = orig
134         }
135         arv.ApiToken = t
136         return restore
137 }
138
139 func getCollection(t *testing.T, uuid string) Dict {
140         defer switchToken(arvadostest.AdminToken)()
141
142         getback := make(Dict)
143         err := arv.Get("collections", uuid, nil, &getback)
144         if err != nil {
145                 t.Fatalf("Error getting collection %s", err)
146         }
147         if getback["uuid"] != uuid {
148                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
149         }
150
151         return getback
152 }
153
154 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
155         defer switchToken(arvadostest.AdminToken)()
156
157         err := arv.Update("collections", uuid, arvadosclient.Dict{
158                 "collection": arvadosclient.Dict{
159                         paramName: paramValue,
160                 },
161         }, &arvadosclient.Dict{})
162
163         if err != nil {
164                 t.Fatalf("Error updating collection %s", err)
165         }
166 }
167
168 type Dict map[string]interface{}
169
170 func deleteCollection(t *testing.T, uuid string) {
171         defer switchToken(arvadostest.AdminToken)()
172
173         getback := make(Dict)
174         err := arv.Delete("collections", uuid, nil, &getback)
175         if err != nil {
176                 t.Fatalf("Error deleting collection %s", err)
177         }
178         if getback["uuid"] != uuid {
179                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
180         }
181 }
182
183 func dataManagerSingleRun(t *testing.T) {
184         err := singlerun(arv)
185         if err != nil {
186                 t.Fatalf("Error during singlerun %s", err)
187         }
188 }
189
190 func getBlockIndexesForServer(t *testing.T, i int) []string {
191         var indexes []string
192
193         path := keepServers[i] + "/index"
194         client := http.Client{}
195         req, err := http.NewRequest("GET", path, nil)
196         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
197         req.Header.Add("Content-Type", "application/octet-stream")
198         resp, err := client.Do(req)
199         defer resp.Body.Close()
200
201         if err != nil {
202                 t.Fatalf("Error during %s %s", path, err)
203         }
204
205         body, err := ioutil.ReadAll(resp.Body)
206         if err != nil {
207                 t.Fatalf("Error reading response from %s %s", path, err)
208         }
209
210         lines := strings.Split(string(body), "\n")
211         for _, line := range lines {
212                 indexes = append(indexes, strings.Split(line, " ")...)
213         }
214
215         return indexes
216 }
217
218 func getBlockIndexes(t *testing.T) [][]string {
219         var indexes [][]string
220
221         for i := 0; i < len(keepServers); i++ {
222                 indexes = append(indexes, getBlockIndexesForServer(t, i))
223         }
224         return indexes
225 }
226
227 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
228         blocks := getBlockIndexes(t)
229
230         for _, block := range notExpected {
231                 for _, idx := range blocks {
232                         if valueInArray(block, idx) {
233                                 t.Fatalf("Found unexpected block %s", block)
234                         }
235                 }
236         }
237
238         for _, block := range expected {
239                 nFound := 0
240                 for _, idx := range blocks {
241                         if valueInArray(block, idx) {
242                                 nFound++
243                         }
244                 }
245                 if nFound < minReplication {
246                         t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
247                 }
248         }
249 }
250
251 func valueInArray(value string, list []string) bool {
252         for _, v := range list {
253                 if value == v {
254                         return true
255                 }
256         }
257         return false
258 }
259
260 // Test env uses two keep volumes. The volume names can be found by reading the files
261 // ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
262 //
263 // The keep volumes are of the dir structure: volumeN/subdir/locator
264 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
265         // First get rid of any size hints in the locators
266         var trimmedBlockLocators []string
267         for _, block := range oldUnusedBlockLocators {
268                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
269         }
270
271         // Get the working dir so that we can read keep{n}.volume files
272         wd, err := os.Getwd()
273         if err != nil {
274                 t.Fatalf("Error getting working dir %s", err)
275         }
276
277         // Now cycle through the two keep volumes
278         oldTime := time.Now().AddDate(0, -2, 0)
279         for i := 0; i < 2; i++ {
280                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
281                 volumeDir, err := ioutil.ReadFile(filename)
282                 if err != nil {
283                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
284                 }
285
286                 // Read the keep volume dir structure
287                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
288                 if err != nil {
289                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
290                 }
291
292                 // Read each subdir for each of the keep volume dir
293                 for _, subdir := range volumeContents {
294                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
295                         subdirContents, err := ioutil.ReadDir(string(subdirName))
296                         if err != nil {
297                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
298                         }
299
300                         // Now we got to the files. The files are names are the block locators
301                         for _, fileInfo := range subdirContents {
302                                 blockName := fileInfo.Name()
303                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
304                                 if valueInArray(blockName, trimmedBlockLocators) {
305                                         err = os.Chtimes(myname, oldTime, oldTime)
306                                 }
307                         }
308                 }
309         }
310 }
311
312 func getStatus(t *testing.T, path string) interface{} {
313         client := http.Client{}
314         req, err := http.NewRequest("GET", path, nil)
315         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
316         req.Header.Add("Content-Type", "application/octet-stream")
317         resp, err := client.Do(req)
318         if err != nil {
319                 t.Fatalf("Error during %s %s", path, err)
320         }
321         defer resp.Body.Close()
322
323         var s interface{}
324         json.NewDecoder(resp.Body).Decode(&s)
325
326         return s
327 }
328
329 // Wait until PullQueue and TrashQueue are empty on all keepServers.
330 func waitUntilQueuesFinishWork(t *testing.T) {
331         for _, ks := range keepServers {
332                 for done := false; !done; {
333                         time.Sleep(100 * time.Millisecond)
334                         s := getStatus(t, ks+"/status.json")
335                         for _, qName := range []string{"PullQueue", "TrashQueue"} {
336                                 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
337                                 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
338                                         done = true
339                                 }
340                         }
341                 }
342         }
343 }
344
345 // Create some blocks and backdate some of them.
346 // Also create some collections and delete some of them.
347 // Verify block indexes.
348 func TestPutAndGetBlocks(t *testing.T) {
349         defer TearDownDataManagerTest(t)
350         SetupDataManagerTest(t)
351
352         // Put some blocks which will be backdated later on
353         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
354         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
355         var oldUnusedBlockLocators []string
356         oldUnusedBlockData := "this block will have older mtime"
357         for i := 0; i < 5; i++ {
358                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
359         }
360         for i := 0; i < 5; i++ {
361                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
362         }
363
364         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
365         oldUsedBlockData := "this collection block will have older mtime"
366         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
367         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
368
369         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
370         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
371         var newBlockLocators []string
372         newBlockData := "this block is newer"
373         for i := 0; i < 5; i++ {
374                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
375         }
376         for i := 0; i < 5; i++ {
377                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
378         }
379
380         // Create a collection that would be deleted later on
381         toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
382         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
383
384         // Create another collection that has the same data as the one of the old blocks
385         oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
386         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
387         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
388                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
389         }
390
391         // Create another collection whose replication level will be changed
392         replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
393         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
394
395         // Create two collections with same data; one will be deleted later on
396         dataForTwoCollections := "one of these collections will be deleted"
397         oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
398         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
399         secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
400         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
401         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
402                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
403         }
404
405         // create collection with empty manifest text
406         emptyBlockLocator := putBlock(t, "")
407         emptyCollection := createCollection(t, "")
408
409         // Verify blocks before doing any backdating / deleting.
410         var expected []string
411         expected = append(expected, oldUnusedBlockLocators...)
412         expected = append(expected, newBlockLocators...)
413         expected = append(expected, toBeDeletedCollectionLocator)
414         expected = append(expected, replicationCollectionLocator)
415         expected = append(expected, oneOfTwoWithSameDataLocator)
416         expected = append(expected, secondOfTwoWithSameDataLocator)
417         expected = append(expected, emptyBlockLocator)
418
419         verifyBlocks(t, nil, expected, 2)
420
421         // Run datamanager in singlerun mode
422         dataManagerSingleRun(t)
423         waitUntilQueuesFinishWork(t)
424
425         verifyBlocks(t, nil, expected, 2)
426
427         // Backdate the to-be old blocks and delete the collections
428         backdateBlocks(t, oldUnusedBlockLocators)
429         deleteCollection(t, toBeDeletedCollectionUUID)
430         deleteCollection(t, secondOfTwoWithSameDataUUID)
431         backdateBlocks(t, []string{emptyBlockLocator})
432         deleteCollection(t, emptyCollection)
433
434         // Run data manager again
435         dataManagerSingleRun(t)
436         waitUntilQueuesFinishWork(t)
437
438         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
439         expected = expected[:0]
440         expected = append(expected, oldUsedBlockLocator)
441         expected = append(expected, newBlockLocators...)
442         expected = append(expected, toBeDeletedCollectionLocator)
443         expected = append(expected, oneOfTwoWithSameDataLocator)
444         expected = append(expected, secondOfTwoWithSameDataLocator)
445         expected = append(expected, emptyBlockLocator) // even when unreferenced, this remains
446
447         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
448
449         // Reduce desired replication on replicationCollectionUUID
450         // collection, and verify that Data Manager does not reduce
451         // actual replication any further than that. (It might not
452         // reduce actual replication at all; that's OK for this test.)
453
454         // Reduce desired replication level.
455         updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
456         collection := getCollection(t, replicationCollectionUUID)
457         if collection["replication_desired"].(interface{}) != float64(1) {
458                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
459         }
460
461         // Verify data is currently overreplicated.
462         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
463
464         // Run data manager again
465         dataManagerSingleRun(t)
466         waitUntilQueuesFinishWork(t)
467
468         // Verify data is not underreplicated.
469         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
470
471         // Verify *other* collections' data is not underreplicated.
472         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
473 }
474
475 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
476         defer TearDownDataManagerTest(t)
477         SetupDataManagerTest(t)
478
479         for i := 0; i < 10; i++ {
480                 err := singlerun(arv)
481                 if err != nil {
482                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
483                 }
484         }
485 }
486
487 func TestGetStatusRepeatedly(t *testing.T) {
488         defer TearDownDataManagerTest(t)
489         SetupDataManagerTest(t)
490
491         for i := 0; i < 10; i++ {
492                 for j := 0; j < 2; j++ {
493                         s := getStatus(t, keepServers[j]+"/status.json")
494
495                         var pullQueueStatus interface{}
496                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
497                         var trashQueueStatus interface{}
498                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
499
500                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
501                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
502                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
503                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
504                                 t.Fatalf("PullQueue and TrashQueue status not found")
505                         }
506
507                         time.Sleep(100 * time.Millisecond)
508                 }
509         }
510 }
511
512 func TestRunDatamanagerWithBogusServer(t *testing.T) {
513         defer TearDownDataManagerTest(t)
514         SetupDataManagerTest(t)
515
516         arv.ApiServer = "bogus-server"
517
518         err := singlerun(arv)
519         if err == nil {
520                 t.Fatalf("Expected error during singlerun with bogus server")
521         }
522 }
523
524 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
525         defer TearDownDataManagerTest(t)
526         SetupDataManagerTest(t)
527
528         arv.ApiToken = arvadostest.ActiveToken
529
530         err := singlerun(arv)
531         if err == nil {
532                 t.Fatalf("Expected error during singlerun as non-admin user")
533         }
534 }
535
536 func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
537         testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
538 }
539
540 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
541         testOldBlocksNotDeletedOnDataManagerError(t, "/badwritetofile", "", true, true)
542 }
543
544 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
545         testOldBlocksNotDeletedOnDataManagerError(t, "", "/badheapprofilefile", true, true)
546 }
547
548 // Create some blocks and backdate some of them.
549 // Run datamanager while producing an error condition.
550 // Verify that the blocks are hence not deleted.
551 func testOldBlocksNotDeletedOnDataManagerError(t *testing.T, writeDataTo string, heapProfileFile string, expectError bool, expectOldBlocks bool) {
552         defer TearDownDataManagerTest(t)
553         SetupDataManagerTest(t)
554
555         // Put some blocks and backdate them.
556         var oldUnusedBlockLocators []string
557         oldUnusedBlockData := "this block will have older mtime"
558         for i := 0; i < 5; i++ {
559                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
560         }
561         backdateBlocks(t, oldUnusedBlockLocators)
562
563         // Run data manager
564         summary.WriteDataTo = writeDataTo
565         collection.HeapProfileFilename = heapProfileFile
566
567         err := singlerun(arv)
568         if !expectError {
569                 if err != nil {
570                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
571                 }
572         } else {
573                 if err == nil {
574                         t.Fatalf("Expected error during datamanager singlerun")
575                 }
576         }
577         waitUntilQueuesFinishWork(t)
578
579         // Get block indexes and verify that all backdated blocks are not/deleted as expected
580         if expectOldBlocks {
581                 verifyBlocks(t, nil, oldUnusedBlockLocators, 2)
582         } else {
583                 verifyBlocks(t, oldUnusedBlockLocators, nil, 2)
584         }
585 }
586
587 // Create a collection with multiple streams and blocks
588 func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, numBlocks int) (string, []string) {
589         defer switchToken(arvadostest.AdminToken)()
590
591         manifest := ""
592         locators := make(map[string]bool)
593         for s := 0; s < numStreams; s++ {
594                 manifest += fmt.Sprintf("./stream%d ", s)
595                 for b := 0; b < numBlocks; b++ {
596                         locator, _, err := keepClient.PutB([]byte(fmt.Sprintf("%s in stream %d and block %d", data, s, b)))
597                         if err != nil {
598                                 t.Fatalf("Error creating block %d in stream %d: %v", b, s, err)
599                         }
600                         locators[strings.Split(locator, "+A")[0]] = true
601                         manifest += locator + " "
602                 }
603                 manifest += "0:1:dummyfile.txt\n"
604         }
605
606         collection := make(Dict)
607         err := arv.Create("collections",
608                 arvadosclient.Dict{"collection": arvadosclient.Dict{"manifest_text": manifest}},
609                 &collection)
610
611         if err != nil {
612                 t.Fatalf("Error creating collection %v", err)
613         }
614
615         var locs []string
616         for k := range locators {
617                 locs = append(locs, k)
618         }
619
620         return collection["uuid"].(string), locs
621 }
622
623 // Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
624 // Also, create stray block and backdate it.
625 // After datamanager run: expect blocks from the collection, but not the stray block.
626 func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
627         testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
628 }
629
630 // Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
631 // keepstore of a service type other than "disk". Only the "disk" type services
632 // will be indexed by datamanager and hence should work the same way.
633 func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
634         testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
635 }
636
637 // Test datamanager with dry-run. Expect no block to be deleted.
638 func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
639         testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
640 }
641
642 func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
643         defer TearDownDataManagerTest(t)
644         SetupDataManagerTest(t)
645
646         // create collection whose blocks will be backdated
647         collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
648         if collectionWithOldBlocks == "" {
649                 t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
650         }
651         if len(oldBlocks) != numStreams*numBlocks {
652                 t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
653         }
654
655         // create a stray block that will be backdated
656         strayOldBlock := putBlock(t, "this stray block is old")
657
658         expected := []string{strayOldBlock}
659         expected = append(expected, oldBlocks...)
660         verifyBlocks(t, nil, expected, 2)
661
662         // Backdate old blocks; but the collection still references these blocks
663         backdateBlocks(t, oldBlocks)
664
665         // also backdate the stray old block
666         backdateBlocks(t, []string{strayOldBlock})
667
668         // If requested, create an extra keepserver with the given type
669         // This should be ignored during indexing and hence not change the datamanager outcome
670         var extraKeepServerUUID string
671         if createExtraKeepServerWithType != "" {
672                 extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
673                 defer deleteExtraKeepServer(extraKeepServerUUID)
674         }
675
676         // run datamanager
677         dryRun = isDryRun
678         dataManagerSingleRun(t)
679
680         if dryRun {
681                 // verify that all blocks, including strayOldBlock, are still to be found
682                 verifyBlocks(t, nil, expected, 2)
683         } else {
684                 // verify that strayOldBlock is not to be found, but the collections blocks are still there
685                 verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
686         }
687 }
688
689 // Add one more keepstore with the given service type
690 func addExtraKeepServer(t *testing.T, serviceType string) string {
691         defer switchToken(arvadostest.AdminToken)()
692
693         extraKeepService := make(arvadosclient.Dict)
694         err := arv.Create("keep_services",
695                 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
696                         "service_host":     "localhost",
697                         "service_port":     "21321",
698                         "service_ssl_flag": false,
699                         "service_type":     serviceType}},
700                 &extraKeepService)
701         if err != nil {
702                 t.Fatal(err)
703         }
704
705         return extraKeepService["uuid"].(string)
706 }
707
708 func deleteExtraKeepServer(uuid string) {
709         defer switchToken(arvadostest.AdminToken)()
710         arv.Delete("keep_services", uuid, nil, nil)
711 }