import (
"crypto/tls"
+ "encoding/json"
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
}
func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTestData, wantData bool) PullRequest {
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
+
arvadostest.StartAPI()
arvadostest.StartKeep()
- os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
-
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
t.Error("Error creating arv")
}
- client := &http.Client{Transport: &http.Transport{
- TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
+ servers := GetKeepServices(t)
+
+ random_token := GenerateRandomApiToken()
+
+ // Put content if the test needs it
+ if wantData {
+ keepClient = keepclient.KeepClient{
+ Arvados: &arv,
+ Want_replicas: 1,
+ Using_proxy: true,
+ Client: &http.Client{},
+ }
+ keepClient.Arvados.ApiToken = random_token
+
+ service_roots := make(map[string]string)
+ for _, addr := range servers {
+ service_roots[addr] = addr
+ }
+ keepClient.SetServiceRoots(service_roots)
+
+ locator, _, err := keepClient.PutB([]byte(testData.Content))
+ if err != nil {
+ t.Errorf("Error putting test data in setup for %s %s %v", testData.Content, locator, err)
+ }
+ if locator == "" {
+ t.Errorf("No locator found after putting test data")
+ }
+ }
+ // Create pullRequest for the test
keepClient = keepclient.KeepClient{
Arvados: &arv,
Want_replicas: 1,
Using_proxy: true,
- Client: client,
+ Client: &http.Client{},
}
-
- random_token := GenerateRandomApiToken()
keepClient.Arvados.ApiToken = random_token
- if err != nil {
- t.Error("Error creating keepclient")
- }
-
- servers := make([]string, 1)
- servers[0] = "https://" + os.Getenv("ARVADOS_API_HOST")
pullRequest := PullRequest{
Locator: testData.Locator,
Servers: servers,
}
+ return pullRequest
+}
+
+func GetKeepServices(t *testing.T) []string {
+ client := &http.Client{Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
+
+ req, err := http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/keep_services", os.Getenv("ARVADOS_API_HOST")), nil)
+ if err != nil {
+ t.Errorf("Error getting keep services: ", err)
+ }
+ req.Header.Set("Authorization", fmt.Sprintf("OAuth2 %s", os.Getenv("ARVADOS_API_TOKEN")))
- service_roots := make(map[string]string)
- for _, addr := range pullRequest.Servers {
- service_roots[addr] = addr
+ resp, err := client.Do(req)
+ if err != nil {
+ t.Errorf("Error getting keep services: ", err)
+ }
+ if resp.StatusCode != 200 {
+ t.Errorf("Error status code getting keep services", resp.StatusCode)
}
- keepClient.SetServiceRoots(service_roots)
- if wantData {
- locator, _, err := keepClient.PutB([]byte(testData.Content))
- if err != nil {
- t.Errorf("Error putting test data in setup for %s %s", testData.Content, locator)
+ defer resp.Body.Close()
+ var servers []string
+
+ decoder := json.NewDecoder(resp.Body)
+
+ var respJSON map[string]interface{}
+ err = decoder.Decode(&respJSON)
+ if err != nil {
+ t.Errorf("Error decoding response for keep services: ", err)
+ }
+
+ var service_names []string
+ var service_ports []string
+ for _, v1 := range respJSON {
+ switch v1_type := v1.(type) {
+ case []interface{}:
+ for _, v2 := range v1_type {
+ switch v2_type := v2.(type) {
+ case map[string]interface{}:
+ for name, value := range v2_type {
+ if name == "service_host" {
+ service_names = append(service_names, fmt.Sprintf("%s", value))
+ } else if name == "service_port" {
+ service_ports = append(service_ports, strings.Split(fmt.Sprintf("%f", value), ".")[0])
+ }
+ }
+ default:
+ }
+ }
+ default:
}
}
- return pullRequest
+
+ for i, port := range service_ports {
+ servers = append(servers, "http://"+service_names[i]+":"+port)
+ }
+
+ return servers
}
+// Do a get on a block that is not existing in any of the keep servers.
+// Expect "block not found" error.
func TestPullWorkerIntegration_GetNonExistingLocator(t *testing.T) {
testData := PullWorkIntegrationTestData{
Name: "TestPullWorkerIntegration_GetLocator",
performPullWorkerIntegrationTest(testData, pullRequest, t)
}
+// Do a get on a block that exists on one of the keep servers.
+// The setup method will create this block before doing the get.
func TestPullWorkerIntegration_GetExistingLocator(t *testing.T) {
testData := PullWorkIntegrationTestData{
Name: "TestPullWorkerIntegration_GetLocator",
performPullWorkerIntegrationTest(testData, pullRequest, t)
}
+// Perform the test.
+// The test directly invokes the "PullItemAndProcess" rather than
+// putting an item on the pullq so that the errors can be verified.
func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
+
+ // Override PutContent to mock PutBlock functionality
+ PutContent = func(content []byte, locator string) (err error) {
+ if string(content) != testData.Content {
+ t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
+ }
+ return
+ }
+
err := PullItemAndProcess(pullRequest, keepClient.Arvados.ApiToken, keepClient)
if len(testData.GetError) > 0 {
if (err == nil) || (!strings.Contains(err.Error(), testData.GetError)) {
- t.Fail()
+ t.Errorf("Got error %v", err)
}
} else {
- t.Fail()
- }
-
- // Override PutContent to mock PutBlock functionality
- PutContent = func(content []byte, locator string) (err error) {
- // do nothing
- return
+ if err != nil {
+ t.Errorf("Got error %v", err)
+ }
}
}