Merge branch 'master' into 7492-keepproxy-upstream-errors
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "os/exec"
13         "regexp"
14         "strings"
15         "testing"
16         "time"
17 )
18
19 const (
20         ActiveUserToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
21         AdminToken      = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
22 )
23
24 var arv arvadosclient.ArvadosClient
25 var keepClient *keepclient.KeepClient
26 var keepServers []string
27
28 func SetupDataManagerTest(t *testing.T) {
29         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
30
31         // start api and keep servers
32         arvadostest.ResetEnv()
33         arvadostest.StartAPI()
34         arvadostest.StartKeep(2, false)
35
36         arv = makeArvadosClient()
37
38         // keep client
39         keepClient = &keepclient.KeepClient{
40                 Arvados:       &arv,
41                 Want_replicas: 2,
42                 Using_proxy:   true,
43                 Client:        &http.Client{},
44         }
45
46         // discover keep services
47         if err := keepClient.DiscoverKeepServers(); err != nil {
48                 t.Fatalf("Error discovering keep services: %s", err)
49         }
50         keepServers = []string{}
51         for _, host := range keepClient.LocalRoots() {
52                 keepServers = append(keepServers, host)
53         }
54 }
55
56 func TearDownDataManagerTest(t *testing.T) {
57         arvadostest.StopKeep(2)
58         arvadostest.StopAPI()
59 }
60
61 func putBlock(t *testing.T, data string) string {
62         locator, _, err := keepClient.PutB([]byte(data))
63         if err != nil {
64                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
65         }
66         if locator == "" {
67                 t.Fatalf("No locator found after putting test data")
68         }
69
70         splits := strings.Split(locator, "+")
71         return splits[0] + "+" + splits[1]
72 }
73
74 func getBlock(t *testing.T, locator string, data string) {
75         reader, blocklen, _, err := keepClient.Get(locator)
76         if err != nil {
77                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
78         }
79         if reader == nil {
80                 t.Fatalf("No reader found after putting test data")
81         }
82         if blocklen != int64(len(data)) {
83                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
84         }
85
86         all, err := ioutil.ReadAll(reader)
87         if string(all) != data {
88                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
89         }
90 }
91
92 // Create a collection using arv-put
93 func createCollection(t *testing.T, data string) string {
94         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
95         defer os.Remove(tempfile.Name())
96
97         _, err = tempfile.Write([]byte(data))
98         if err != nil {
99                 t.Fatalf("Error writing to tempfile %v", err)
100         }
101
102         // arv-put
103         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
104         if err != nil {
105                 t.Fatalf("Error running arv-put %s", err)
106         }
107
108         uuid := string(output[0:27]) // trim terminating char
109         return uuid
110 }
111
112 // Get collection locator
113 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
114
115 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
116         manifest := getCollection(t, uuid)["manifest_text"].(string)
117
118         locator := strings.Split(manifest, " ")[1]
119         match := locatorMatcher.FindStringSubmatch(locator)
120         if match == nil {
121                 t.Fatalf("No locator found in collection manifest %s", manifest)
122         }
123
124         return match[1] + "+" + match[2]
125 }
126
127 func getCollection(t *testing.T, uuid string) Dict {
128         getback := make(Dict)
129         err := arv.Get("collections", uuid, nil, &getback)
130         if err != nil {
131                 t.Fatalf("Error getting collection %s", err)
132         }
133         if getback["uuid"] != uuid {
134                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
135         }
136
137         return getback
138 }
139
140 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
141         err := arv.Update("collections", uuid, arvadosclient.Dict{
142                 "collection": arvadosclient.Dict{
143                         paramName: paramValue,
144                 },
145         }, &arvadosclient.Dict{})
146
147         if err != nil {
148                 t.Fatalf("Error updating collection %s", err)
149         }
150 }
151
152 type Dict map[string]interface{}
153
154 func deleteCollection(t *testing.T, uuid string) {
155         getback := make(Dict)
156         err := arv.Delete("collections", uuid, nil, &getback)
157         if err != nil {
158                 t.Fatalf("Error deleting collection %s", err)
159         }
160         if getback["uuid"] != uuid {
161                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
162         }
163 }
164
165 func dataManagerSingleRun(t *testing.T) {
166         err := singlerun(arv)
167         if err != nil {
168                 t.Fatalf("Error during singlerun %s", err)
169         }
170 }
171
172 func getBlockIndexesForServer(t *testing.T, i int) []string {
173         var indexes []string
174
175         path := keepServers[i] + "/index"
176         client := http.Client{}
177         req, err := http.NewRequest("GET", path, nil)
178         req.Header.Add("Authorization", "OAuth2 "+AdminToken)
179         req.Header.Add("Content-Type", "application/octet-stream")
180         resp, err := client.Do(req)
181         defer resp.Body.Close()
182
183         if err != nil {
184                 t.Fatalf("Error during %s %s", path, err)
185         }
186
187         body, err := ioutil.ReadAll(resp.Body)
188         if err != nil {
189                 t.Fatalf("Error reading response from %s %s", path, err)
190         }
191
192         lines := strings.Split(string(body), "\n")
193         for _, line := range lines {
194                 indexes = append(indexes, strings.Split(line, " ")...)
195         }
196
197         return indexes
198 }
199
200 func getBlockIndexes(t *testing.T) [][]string {
201         var indexes [][]string
202
203         for i := 0; i < len(keepServers); i++ {
204                 indexes = append(indexes, getBlockIndexesForServer(t, i))
205         }
206         return indexes
207 }
208
209 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
210         blocks := getBlockIndexes(t)
211
212         for _, block := range notExpected {
213                 for _, idx := range blocks {
214                         if valueInArray(block, idx) {
215                                 t.Fatalf("Found unexpected block %s", block)
216                         }
217                 }
218         }
219
220         for _, block := range expected {
221                 nFound := 0
222                 for _, idx := range blocks {
223                         if valueInArray(block, idx) {
224                                 nFound++
225                         }
226                 }
227                 if nFound < minReplication {
228                         t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
229                 }
230         }
231 }
232
233 func valueInArray(value string, list []string) bool {
234         for _, v := range list {
235                 if value == v {
236                         return true
237                 }
238         }
239         return false
240 }
241
242 /*
243 Test env uses two keep volumes. The volume names can be found by reading the files
244   ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
245
246 The keep volumes are of the dir structure:
247   volumeN/subdir/locator
248 */
249 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
250         // First get rid of any size hints in the locators
251         var trimmedBlockLocators []string
252         for _, block := range oldUnusedBlockLocators {
253                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
254         }
255
256         // Get the working dir so that we can read keep{n}.volume files
257         wd, err := os.Getwd()
258         if err != nil {
259                 t.Fatalf("Error getting working dir %s", err)
260         }
261
262         // Now cycle through the two keep volumes
263         oldTime := time.Now().AddDate(0, -2, 0)
264         for i := 0; i < 2; i++ {
265                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
266                 volumeDir, err := ioutil.ReadFile(filename)
267                 if err != nil {
268                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
269                 }
270
271                 // Read the keep volume dir structure
272                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
273                 if err != nil {
274                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
275                 }
276
277                 // Read each subdir for each of the keep volume dir
278                 for _, subdir := range volumeContents {
279                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
280                         subdirContents, err := ioutil.ReadDir(string(subdirName))
281                         if err != nil {
282                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
283                         }
284
285                         // Now we got to the files. The files are names are the block locators
286                         for _, fileInfo := range subdirContents {
287                                 blockName := fileInfo.Name()
288                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
289                                 if valueInArray(blockName, trimmedBlockLocators) {
290                                         err = os.Chtimes(myname, oldTime, oldTime)
291                                 }
292                         }
293                 }
294         }
295 }
296
297 func getStatus(t *testing.T, path string) interface{} {
298         client := http.Client{}
299         req, err := http.NewRequest("GET", path, nil)
300         req.Header.Add("Authorization", "OAuth2 "+AdminToken)
301         req.Header.Add("Content-Type", "application/octet-stream")
302         resp, err := client.Do(req)
303         if err != nil {
304                 t.Fatalf("Error during %s %s", path, err)
305         }
306         defer resp.Body.Close()
307
308         var s interface{}
309         json.NewDecoder(resp.Body).Decode(&s)
310
311         return s
312 }
313
314 // Wait until PullQueue and TrashQueue are empty on all keepServers.
315 func waitUntilQueuesFinishWork(t *testing.T) {
316         for _, ks := range keepServers {
317                 for done := false; !done; {
318                         time.Sleep(100 * time.Millisecond)
319                         s := getStatus(t, ks+"/status.json")
320                         for _, qName := range []string{"PullQueue", "TrashQueue"} {
321                                 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
322                                 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
323                                         done = true
324                                 }
325                         }
326                 }
327         }
328 }
329
330 /*
331 Create some blocks and backdate some of them.
332 Also create some collections and delete some of them.
333 Verify block indexes.
334 */
335 func TestPutAndGetBlocks(t *testing.T) {
336         defer TearDownDataManagerTest(t)
337         SetupDataManagerTest(t)
338
339         // Put some blocks which will be backdated later on
340         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
341         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
342         var oldUnusedBlockLocators []string
343         oldUnusedBlockData := "this block will have older mtime"
344         for i := 0; i < 5; i++ {
345                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
346         }
347         for i := 0; i < 5; i++ {
348                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
349         }
350
351         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
352         oldUsedBlockData := "this collection block will have older mtime"
353         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
354         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
355
356         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
357         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
358         var newBlockLocators []string
359         newBlockData := "this block is newer"
360         for i := 0; i < 5; i++ {
361                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
362         }
363         for i := 0; i < 5; i++ {
364                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
365         }
366
367         // Create a collection that would be deleted later on
368         toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
369         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
370
371         // Create another collection that has the same data as the one of the old blocks
372         oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
373         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
374         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
375                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
376         }
377
378         // Create another collection whose replication level will be changed
379         replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
380         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
381
382         // Create two collections with same data; one will be deleted later on
383         dataForTwoCollections := "one of these collections will be deleted"
384         oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
385         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
386         secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
387         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
388         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
389                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
390         }
391
392         // Verify blocks before doing any backdating / deleting.
393         var expected []string
394         expected = append(expected, oldUnusedBlockLocators...)
395         expected = append(expected, newBlockLocators...)
396         expected = append(expected, toBeDeletedCollectionLocator)
397         expected = append(expected, replicationCollectionLocator)
398         expected = append(expected, oneOfTwoWithSameDataLocator)
399         expected = append(expected, secondOfTwoWithSameDataLocator)
400
401         verifyBlocks(t, nil, expected, 2)
402
403         // Run datamanager in singlerun mode
404         dataManagerSingleRun(t)
405         waitUntilQueuesFinishWork(t)
406
407         verifyBlocks(t, nil, expected, 2)
408
409         // Backdate the to-be old blocks and delete the collections
410         backdateBlocks(t, oldUnusedBlockLocators)
411         deleteCollection(t, toBeDeletedCollectionUUID)
412         deleteCollection(t, secondOfTwoWithSameDataUUID)
413
414         // Run data manager again
415         dataManagerSingleRun(t)
416         waitUntilQueuesFinishWork(t)
417
418         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
419         expected = expected[:0]
420         expected = append(expected, oldUsedBlockLocator)
421         expected = append(expected, newBlockLocators...)
422         expected = append(expected, toBeDeletedCollectionLocator)
423         expected = append(expected, oneOfTwoWithSameDataLocator)
424         expected = append(expected, secondOfTwoWithSameDataLocator)
425
426         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
427
428         // Reduce desired replication on replicationCollectionUUID
429         // collection, and verify that Data Manager does not reduce
430         // actual replication any further than that. (It might not
431         // reduce actual replication at all; that's OK for this test.)
432
433         // Reduce desired replication level.
434         updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
435         collection := getCollection(t, replicationCollectionUUID)
436         if collection["replication_desired"].(interface{}) != float64(1) {
437                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
438         }
439
440         // Verify data is currently overreplicated.
441         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
442
443         // Run data manager again
444         dataManagerSingleRun(t)
445         waitUntilQueuesFinishWork(t)
446
447         // Verify data is not underreplicated.
448         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
449
450         // Verify *other* collections' data is not underreplicated.
451         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
452 }
453
454 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
455         defer TearDownDataManagerTest(t)
456         SetupDataManagerTest(t)
457
458         for i := 0; i < 10; i++ {
459                 err := singlerun(arv)
460                 if err != nil {
461                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
462                 }
463         }
464 }
465
466 func TestGetStatusRepeatedly(t *testing.T) {
467         defer TearDownDataManagerTest(t)
468         SetupDataManagerTest(t)
469
470         for i := 0; i < 10; i++ {
471                 for j := 0; j < 2; j++ {
472                         s := getStatus(t, keepServers[j]+"/status.json")
473
474                         var pullQueueStatus interface{}
475                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
476                         var trashQueueStatus interface{}
477                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
478
479                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
480                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
481                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
482                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
483                                 t.Fatalf("PullQueue and TrashQueue status not found")
484                         }
485
486                         time.Sleep(100 * time.Millisecond)
487                 }
488         }
489 }
490
491 func TestRunDatamanagerWithBogusServer(t *testing.T) {
492         defer TearDownDataManagerTest(t)
493         SetupDataManagerTest(t)
494
495         arv.ApiServer = "bogus-server"
496
497         err := singlerun(arv)
498         if err == nil {
499                 t.Fatalf("Expected error during singlerun with bogus server")
500         }
501 }
502
503 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
504         defer TearDownDataManagerTest(t)
505         SetupDataManagerTest(t)
506
507         arv.ApiToken = ActiveUserToken
508
509         err := singlerun(arv)
510         if err == nil {
511                 t.Fatalf("Expected error during singlerun as non-admin user")
512         }
513 }