e876fa4b25eb309dc0af3debae9212550a9dca86
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/services/datamanager/keep"
10         "io/ioutil"
11         "log"
12         "net/http"
13         "os"
14         "os/exec"
15         "regexp"
16         "strings"
17         "testing"
18         "time"
19 )
20
21 var arv arvadosclient.ArvadosClient
22 var keepClient *keepclient.KeepClient
23 var keepServers []string
24
25 func SetupDataManagerTest(t *testing.T) {
26         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
27
28         // start api and keep servers
29         arvadostest.ResetEnv()
30         arvadostest.StartAPI()
31         arvadostest.StartKeep()
32
33         // make arvadosclient
34         var err error
35         arv, err = arvadosclient.MakeArvadosClient()
36         if err != nil {
37                 t.Fatalf("Error setting up arvados client: %s", err)
38         }
39
40         // keep client
41         keepClient = &keepclient.KeepClient{
42                 Arvados:       &arv,
43                 Want_replicas: 2,
44                 Using_proxy:   true,
45                 Client:        &http.Client{},
46         }
47
48         // discover keep services
49         if err := keepClient.DiscoverKeepServers(); err != nil {
50                 t.Fatalf("Error discovering keep services: %s", err)
51         }
52         for _, host := range keepClient.LocalRoots() {
53                 keepServers = append(keepServers, host)
54         }
55 }
56
57 func TearDownDataManagerTest(t *testing.T) {
58         arvadostest.StopKeep()
59         arvadostest.StopAPI()
60 }
61
62 func putBlock(t *testing.T, data string) string {
63         locator, _, err := keepClient.PutB([]byte(data))
64         if err != nil {
65                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
66         }
67         if locator == "" {
68                 t.Fatalf("No locator found after putting test data")
69         }
70
71         splits := strings.Split(locator, "+")
72         return splits[0] + "+" + splits[1]
73 }
74
75 func getBlock(t *testing.T, locator string, data string) {
76         reader, blocklen, _, err := keepClient.Get(locator)
77         if err != nil {
78                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
79         }
80         if reader == nil {
81                 t.Fatalf("No reader found after putting test data")
82         }
83         if blocklen != int64(len(data)) {
84                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
85         }
86
87         all, err := ioutil.ReadAll(reader)
88         if string(all) != data {
89                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
90         }
91 }
92
93 // Create a collection using arv-put
94 func createCollection(t *testing.T, data string) string {
95         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
96         defer os.Remove(tempfile.Name())
97
98         _, err = tempfile.Write([]byte(data))
99         if err != nil {
100                 t.Fatalf("Error writing to tempfile %v", err)
101         }
102
103         // arv-put
104         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
105         if err != nil {
106                 t.Fatalf("Error running arv-put %s", err)
107         }
108
109         uuid := string(output[0:27]) // trim terminating char
110         return uuid
111 }
112
113 // Get collection locator
114 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
115
116 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
117         manifest := getCollection(t, uuid)["manifest_text"].(string)
118
119         locator := strings.Split(manifest, " ")[1]
120         match := locatorMatcher.FindStringSubmatch(locator)
121         if match == nil {
122                 t.Fatalf("No locator found in collection manifest %s", manifest)
123         }
124
125         return match[1] + "+" + match[2]
126 }
127
128 func getCollection(t *testing.T, uuid string) Dict {
129         getback := make(Dict)
130         err := arv.Get("collections", uuid, nil, &getback)
131         if err != nil {
132                 t.Fatalf("Error getting collection %s", err)
133         }
134         if getback["uuid"] != uuid {
135                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
136         }
137
138         return getback
139 }
140
141 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
142         err := arv.Update("collections", uuid, arvadosclient.Dict{
143                 "collection": arvadosclient.Dict{
144                         paramName: paramValue,
145                 },
146         }, &arvadosclient.Dict{})
147
148         if err != nil {
149                 t.Fatalf("Error updating collection %s", err)
150         }
151 }
152
153 type Dict map[string]interface{}
154
155 func deleteCollection(t *testing.T, uuid string) {
156         getback := make(Dict)
157         err := arv.Delete("collections", uuid, nil, &getback)
158         if err != nil {
159                 t.Fatalf("Error deleting collection %s", err)
160         }
161         if getback["uuid"] != uuid {
162                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
163         }
164 }
165
166 func dataManagerSingleRun(t *testing.T) {
167         err := singlerun()
168         if err != nil {
169                 t.Fatalf("Error during singlerun %s", err)
170         }
171 }
172
173 func getBlockIndexesForServer(t *testing.T, i int) []string {
174         var indexes []string
175
176         path := keepServers[i] + "/index"
177         client := http.Client{}
178         req, err := http.NewRequest("GET", path, nil)
179         req.Header.Add("Authorization", "OAuth2 "+keep.GetDataManagerToken(nil))
180         req.Header.Add("Content-Type", "application/octet-stream")
181         resp, err := client.Do(req)
182         defer resp.Body.Close()
183
184         if err != nil {
185                 t.Fatalf("Error during %s %s", path, err)
186         }
187
188         body, err := ioutil.ReadAll(resp.Body)
189         if err != nil {
190                 t.Fatalf("Error reading response from %s %s", path, err)
191         }
192
193         lines := strings.Split(string(body), "\n")
194         for _, line := range lines {
195                 indexes = append(indexes, strings.Split(line, " ")...)
196         }
197
198         return indexes
199 }
200
201 func getBlockIndexes(t *testing.T) [][]string {
202         var indexes [][]string
203
204         for i := 0; i < len(keepServers); i++ {
205                 indexes = append(indexes, getBlockIndexesForServer(t, i))
206         }
207         return indexes
208 }
209
210 func verifyBlocks(t *testing.T, notExpected []string, expected []string) {
211         blocks := getBlockIndexes(t)
212         for _, block := range notExpected {
213                 for i := 0; i < len(blocks); i++ {
214                         exists := valueInArray(block, blocks[i])
215                         if exists {
216                                 t.Fatalf("Found unexpected block in index %s", block)
217                         }
218                 }
219         }
220
221         //      var blockExists [][]string
222         blockExists := make(map[string][]string)
223         for _, block := range expected {
224                 var blockArray []string
225                 for i := 0; i < len(blocks); i++ {
226                         exists := valueInArray(block, blocks[i])
227                         if exists {
228                                 blockArray = append(blockArray, block)
229                         }
230                 }
231                 blockExists[block] = blockArray
232         }
233
234         for _, block := range expected {
235                 if blockExists[block] == nil || len(blockExists[block]) < 2 {
236                         t.Fatalf("Expected to find two replicas for block %s; found %d", block, len(blockExists[block]))
237                 }
238         }
239 }
240
241 func valueInArray(value string, list []string) bool {
242         for _, v := range list {
243                 if value == v {
244                         return true
245                 }
246         }
247         return false
248 }
249
250 /*
251 Test env uses two keep volumes. The volume names can be found by reading the files
252   ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
253
254 The keep volumes are of the dir structure:
255   volumeN/subdir/locator
256 */
257 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
258         // First get rid of any size hints in the locators
259         var trimmedBlockLocators []string
260         for _, block := range oldUnusedBlockLocators {
261                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
262         }
263
264         // Get the working dir so that we can read keep{n}.volume files
265         wd, err := os.Getwd()
266         if err != nil {
267                 t.Fatalf("Error getting working dir %s", err)
268         }
269
270         // Now cycle through the two keep volumes
271         oldTime := time.Now().AddDate(0, -2, 0)
272         for i := 0; i < 2; i++ {
273                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
274                 volumeDir, err := ioutil.ReadFile(filename)
275                 if err != nil {
276                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
277                 }
278
279                 // Read the keep volume dir structure
280                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
281                 if err != nil {
282                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
283                 }
284
285                 // Read each subdir for each of the keep volume dir
286                 for _, subdir := range volumeContents {
287                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
288                         subdirContents, err := ioutil.ReadDir(string(subdirName))
289                         if err != nil {
290                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
291                         }
292
293                         // Now we got to the files. The files are names are the block locators
294                         for _, fileInfo := range subdirContents {
295                                 blockName := fileInfo.Name()
296                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
297                                 if valueInArray(blockName, trimmedBlockLocators) {
298                                         err = os.Chtimes(myname, oldTime, oldTime)
299                                 }
300                         }
301                 }
302         }
303 }
304
305 func getStatus(t *testing.T, path string) interface{} {
306         client := http.Client{}
307         req, err := http.NewRequest("GET", path, nil)
308         req.Header.Add("Authorization", "OAuth2 "+keep.GetDataManagerToken(nil))
309         req.Header.Add("Content-Type", "application/octet-stream")
310         resp, err := client.Do(req)
311         defer resp.Body.Close()
312
313         if err != nil {
314                 t.Fatalf("Error during %s %s", path, err)
315         }
316
317         var s interface{}
318         json.NewDecoder(resp.Body).Decode(&s)
319
320         return s
321 }
322
323 func waitUntilQueuesFinishWork(t *testing.T) {
324         // Wait until PullQueue and TrashQueue finish their work
325         for {
326                 var done [2]bool
327                 for i := 0; i < 2; i++ {
328                         s := getStatus(t, keepServers[i]+"/status.json")
329                         var pullQueueStatus interface{}
330                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
331                         var trashQueueStatus interface{}
332                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
333
334                         if pullQueueStatus.(map[string]interface{})["Queued"] == float64(0) &&
335                                 pullQueueStatus.(map[string]interface{})["InProgress"] == float64(0) &&
336                                 trashQueueStatus.(map[string]interface{})["Queued"] == float64(0) &&
337                                 trashQueueStatus.(map[string]interface{})["InProgress"] == float64(0) {
338                                 done[i] = true
339                         }
340                 }
341                 if done[0] && done[1] {
342                         break
343                 } else {
344                         time.Sleep(100 * time.Millisecond)
345                 }
346         }
347 }
348
349 /*
350 Create some blocks and backdate some of them.
351 Also create some collections and delete some of them.
352 Verify block indexes.
353 */
354 func TestPutAndGetBlocks(t *testing.T) {
355         log.Print("TestPutAndGetBlocks start")
356         defer TearDownDataManagerTest(t)
357         SetupDataManagerTest(t)
358
359         // Put some blocks which will be backdated later on
360         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
361         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
362         var oldUnusedBlockLocators []string
363         oldUnusedBlockData := "this block will have older mtime"
364         for i := 0; i < 5; i++ {
365                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
366         }
367         for i := 0; i < 5; i++ {
368                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
369         }
370
371         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
372         oldUsedBlockData := "this collection block will have older mtime"
373         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
374         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
375
376         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
377         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
378         var newBlockLocators []string
379         newBlockData := "this block is newer"
380         for i := 0; i < 5; i++ {
381                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
382         }
383         for i := 0; i < 5; i++ {
384                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
385         }
386
387         // Create a collection that would be deleted later on
388         toBeDeletedCollectionUuid := createCollection(t, "some data for collection creation")
389         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUuid)
390
391         // Create another collection that has the same data as the one of the old blocks
392         oldUsedBlockCollectionUuid := createCollection(t, oldUsedBlockData)
393         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUuid)
394         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
395                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
396         }
397
398         // Create another collection whose replication level will be changed
399         replicationCollectionUuid := createCollection(t, "replication level on this collection will be reduced")
400         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUuid)
401
402         // Create two collections with same data; one will be deleted later on
403         dataForTwoCollections := "one of these collections will be deleted"
404         oneOfTwoWithSameDataUuid := createCollection(t, dataForTwoCollections)
405         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUuid)
406         secondOfTwoWithSameDataUuid := createCollection(t, dataForTwoCollections)
407         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUuid)
408         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
409                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
410         }
411
412         // Verify blocks before doing any backdating / deleting.
413         var expected []string
414         expected = append(expected, oldUnusedBlockLocators...)
415         expected = append(expected, newBlockLocators...)
416         expected = append(expected, toBeDeletedCollectionLocator)
417         expected = append(expected, replicationCollectionLocator)
418         expected = append(expected, oneOfTwoWithSameDataLocator)
419         expected = append(expected, secondOfTwoWithSameDataLocator)
420
421         verifyBlocks(t, nil, expected)
422
423         // Run datamanager in singlerun mode
424         dataManagerSingleRun(t)
425         waitUntilQueuesFinishWork(t)
426
427         verifyBlocks(t, nil, expected)
428
429         // Backdate the to-be old blocks and delete the collections
430         backdateBlocks(t, oldUnusedBlockLocators)
431         deleteCollection(t, toBeDeletedCollectionUuid)
432         deleteCollection(t, secondOfTwoWithSameDataUuid)
433
434         // Run data manager again
435         dataManagerSingleRun(t)
436         waitUntilQueuesFinishWork(t)
437
438         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
439         expected = expected[:0]
440         expected = append(expected, oldUsedBlockLocator)
441         expected = append(expected, newBlockLocators...)
442         expected = append(expected, toBeDeletedCollectionLocator)
443         expected = append(expected, replicationCollectionLocator)
444         expected = append(expected, oneOfTwoWithSameDataLocator)
445         expected = append(expected, secondOfTwoWithSameDataLocator)
446
447         verifyBlocks(t, oldUnusedBlockLocators, expected)
448
449         // Reduce replication on replicationCollectionUuid collection and verify that the overreplicated blocks are untouched.
450
451         // Default replication level is 2; first verify that the replicationCollectionLocator appears in both volumes
452         for i := 0; i < len(keepServers); i++ {
453                 indexes := getBlockIndexesForServer(t, i)
454                 if !valueInArray(replicationCollectionLocator, indexes) {
455                         t.Fatalf("Not found block in index %s", replicationCollectionLocator)
456                 }
457         }
458
459         // Now reduce replication level on this collection and verify that it still appears in both volumes
460         updateCollection(t, replicationCollectionUuid, "replication_desired", "1")
461         collection := getCollection(t, replicationCollectionUuid)
462         if collection["replication_desired"].(interface{}) != float64(1) {
463                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
464         }
465
466         // Run data manager again
467         time.Sleep(100 * time.Millisecond)
468         dataManagerSingleRun(t)
469         waitUntilQueuesFinishWork(t)
470
471         for i := 0; i < len(keepServers); i++ {
472                 indexes := getBlockIndexesForServer(t, i)
473                 if !valueInArray(replicationCollectionLocator, indexes) {
474                         t.Fatalf("Not found block in index %s", replicationCollectionLocator)
475                 }
476         }
477         // Done testing reduce replication on collection
478
479         // Verify blocks one more time
480         verifyBlocks(t, oldUnusedBlockLocators, expected)
481 }
482
483 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
484         log.Print("TestDatamanagerSingleRunRepeatedly start")
485
486         defer TearDownDataManagerTest(t)
487         SetupDataManagerTest(t)
488
489         for i := 0; i < 10; i++ {
490                 err := singlerun()
491                 if err != nil {
492                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
493                 }
494                 time.Sleep(100 * time.Millisecond)
495         }
496 }
497
498 func TestGetStatusRepeatedly(t *testing.T) {
499         t.Skip("This test still fails. Skip it until it is fixed.")
500
501         defer TearDownDataManagerTest(t)
502         SetupDataManagerTest(t)
503
504         for i := 0; i < 10; i++ {
505                 for j := 0; j < 2; j++ {
506                         s := getStatus(t, keepServers[j]+"/status.json")
507
508                         var pullQueueStatus interface{}
509                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
510                         var trashQueueStatus interface{}
511                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
512
513                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
514                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
515                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
516                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
517                                 t.Fatalf("PullQueue and TrashQueue status not found")
518                         }
519
520                         time.Sleep(100 * time.Millisecond)
521                 }
522         }
523 }