Merge branch 'master' into 3761-pull-list-worker
[arvados.git] / services / keepstore / pull_worker_integration_test.go
1 package main
2
3 import (
4         "crypto/tls"
5         "encoding/json"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "net/http"
11         "os"
12         "strings"
13         "testing"
14 )
15
16 var keepClient keepclient.KeepClient
17
18 type PullWorkIntegrationTestData struct {
19         Name     string
20         Locator  string
21         Content  string
22         GetError string
23 }
24
25 func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTestData, wantData bool) PullRequest {
26         os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
27
28         arvadostest.StartAPI()
29         arvadostest.StartKeep()
30
31         arv, err := arvadosclient.MakeArvadosClient()
32         if err != nil {
33                 t.Error("Error creating arv")
34         }
35
36         servers := GetKeepServices(t)
37
38         random_token := GenerateRandomApiToken()
39
40         // Put content if the test needs it
41         if wantData {
42                 keepClient = keepclient.KeepClient{
43                         Arvados:       &arv,
44                         Want_replicas: 1,
45                         Using_proxy:   true,
46                         Client:        &http.Client{},
47                 }
48                 keepClient.Arvados.ApiToken = random_token
49
50                 service_roots := make(map[string]string)
51                 for _, addr := range servers {
52                         service_roots[addr] = addr
53                 }
54                 keepClient.SetServiceRoots(service_roots)
55
56                 locator, _, err := keepClient.PutB([]byte(testData.Content))
57                 if err != nil {
58                         t.Errorf("Error putting test data in setup for %s %s %v", testData.Content, locator, err)
59                 }
60                 if locator == "" {
61                         t.Errorf("No locator found after putting test data")
62                 }
63         }
64
65         // Create pullRequest for the test
66         keepClient = keepclient.KeepClient{
67                 Arvados:       &arv,
68                 Want_replicas: 1,
69                 Using_proxy:   true,
70                 Client:        &http.Client{},
71         }
72         keepClient.Arvados.ApiToken = random_token
73
74         pullRequest := PullRequest{
75                 Locator: testData.Locator,
76                 Servers: servers,
77         }
78         return pullRequest
79 }
80
81 func GetKeepServices(t *testing.T) []string {
82         client := &http.Client{Transport: &http.Transport{
83                 TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
84
85         req, err := http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services", os.Getenv("ARVADOS_API_HOST")), nil)
86         if err != nil {
87                 t.Errorf("Error getting keep services: ", err)
88         }
89         req.Header.Set("Authorization", fmt.Sprintf("OAuth2 %s", os.Getenv("ARVADOS_API_TOKEN")))
90
91         resp, err := client.Do(req)
92         if err != nil {
93                 t.Errorf("Error getting keep services: ", err)
94         }
95         if resp.StatusCode != 200 {
96                 t.Errorf("Error status code getting keep services", resp.StatusCode)
97         }
98
99         defer resp.Body.Close()
100         var servers []string
101
102         decoder := json.NewDecoder(resp.Body)
103
104         var respJSON map[string]interface{}
105         err = decoder.Decode(&respJSON)
106         if err != nil {
107                 t.Errorf("Error decoding response for keep services: ", err)
108         }
109
110         var service_names []string
111         var service_ports []string
112         for _, v1 := range respJSON {
113                 switch v1_type := v1.(type) {
114                 case []interface{}:
115                         for _, v2 := range v1_type {
116                                 switch v2_type := v2.(type) {
117                                 case map[string]interface{}:
118                                         for name, value := range v2_type {
119                                                 if name == "service_host" {
120                                                         service_names = append(service_names, fmt.Sprintf("%s", value))
121                                                 } else if name == "service_port" {
122                                                         service_ports = append(service_ports, strings.Split(fmt.Sprintf("%f", value), ".")[0])
123                                                 }
124                                         }
125                                 default:
126                                 }
127                         }
128                 default:
129                 }
130         }
131
132         for i, port := range service_ports {
133                 servers = append(servers, "http://"+service_names[i]+":"+port)
134         }
135
136         return servers
137 }
138
139 // Do a get on a block that is not existing in any of the keep servers.
140 // Expect "block not found" error.
141 func TestPullWorkerIntegration_GetNonExistingLocator(t *testing.T) {
142         testData := PullWorkIntegrationTestData{
143                 Name:     "TestPullWorkerIntegration_GetLocator",
144                 Locator:  "5d41402abc4b2a76b9719d911017c592",
145                 Content:  "hello",
146                 GetError: "Block not found",
147         }
148
149         pullRequest := SetupPullWorkerIntegrationTest(t, testData, false)
150
151         performPullWorkerIntegrationTest(testData, pullRequest, t)
152 }
153
154 // Do a get on a block that exists on one of the keep servers.
155 // The setup method will create this block before doing the get.
156 func TestPullWorkerIntegration_GetExistingLocator(t *testing.T) {
157         testData := PullWorkIntegrationTestData{
158                 Name:     "TestPullWorkerIntegration_GetLocator",
159                 Locator:  "5d41402abc4b2a76b9719d911017c592",
160                 Content:  "hello",
161                 GetError: "",
162         }
163
164         pullRequest := SetupPullWorkerIntegrationTest(t, testData, true)
165
166         performPullWorkerIntegrationTest(testData, pullRequest, t)
167 }
168
169 // Perform the test.
170 // The test directly invokes the "PullItemAndProcess" rather than
171 // putting an item on the pullq so that the errors can be verified.
172 func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
173
174         // Override PutContent to mock PutBlock functionality
175         PutContent = func(content []byte, locator string) (err error) {
176                 if string(content) != testData.Content {
177                         t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
178                 }
179                 return
180         }
181
182         err := PullItemAndProcess(pullRequest, keepClient.Arvados.ApiToken, keepClient)
183
184         if len(testData.GetError) > 0 {
185                 if (err == nil) || (!strings.Contains(err.Error(), testData.GetError)) {
186                         t.Errorf("Got error %v", err)
187                 }
188         } else {
189                 if err != nil {
190                         t.Errorf("Got error %v", err)
191                 }
192         }
193 }