6 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7 "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "git.curoverse.com/arvados.git/services/datamanager/collection"
10 "git.curoverse.com/arvados.git/services/datamanager/summary"
21 var arv arvadosclient.ArvadosClient
22 var keepClient *keepclient.KeepClient
23 var keepServers []string
25 func SetupDataManagerTest(t *testing.T) {
26 os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
28 // start api and keep servers
29 arvadostest.ResetEnv()
30 arvadostest.StartAPI()
31 arvadostest.StartKeep(2, false)
34 arv, err = arvadosclient.MakeArvadosClient()
36 t.Fatalf("Error making arvados client: %s", err)
38 arv.ApiToken = arvadostest.DataManagerToken
41 keepClient = &keepclient.KeepClient{
44 Client: &http.Client{},
47 // discover keep services
48 if err = keepClient.DiscoverKeepServers(); err != nil {
49 t.Fatalf("Error discovering keep services: %s", err)
51 keepServers = []string{}
52 for _, host := range keepClient.LocalRoots() {
53 keepServers = append(keepServers, host)
57 func TearDownDataManagerTest(t *testing.T) {
58 arvadostest.StopKeep(2)
60 summary.WriteDataTo = ""
61 collection.HeapProfileFilename = ""
64 func putBlock(t *testing.T, data string) string {
65 locator, _, err := keepClient.PutB([]byte(data))
67 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
70 t.Fatalf("No locator found after putting test data")
73 splits := strings.Split(locator, "+")
74 return splits[0] + "+" + splits[1]
77 func getBlock(t *testing.T, locator string, data string) {
78 reader, blocklen, _, err := keepClient.Get(locator)
80 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
83 t.Fatalf("No reader found after putting test data")
85 if blocklen != int64(len(data)) {
86 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
89 all, err := ioutil.ReadAll(reader)
90 if string(all) != data {
91 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
95 // Create a collection using arv-put
96 func createCollection(t *testing.T, data string) string {
97 tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
98 defer os.Remove(tempfile.Name())
100 _, err = tempfile.Write([]byte(data))
102 t.Fatalf("Error writing to tempfile %v", err)
106 output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
108 t.Fatalf("Error running arv-put %s", err)
111 uuid := string(output[0:27]) // trim terminating char
115 // Get collection locator
116 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
118 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
119 manifest := getCollection(t, uuid)["manifest_text"].(string)
121 locator := strings.Split(manifest, " ")[1]
122 match := locatorMatcher.FindStringSubmatch(locator)
124 t.Fatalf("No locator found in collection manifest %s", manifest)
127 return match[1] + "+" + match[2]
130 func switchToken(t string) func() {
139 func getCollection(t *testing.T, uuid string) Dict {
140 defer switchToken(arvadostest.AdminToken)()
142 getback := make(Dict)
143 err := arv.Get("collections", uuid, nil, &getback)
145 t.Fatalf("Error getting collection %s", err)
147 if getback["uuid"] != uuid {
148 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
154 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
155 defer switchToken(arvadostest.AdminToken)()
157 err := arv.Update("collections", uuid, arvadosclient.Dict{
158 "collection": arvadosclient.Dict{
159 paramName: paramValue,
161 }, &arvadosclient.Dict{})
164 t.Fatalf("Error updating collection %s", err)
168 type Dict map[string]interface{}
170 func deleteCollection(t *testing.T, uuid string) {
171 defer switchToken(arvadostest.AdminToken)()
173 getback := make(Dict)
174 err := arv.Delete("collections", uuid, nil, &getback)
176 t.Fatalf("Error deleting collection %s", err)
178 if getback["uuid"] != uuid {
179 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
183 func dataManagerSingleRun(t *testing.T) {
184 err := singlerun(arv)
186 t.Fatalf("Error during singlerun %s", err)
190 func getBlockIndexesForServer(t *testing.T, i int) []string {
193 path := keepServers[i] + "/index"
194 client := http.Client{}
195 req, err := http.NewRequest("GET", path, nil)
196 req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
197 req.Header.Add("Content-Type", "application/octet-stream")
198 resp, err := client.Do(req)
199 defer resp.Body.Close()
202 t.Fatalf("Error during %s %s", path, err)
205 body, err := ioutil.ReadAll(resp.Body)
207 t.Fatalf("Error reading response from %s %s", path, err)
210 lines := strings.Split(string(body), "\n")
211 for _, line := range lines {
212 indexes = append(indexes, strings.Split(line, " ")...)
218 func getBlockIndexes(t *testing.T) [][]string {
219 var indexes [][]string
221 for i := 0; i < len(keepServers); i++ {
222 indexes = append(indexes, getBlockIndexesForServer(t, i))
227 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
228 blocks := getBlockIndexes(t)
230 for _, block := range notExpected {
231 for _, idx := range blocks {
232 if valueInArray(block, idx) {
233 t.Fatalf("Found unexpected block %s", block)
238 for _, block := range expected {
240 for _, idx := range blocks {
241 if valueInArray(block, idx) {
245 if nFound < minReplication {
246 t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
251 func valueInArray(value string, list []string) bool {
252 for _, v := range list {
260 // Test env uses two keep volumes. The volume names can be found by reading the files
261 // ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
263 // The keep volumes are of the dir structure: volumeN/subdir/locator
264 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
265 // First get rid of any size hints in the locators
266 var trimmedBlockLocators []string
267 for _, block := range oldUnusedBlockLocators {
268 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
271 // Get the working dir so that we can read keep{n}.volume files
272 wd, err := os.Getwd()
274 t.Fatalf("Error getting working dir %s", err)
277 // Now cycle through the two keep volumes
278 oldTime := time.Now().AddDate(0, -2, 0)
279 for i := 0; i < 2; i++ {
280 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
281 volumeDir, err := ioutil.ReadFile(filename)
283 t.Fatalf("Error reading keep volume file %s %s", filename, err)
286 // Read the keep volume dir structure
287 volumeContents, err := ioutil.ReadDir(string(volumeDir))
289 t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
292 // Read each subdir for each of the keep volume dir
293 for _, subdir := range volumeContents {
294 subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
295 subdirContents, err := ioutil.ReadDir(string(subdirName))
297 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
300 // Now we got to the files. The files are names are the block locators
301 for _, fileInfo := range subdirContents {
302 blockName := fileInfo.Name()
303 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
304 if valueInArray(blockName, trimmedBlockLocators) {
305 err = os.Chtimes(myname, oldTime, oldTime)
312 func getStatus(t *testing.T, path string) interface{} {
313 client := http.Client{}
314 req, err := http.NewRequest("GET", path, nil)
315 req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
316 req.Header.Add("Content-Type", "application/octet-stream")
317 resp, err := client.Do(req)
319 t.Fatalf("Error during %s %s", path, err)
321 defer resp.Body.Close()
324 json.NewDecoder(resp.Body).Decode(&s)
329 // Wait until PullQueue and TrashQueue are empty on all keepServers.
330 func waitUntilQueuesFinishWork(t *testing.T) {
331 for _, ks := range keepServers {
332 for done := false; !done; {
333 time.Sleep(100 * time.Millisecond)
334 s := getStatus(t, ks+"/status.json")
335 for _, qName := range []string{"PullQueue", "TrashQueue"} {
336 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
337 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
345 // Create some blocks and backdate some of them.
346 // Also create some collections and delete some of them.
347 // Verify block indexes.
348 func TestPutAndGetBlocks(t *testing.T) {
349 defer TearDownDataManagerTest(t)
350 SetupDataManagerTest(t)
352 // Put some blocks which will be backdated later on
353 // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
354 // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
355 var oldUnusedBlockLocators []string
356 oldUnusedBlockData := "this block will have older mtime"
357 for i := 0; i < 5; i++ {
358 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
360 for i := 0; i < 5; i++ {
361 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
364 // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
365 oldUsedBlockData := "this collection block will have older mtime"
366 oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
367 getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
369 // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
370 // Hence, even though unreferenced, these should not be deleted when datamanager runs.
371 var newBlockLocators []string
372 newBlockData := "this block is newer"
373 for i := 0; i < 5; i++ {
374 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
376 for i := 0; i < 5; i++ {
377 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
380 // Create a collection that would be deleted later on
381 toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
382 toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
384 // Create another collection that has the same data as the one of the old blocks
385 oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
386 oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
387 if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
388 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
391 // Create another collection whose replication level will be changed
392 replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
393 replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
395 // Create two collections with same data; one will be deleted later on
396 dataForTwoCollections := "one of these collections will be deleted"
397 oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
398 oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
399 secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
400 secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
401 if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
402 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
405 // create collection with empty manifest text
406 emptyBlockLocator := putBlock(t, "")
407 emptyCollection := createCollection(t, "")
409 // Verify blocks before doing any backdating / deleting.
410 var expected []string
411 expected = append(expected, oldUnusedBlockLocators...)
412 expected = append(expected, newBlockLocators...)
413 expected = append(expected, toBeDeletedCollectionLocator)
414 expected = append(expected, replicationCollectionLocator)
415 expected = append(expected, oneOfTwoWithSameDataLocator)
416 expected = append(expected, secondOfTwoWithSameDataLocator)
417 expected = append(expected, emptyBlockLocator)
419 verifyBlocks(t, nil, expected, 2)
421 // Run datamanager in singlerun mode
422 dataManagerSingleRun(t)
423 waitUntilQueuesFinishWork(t)
425 verifyBlocks(t, nil, expected, 2)
427 // Backdate the to-be old blocks and delete the collections
428 backdateBlocks(t, oldUnusedBlockLocators)
429 deleteCollection(t, toBeDeletedCollectionUUID)
430 deleteCollection(t, secondOfTwoWithSameDataUUID)
431 backdateBlocks(t, []string{emptyBlockLocator})
432 deleteCollection(t, emptyCollection)
434 // Run data manager again
435 dataManagerSingleRun(t)
436 waitUntilQueuesFinishWork(t)
438 // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
439 expected = expected[:0]
440 expected = append(expected, oldUsedBlockLocator)
441 expected = append(expected, newBlockLocators...)
442 expected = append(expected, toBeDeletedCollectionLocator)
443 expected = append(expected, oneOfTwoWithSameDataLocator)
444 expected = append(expected, secondOfTwoWithSameDataLocator)
445 expected = append(expected, emptyBlockLocator) // even when unreferenced, this remains
447 verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
449 // Reduce desired replication on replicationCollectionUUID
450 // collection, and verify that Data Manager does not reduce
451 // actual replication any further than that. (It might not
452 // reduce actual replication at all; that's OK for this test.)
454 // Reduce desired replication level.
455 updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
456 collection := getCollection(t, replicationCollectionUUID)
457 if collection["replication_desired"].(interface{}) != float64(1) {
458 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
461 // Verify data is currently overreplicated.
462 verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
464 // Run data manager again
465 dataManagerSingleRun(t)
466 waitUntilQueuesFinishWork(t)
468 // Verify data is not underreplicated.
469 verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
471 // Verify *other* collections' data is not underreplicated.
472 verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
475 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
476 defer TearDownDataManagerTest(t)
477 SetupDataManagerTest(t)
479 for i := 0; i < 10; i++ {
480 err := singlerun(arv)
482 t.Fatalf("Got an error during datamanager singlerun: %v", err)
487 func TestGetStatusRepeatedly(t *testing.T) {
488 defer TearDownDataManagerTest(t)
489 SetupDataManagerTest(t)
491 for i := 0; i < 10; i++ {
492 for j := 0; j < 2; j++ {
493 s := getStatus(t, keepServers[j]+"/status.json")
495 var pullQueueStatus interface{}
496 pullQueueStatus = s.(map[string]interface{})["PullQueue"]
497 var trashQueueStatus interface{}
498 trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
500 if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
501 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
502 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
503 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
504 t.Fatalf("PullQueue and TrashQueue status not found")
507 time.Sleep(100 * time.Millisecond)
512 func TestRunDatamanagerWithBogusServer(t *testing.T) {
513 defer TearDownDataManagerTest(t)
514 SetupDataManagerTest(t)
516 arv.ApiServer = "bogus-server"
518 err := singlerun(arv)
520 t.Fatalf("Expected error during singlerun with bogus server")
524 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
525 defer TearDownDataManagerTest(t)
526 SetupDataManagerTest(t)
528 arv.ApiToken = arvadostest.ActiveToken
530 err := singlerun(arv)
532 t.Fatalf("Expected error during singlerun as non-admin user")
536 func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
537 testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
540 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
541 testOldBlocksNotDeletedOnDataManagerError(t, "/badwritetofile", "", true, true)
544 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
545 testOldBlocksNotDeletedOnDataManagerError(t, "", "/badheapprofilefile", true, true)
548 // Create some blocks and backdate some of them.
549 // Run datamanager while producing an error condition.
550 // Verify that the blocks are hence not deleted.
551 func testOldBlocksNotDeletedOnDataManagerError(t *testing.T, writeDataTo string, heapProfileFile string, expectError bool, expectOldBlocks bool) {
552 defer TearDownDataManagerTest(t)
553 SetupDataManagerTest(t)
555 // Put some blocks and backdate them.
556 var oldUnusedBlockLocators []string
557 oldUnusedBlockData := "this block will have older mtime"
558 for i := 0; i < 5; i++ {
559 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
561 backdateBlocks(t, oldUnusedBlockLocators)
564 summary.WriteDataTo = writeDataTo
565 collection.HeapProfileFilename = heapProfileFile
567 err := singlerun(arv)
570 t.Fatalf("Got an error during datamanager singlerun: %v", err)
574 t.Fatalf("Expected error during datamanager singlerun")
577 waitUntilQueuesFinishWork(t)
579 // Get block indexes and verify that all backdated blocks are not/deleted as expected
581 verifyBlocks(t, nil, oldUnusedBlockLocators, 2)
583 verifyBlocks(t, oldUnusedBlockLocators, nil, 2)
587 // Create a collection with multiple streams and blocks
588 func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, numBlocks int) (string, []string) {
589 defer switchToken(arvadostest.AdminToken)()
592 locators := make(map[string]bool)
593 for s := 0; s < numStreams; s++ {
594 manifest += fmt.Sprintf("./stream%d ", s)
595 for b := 0; b < numBlocks; b++ {
596 locator, _, err := keepClient.PutB([]byte(fmt.Sprintf("%s in stream %d and block %d", data, s, b)))
598 t.Fatalf("Error creating block %d in stream %d: %v", b, s, err)
600 locators[strings.Split(locator, "+A")[0]] = true
601 manifest += locator + " "
603 manifest += "0:1:dummyfile.txt\n"
606 collection := make(Dict)
607 err := arv.Create("collections",
608 arvadosclient.Dict{"collection": arvadosclient.Dict{"manifest_text": manifest}},
612 t.Fatalf("Error creating collection %v", err)
616 for k := range locators {
617 locs = append(locs, k)
620 return collection["uuid"].(string), locs
623 // Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
624 // Also, create stray block and backdate it.
625 // After datamanager run: expect blocks from the collection, but not the stray block.
626 func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
627 testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
630 // Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
631 // keepstore of a service type other than "disk". Only the "disk" type services
632 // will be indexed by datamanager and hence should work the same way.
633 func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
634 testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
637 // Test datamanager with dry-run. Expect no block to be deleted.
638 func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
639 testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
642 func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
643 defer TearDownDataManagerTest(t)
644 SetupDataManagerTest(t)
646 // create collection whose blocks will be backdated
647 collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
648 if collectionWithOldBlocks == "" {
649 t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
651 if len(oldBlocks) != numStreams*numBlocks {
652 t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
655 // create a stray block that will be backdated
656 strayOldBlock := putBlock(t, "this stray block is old")
658 expected := []string{strayOldBlock}
659 expected = append(expected, oldBlocks...)
660 verifyBlocks(t, nil, expected, 2)
662 // Backdate old blocks; but the collection still references these blocks
663 backdateBlocks(t, oldBlocks)
665 // also backdate the stray old block
666 backdateBlocks(t, []string{strayOldBlock})
668 // If requested, create an extra keepserver with the given type
669 // This should be ignored during indexing and hence not change the datamanager outcome
670 var extraKeepServerUUID string
671 if createExtraKeepServerWithType != "" {
672 extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
673 defer deleteExtraKeepServer(extraKeepServerUUID)
678 dataManagerSingleRun(t)
681 // verify that all blocks, including strayOldBlock, are still to be found
682 verifyBlocks(t, nil, expected, 2)
684 // verify that strayOldBlock is not to be found, but the collections blocks are still there
685 verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
689 // Add one more keepstore with the given service type
690 func addExtraKeepServer(t *testing.T, serviceType string) string {
691 defer switchToken(arvadostest.AdminToken)()
693 extraKeepService := make(arvadosclient.Dict)
694 err := arv.Create("keep_services",
695 arvadosclient.Dict{"keep_service": arvadosclient.Dict{
696 "service_host": "localhost",
697 "service_port": "21321",
698 "service_ssl_flag": false,
699 "service_type": serviceType}},
705 return extraKeepService["uuid"].(string)
708 func deleteExtraKeepServer(uuid string) {
709 defer switchToken(arvadostest.AdminToken)()
710 arv.Delete("keep_services", uuid, nil, nil)