7490: use loggerutil to log any datamanager errors.
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/services/datamanager/collection"
10         "git.curoverse.com/arvados.git/services/datamanager/summary"
11         "io/ioutil"
12         "net/http"
13         "os"
14         "os/exec"
15         "regexp"
16         "strings"
17         "testing"
18         "time"
19 )
20
21 var arv arvadosclient.ArvadosClient
22 var keepClient *keepclient.KeepClient
23 var keepServers []string
24
25 func SetupDataManagerTest(t *testing.T) {
26         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
27
28         // start api and keep servers
29         arvadostest.ResetEnv()
30         arvadostest.StartAPI()
31         arvadostest.StartKeep(2, false)
32
33         var err error
34         arv, err = arvadosclient.MakeArvadosClient()
35         if err != nil {
36                 t.Fatalf("Error making arvados client: %s", err)
37         }
38         arv.ApiToken = arvadostest.DataManagerToken
39
40         // keep client
41         keepClient = &keepclient.KeepClient{
42                 Arvados:       &arv,
43                 Want_replicas: 2,
44                 Using_proxy:   true,
45                 Client:        &http.Client{},
46         }
47
48         // discover keep services
49         if err = keepClient.DiscoverKeepServers(); err != nil {
50                 t.Fatalf("Error discovering keep services: %s", err)
51         }
52         keepServers = []string{}
53         for _, host := range keepClient.LocalRoots() {
54                 keepServers = append(keepServers, host)
55         }
56 }
57
58 func TearDownDataManagerTest(t *testing.T) {
59         arvadostest.StopKeep(2)
60         arvadostest.StopAPI()
61 }
62
63 func putBlock(t *testing.T, data string) string {
64         locator, _, err := keepClient.PutB([]byte(data))
65         if err != nil {
66                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
67         }
68         if locator == "" {
69                 t.Fatalf("No locator found after putting test data")
70         }
71
72         splits := strings.Split(locator, "+")
73         return splits[0] + "+" + splits[1]
74 }
75
76 func getBlock(t *testing.T, locator string, data string) {
77         reader, blocklen, _, err := keepClient.Get(locator)
78         if err != nil {
79                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
80         }
81         if reader == nil {
82                 t.Fatalf("No reader found after putting test data")
83         }
84         if blocklen != int64(len(data)) {
85                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
86         }
87
88         all, err := ioutil.ReadAll(reader)
89         if string(all) != data {
90                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
91         }
92 }
93
94 // Create a collection using arv-put
95 func createCollection(t *testing.T, data string) string {
96         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
97         defer os.Remove(tempfile.Name())
98
99         _, err = tempfile.Write([]byte(data))
100         if err != nil {
101                 t.Fatalf("Error writing to tempfile %v", err)
102         }
103
104         // arv-put
105         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
106         if err != nil {
107                 t.Fatalf("Error running arv-put %s", err)
108         }
109
110         uuid := string(output[0:27]) // trim terminating char
111         return uuid
112 }
113
114 // Get collection locator
115 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
116
117 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
118         manifest := getCollection(t, uuid)["manifest_text"].(string)
119
120         locator := strings.Split(manifest, " ")[1]
121         match := locatorMatcher.FindStringSubmatch(locator)
122         if match == nil {
123                 t.Fatalf("No locator found in collection manifest %s", manifest)
124         }
125
126         return match[1] + "+" + match[2]
127 }
128
129 func switchToken(t string) func() {
130         orig := arv.ApiToken
131         restore := func() {
132                 arv.ApiToken = orig
133         }
134         arv.ApiToken = t
135         return restore
136 }
137
138 func getCollection(t *testing.T, uuid string) Dict {
139         defer switchToken(arvadostest.AdminToken)()
140
141         getback := make(Dict)
142         err := arv.Get("collections", uuid, nil, &getback)
143         if err != nil {
144                 t.Fatalf("Error getting collection %s", err)
145         }
146         if getback["uuid"] != uuid {
147                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
148         }
149
150         return getback
151 }
152
153 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
154         defer switchToken(arvadostest.AdminToken)()
155
156         err := arv.Update("collections", uuid, arvadosclient.Dict{
157                 "collection": arvadosclient.Dict{
158                         paramName: paramValue,
159                 },
160         }, &arvadosclient.Dict{})
161
162         if err != nil {
163                 t.Fatalf("Error updating collection %s", err)
164         }
165 }
166
167 type Dict map[string]interface{}
168
169 func deleteCollection(t *testing.T, uuid string) {
170         defer switchToken(arvadostest.AdminToken)()
171
172         getback := make(Dict)
173         err := arv.Delete("collections", uuid, nil, &getback)
174         if err != nil {
175                 t.Fatalf("Error deleting collection %s", err)
176         }
177         if getback["uuid"] != uuid {
178                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
179         }
180 }
181
182 func dataManagerSingleRun(t *testing.T) {
183         err := singlerun(arv)
184         if err != nil {
185                 t.Fatalf("Error during singlerun %s", err)
186         }
187 }
188
189 func getBlockIndexesForServer(t *testing.T, i int) []string {
190         var indexes []string
191
192         path := keepServers[i] + "/index"
193         client := http.Client{}
194         req, err := http.NewRequest("GET", path, nil)
195         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
196         req.Header.Add("Content-Type", "application/octet-stream")
197         resp, err := client.Do(req)
198         defer resp.Body.Close()
199
200         if err != nil {
201                 t.Fatalf("Error during %s %s", path, err)
202         }
203
204         body, err := ioutil.ReadAll(resp.Body)
205         if err != nil {
206                 t.Fatalf("Error reading response from %s %s", path, err)
207         }
208
209         lines := strings.Split(string(body), "\n")
210         for _, line := range lines {
211                 indexes = append(indexes, strings.Split(line, " ")...)
212         }
213
214         return indexes
215 }
216
217 func getBlockIndexes(t *testing.T) [][]string {
218         var indexes [][]string
219
220         for i := 0; i < len(keepServers); i++ {
221                 indexes = append(indexes, getBlockIndexesForServer(t, i))
222         }
223         return indexes
224 }
225
226 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
227         blocks := getBlockIndexes(t)
228
229         for _, block := range notExpected {
230                 for _, idx := range blocks {
231                         if valueInArray(block, idx) {
232                                 t.Fatalf("Found unexpected block %s", block)
233                         }
234                 }
235         }
236
237         for _, block := range expected {
238                 nFound := 0
239                 for _, idx := range blocks {
240                         if valueInArray(block, idx) {
241                                 nFound++
242                         }
243                 }
244                 if nFound < minReplication {
245                         t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
246                 }
247         }
248 }
249
250 func valueInArray(value string, list []string) bool {
251         for _, v := range list {
252                 if value == v {
253                         return true
254                 }
255         }
256         return false
257 }
258
259 /*
260 Test env uses two keep volumes. The volume names can be found by reading the files
261   ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
262
263 The keep volumes are of the dir structure:
264   volumeN/subdir/locator
265 */
266 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
267         // First get rid of any size hints in the locators
268         var trimmedBlockLocators []string
269         for _, block := range oldUnusedBlockLocators {
270                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
271         }
272
273         // Get the working dir so that we can read keep{n}.volume files
274         wd, err := os.Getwd()
275         if err != nil {
276                 t.Fatalf("Error getting working dir %s", err)
277         }
278
279         // Now cycle through the two keep volumes
280         oldTime := time.Now().AddDate(0, -2, 0)
281         for i := 0; i < 2; i++ {
282                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
283                 volumeDir, err := ioutil.ReadFile(filename)
284                 if err != nil {
285                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
286                 }
287
288                 // Read the keep volume dir structure
289                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
290                 if err != nil {
291                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
292                 }
293
294                 // Read each subdir for each of the keep volume dir
295                 for _, subdir := range volumeContents {
296                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
297                         subdirContents, err := ioutil.ReadDir(string(subdirName))
298                         if err != nil {
299                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
300                         }
301
302                         // Now we got to the files. The files are names are the block locators
303                         for _, fileInfo := range subdirContents {
304                                 blockName := fileInfo.Name()
305                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
306                                 if valueInArray(blockName, trimmedBlockLocators) {
307                                         err = os.Chtimes(myname, oldTime, oldTime)
308                                 }
309                         }
310                 }
311         }
312 }
313
314 func getStatus(t *testing.T, path string) interface{} {
315         client := http.Client{}
316         req, err := http.NewRequest("GET", path, nil)
317         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
318         req.Header.Add("Content-Type", "application/octet-stream")
319         resp, err := client.Do(req)
320         if err != nil {
321                 t.Fatalf("Error during %s %s", path, err)
322         }
323         defer resp.Body.Close()
324
325         var s interface{}
326         json.NewDecoder(resp.Body).Decode(&s)
327
328         return s
329 }
330
331 // Wait until PullQueue and TrashQueue are empty on all keepServers.
332 func waitUntilQueuesFinishWork(t *testing.T) {
333         for _, ks := range keepServers {
334                 for done := false; !done; {
335                         time.Sleep(100 * time.Millisecond)
336                         s := getStatus(t, ks+"/status.json")
337                         for _, qName := range []string{"PullQueue", "TrashQueue"} {
338                                 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
339                                 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
340                                         done = true
341                                 }
342                         }
343                 }
344         }
345 }
346
347 /*
348 Create some blocks and backdate some of them.
349 Also create some collections and delete some of them.
350 Verify block indexes.
351 */
352 func TestPutAndGetBlocks(t *testing.T) {
353         defer TearDownDataManagerTest(t)
354         SetupDataManagerTest(t)
355
356         // Put some blocks which will be backdated later on
357         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
358         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
359         var oldUnusedBlockLocators []string
360         oldUnusedBlockData := "this block will have older mtime"
361         for i := 0; i < 5; i++ {
362                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
363         }
364         for i := 0; i < 5; i++ {
365                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
366         }
367
368         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
369         oldUsedBlockData := "this collection block will have older mtime"
370         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
371         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
372
373         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
374         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
375         var newBlockLocators []string
376         newBlockData := "this block is newer"
377         for i := 0; i < 5; i++ {
378                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
379         }
380         for i := 0; i < 5; i++ {
381                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
382         }
383
384         // Create a collection that would be deleted later on
385         toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
386         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
387
388         // Create another collection that has the same data as the one of the old blocks
389         oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
390         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
391         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
392                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
393         }
394
395         // Create another collection whose replication level will be changed
396         replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
397         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
398
399         // Create two collections with same data; one will be deleted later on
400         dataForTwoCollections := "one of these collections will be deleted"
401         oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
402         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
403         secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
404         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
405         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
406                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
407         }
408
409         // Verify blocks before doing any backdating / deleting.
410         var expected []string
411         expected = append(expected, oldUnusedBlockLocators...)
412         expected = append(expected, newBlockLocators...)
413         expected = append(expected, toBeDeletedCollectionLocator)
414         expected = append(expected, replicationCollectionLocator)
415         expected = append(expected, oneOfTwoWithSameDataLocator)
416         expected = append(expected, secondOfTwoWithSameDataLocator)
417
418         verifyBlocks(t, nil, expected, 2)
419
420         // Run datamanager in singlerun mode
421         dataManagerSingleRun(t)
422         waitUntilQueuesFinishWork(t)
423
424         verifyBlocks(t, nil, expected, 2)
425
426         // Backdate the to-be old blocks and delete the collections
427         backdateBlocks(t, oldUnusedBlockLocators)
428         deleteCollection(t, toBeDeletedCollectionUUID)
429         deleteCollection(t, secondOfTwoWithSameDataUUID)
430
431         // Run data manager again
432         dataManagerSingleRun(t)
433         waitUntilQueuesFinishWork(t)
434
435         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
436         expected = expected[:0]
437         expected = append(expected, oldUsedBlockLocator)
438         expected = append(expected, newBlockLocators...)
439         expected = append(expected, toBeDeletedCollectionLocator)
440         expected = append(expected, oneOfTwoWithSameDataLocator)
441         expected = append(expected, secondOfTwoWithSameDataLocator)
442
443         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
444
445         // Reduce desired replication on replicationCollectionUUID
446         // collection, and verify that Data Manager does not reduce
447         // actual replication any further than that. (It might not
448         // reduce actual replication at all; that's OK for this test.)
449
450         // Reduce desired replication level.
451         updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
452         collection := getCollection(t, replicationCollectionUUID)
453         if collection["replication_desired"].(interface{}) != float64(1) {
454                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
455         }
456
457         // Verify data is currently overreplicated.
458         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
459
460         // Run data manager again
461         dataManagerSingleRun(t)
462         waitUntilQueuesFinishWork(t)
463
464         // Verify data is not underreplicated.
465         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
466
467         // Verify *other* collections' data is not underreplicated.
468         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
469 }
470
471 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
472         defer TearDownDataManagerTest(t)
473         SetupDataManagerTest(t)
474
475         for i := 0; i < 10; i++ {
476                 err := singlerun(arv)
477                 if err != nil {
478                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
479                 }
480         }
481 }
482
483 func TestGetStatusRepeatedly(t *testing.T) {
484         defer TearDownDataManagerTest(t)
485         SetupDataManagerTest(t)
486
487         for i := 0; i < 10; i++ {
488                 for j := 0; j < 2; j++ {
489                         s := getStatus(t, keepServers[j]+"/status.json")
490
491                         var pullQueueStatus interface{}
492                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
493                         var trashQueueStatus interface{}
494                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
495
496                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
497                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
498                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
499                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
500                                 t.Fatalf("PullQueue and TrashQueue status not found")
501                         }
502
503                         time.Sleep(100 * time.Millisecond)
504                 }
505         }
506 }
507
508 func TestRunDatamanagerWithBogusServer(t *testing.T) {
509         defer TearDownDataManagerTest(t)
510         SetupDataManagerTest(t)
511
512         arv.ApiServer = "bogus-server"
513
514         err := singlerun(arv)
515         if err == nil {
516                 t.Fatalf("Expected error during singlerun with bogus server")
517         }
518 }
519
520 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
521         defer TearDownDataManagerTest(t)
522         SetupDataManagerTest(t)
523
524         arv.ApiToken = arvadostest.ActiveToken
525
526         err := singlerun(arv)
527         if err == nil {
528                 t.Fatalf("Expected error during singlerun as non-admin user")
529         }
530 }
531
532 func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
533         testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
534 }
535
536 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
537         testOldBlocksNotDeletedOnDataManagerError(t, "/badwritetofile", "", true, true)
538 }
539
540 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
541         testOldBlocksNotDeletedOnDataManagerError(t, "", "/badheapprofilefile", true, true)
542 }
543
544 /*
545   Create some blocks and backdate some of them.
546   Run datamanager while producing an error condition.
547   Verify that the blocks are hence not deleted.
548 */
549 func testOldBlocksNotDeletedOnDataManagerError(t *testing.T, writeDataTo string, heapProfileFile string, expectError bool, expectOldBlocks bool) {
550         defer TearDownDataManagerTest(t)
551         SetupDataManagerTest(t)
552
553         // Put some blocks and backdate them.
554         var oldUnusedBlockLocators []string
555         oldUnusedBlockData := "this block will have older mtime"
556         for i := 0; i < 5; i++ {
557                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
558         }
559         backdateBlocks(t, oldUnusedBlockLocators)
560
561         // Run data manager
562         summary.WriteDataTo = writeDataTo
563         collection.HeapProfileFilename = heapProfileFile
564
565         err := singlerun(arv)
566         if !expectError {
567                 if err != nil {
568                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
569                 }
570         } else {
571                 if err == nil {
572                         t.Fatalf("Expected error during datamanager singlerun")
573                 }
574         }
575         waitUntilQueuesFinishWork(t)
576
577         // Get block indexes and verify that all backdated blocks are not/deleted as expected
578         if expectOldBlocks {
579                 verifyBlocks(t, nil, oldUnusedBlockLocators, 2)
580         } else {
581                 verifyBlocks(t, oldUnusedBlockLocators, nil, 2)
582         }
583 }