7490: The makeArvadosClient func, which is invoked by singlerun, should return error...
[arvados.git] / services / datamanager / datamanager_test.go
1 package main
2
3 import (
4         "encoding/json"
5         "fmt"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "os/exec"
13         "regexp"
14         "strings"
15         "testing"
16         "time"
17 )
18
19 const (
20         ActiveUserToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
21         AdminToken      = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
22 )
23
24 var arv arvadosclient.ArvadosClient
25 var keepClient *keepclient.KeepClient
26 var keepServers []string
27
28 func SetupDataManagerTest(t *testing.T) {
29         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
30
31         // start api and keep servers
32         arvadostest.ResetEnv()
33         arvadostest.StartAPI()
34         arvadostest.StartKeep(2, false)
35
36         var err error
37         arv, err = makeArvadosClient()
38         if err != nil {
39                 t.Fatalf("Error making arvados client: %s", err)
40         }
41
42         // keep client
43         keepClient = &keepclient.KeepClient{
44                 Arvados:       &arv,
45                 Want_replicas: 2,
46                 Using_proxy:   true,
47                 Client:        &http.Client{},
48         }
49
50         // discover keep services
51         if err = keepClient.DiscoverKeepServers(); err != nil {
52                 t.Fatalf("Error discovering keep services: %s", err)
53         }
54         keepServers = []string{}
55         for _, host := range keepClient.LocalRoots() {
56                 keepServers = append(keepServers, host)
57         }
58 }
59
60 func TearDownDataManagerTest(t *testing.T) {
61         arvadostest.StopKeep(2)
62         arvadostest.StopAPI()
63 }
64
65 func putBlock(t *testing.T, data string) string {
66         locator, _, err := keepClient.PutB([]byte(data))
67         if err != nil {
68                 t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
69         }
70         if locator == "" {
71                 t.Fatalf("No locator found after putting test data")
72         }
73
74         splits := strings.Split(locator, "+")
75         return splits[0] + "+" + splits[1]
76 }
77
78 func getBlock(t *testing.T, locator string, data string) {
79         reader, blocklen, _, err := keepClient.Get(locator)
80         if err != nil {
81                 t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
82         }
83         if reader == nil {
84                 t.Fatalf("No reader found after putting test data")
85         }
86         if blocklen != int64(len(data)) {
87                 t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
88         }
89
90         all, err := ioutil.ReadAll(reader)
91         if string(all) != data {
92                 t.Fatalf("Data read %s did not match expected data %s", string(all), data)
93         }
94 }
95
96 // Create a collection using arv-put
97 func createCollection(t *testing.T, data string) string {
98         tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
99         defer os.Remove(tempfile.Name())
100
101         _, err = tempfile.Write([]byte(data))
102         if err != nil {
103                 t.Fatalf("Error writing to tempfile %v", err)
104         }
105
106         // arv-put
107         output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
108         if err != nil {
109                 t.Fatalf("Error running arv-put %s", err)
110         }
111
112         uuid := string(output[0:27]) // trim terminating char
113         return uuid
114 }
115
116 // Get collection locator
117 var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
118
119 func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
120         manifest := getCollection(t, uuid)["manifest_text"].(string)
121
122         locator := strings.Split(manifest, " ")[1]
123         match := locatorMatcher.FindStringSubmatch(locator)
124         if match == nil {
125                 t.Fatalf("No locator found in collection manifest %s", manifest)
126         }
127
128         return match[1] + "+" + match[2]
129 }
130
131 func getCollection(t *testing.T, uuid string) Dict {
132         getback := make(Dict)
133         err := arv.Get("collections", uuid, nil, &getback)
134         if err != nil {
135                 t.Fatalf("Error getting collection %s", err)
136         }
137         if getback["uuid"] != uuid {
138                 t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
139         }
140
141         return getback
142 }
143
144 func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
145         err := arv.Update("collections", uuid, arvadosclient.Dict{
146                 "collection": arvadosclient.Dict{
147                         paramName: paramValue,
148                 },
149         }, &arvadosclient.Dict{})
150
151         if err != nil {
152                 t.Fatalf("Error updating collection %s", err)
153         }
154 }
155
156 type Dict map[string]interface{}
157
158 func deleteCollection(t *testing.T, uuid string) {
159         getback := make(Dict)
160         err := arv.Delete("collections", uuid, nil, &getback)
161         if err != nil {
162                 t.Fatalf("Error deleting collection %s", err)
163         }
164         if getback["uuid"] != uuid {
165                 t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
166         }
167 }
168
169 func dataManagerSingleRun(t *testing.T) {
170         err := singlerun(arv)
171         if err != nil {
172                 t.Fatalf("Error during singlerun %s", err)
173         }
174 }
175
176 func getBlockIndexesForServer(t *testing.T, i int) []string {
177         var indexes []string
178
179         path := keepServers[i] + "/index"
180         client := http.Client{}
181         req, err := http.NewRequest("GET", path, nil)
182         req.Header.Add("Authorization", "OAuth2 "+AdminToken)
183         req.Header.Add("Content-Type", "application/octet-stream")
184         resp, err := client.Do(req)
185         defer resp.Body.Close()
186
187         if err != nil {
188                 t.Fatalf("Error during %s %s", path, err)
189         }
190
191         body, err := ioutil.ReadAll(resp.Body)
192         if err != nil {
193                 t.Fatalf("Error reading response from %s %s", path, err)
194         }
195
196         lines := strings.Split(string(body), "\n")
197         for _, line := range lines {
198                 indexes = append(indexes, strings.Split(line, " ")...)
199         }
200
201         return indexes
202 }
203
204 func getBlockIndexes(t *testing.T) [][]string {
205         var indexes [][]string
206
207         for i := 0; i < len(keepServers); i++ {
208                 indexes = append(indexes, getBlockIndexesForServer(t, i))
209         }
210         return indexes
211 }
212
213 func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
214         blocks := getBlockIndexes(t)
215
216         for _, block := range notExpected {
217                 for _, idx := range blocks {
218                         if valueInArray(block, idx) {
219                                 t.Fatalf("Found unexpected block %s", block)
220                         }
221                 }
222         }
223
224         for _, block := range expected {
225                 nFound := 0
226                 for _, idx := range blocks {
227                         if valueInArray(block, idx) {
228                                 nFound++
229                         }
230                 }
231                 if nFound < minReplication {
232                         t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
233                 }
234         }
235 }
236
237 func valueInArray(value string, list []string) bool {
238         for _, v := range list {
239                 if value == v {
240                         return true
241                 }
242         }
243         return false
244 }
245
246 /*
247 Test env uses two keep volumes. The volume names can be found by reading the files
248   ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
249
250 The keep volumes are of the dir structure:
251   volumeN/subdir/locator
252 */
253 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
254         // First get rid of any size hints in the locators
255         var trimmedBlockLocators []string
256         for _, block := range oldUnusedBlockLocators {
257                 trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
258         }
259
260         // Get the working dir so that we can read keep{n}.volume files
261         wd, err := os.Getwd()
262         if err != nil {
263                 t.Fatalf("Error getting working dir %s", err)
264         }
265
266         // Now cycle through the two keep volumes
267         oldTime := time.Now().AddDate(0, -2, 0)
268         for i := 0; i < 2; i++ {
269                 filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
270                 volumeDir, err := ioutil.ReadFile(filename)
271                 if err != nil {
272                         t.Fatalf("Error reading keep volume file %s %s", filename, err)
273                 }
274
275                 // Read the keep volume dir structure
276                 volumeContents, err := ioutil.ReadDir(string(volumeDir))
277                 if err != nil {
278                         t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
279                 }
280
281                 // Read each subdir for each of the keep volume dir
282                 for _, subdir := range volumeContents {
283                         subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
284                         subdirContents, err := ioutil.ReadDir(string(subdirName))
285                         if err != nil {
286                                 t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
287                         }
288
289                         // Now we got to the files. The files are names are the block locators
290                         for _, fileInfo := range subdirContents {
291                                 blockName := fileInfo.Name()
292                                 myname := fmt.Sprintf("%s/%s", subdirName, blockName)
293                                 if valueInArray(blockName, trimmedBlockLocators) {
294                                         err = os.Chtimes(myname, oldTime, oldTime)
295                                 }
296                         }
297                 }
298         }
299 }
300
301 func getStatus(t *testing.T, path string) interface{} {
302         client := http.Client{}
303         req, err := http.NewRequest("GET", path, nil)
304         req.Header.Add("Authorization", "OAuth2 "+AdminToken)
305         req.Header.Add("Content-Type", "application/octet-stream")
306         resp, err := client.Do(req)
307         if err != nil {
308                 t.Fatalf("Error during %s %s", path, err)
309         }
310         defer resp.Body.Close()
311
312         var s interface{}
313         json.NewDecoder(resp.Body).Decode(&s)
314
315         return s
316 }
317
318 // Wait until PullQueue and TrashQueue are empty on all keepServers.
319 func waitUntilQueuesFinishWork(t *testing.T) {
320         for _, ks := range keepServers {
321                 for done := false; !done; {
322                         time.Sleep(100 * time.Millisecond)
323                         s := getStatus(t, ks+"/status.json")
324                         for _, qName := range []string{"PullQueue", "TrashQueue"} {
325                                 qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
326                                 if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
327                                         done = true
328                                 }
329                         }
330                 }
331         }
332 }
333
334 /*
335 Create some blocks and backdate some of them.
336 Also create some collections and delete some of them.
337 Verify block indexes.
338 */
339 func TestPutAndGetBlocks(t *testing.T) {
340         defer TearDownDataManagerTest(t)
341         SetupDataManagerTest(t)
342
343         // Put some blocks which will be backdated later on
344         // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
345         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
346         var oldUnusedBlockLocators []string
347         oldUnusedBlockData := "this block will have older mtime"
348         for i := 0; i < 5; i++ {
349                 oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
350         }
351         for i := 0; i < 5; i++ {
352                 getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
353         }
354
355         // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
356         oldUsedBlockData := "this collection block will have older mtime"
357         oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
358         getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
359
360         // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
361         // Hence, even though unreferenced, these should not be deleted when datamanager runs.
362         var newBlockLocators []string
363         newBlockData := "this block is newer"
364         for i := 0; i < 5; i++ {
365                 newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
366         }
367         for i := 0; i < 5; i++ {
368                 getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
369         }
370
371         // Create a collection that would be deleted later on
372         toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
373         toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
374
375         // Create another collection that has the same data as the one of the old blocks
376         oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
377         oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
378         if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
379                 t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
380         }
381
382         // Create another collection whose replication level will be changed
383         replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
384         replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
385
386         // Create two collections with same data; one will be deleted later on
387         dataForTwoCollections := "one of these collections will be deleted"
388         oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
389         oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
390         secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
391         secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
392         if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
393                 t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
394         }
395
396         // Verify blocks before doing any backdating / deleting.
397         var expected []string
398         expected = append(expected, oldUnusedBlockLocators...)
399         expected = append(expected, newBlockLocators...)
400         expected = append(expected, toBeDeletedCollectionLocator)
401         expected = append(expected, replicationCollectionLocator)
402         expected = append(expected, oneOfTwoWithSameDataLocator)
403         expected = append(expected, secondOfTwoWithSameDataLocator)
404
405         verifyBlocks(t, nil, expected, 2)
406
407         // Run datamanager in singlerun mode
408         dataManagerSingleRun(t)
409         waitUntilQueuesFinishWork(t)
410
411         verifyBlocks(t, nil, expected, 2)
412
413         // Backdate the to-be old blocks and delete the collections
414         backdateBlocks(t, oldUnusedBlockLocators)
415         deleteCollection(t, toBeDeletedCollectionUUID)
416         deleteCollection(t, secondOfTwoWithSameDataUUID)
417
418         // Run data manager again
419         dataManagerSingleRun(t)
420         waitUntilQueuesFinishWork(t)
421
422         // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
423         expected = expected[:0]
424         expected = append(expected, oldUsedBlockLocator)
425         expected = append(expected, newBlockLocators...)
426         expected = append(expected, toBeDeletedCollectionLocator)
427         expected = append(expected, oneOfTwoWithSameDataLocator)
428         expected = append(expected, secondOfTwoWithSameDataLocator)
429
430         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
431
432         // Reduce desired replication on replicationCollectionUUID
433         // collection, and verify that Data Manager does not reduce
434         // actual replication any further than that. (It might not
435         // reduce actual replication at all; that's OK for this test.)
436
437         // Reduce desired replication level.
438         updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
439         collection := getCollection(t, replicationCollectionUUID)
440         if collection["replication_desired"].(interface{}) != float64(1) {
441                 t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
442         }
443
444         // Verify data is currently overreplicated.
445         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
446
447         // Run data manager again
448         dataManagerSingleRun(t)
449         waitUntilQueuesFinishWork(t)
450
451         // Verify data is not underreplicated.
452         verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
453
454         // Verify *other* collections' data is not underreplicated.
455         verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
456 }
457
458 func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
459         defer TearDownDataManagerTest(t)
460         SetupDataManagerTest(t)
461
462         for i := 0; i < 10; i++ {
463                 err := singlerun(arv)
464                 if err != nil {
465                         t.Fatalf("Got an error during datamanager singlerun: %v", err)
466                 }
467         }
468 }
469
470 func TestGetStatusRepeatedly(t *testing.T) {
471         defer TearDownDataManagerTest(t)
472         SetupDataManagerTest(t)
473
474         for i := 0; i < 10; i++ {
475                 for j := 0; j < 2; j++ {
476                         s := getStatus(t, keepServers[j]+"/status.json")
477
478                         var pullQueueStatus interface{}
479                         pullQueueStatus = s.(map[string]interface{})["PullQueue"]
480                         var trashQueueStatus interface{}
481                         trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
482
483                         if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
484                                 pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
485                                 trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
486                                 trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
487                                 t.Fatalf("PullQueue and TrashQueue status not found")
488                         }
489
490                         time.Sleep(100 * time.Millisecond)
491                 }
492         }
493 }
494
495 func TestRunDatamanagerWithBogusServer(t *testing.T) {
496         defer TearDownDataManagerTest(t)
497         SetupDataManagerTest(t)
498
499         arv.ApiServer = "bogus-server"
500
501         err := singlerun(arv)
502         if err == nil {
503                 t.Fatalf("Expected error during singlerun with bogus server")
504         }
505 }
506
507 func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
508         defer TearDownDataManagerTest(t)
509         SetupDataManagerTest(t)
510
511         arv.ApiToken = ActiveUserToken
512
513         err := singlerun(arv)
514         if err == nil {
515                 t.Fatalf("Expected error during singlerun as non-admin user")
516         }
517 }