c76d481f2362269f0d85c2242cd09c4fd61a76cd
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "os/exec"
13         "regexp"
14         "strings"
15         "testing"
16         "time"
17 )
18
19 const (
20         ActiveUserToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
21         AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
22 )
23
24 var arv arvadosclient.ArvadosClient
25 var keepClient *keepclient.KeepClient
26 var keepServers []string
27
28 func SetupDataManagerTest(t *testing.T) {
29         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
30
31         // start api and keep servers
32         arvadostest.ResetEnv()
33         arvadostest.StartAPI()
34         arvadostest.StartKeep()
35
36         arv = makeArvadosClient()
37
38         // keep client
39         keepClient = &keepclient.KeepClient{
40                 Arvados:       &arv,
41                 Want_replicas: 2,
42                 Using_proxy:   true,
43                 Client:        &http.Client{},
44         }
45
46         // discover keep services
47         if err := keepClient.DiscoverKeepServers(); err != nil {
48                 t.Fatalf("Error discovering keep services: %s", err)
49         }
50         for _, host := range keepClient.LocalRoots() {
51                 keepServers = append(keepServers, host)
52         }
53 }
54
55 func TearDownDataManagerTest(t *testing.T) {
56         arvadostest.StopKeep()
57         arvadostest.StopAPI()
58 }
59
60 func putBlock(t *testing.T, data string) string {
61         locator, _, err := keepClient.PutB([]byte(data))
62         if err != nil {
63                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
64         }
65         if locator == "" {
66                 t.Fatalf("No locator found after putting test data")
67         }
68
69         splits := strings.Split(locator, "+")
70         return splits[0] + "+" + splits[1]
71 }
72
73 func getBlock(t *testing.T, locator string, data string) {
74         reader, blocklen, _, err := keepClient.Get(locator)
75         if err != nil {
76                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
77         }
78         if reader == nil {
79                 t.Fatalf("No reader found after putting test data")
80         }
81         if blocklen != int64(len(data)) {
82                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
83         }
84
85         all, err := ioutil.ReadAll(reader)
86         if string(all) != data {
87                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
88         }
89 }
90
91 // Create a collection using arv-put
92 func createCollection(t *testing.T, data string) string {
93         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
94         defer os.Remove(tempfile.Name())
95
96         _, err = tempfile.Write([]byte(data))
97         if err != nil {
98                 t.Fatalf("Error writing to tempfile %v", err)
99         }
100
101         // arv-put
102         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
103         if err != nil {
104                 t.Fatalf("Error running arv-put %s", err)
105         }
106
107         uuid := string(output[0:27]) // trim terminating char
108         return uuid
109 }
110
111 // Get collection locator
112 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
113
114 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
115         manifest := getCollection(t, uuid)["manifest_text"].(string)
116
117         locator := strings.Split(manifest, " ")[1]
118         match := locatorMatcher.FindStringSubmatch(locator)
119         if match == nil {
120                 t.Fatalf("No locator found in collection manifest %s", manifest)
121         }
122
123         return match[1] + "+" + match[2]
124 }
125
126 func getCollection(t *testing.T, uuid string) Dict {
127         getback := make(Dict)
128         err := arv.Get("collections", uuid, nil, &getback)
129         if err != nil {
130                 t.Fatalf("Error getting collection %s", err)
131         }
132         if getback["uuid"] != uuid {
133                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
134         }
135
136         return getback
137 }
138
139 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
140         err := arv.Update("collections", uuid, arvadosclient.Dict{
141                 "collection": arvadosclient.Dict{
142                         paramName: paramValue,
143                 },
144         }, &arvadosclient.Dict{})
145
146         if err != nil {
147                 t.Fatalf("Error updating collection %s", err)
148         }
149 }
150
151 type Dict map[string]interface{}
152
153 func deleteCollection(t *testing.T, uuid string) {
154         getback := make(Dict)
155         err := arv.Delete("collections", uuid, nil, &getback)
156         if err != nil {
157                 t.Fatalf("Error deleting collection %s", err)
158         }
159         if getback["uuid"] != uuid {
160                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
161         }
162 }
163
164 func dataManagerSingleRun(t *testing.T) {
165         err := singlerun(arv)
166         if err != nil {
167                 t.Fatalf("Error during singlerun %s", err)
168         }
169 }
170
171 func getBlockIndexesForServer(t *testing.T, i int) []string {
172         var indexes []string
173
174         path := keepServers[i] + "/index"
175         client := http.Client{}
176         req, err := http.NewRequest("GET", path, nil)
177         req.Header.Add("Authorization", "OAuth2 " + AdminToken)
178         req.Header.Add("Content-Type", "application/octet-stream")
179         resp, err := client.Do(req)
180         defer resp.Body.Close()
181
182         if err != nil {
183                 t.Fatalf("Error during %s %s", path, err)
184         }
185
186         body, err := ioutil.ReadAll(resp.Body)
187         if err != nil {
188                 t.Fatalf("Error reading response from %s %s", path, err)
189         }
190
191         lines := strings.Split(string(body), "\n")
192         for _, line := range lines {
193                 indexes = append(indexes, strings.Split(line, " ")...)
194         }
195
196         return indexes
197 }
198
199 func getBlockIndexes(t *testing.T) [][]string {
200         var indexes [][]string
201
202         for i := 0; i < len(keepServers); i++ {
203                 indexes = append(indexes, getBlockIndexesForServer(t, i))
204         }
205         return indexes
206 }
207
208 func verifyBlocks(t *testing.T, notExpected []string, expected []string) {
209         blocks := getBlockIndexes(t)
210
211         for _, block := range notExpected {
212                 for _, idx := range blocks {
213                         if valueInArray(block, idx) {
214                                 t.Fatalf("Found unexpected block %s", block)
215                         }
216                 }
217         }
218
219         for _, block := range expected {
220                 nFound := 0
221                 for _, idx := range blocks {
222                         if valueInArray(block, idx) {
223                                 nFound++
224                         }
225                 }
226                 if nFound < 2 {
227                         t.Fatalf("Found %d replicas of block %s, expected >= 2", nFound, block)
228                 }
229         }
230 }
231
232 func valueInArray(value string, list []string) bool {
233         for _, v := range list {
234                 if value == v {
235                         return true
236                 }
237         }
238         return false
239 }
240
241 /*
242 Test env uses two keep volumes. The volume names can be found by reading the files
243   ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
244
245 The keep volumes are of the dir structure:
246   volumeN/subdir/locator
247 */
248 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
249         // First get rid of any size hints in the locators
250         var trimmedBlockLocators []string
251         for _, block := range oldUnusedBlockLocators {
252                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
253         }
254
255         // Get the working dir so that we can read keep{n}.volume files
256         wd, err := os.Getwd()
257         if err != nil {
258                 t.Fatalf("Error getting working dir %s", err)
259         }
260
261         // Now cycle through the two keep volumes
262         oldTime := time.Now().AddDate(0, -2, 0)
263         for i := 0; i < 2; i++ {
264                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
265                 volumeDir, err := ioutil.ReadFile(filename)
266                 if err != nil {
267                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
268                 }
269
270                 // Read the keep volume dir structure
271                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
272                 if err != nil {
273                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
274                 }
275
276                 // Read each subdir for each of the keep volume dir
277                 for _, subdir := range volumeContents {
278                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
279                         subdirContents, err := ioutil.ReadDir(string(subdirName))
280                         if err != nil {
281                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
282                         }
283
284                         // Now we got to the files. The files are names are the block locators
285                         for _, fileInfo := range subdirContents {
286                                 blockName := fileInfo.Name()
287                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
288                                 if valueInArray(blockName, trimmedBlockLocators) {
289                                         err = os.Chtimes(myname, oldTime, oldTime)
290                                 }
291                         }
292                 }
293         }
294 }
295
296 func getStatus(t *testing.T, path string) interface{} {
297         client := http.Client{}
298         req, err := http.NewRequest("GET", path, nil)
299         req.Header.Add("Authorization", "OAuth2 " + AdminToken)
300         req.Header.Add("Content-Type", "application/octet-stream")
301         resp, err := client.Do(req)
302         defer resp.Body.Close()
303
304         if err != nil {
305                 t.Fatalf("Error during %s %s", path, err)
306         }
307
308         var s interface{}
309         json.NewDecoder(resp.Body).Decode(&s)
310
311         return s
312 }
313
314 func waitUntilQueuesFinishWork(t *testing.T) {
315         // Wait until PullQueue and TrashQueue finish their work
316         for {
317                 var done [2]bool
318                 for i := 0; i < 2; i++ {
319                         s := getStatus(t, keepServers[i]+"/status.json")
320                         var pullQueueStatus interface{}
321                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
322                         var trashQueueStatus interface{}
323                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
324
325                         if pullQueueStatus.(map[string]interface{})["Queued"] == float64(0) &&
326                                 pullQueueStatus.(map[string]interface{})["InProgress"] == float64(0) &&
327                                 trashQueueStatus.(map[string]interface{})["Queued"] == float64(0) &&
328                                 trashQueueStatus.(map[string]interface{})["InProgress"] == float64(0) {
329                                 done[i] = true
330                         }
331                 }
332                 if done[0] && done[1] {
333                         break
334                 } else {
335                         time.Sleep(100 * time.Millisecond)
336                 }
337         }
338 }
339
340 /*
341 Create some blocks and backdate some of them.
342 Also create some collections and delete some of them.
343 Verify block indexes.
344 */
345 func TestPutAndGetBlocks(t *testing.T) {
346         defer TearDownDataManagerTest(t)
347         SetupDataManagerTest(t)
348
349         // Put some blocks which will be backdated later on
350         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
351         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
352         var oldUnusedBlockLocators []string
353         oldUnusedBlockData := "this block will have older mtime"
354         for i := 0; i < 5; i++ {
355                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
356         }
357         for i := 0; i < 5; i++ {
358                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
359         }
360
361         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
362         oldUsedBlockData := "this collection block will have older mtime"
363         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
364         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
365
366         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
367         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
368         var newBlockLocators []string
369         newBlockData := "this block is newer"
370         for i := 0; i < 5; i++ {
371                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
372         }
373         for i := 0; i < 5; i++ {
374                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
375         }
376
377         // Create a collection that would be deleted later on
378         toBeDeletedCollectionUuid := createCollection(t, "some data for collection creation")
379         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUuid)
380
381         // Create another collection that has the same data as the one of the old blocks
382         oldUsedBlockCollectionUuid := createCollection(t, oldUsedBlockData)
383         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUuid)
384         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
385                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
386         }
387
388         // Create another collection whose replication level will be changed
389         replicationCollectionUuid := createCollection(t, "replication level on this collection will be reduced")
390         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUuid)
391
392         // Create two collections with same data; one will be deleted later on
393         dataForTwoCollections := "one of these collections will be deleted"
394         oneOfTwoWithSameDataUuid := createCollection(t, dataForTwoCollections)
395         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUuid)
396         secondOfTwoWithSameDataUuid := createCollection(t, dataForTwoCollections)
397         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUuid)
398         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
399                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
400         }
401
402         // Verify blocks before doing any backdating / deleting.
403         var expected []string
404         expected = append(expected, oldUnusedBlockLocators...)
405         expected = append(expected, newBlockLocators...)
406         expected = append(expected, toBeDeletedCollectionLocator)
407         expected = append(expected, replicationCollectionLocator)
408         expected = append(expected, oneOfTwoWithSameDataLocator)
409         expected = append(expected, secondOfTwoWithSameDataLocator)
410
411         verifyBlocks(t, nil, expected)
412
413         // Run datamanager in singlerun mode
414         dataManagerSingleRun(t)
415         waitUntilQueuesFinishWork(t)
416
417         verifyBlocks(t, nil, expected)
418
419         // Backdate the to-be old blocks and delete the collections
420         backdateBlocks(t, oldUnusedBlockLocators)
421         deleteCollection(t, toBeDeletedCollectionUuid)
422         deleteCollection(t, secondOfTwoWithSameDataUuid)
423
424         // Run data manager again
425         dataManagerSingleRun(t)
426         waitUntilQueuesFinishWork(t)
427
428         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
429         expected = expected[:0]
430         expected = append(expected, oldUsedBlockLocator)
431         expected = append(expected, newBlockLocators...)
432         expected = append(expected, toBeDeletedCollectionLocator)
433         expected = append(expected, replicationCollectionLocator)
434         expected = append(expected, oneOfTwoWithSameDataLocator)
435         expected = append(expected, secondOfTwoWithSameDataLocator)
436
437         verifyBlocks(t, oldUnusedBlockLocators, expected)
438
439         // Reduce replication on replicationCollectionUuid collection and verify that the overreplicated blocks are untouched.
440
441         // Default replication level is 2; first verify that the replicationCollectionLocator appears in both volumes
442         for i := 0; i < len(keepServers); i++ {
443                 indexes := getBlockIndexesForServer(t, i)
444                 if !valueInArray(replicationCollectionLocator, indexes) {
445                         t.Fatalf("Not found block in index %s", replicationCollectionLocator)
446                 }
447         }
448
449         // Now reduce replication level on this collection and verify that it still appears in both volumes
450         updateCollection(t, replicationCollectionUuid, "replication_desired", "1")
451         collection := getCollection(t, replicationCollectionUuid)
452         if collection["replication_desired"].(interface{}) != float64(1) {
453                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
454         }
455
456         // Run data manager again
457         time.Sleep(100 * time.Millisecond)
458         dataManagerSingleRun(t)
459         waitUntilQueuesFinishWork(t)
460
461         for i := 0; i < len(keepServers); i++ {
462                 indexes := getBlockIndexesForServer(t, i)
463                 if !valueInArray(replicationCollectionLocator, indexes) {
464                         t.Fatalf("Not found block in index %s", replicationCollectionLocator)
465                 }
466         }
467         // Done testing reduce replication on collection
468
469         // Verify blocks one more time
470         verifyBlocks(t, oldUnusedBlockLocators, expected)
471 }
472
473 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
474         defer TearDownDataManagerTest(t)
475         SetupDataManagerTest(t)
476
477         for i := 0; i < 10; i++ {
478                 err := singlerun(arv)
479                 if err != nil {
480                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
481                 }
482                 time.Sleep(100 * time.Millisecond)
483         }
484 }
485
486 func TestGetStatusRepeatedly(t *testing.T) {
487         t.Skip("This test still fails. Skip it until it is fixed.")
488
489         defer TearDownDataManagerTest(t)
490         SetupDataManagerTest(t)
491
492         for i := 0; i < 10; i++ {
493                 for j := 0; j < 2; j++ {
494                         s := getStatus(t, keepServers[j]+"/status.json")
495
496                         var pullQueueStatus interface{}
497                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
498                         var trashQueueStatus interface{}
499                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
500
501                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
502                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
503                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
504                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
505                                 t.Fatalf("PullQueue and TrashQueue status not found")
506                         }
507
508                         time.Sleep(100 * time.Millisecond)
509                 }
510         }
511 }
512
513 func TestRunDatamanagerWithBogusServer(t *testing.T) {
514         defer TearDownDataManagerTest(t)
515         SetupDataManagerTest(t)
516
517         arv.ApiServer = "bogus-server"
518
519         err := singlerun(arv)
520         if err == nil {
521                 t.Fatalf("Expected error during singlerun with bogus server")
522         }
523 }
524
525 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
526         defer TearDownDataManagerTest(t)
527         SetupDataManagerTest(t)
528
529         arv.ApiToken = ActiveUserToken
530
531         err := singlerun(arv)
532         if err == nil {
533                 t.Fatalf("Expected error during singlerun as non-admin user")
534         }
535 }