Merge branch 'master' into 7490-datamanager-dont-die-return-error
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "os/exec"
13         "regexp"
14         "strings"
15         "testing"
16         "time"
17 )
18
19 var arv arvadosclient.ArvadosClient
20 var keepClient *keepclient.KeepClient
21 var keepServers []string
22
23 func SetupDataManagerTest(t *testing.T) {
24         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
25
26         // start api and keep servers
27         arvadostest.ResetEnv()
28         arvadostest.StartAPI()
29         arvadostest.StartKeep(2, false)
30
31         var err error
32         arv, err = makeArvadosClient()
33         if err != nil {
34                 t.Fatalf("Error making arvados client: %s", err)
35         }
36         arv.ApiToken = arvadostest.DataManagerToken
37
38         // keep client
39         keepClient = &keepclient.KeepClient{
40                 Arvados:       &arv,
41                 Want_replicas: 2,
42                 Using_proxy:   true,
43                 Client:        &http.Client{},
44         }
45
46         // discover keep services
47         if err = keepClient.DiscoverKeepServers(); err != nil {
48                 t.Fatalf("Error discovering keep services: %s", err)
49         }
50         keepServers = []string{}
51         for _, host := range keepClient.LocalRoots() {
52                 keepServers = append(keepServers, host)
53         }
54 }
55
56 func TearDownDataManagerTest(t *testing.T) {
57         arvadostest.StopKeep(2)
58         arvadostest.StopAPI()
59 }
60
61 func putBlock(t *testing.T, data string) string {
62         locator, _, err := keepClient.PutB([]byte(data))
63         if err != nil {
64                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
65         }
66         if locator == "" {
67                 t.Fatalf("No locator found after putting test data")
68         }
69
70         splits := strings.Split(locator, "+")
71         return splits[0] + "+" + splits[1]
72 }
73
74 func getBlock(t *testing.T, locator string, data string) {
75         reader, blocklen, _, err := keepClient.Get(locator)
76         if err != nil {
77                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
78         }
79         if reader == nil {
80                 t.Fatalf("No reader found after putting test data")
81         }
82         if blocklen != int64(len(data)) {
83                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
84         }
85
86         all, err := ioutil.ReadAll(reader)
87         if string(all) != data {
88                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
89         }
90 }
91
92 // Create a collection using arv-put
93 func createCollection(t *testing.T, data string) string {
94         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
95         defer os.Remove(tempfile.Name())
96
97         _, err = tempfile.Write([]byte(data))
98         if err != nil {
99                 t.Fatalf("Error writing to tempfile %v", err)
100         }
101
102         // arv-put
103         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
104         if err != nil {
105                 t.Fatalf("Error running arv-put %s", err)
106         }
107
108         uuid := string(output[0:27]) // trim terminating char
109         return uuid
110 }
111
112 // Get collection locator
113 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
114
115 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
116         manifest := getCollection(t, uuid)["manifest_text"].(string)
117
118         locator := strings.Split(manifest, " ")[1]
119         match := locatorMatcher.FindStringSubmatch(locator)
120         if match == nil {
121                 t.Fatalf("No locator found in collection manifest %s", manifest)
122         }
123
124         return match[1] + "+" + match[2]
125 }
126
127 func switchToken(t string) func() {
128         orig := arv.ApiToken
129         restore := func() {
130                 arv.ApiToken = orig
131         }
132         arv.ApiToken = t
133         return restore
134 }
135
136 func getCollection(t *testing.T, uuid string) Dict {
137         defer switchToken(arvadostest.AdminToken)()
138
139         getback := make(Dict)
140         err := arv.Get("collections", uuid, nil, &getback)
141         if err != nil {
142                 t.Fatalf("Error getting collection %s", err)
143         }
144         if getback["uuid"] != uuid {
145                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
146         }
147
148         return getback
149 }
150
151 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
152         defer switchToken(arvadostest.AdminToken)()
153
154         err := arv.Update("collections", uuid, arvadosclient.Dict{
155                 "collection": arvadosclient.Dict{
156                         paramName: paramValue,
157                 },
158         }, &arvadosclient.Dict{})
159
160         if err != nil {
161                 t.Fatalf("Error updating collection %s", err)
162         }
163 }
164
165 type Dict map[string]interface{}
166
167 func deleteCollection(t *testing.T, uuid string) {
168         defer switchToken(arvadostest.AdminToken)()
169
170         getback := make(Dict)
171         err := arv.Delete("collections", uuid, nil, &getback)
172         if err != nil {
173                 t.Fatalf("Error deleting collection %s", err)
174         }
175         if getback["uuid"] != uuid {
176                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
177         }
178 }
179
180 func dataManagerSingleRun(t *testing.T) {
181         err := singlerun(arv)
182         if err != nil {
183                 t.Fatalf("Error during singlerun %s", err)
184         }
185 }
186
187 func getBlockIndexesForServer(t *testing.T, i int) []string {
188         var indexes []string
189
190         path := keepServers[i] + "/index"
191         client := http.Client{}
192         req, err := http.NewRequest("GET", path, nil)
193         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
194         req.Header.Add("Content-Type", "application/octet-stream")
195         resp, err := client.Do(req)
196         defer resp.Body.Close()
197
198         if err != nil {
199                 t.Fatalf("Error during %s %s", path, err)
200         }
201
202         body, err := ioutil.ReadAll(resp.Body)
203         if err != nil {
204                 t.Fatalf("Error reading response from %s %s", path, err)
205         }
206
207         lines := strings.Split(string(body), "\n")
208         for _, line := range lines {
209                 indexes = append(indexes, strings.Split(line, " ")...)
210         }
211
212         return indexes
213 }
214
215 func getBlockIndexes(t *testing.T) [][]string {
216         var indexes [][]string
217
218         for i := 0; i < len(keepServers); i++ {
219                 indexes = append(indexes, getBlockIndexesForServer(t, i))
220         }
221         return indexes
222 }
223
224 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
225         blocks := getBlockIndexes(t)
226
227         for _, block := range notExpected {
228                 for _, idx := range blocks {
229                         if valueInArray(block, idx) {
230                                 t.Fatalf("Found unexpected block %s", block)
231                         }
232                 }
233         }
234
235         for _, block := range expected {
236                 nFound := 0
237                 for _, idx := range blocks {
238                         if valueInArray(block, idx) {
239                                 nFound++
240                         }
241                 }
242                 if nFound < minReplication {
243                         t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
244                 }
245         }
246 }
247
248 func valueInArray(value string, list []string) bool {
249         for _, v := range list {
250                 if value == v {
251                         return true
252                 }
253         }
254         return false
255 }
256
257 /*
258 Test env uses two keep volumes. The volume names can be found by reading the files
259   ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
260
261 The keep volumes are of the dir structure:
262   volumeN/subdir/locator
263 */
264 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
265         // First get rid of any size hints in the locators
266         var trimmedBlockLocators []string
267         for _, block := range oldUnusedBlockLocators {
268                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
269         }
270
271         // Get the working dir so that we can read keep{n}.volume files
272         wd, err := os.Getwd()
273         if err != nil {
274                 t.Fatalf("Error getting working dir %s", err)
275         }
276
277         // Now cycle through the two keep volumes
278         oldTime := time.Now().AddDate(0, -2, 0)
279         for i := 0; i < 2; i++ {
280                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
281                 volumeDir, err := ioutil.ReadFile(filename)
282                 if err != nil {
283                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
284                 }
285
286                 // Read the keep volume dir structure
287                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
288                 if err != nil {
289                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
290                 }
291
292                 // Read each subdir for each of the keep volume dir
293                 for _, subdir := range volumeContents {
294                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
295                         subdirContents, err := ioutil.ReadDir(string(subdirName))
296                         if err != nil {
297                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
298                         }
299
300                         // Now we got to the files. The files are names are the block locators
301                         for _, fileInfo := range subdirContents {
302                                 blockName := fileInfo.Name()
303                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
304                                 if valueInArray(blockName, trimmedBlockLocators) {
305                                         err = os.Chtimes(myname, oldTime, oldTime)
306                                 }
307                         }
308                 }
309         }
310 }
311
312 func getStatus(t *testing.T, path string) interface{} {
313         client := http.Client{}
314         req, err := http.NewRequest("GET", path, nil)
315         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
316         req.Header.Add("Content-Type", "application/octet-stream")
317         resp, err := client.Do(req)
318         if err != nil {
319                 t.Fatalf("Error during %s %s", path, err)
320         }
321         defer resp.Body.Close()
322
323         var s interface{}
324         json.NewDecoder(resp.Body).Decode(&s)
325
326         return s
327 }
328
329 // Wait until PullQueue and TrashQueue are empty on all keepServers.
330 func waitUntilQueuesFinishWork(t *testing.T) {
331         for _, ks := range keepServers {
332                 for done := false; !done; {
333                         time.Sleep(100 * time.Millisecond)
334                         s := getStatus(t, ks+"/status.json")
335                         for _, qName := range []string{"PullQueue", "TrashQueue"} {
336                                 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
337                                 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
338                                         done = true
339                                 }
340                         }
341                 }
342         }
343 }
344
345 /*
346 Create some blocks and backdate some of them.
347 Also create some collections and delete some of them.
348 Verify block indexes.
349 */
350 func TestPutAndGetBlocks(t *testing.T) {
351         defer TearDownDataManagerTest(t)
352         SetupDataManagerTest(t)
353
354         // Put some blocks which will be backdated later on
355         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
356         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
357         var oldUnusedBlockLocators []string
358         oldUnusedBlockData := "this block will have older mtime"
359         for i := 0; i < 5; i++ {
360                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
361         }
362         for i := 0; i < 5; i++ {
363                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
364         }
365
366         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
367         oldUsedBlockData := "this collection block will have older mtime"
368         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
369         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
370
371         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
372         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
373         var newBlockLocators []string
374         newBlockData := "this block is newer"
375         for i := 0; i < 5; i++ {
376                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
377         }
378         for i := 0; i < 5; i++ {
379                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
380         }
381
382         // Create a collection that would be deleted later on
383         toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
384         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
385
386         // Create another collection that has the same data as the one of the old blocks
387         oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
388         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
389         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
390                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
391         }
392
393         // Create another collection whose replication level will be changed
394         replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
395         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
396
397         // Create two collections with same data; one will be deleted later on
398         dataForTwoCollections := "one of these collections will be deleted"
399         oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
400         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
401         secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
402         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
403         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
404                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
405         }
406
407         // Verify blocks before doing any backdating / deleting.
408         var expected []string
409         expected = append(expected, oldUnusedBlockLocators...)
410         expected = append(expected, newBlockLocators...)
411         expected = append(expected, toBeDeletedCollectionLocator)
412         expected = append(expected, replicationCollectionLocator)
413         expected = append(expected, oneOfTwoWithSameDataLocator)
414         expected = append(expected, secondOfTwoWithSameDataLocator)
415
416         verifyBlocks(t, nil, expected, 2)
417
418         // Run datamanager in singlerun mode
419         dataManagerSingleRun(t)
420         waitUntilQueuesFinishWork(t)
421
422         verifyBlocks(t, nil, expected, 2)
423
424         // Backdate the to-be old blocks and delete the collections
425         backdateBlocks(t, oldUnusedBlockLocators)
426         deleteCollection(t, toBeDeletedCollectionUUID)
427         deleteCollection(t, secondOfTwoWithSameDataUUID)
428
429         // Run data manager again
430         dataManagerSingleRun(t)
431         waitUntilQueuesFinishWork(t)
432
433         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
434         expected = expected[:0]
435         expected = append(expected, oldUsedBlockLocator)
436         expected = append(expected, newBlockLocators...)
437         expected = append(expected, toBeDeletedCollectionLocator)
438         expected = append(expected, oneOfTwoWithSameDataLocator)
439         expected = append(expected, secondOfTwoWithSameDataLocator)
440
441         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
442
443         // Reduce desired replication on replicationCollectionUUID
444         // collection, and verify that Data Manager does not reduce
445         // actual replication any further than that. (It might not
446         // reduce actual replication at all; that's OK for this test.)
447
448         // Reduce desired replication level.
449         updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
450         collection := getCollection(t, replicationCollectionUUID)
451         if collection["replication_desired"].(interface{}) != float64(1) {
452                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
453         }
454
455         // Verify data is currently overreplicated.
456         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
457
458         // Run data manager again
459         dataManagerSingleRun(t)
460         waitUntilQueuesFinishWork(t)
461
462         // Verify data is not underreplicated.
463         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
464
465         // Verify *other* collections' data is not underreplicated.
466         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
467 }
468
469 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
470         defer TearDownDataManagerTest(t)
471         SetupDataManagerTest(t)
472
473         for i := 0; i < 10; i++ {
474                 err := singlerun(arv)
475                 if err != nil {
476                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
477                 }
478         }
479 }
480
481 func TestGetStatusRepeatedly(t *testing.T) {
482         defer TearDownDataManagerTest(t)
483         SetupDataManagerTest(t)
484
485         for i := 0; i < 10; i++ {
486                 for j := 0; j < 2; j++ {
487                         s := getStatus(t, keepServers[j]+"/status.json")
488
489                         var pullQueueStatus interface{}
490                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
491                         var trashQueueStatus interface{}
492                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
493
494                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
495                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
496                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
497                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
498                                 t.Fatalf("PullQueue and TrashQueue status not found")
499                         }
500
501                         time.Sleep(100 * time.Millisecond)
502                 }
503         }
504 }
505
506 func TestRunDatamanagerWithBogusServer(t *testing.T) {
507         defer TearDownDataManagerTest(t)
508         SetupDataManagerTest(t)
509
510         arv.ApiServer = "bogus-server"
511
512         err := singlerun(arv)
513         if err == nil {
514                 t.Fatalf("Expected error during singlerun with bogus server")
515         }
516 }
517
518 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
519         defer TearDownDataManagerTest(t)
520         SetupDataManagerTest(t)
521
522         arv.ApiToken = arvadostest.ActiveToken
523
524         err := singlerun(arv)
525         if err == nil {
526                 t.Fatalf("Expected error during singlerun as non-admin user")
527         }
528 }