7724: Use datamanager token in keep-rsync tests. refs #7724
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "os/exec"
13         "regexp"
14         "strings"
15         "testing"
16         "time"
17 )
18
19 var arv arvadosclient.ArvadosClient
20 var keepClient *keepclient.KeepClient
21 var keepServers []string
22
23 func SetupDataManagerTest(t *testing.T) {
24         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
25
26         // start api and keep servers
27         arvadostest.ResetEnv()
28         arvadostest.StartAPI()
29         arvadostest.StartKeep(2, false)
30
31         arv = makeArvadosClient()
32         arv.ApiToken = arvadostest.DataManagerToken
33
34         // keep client
35         keepClient = &keepclient.KeepClient{
36                 Arvados:       &arv,
37                 Want_replicas: 2,
38                 Using_proxy:   true,
39                 Client:        &http.Client{},
40         }
41
42         // discover keep services
43         if err := keepClient.DiscoverKeepServers(); err != nil {
44                 t.Fatalf("Error discovering keep services: %s", err)
45         }
46         keepServers = []string{}
47         for _, host := range keepClient.LocalRoots() {
48                 keepServers = append(keepServers, host)
49         }
50 }
51
52 func TearDownDataManagerTest(t *testing.T) {
53         arvadostest.StopKeep(2)
54         arvadostest.StopAPI()
55 }
56
57 func putBlock(t *testing.T, data string) string {
58         locator, _, err := keepClient.PutB([]byte(data))
59         if err != nil {
60                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
61         }
62         if locator == "" {
63                 t.Fatalf("No locator found after putting test data")
64         }
65
66         splits := strings.Split(locator, "+")
67         return splits[0] + "+" + splits[1]
68 }
69
70 func getBlock(t *testing.T, locator string, data string) {
71         reader, blocklen, _, err := keepClient.Get(locator)
72         if err != nil {
73                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
74         }
75         if reader == nil {
76                 t.Fatalf("No reader found after putting test data")
77         }
78         if blocklen != int64(len(data)) {
79                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
80         }
81
82         all, err := ioutil.ReadAll(reader)
83         if string(all) != data {
84                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
85         }
86 }
87
88 // Create a collection using arv-put
89 func createCollection(t *testing.T, data string) string {
90         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
91         defer os.Remove(tempfile.Name())
92
93         _, err = tempfile.Write([]byte(data))
94         if err != nil {
95                 t.Fatalf("Error writing to tempfile %v", err)
96         }
97
98         // arv-put
99         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
100         if err != nil {
101                 t.Fatalf("Error running arv-put %s", err)
102         }
103
104         uuid := string(output[0:27]) // trim terminating char
105         return uuid
106 }
107
108 // Get collection locator
109 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
110
111 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
112         manifest := getCollection(t, uuid)["manifest_text"].(string)
113
114         locator := strings.Split(manifest, " ")[1]
115         match := locatorMatcher.FindStringSubmatch(locator)
116         if match == nil {
117                 t.Fatalf("No locator found in collection manifest %s", manifest)
118         }
119
120         return match[1] + "+" + match[2]
121 }
122
123 func switchToken(t string) func() {
124         orig := arv.ApiToken
125         restore := func() {
126                 arv.ApiToken = orig
127         }
128         arv.ApiToken = t
129         return restore
130 }
131
132 func getCollection(t *testing.T, uuid string) Dict {
133         defer switchToken(arvadostest.AdminToken)()
134
135         getback := make(Dict)
136         err := arv.Get("collections", uuid, nil, &getback)
137         if err != nil {
138                 t.Fatalf("Error getting collection %s", err)
139         }
140         if getback["uuid"] != uuid {
141                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
142         }
143
144         return getback
145 }
146
147 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
148         defer switchToken(arvadostest.AdminToken)()
149
150         err := arv.Update("collections", uuid, arvadosclient.Dict{
151                 "collection": arvadosclient.Dict{
152                         paramName: paramValue,
153                 },
154         }, &arvadosclient.Dict{})
155
156         if err != nil {
157                 t.Fatalf("Error updating collection %s", err)
158         }
159 }
160
161 type Dict map[string]interface{}
162
163 func deleteCollection(t *testing.T, uuid string) {
164         defer switchToken(arvadostest.AdminToken)()
165
166         getback := make(Dict)
167         err := arv.Delete("collections", uuid, nil, &getback)
168         if err != nil {
169                 t.Fatalf("Error deleting collection %s", err)
170         }
171         if getback["uuid"] != uuid {
172                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
173         }
174 }
175
176 func dataManagerSingleRun(t *testing.T) {
177         err := singlerun(arv)
178         if err != nil {
179                 t.Fatalf("Error during singlerun %s", err)
180         }
181 }
182
183 func getBlockIndexesForServer(t *testing.T, i int) []string {
184         var indexes []string
185
186         path := keepServers[i] + "/index"
187         client := http.Client{}
188         req, err := http.NewRequest("GET", path, nil)
189         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
190         req.Header.Add("Content-Type", "application/octet-stream")
191         resp, err := client.Do(req)
192         defer resp.Body.Close()
193
194         if err != nil {
195                 t.Fatalf("Error during %s %s", path, err)
196         }
197
198         body, err := ioutil.ReadAll(resp.Body)
199         if err != nil {
200                 t.Fatalf("Error reading response from %s %s", path, err)
201         }
202
203         lines := strings.Split(string(body), "\n")
204         for _, line := range lines {
205                 indexes = append(indexes, strings.Split(line, " ")...)
206         }
207
208         return indexes
209 }
210
211 func getBlockIndexes(t *testing.T) [][]string {
212         var indexes [][]string
213
214         for i := 0; i < len(keepServers); i++ {
215                 indexes = append(indexes, getBlockIndexesForServer(t, i))
216         }
217         return indexes
218 }
219
220 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
221         blocks := getBlockIndexes(t)
222
223         for _, block := range notExpected {
224                 for _, idx := range blocks {
225                         if valueInArray(block, idx) {
226                                 t.Fatalf("Found unexpected block %s", block)
227                         }
228                 }
229         }
230
231         for _, block := range expected {
232                 nFound := 0
233                 for _, idx := range blocks {
234                         if valueInArray(block, idx) {
235                                 nFound++
236                         }
237                 }
238                 if nFound < minReplication {
239                         t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
240                 }
241         }
242 }
243
244 func valueInArray(value string, list []string) bool {
245         for _, v := range list {
246                 if value == v {
247                         return true
248                 }
249         }
250         return false
251 }
252
253 /*
254 Test env uses two keep volumes. The volume names can be found by reading the files
255   ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
256
257 The keep volumes are of the dir structure:
258   volumeN/subdir/locator
259 */
260 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
261         // First get rid of any size hints in the locators
262         var trimmedBlockLocators []string
263         for _, block := range oldUnusedBlockLocators {
264                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
265         }
266
267         // Get the working dir so that we can read keep{n}.volume files
268         wd, err := os.Getwd()
269         if err != nil {
270                 t.Fatalf("Error getting working dir %s", err)
271         }
272
273         // Now cycle through the two keep volumes
274         oldTime := time.Now().AddDate(0, -2, 0)
275         for i := 0; i < 2; i++ {
276                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
277                 volumeDir, err := ioutil.ReadFile(filename)
278                 if err != nil {
279                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
280                 }
281
282                 // Read the keep volume dir structure
283                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
284                 if err != nil {
285                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
286                 }
287
288                 // Read each subdir for each of the keep volume dir
289                 for _, subdir := range volumeContents {
290                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
291                         subdirContents, err := ioutil.ReadDir(string(subdirName))
292                         if err != nil {
293                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
294                         }
295
296                         // Now we got to the files. The files are names are the block locators
297                         for _, fileInfo := range subdirContents {
298                                 blockName := fileInfo.Name()
299                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
300                                 if valueInArray(blockName, trimmedBlockLocators) {
301                                         err = os.Chtimes(myname, oldTime, oldTime)
302                                 }
303                         }
304                 }
305         }
306 }
307
308 func getStatus(t *testing.T, path string) interface{} {
309         client := http.Client{}
310         req, err := http.NewRequest("GET", path, nil)
311         req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
312         req.Header.Add("Content-Type", "application/octet-stream")
313         resp, err := client.Do(req)
314         if err != nil {
315                 t.Fatalf("Error during %s %s", path, err)
316         }
317         defer resp.Body.Close()
318
319         var s interface{}
320         json.NewDecoder(resp.Body).Decode(&s)
321
322         return s
323 }
324
325 // Wait until PullQueue and TrashQueue are empty on all keepServers.
326 func waitUntilQueuesFinishWork(t *testing.T) {
327         for _, ks := range keepServers {
328                 for done := false; !done; {
329                         time.Sleep(100 * time.Millisecond)
330                         s := getStatus(t, ks+"/status.json")
331                         for _, qName := range []string{"PullQueue", "TrashQueue"} {
332                                 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
333                                 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
334                                         done = true
335                                 }
336                         }
337                 }
338         }
339 }
340
341 /*
342 Create some blocks and backdate some of them.
343 Also create some collections and delete some of them.
344 Verify block indexes.
345 */
346 func TestPutAndGetBlocks(t *testing.T) {
347         defer TearDownDataManagerTest(t)
348         SetupDataManagerTest(t)
349
350         // Put some blocks which will be backdated later on
351         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
352         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
353         var oldUnusedBlockLocators []string
354         oldUnusedBlockData := "this block will have older mtime"
355         for i := 0; i < 5; i++ {
356                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
357         }
358         for i := 0; i < 5; i++ {
359                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
360         }
361
362         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
363         oldUsedBlockData := "this collection block will have older mtime"
364         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
365         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
366
367         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
368         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
369         var newBlockLocators []string
370         newBlockData := "this block is newer"
371         for i := 0; i < 5; i++ {
372                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
373         }
374         for i := 0; i < 5; i++ {
375                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
376         }
377
378         // Create a collection that would be deleted later on
379         toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
380         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
381
382         // Create another collection that has the same data as the one of the old blocks
383         oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
384         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
385         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
386                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
387         }
388
389         // Create another collection whose replication level will be changed
390         replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
391         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
392
393         // Create two collections with same data; one will be deleted later on
394         dataForTwoCollections := "one of these collections will be deleted"
395         oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
396         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
397         secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
398         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
399         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
400                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
401         }
402
403         // Verify blocks before doing any backdating / deleting.
404         var expected []string
405         expected = append(expected, oldUnusedBlockLocators...)
406         expected = append(expected, newBlockLocators...)
407         expected = append(expected, toBeDeletedCollectionLocator)
408         expected = append(expected, replicationCollectionLocator)
409         expected = append(expected, oneOfTwoWithSameDataLocator)
410         expected = append(expected, secondOfTwoWithSameDataLocator)
411
412         verifyBlocks(t, nil, expected, 2)
413
414         // Run datamanager in singlerun mode
415         dataManagerSingleRun(t)
416         waitUntilQueuesFinishWork(t)
417
418         verifyBlocks(t, nil, expected, 2)
419
420         // Backdate the to-be old blocks and delete the collections
421         backdateBlocks(t, oldUnusedBlockLocators)
422         deleteCollection(t, toBeDeletedCollectionUUID)
423         deleteCollection(t, secondOfTwoWithSameDataUUID)
424
425         // Run data manager again
426         dataManagerSingleRun(t)
427         waitUntilQueuesFinishWork(t)
428
429         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
430         expected = expected[:0]
431         expected = append(expected, oldUsedBlockLocator)
432         expected = append(expected, newBlockLocators...)
433         expected = append(expected, toBeDeletedCollectionLocator)
434         expected = append(expected, oneOfTwoWithSameDataLocator)
435         expected = append(expected, secondOfTwoWithSameDataLocator)
436
437         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
438
439         // Reduce desired replication on replicationCollectionUUID
440         // collection, and verify that Data Manager does not reduce
441         // actual replication any further than that. (It might not
442         // reduce actual replication at all; that's OK for this test.)
443
444         // Reduce desired replication level.
445         updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
446         collection := getCollection(t, replicationCollectionUUID)
447         if collection["replication_desired"].(interface{}) != float64(1) {
448                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
449         }
450
451         // Verify data is currently overreplicated.
452         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
453
454         // Run data manager again
455         dataManagerSingleRun(t)
456         waitUntilQueuesFinishWork(t)
457
458         // Verify data is not underreplicated.
459         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
460
461         // Verify *other* collections' data is not underreplicated.
462         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
463 }
464
465 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
466         defer TearDownDataManagerTest(t)
467         SetupDataManagerTest(t)
468
469         for i := 0; i < 10; i++ {
470                 err := singlerun(arv)
471                 if err != nil {
472                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
473                 }
474         }
475 }
476
477 func TestGetStatusRepeatedly(t *testing.T) {
478         defer TearDownDataManagerTest(t)
479         SetupDataManagerTest(t)
480
481         for i := 0; i < 10; i++ {
482                 for j := 0; j < 2; j++ {
483                         s := getStatus(t, keepServers[j]+"/status.json")
484
485                         var pullQueueStatus interface{}
486                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
487                         var trashQueueStatus interface{}
488                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
489
490                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
491                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
492                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
493                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
494                                 t.Fatalf("PullQueue and TrashQueue status not found")
495                         }
496
497                         time.Sleep(100 * time.Millisecond)
498                 }
499         }
500 }
501
502 func TestRunDatamanagerWithBogusServer(t *testing.T) {
503         defer TearDownDataManagerTest(t)
504         SetupDataManagerTest(t)
505
506         arv.ApiServer = "bogus-server"
507
508         err := singlerun(arv)
509         if err == nil {
510                 t.Fatalf("Expected error during singlerun with bogus server")
511         }
512 }
513
514 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
515         defer TearDownDataManagerTest(t)
516         SetupDataManagerTest(t)
517
518         arv.ApiToken = arvadostest.ActiveToken
519
520         err := singlerun(arv)
521         if err == nil {
522                 t.Fatalf("Expected error during singlerun as non-admin user")
523         }
524 }